SerialUtils.cc 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hadoop/SerialUtils.hh"
  19. #include "hadoop/StringUtils.hh"
  20. #include <errno.h>
  21. #include <rpc/types.h>
  22. #include <rpc/xdr.h>
  23. #include <string>
  24. #include <string.h>
  25. using std::string;
  26. namespace HadoopUtils {
  27. Error::Error(const std::string& msg): error(msg) {
  28. }
  29. Error::Error(const std::string& msg,
  30. const std::string& file, int line,
  31. const std::string& function) {
  32. error = msg + " at " + file + ":" + toString(line) +
  33. " in " + function;
  34. }
  35. const std::string& Error::getMessage() const {
  36. return error;
  37. }
  38. FileInStream::FileInStream()
  39. {
  40. mFile = NULL;
  41. isOwned = false;
  42. }
  43. bool FileInStream::open(const std::string& name)
  44. {
  45. mFile = fopen(name.c_str(), "rb");
  46. isOwned = true;
  47. return (mFile != NULL);
  48. }
  49. bool FileInStream::open(FILE* file)
  50. {
  51. mFile = file;
  52. isOwned = false;
  53. return (mFile != NULL);
  54. }
  55. void FileInStream::read(void *buf, size_t len)
  56. {
  57. size_t result = fread(buf, len, 1, mFile);
  58. if (result == 0) {
  59. if (feof(mFile)) {
  60. HADOOP_ASSERT(false, "end of file");
  61. } else {
  62. HADOOP_ASSERT(false, string("read error on file: ") + strerror(errno));
  63. }
  64. }
  65. }
  66. bool FileInStream::skip(size_t nbytes)
  67. {
  68. return (0==fseek(mFile, nbytes, SEEK_CUR));
  69. }
  70. bool FileInStream::close()
  71. {
  72. int ret = 0;
  73. if (mFile != NULL && isOwned) {
  74. ret = fclose(mFile);
  75. }
  76. mFile = NULL;
  77. return (ret==0);
  78. }
  79. FileInStream::~FileInStream()
  80. {
  81. if (mFile != NULL) {
  82. close();
  83. }
  84. }
  85. FileOutStream::FileOutStream()
  86. {
  87. mFile = NULL;
  88. isOwned = false;
  89. }
  90. bool FileOutStream::open(const std::string& name, bool overwrite)
  91. {
  92. if (!overwrite) {
  93. mFile = fopen(name.c_str(), "rb");
  94. if (mFile != NULL) {
  95. fclose(mFile);
  96. return false;
  97. }
  98. }
  99. mFile = fopen(name.c_str(), "wb");
  100. isOwned = true;
  101. return (mFile != NULL);
  102. }
  103. bool FileOutStream::open(FILE* file)
  104. {
  105. mFile = file;
  106. isOwned = false;
  107. return (mFile != NULL);
  108. }
  109. void FileOutStream::write(const void* buf, size_t len)
  110. {
  111. size_t result = fwrite(buf, len, 1, mFile);
  112. HADOOP_ASSERT(result == 1,
  113. string("write error to file: ") + strerror(errno));
  114. }
  115. bool FileOutStream::advance(size_t nbytes)
  116. {
  117. return (0==fseek(mFile, nbytes, SEEK_CUR));
  118. }
  119. bool FileOutStream::close()
  120. {
  121. int ret = 0;
  122. if (mFile != NULL && isOwned) {
  123. ret = fclose(mFile);
  124. }
  125. mFile = NULL;
  126. return (ret == 0);
  127. }
  128. void FileOutStream::flush()
  129. {
  130. fflush(mFile);
  131. }
  132. FileOutStream::~FileOutStream()
  133. {
  134. if (mFile != NULL) {
  135. close();
  136. }
  137. }
  138. StringInStream::StringInStream(const std::string& str): buffer(str) {
  139. itr = buffer.begin();
  140. }
  141. void StringInStream::read(void *buf, size_t buflen) {
  142. size_t bytes = 0;
  143. char* output = (char*) buf;
  144. std::string::const_iterator end = buffer.end();
  145. while (bytes < buflen) {
  146. output[bytes++] = *itr;
  147. ++itr;
  148. if (itr == end) {
  149. break;
  150. }
  151. }
  152. HADOOP_ASSERT(bytes == buflen, "unexpected end of string reached");
  153. }
  154. void serializeInt(int32_t t, OutStream& stream) {
  155. serializeLong(t,stream);
  156. }
  157. void serializeLong(int64_t t, OutStream& stream)
  158. {
  159. if (t >= -112 && t <= 127) {
  160. int8_t b = t;
  161. stream.write(&b, 1);
  162. return;
  163. }
  164. int8_t len = -112;
  165. if (t < 0) {
  166. t ^= -1ll; // reset the sign bit
  167. len = -120;
  168. }
  169. uint64_t tmp = t;
  170. while (tmp != 0) {
  171. tmp = tmp >> 8;
  172. len--;
  173. }
  174. stream.write(&len, 1);
  175. len = (len < -120) ? -(len + 120) : -(len + 112);
  176. for (uint32_t idx = len; idx != 0; idx--) {
  177. uint32_t shiftbits = (idx - 1) * 8;
  178. uint64_t mask = 0xFFll << shiftbits;
  179. uint8_t b = (t & mask) >> shiftbits;
  180. stream.write(&b, 1);
  181. }
  182. }
  183. int32_t deserializeInt(InStream& stream) {
  184. return deserializeLong(stream);
  185. }
  186. int64_t deserializeLong(InStream& stream)
  187. {
  188. int8_t b;
  189. stream.read(&b, 1);
  190. if (b >= -112) {
  191. return b;
  192. }
  193. bool negative;
  194. int len;
  195. if (b < -120) {
  196. negative = true;
  197. len = -120 - b;
  198. } else {
  199. negative = false;
  200. len = -112 - b;
  201. }
  202. uint8_t barr[len];
  203. stream.read(barr, len);
  204. int64_t t = 0;
  205. for (int idx = 0; idx < len; idx++) {
  206. t = t << 8;
  207. t |= (barr[idx] & 0xFF);
  208. }
  209. if (negative) {
  210. t ^= -1ll;
  211. }
  212. return t;
  213. }
  214. void serializeFloat(float t, OutStream& stream)
  215. {
  216. char buf[sizeof(float)];
  217. XDR xdrs;
  218. xdrmem_create(&xdrs, buf, sizeof(float), XDR_ENCODE);
  219. xdr_float(&xdrs, &t);
  220. stream.write(buf, sizeof(float));
  221. }
  222. void deserializeFloat(float& t, InStream& stream)
  223. {
  224. char buf[sizeof(float)];
  225. stream.read(buf, sizeof(float));
  226. XDR xdrs;
  227. xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
  228. xdr_float(&xdrs, &t);
  229. }
  230. void serializeString(const std::string& t, OutStream& stream)
  231. {
  232. serializeInt(t.length(), stream);
  233. if (t.length() > 0) {
  234. stream.write(t.data(), t.length());
  235. }
  236. }
  237. void deserializeString(std::string& t, InStream& stream)
  238. {
  239. int32_t len = deserializeInt(stream);
  240. if (len > 0) {
  241. // resize the string to the right length
  242. t.resize(len);
  243. // read into the string in 64k chunks
  244. const int bufSize = 65536;
  245. int offset = 0;
  246. char buf[bufSize];
  247. while (len > 0) {
  248. int chunkLength = len > bufSize ? bufSize : len;
  249. stream.read(buf, chunkLength);
  250. t.replace(offset, chunkLength, buf, chunkLength);
  251. offset += chunkLength;
  252. len -= chunkLength;
  253. }
  254. } else {
  255. t.clear();
  256. }
  257. }
  258. }