binarchive.cc 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "binarchive.hh"
  19. #include <rpc/types.h>
  20. #include <rpc/xdr.h>
  21. using namespace hadoop;
  22. template <typename T>
  23. static void serialize(T t, OutStream& stream)
  24. {
  25. if (sizeof(T) != stream.write((const void *) &t, sizeof(T))) {
  26. throw new IOException("Error serializing data.");
  27. }
  28. }
  29. template <typename T>
  30. static void deserialize(T& t, InStream& stream)
  31. {
  32. if (sizeof(T) != stream.read((void *) &t, sizeof(T))) {
  33. throw new IOException("Error deserializing data.");
  34. }
  35. }
  36. static void serializeLong(int64_t t, OutStream& stream)
  37. {
  38. if (t >= -112 && t <= 127) {
  39. int8_t b = t;
  40. stream.write(&b, 1);
  41. return;
  42. }
  43. int8_t len = -112;
  44. if (t < 0) {
  45. t ^= 0xFFFFFFFFFFFFFFFFLL; // take one's complement
  46. len = -120;
  47. }
  48. uint64_t tmp = t;
  49. while (tmp != 0) {
  50. tmp = tmp >> 8;
  51. len--;
  52. }
  53. stream.write(&len, 1);
  54. len = (len < -120) ? -(len + 120) : -(len + 112);
  55. for (uint32_t idx = len; idx != 0; idx--) {
  56. uint32_t shiftbits = (idx - 1) * 8;
  57. uint64_t mask = 0xFFLL << shiftbits;
  58. uint8_t b = (t & mask) >> shiftbits;
  59. stream.write(&b, 1);
  60. }
  61. }
  62. static void deserializeLong(int64_t& t, InStream& stream)
  63. {
  64. int8_t b;
  65. if (1 != stream.read(&b, 1)) {
  66. throw new IOException("Error deserializing long.");
  67. }
  68. if (b >= -112) {
  69. t = b;
  70. return;
  71. }
  72. bool isNegative = (b < -120);
  73. b = isNegative ? -(b + 120) : -(b + 112);
  74. uint8_t barr[b];
  75. if (b != stream.read(barr, b)) {
  76. throw new IOException("Error deserializing long.");
  77. }
  78. t = 0;
  79. for (int idx = 0; idx < b; idx++) {
  80. t = t << 8;
  81. t |= (barr[idx] & 0xFF);
  82. }
  83. if (isNegative) {
  84. t ^= 0xFFFFFFFFFFFFFFFFLL;
  85. }
  86. }
  87. static void serializeInt(int32_t t, OutStream& stream)
  88. {
  89. int64_t longVal = t;
  90. ::serializeLong(longVal, stream);
  91. }
  92. static void deserializeInt(int32_t& t, InStream& stream)
  93. {
  94. int64_t longVal;
  95. ::deserializeLong(longVal, stream);
  96. t = longVal;
  97. }
  98. static void serializeFloat(float t, OutStream& stream)
  99. {
  100. char buf[sizeof(float)];
  101. XDR xdrs;
  102. xdrmem_create(&xdrs, buf, sizeof(float), XDR_ENCODE);
  103. xdr_float(&xdrs, &t);
  104. stream.write(buf, sizeof(float));
  105. }
  106. static void deserializeFloat(float& t, InStream& stream)
  107. {
  108. char buf[sizeof(float)];
  109. if (sizeof(float) != stream.read(buf, sizeof(float))) {
  110. throw new IOException("Error deserializing float.");
  111. }
  112. XDR xdrs;
  113. xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
  114. xdr_float(&xdrs, &t);
  115. }
  116. static void serializeDouble(double t, OutStream& stream)
  117. {
  118. char buf[sizeof(double)];
  119. XDR xdrs;
  120. xdrmem_create(&xdrs, buf, sizeof(double), XDR_ENCODE);
  121. xdr_double(&xdrs, &t);
  122. stream.write(buf, sizeof(double));
  123. }
  124. static void deserializeDouble(double& t, InStream& stream)
  125. {
  126. char buf[sizeof(double)];
  127. stream.read(buf, sizeof(double));
  128. XDR xdrs;
  129. xdrmem_create(&xdrs, buf, sizeof(double), XDR_DECODE);
  130. xdr_double(&xdrs, &t);
  131. }
  132. static void serializeString(const std::string& t, OutStream& stream)
  133. {
  134. ::serializeInt(t.length(), stream);
  135. if (t.length() > 0) {
  136. stream.write(t.data(), t.length());
  137. }
  138. }
  139. static void deserializeString(std::string& t, InStream& stream)
  140. {
  141. int32_t len = 0;
  142. ::deserializeInt(len, stream);
  143. if (len > 0) {
  144. // resize the string to the right length
  145. t.resize(len);
  146. // read into the string in 64k chunks
  147. const int bufSize = 65536;
  148. int offset = 0;
  149. char buf[bufSize];
  150. while (len > 0) {
  151. int chunkLength = len > bufSize ? bufSize : len;
  152. stream.read((void *)buf, chunkLength);
  153. t.replace(offset, chunkLength, buf, chunkLength);
  154. offset += chunkLength;
  155. len -= chunkLength;
  156. }
  157. }
  158. }
  159. void hadoop::IBinArchive::deserialize(int8_t& t, const char* tag)
  160. {
  161. ::deserialize(t, stream);
  162. }
  163. void hadoop::IBinArchive::deserialize(bool& t, const char* tag)
  164. {
  165. ::deserialize(t, stream);
  166. }
  167. void hadoop::IBinArchive::deserialize(int32_t& t, const char* tag)
  168. {
  169. int64_t longVal = 0LL;
  170. ::deserializeLong(longVal, stream);
  171. t = longVal;
  172. }
  173. void hadoop::IBinArchive::deserialize(int64_t& t, const char* tag)
  174. {
  175. ::deserializeLong(t, stream);
  176. }
  177. void hadoop::IBinArchive::deserialize(float& t, const char* tag)
  178. {
  179. ::deserializeFloat(t, stream);
  180. }
  181. void hadoop::IBinArchive::deserialize(double& t, const char* tag)
  182. {
  183. ::deserializeDouble(t, stream);
  184. }
  185. void hadoop::IBinArchive::deserialize(std::string& t, const char* tag)
  186. {
  187. ::deserializeString(t, stream);
  188. }
  189. void hadoop::IBinArchive::deserialize(std::string& t, size_t& len, const char* tag)
  190. {
  191. ::deserializeString(t, stream);
  192. len = t.length();
  193. }
  194. void hadoop::IBinArchive::startRecord(Record& s, const char* tag)
  195. {
  196. }
  197. void hadoop::IBinArchive::endRecord(Record& s, const char* tag)
  198. {
  199. }
  200. Index* hadoop::IBinArchive::startVector(const char* tag)
  201. {
  202. int32_t len;
  203. ::deserializeInt(len, stream);
  204. BinIndex *idx = new BinIndex((size_t) len);
  205. return idx;
  206. }
  207. void hadoop::IBinArchive::endVector(Index* idx, const char* tag)
  208. {
  209. delete idx;
  210. }
  211. Index* hadoop::IBinArchive::startMap(const char* tag)
  212. {
  213. int32_t len;
  214. ::deserializeInt(len, stream);
  215. BinIndex *idx = new BinIndex((size_t) len);
  216. return idx;
  217. }
  218. void hadoop::IBinArchive::endMap(Index* idx, const char* tag)
  219. {
  220. delete idx;
  221. }
  222. hadoop::IBinArchive::~IBinArchive()
  223. {
  224. }
  225. void hadoop::OBinArchive::serialize(int8_t t, const char* tag)
  226. {
  227. ::serialize(t, stream);
  228. }
  229. void hadoop::OBinArchive::serialize(bool t, const char* tag)
  230. {
  231. ::serialize(t, stream);
  232. }
  233. void hadoop::OBinArchive::serialize(int32_t t, const char* tag)
  234. {
  235. int64_t longVal = t;
  236. ::serializeLong(longVal, stream);
  237. }
  238. void hadoop::OBinArchive::serialize(int64_t t, const char* tag)
  239. {
  240. ::serializeLong(t, stream);
  241. }
  242. void hadoop::OBinArchive::serialize(float t, const char* tag)
  243. {
  244. ::serializeFloat(t, stream);
  245. }
  246. void hadoop::OBinArchive::serialize(double t, const char* tag)
  247. {
  248. ::serializeDouble(t, stream);
  249. }
  250. void hadoop::OBinArchive::serialize(const std::string& t, const char* tag)
  251. {
  252. ::serializeString(t, stream);
  253. }
  254. void hadoop::OBinArchive::serialize(const std::string& t, size_t len, const char* tag)
  255. {
  256. ::serializeString(t, stream);
  257. }
  258. void hadoop::OBinArchive::startRecord(const Record& s, const char* tag)
  259. {
  260. }
  261. void hadoop::OBinArchive::endRecord(const Record& s, const char* tag)
  262. {
  263. }
  264. void hadoop::OBinArchive::startVector(size_t len, const char* tag)
  265. {
  266. ::serializeInt(len, stream);
  267. }
  268. void hadoop::OBinArchive::endVector(size_t len, const char* tag)
  269. {
  270. }
  271. void hadoop::OBinArchive::startMap(size_t len, const char* tag)
  272. {
  273. ::serializeInt(len, stream);
  274. }
  275. void hadoop::OBinArchive::endMap(size_t len, const char* tag)
  276. {
  277. }
  278. hadoop::OBinArchive::~OBinArchive()
  279. {
  280. }