Buffers.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef BUFFERS_H_
  19. #define BUFFERS_H_
  20. #include "lib/Streams.h"
  21. #include "lib/Compressions.h"
  22. #include "lib/Constants.h"
  23. namespace NativeTask {
  24. /**
  25. * A lightweight read buffer, act as buffered input stream
  26. */
  27. class ReadBuffer {
  28. protected:
  29. char * _buff;
  30. uint32_t _remain;
  31. uint32_t _size;
  32. uint32_t _capacity;
  33. InputStream * _stream;
  34. InputStream * _source;
  35. protected:
  36. inline char * current() {
  37. return _buff + _size - _remain;
  38. }
  39. char * fillGet(uint32_t count);
  40. int32_t fillRead(char * buff, uint32_t len);
  41. int64_t fillReadVLong();
  42. public:
  43. ReadBuffer();
  44. void init(uint32_t size, InputStream * stream, const string & codec);
  45. ~ReadBuffer();
  46. /**
  47. * use get() to get inplace continuous memory of small object
  48. */
  49. inline char * get(uint32_t count) {
  50. if (likely(count <= _remain)) {
  51. char * ret = current();
  52. _remain -= count;
  53. return ret;
  54. }
  55. return fillGet(count);
  56. }
  57. /**
  58. * read to outside buffer
  59. */
  60. inline int32_t read(char * buff, uint32_t len) {
  61. if (likely(len <= _remain)) {
  62. memcpy(buff, current(), len);
  63. _remain -= len;
  64. return len;
  65. }
  66. return fillRead(buff, len);
  67. }
  68. /**
  69. * read to outside buffer, use simple_memcpy
  70. */
  71. inline void readUnsafe(char * buff, uint32_t len) {
  72. if (likely(len <= _remain)) {
  73. simple_memcpy(buff, current(), len);
  74. _remain -= len;
  75. return;
  76. }
  77. fillRead(buff, len);
  78. }
  79. /**
  80. * read VUInt
  81. */
  82. inline int64_t readVLong() {
  83. if (likely(_remain > 0)) {
  84. char * mark = current();
  85. if (*(int8_t*)mark >= (int8_t)-112) {
  86. _remain--;
  87. return (int64_t)*mark;
  88. }
  89. }
  90. return fillReadVLong();
  91. }
  92. /**
  93. * read uint32_t little endian
  94. */
  95. inline uint32_t read_uint32_le() {
  96. return *(uint32_t*)get(4);
  97. }
  98. /**
  99. * read uint32_t big endian
  100. */
  101. inline uint32_t read_uint32_be() {
  102. return hadoop_be32toh(read_uint32_le());
  103. }
  104. };
  105. /**
  106. * A light weighted append buffer, used as buffered output streams
  107. */
  108. class AppendBuffer {
  109. protected:
  110. char * _buff;
  111. uint32_t _remain;
  112. uint32_t _capacity;
  113. uint64_t _counter;
  114. OutputStream * _stream;
  115. OutputStream * _dest;
  116. bool _compression;
  117. protected:
  118. void flushd();
  119. inline char * current() {
  120. return _buff + _capacity - _remain;
  121. }
  122. void write_inner(const void * data, uint32_t len);
  123. void write_vlong_inner(int64_t v);
  124. void write_vuint2_inner(uint32_t v1, uint32_t v2);
  125. public:
  126. AppendBuffer();
  127. ~AppendBuffer();
  128. void init(uint32_t size, OutputStream * stream, const string & codec);
  129. CompressStream * getCompressionStream();
  130. uint64_t getCounter() {
  131. return _counter;
  132. }
  133. inline char * borrowUnsafe(uint32_t len) {
  134. if (likely(_remain >= len)) {
  135. return current();
  136. }
  137. if (likely(_capacity >= len)) {
  138. flushd();
  139. return _buff;
  140. }
  141. return NULL;
  142. }
  143. inline void useUnsafe(uint32_t count) {
  144. _remain -= count;
  145. }
  146. inline void write(char c) {
  147. if (unlikely(_remain == 0)) {
  148. flushd();
  149. }
  150. *current() = c;
  151. _remain--;
  152. }
  153. inline void write(const void * data, uint32_t len) {
  154. if (likely(len <= _remain)) { // append directly
  155. simple_memcpy(current(), data, len);
  156. _remain -= len;
  157. return;
  158. }
  159. write_inner(data, len);
  160. }
  161. inline void write_uint32_le(uint32_t v) {
  162. if (unlikely(4 > _remain)) {
  163. flushd();
  164. }
  165. *(uint32_t*)current() = v;
  166. _remain -= 4;
  167. return;
  168. }
  169. inline void write_uint32_be(uint32_t v) {
  170. write_uint32_le(hadoop_be32toh(v));
  171. }
  172. inline void write_uint64_le(uint64_t v) {
  173. if (unlikely(8 > _remain)) {
  174. flushd();
  175. }
  176. *(uint64_t*)current() = v;
  177. _remain -= 8;
  178. return;
  179. }
  180. inline void write_uint64_be(uint64_t v) {
  181. write_uint64_le(hadoop_be64toh(v));
  182. }
  183. inline void write_vlong(int64_t v) {
  184. if (likely(_remain > 0 && v <= 127 && v >= -112)) {
  185. *(char*)current() = (char)v;
  186. _remain--;
  187. return;
  188. }
  189. write_vlong_inner(v);
  190. }
  191. inline void write_vuint(uint32_t v) {
  192. if (likely(_remain > 0 && v <= 127)) {
  193. *(char*)current() = (char)v;
  194. _remain--;
  195. return;
  196. }
  197. write_vlong_inner(v);
  198. }
  199. inline void write_vuint2(uint32_t v1, uint32_t v2) {
  200. if (likely(_remain >= 2 && v1 <= 127 && v2 <= 127)) {
  201. *(char*)current() = (char)v1;
  202. *(char*)(current() + 1) = (char)v2;
  203. _remain -= 2;
  204. return;
  205. }
  206. write_vuint2_inner(v1, v2);
  207. }
  208. /**
  209. * flush current buffer, clear content
  210. */
  211. inline void flush() {
  212. if (_remain < _capacity) {
  213. flushd();
  214. }
  215. }
  216. };
  217. /**
  218. * Memory Key-Value buffer pair with direct address content, so can be
  219. * easily copied or dumped to file
  220. */
  221. struct KVBuffer {
  222. uint32_t keyLength;
  223. uint32_t valueLength;
  224. char content[1];
  225. char * getKey() {
  226. return content;
  227. }
  228. char * getValue() {
  229. return content + keyLength;
  230. }
  231. KVBuffer * next() {
  232. return ((KVBuffer*)(content + keyLength + valueLength));
  233. }
  234. std::string str() {
  235. return std::string(content, keyLength) + "\t" + std::string(getValue(), valueLength);
  236. }
  237. uint32_t length() {
  238. return keyLength + valueLength + SIZE_OF_KV_LENGTH;
  239. }
  240. uint32_t lengthConvertEndium() {
  241. return hadoop_be32toh(keyLength) + hadoop_be32toh(valueLength) + SIZE_OF_KV_LENGTH;
  242. }
  243. void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) {
  244. keyLength = keylen;
  245. valueLength = vallen;
  246. if (keylen > 0) {
  247. simple_memcpy(getKey(), key, keylen);
  248. }
  249. if (vallen > 0) {
  250. simple_memcpy(getValue(), value, vallen);
  251. }
  252. }
  253. static uint32_t headerLength() {
  254. return SIZE_OF_KV_LENGTH;
  255. }
  256. };
  257. struct KVBufferWithParititionId {
  258. uint32_t partitionId;
  259. KVBuffer buffer;
  260. inline static uint32_t minLength() {
  261. return SIZE_OF_PARTITION_LENGTH + SIZE_OF_KV_LENGTH;
  262. }
  263. int length() {
  264. return 4 + buffer.length();
  265. }
  266. int lengthConvertEndium() {
  267. return 4 + buffer.lengthConvertEndium();
  268. }
  269. };
  270. /**
  271. * Native side abstraction of java ByteBuffer
  272. */
  273. class ByteBuffer {
  274. private:
  275. char * _buff;
  276. uint32_t _limit;
  277. uint32_t _position;
  278. uint32_t _capacity;
  279. public:
  280. ByteBuffer()
  281. : _buff(NULL), _limit(0), _position(0), _capacity(0) {
  282. }
  283. ~ByteBuffer() {
  284. }
  285. void reset(char * buff, uint32_t inputCapacity) {
  286. this->_buff = buff;
  287. this->_capacity = inputCapacity;
  288. this->_position = 0;
  289. this->_limit = 0;
  290. }
  291. int capacity() {
  292. return this->_capacity;
  293. }
  294. uint32_t remain() {
  295. return _limit - _position;
  296. }
  297. uint32_t limit() {
  298. return _limit;
  299. }
  300. uint32_t advance(int positionOffset) {
  301. _position += positionOffset;
  302. return _position;
  303. }
  304. uint32_t position() {
  305. return this->_position;
  306. }
  307. void position(uint32_t newPos) {
  308. this->_position = newPos;
  309. }
  310. void rewind(uint32_t newPos, uint32_t newLimit) {
  311. this->_position = newPos;
  312. if (newLimit > this->_capacity) {
  313. THROW_EXCEPTION(IOException, "length larger than input buffer capacity");
  314. }
  315. this->_limit = newLimit;
  316. }
  317. char * current() {
  318. return _buff + _position;
  319. }
  320. char * base() {
  321. return _buff;
  322. }
  323. };
  324. class ByteArray {
  325. private:
  326. char * _buff;
  327. uint32_t _length;
  328. uint32_t _capacity;
  329. public:
  330. ByteArray()
  331. : _buff(NULL), _length(0), _capacity(0) {
  332. }
  333. ~ByteArray() {
  334. if (NULL != _buff) {
  335. delete[] _buff;
  336. _buff = NULL;
  337. }
  338. _length = 0;
  339. _capacity = 0;
  340. }
  341. void resize(uint32_t newSize) {
  342. if (newSize <= _capacity) {
  343. _length = newSize;
  344. } else {
  345. if (NULL != _buff) {
  346. delete[] _buff;
  347. _buff = NULL;
  348. }
  349. _capacity = 2 * newSize;
  350. _buff = new char[_capacity];
  351. _length = newSize;
  352. }
  353. }
  354. char * buff() {
  355. return _buff;
  356. }
  357. uint32_t size() {
  358. return _length;
  359. }
  360. };
  361. class FixSizeContainer {
  362. private:
  363. char * _buff;
  364. uint32_t _pos;
  365. uint32_t _size;
  366. public:
  367. FixSizeContainer()
  368. : _buff(NULL), _pos(0), _size(0) {
  369. }
  370. ~FixSizeContainer() {
  371. }
  372. void wrap(char * buff, uint32_t size) {
  373. _size = size;
  374. _buff = buff;
  375. _pos = 0;
  376. }
  377. void rewind() {
  378. _pos = 0;
  379. }
  380. uint32_t remain() {
  381. return _size - _pos;
  382. }
  383. char * current() {
  384. return _buff + _pos;
  385. }
  386. char * base() {
  387. return _buff;
  388. }
  389. uint32_t size() {
  390. return _size;
  391. }
  392. /**
  393. * return the length of actually filled data.
  394. */
  395. uint32_t fill(const char * source, uint32_t maxSize) {
  396. if (_pos > _size) {
  397. return 0;
  398. }
  399. uint32_t remain = _size - _pos;
  400. uint32_t length = (maxSize < remain) ? maxSize : remain;
  401. simple_memcpy(_buff + _pos, source, length);
  402. _pos += length;
  403. return length;
  404. }
  405. uint32_t position() {
  406. return _pos;
  407. }
  408. void position(int pos) {
  409. _pos = pos;
  410. }
  411. };
  412. class ReadWriteBuffer {
  413. private:
  414. static const uint32_t INITIAL_LENGTH = 16;
  415. uint32_t _readPoint;
  416. uint32_t _writePoint;
  417. char * _buff;
  418. uint32_t _buffLength;
  419. bool _newCreatedBuff;
  420. public:
  421. ReadWriteBuffer(uint32_t length)
  422. : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
  423. _buffLength = length;
  424. if (_buffLength > 0) {
  425. _buff = new char[_buffLength];
  426. _newCreatedBuff = true;
  427. }
  428. }
  429. ReadWriteBuffer()
  430. : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
  431. }
  432. ~ReadWriteBuffer() {
  433. if (_newCreatedBuff) {
  434. delete[] _buff;
  435. _buff = NULL;
  436. }
  437. }
  438. void setReadPoint(uint32_t pos) {
  439. _readPoint = pos;
  440. }
  441. void setWritePoint(uint32_t pos) {
  442. _writePoint = pos;
  443. }
  444. char * getBuff() {
  445. return _buff;
  446. }
  447. uint32_t getWritePoint() {
  448. return _writePoint;
  449. }
  450. uint32_t getReadPoint() {
  451. return _readPoint;
  452. }
  453. void writeInt(uint32_t param) {
  454. uint32_t written = param;
  455. checkWriteSpaceAndResizeIfNecessary(4);
  456. *((uint32_t *)(_buff + _writePoint)) = written;
  457. _writePoint += 4;
  458. }
  459. void writeLong(uint64_t param) {
  460. uint64_t written = param;
  461. checkWriteSpaceAndResizeIfNecessary(8);
  462. *((uint64_t *)(_buff + _writePoint)) = written;
  463. _writePoint += 8;
  464. }
  465. void writeString(const char * param, uint32_t length) {
  466. writeInt(length);
  467. checkWriteSpaceAndResizeIfNecessary(length);
  468. memcpy(_buff + _writePoint, param, length);
  469. _writePoint += length;
  470. }
  471. void writeString(std::string * param) {
  472. const char * str = param->c_str();
  473. int length = param->size();
  474. writeString(str, length);
  475. }
  476. void writePointer(void * param) {
  477. uint64_t written = (uint64_t)(param);
  478. writeLong(written);
  479. }
  480. uint32_t readInt() {
  481. char * readPos = _buff + _readPoint;
  482. uint32_t result = *((uint32_t *)(readPos));
  483. _readPoint += 4;
  484. return result;
  485. }
  486. uint64_t readLong() {
  487. char * readPos = _buff + _readPoint;
  488. uint64_t result = *((uint64_t *)(readPos));
  489. _readPoint += 8;
  490. return result;
  491. }
  492. std::string * readString() {
  493. uint32_t len = readInt();
  494. char * strBegin = _buff + _readPoint;
  495. _readPoint += len;
  496. return new std::string(strBegin, len);
  497. }
  498. void * readPointer() {
  499. uint64_t result = readLong();
  500. return (void *)(result);
  501. }
  502. private:
  503. void checkWriteSpaceAndResizeIfNecessary(uint32_t toBeWritten) {
  504. if (_buffLength == 0) {
  505. _newCreatedBuff = true;
  506. _buffLength = INITIAL_LENGTH > toBeWritten ? INITIAL_LENGTH : toBeWritten;
  507. _buff = new char[_buffLength];
  508. }
  509. if (_buffLength - _writePoint >= toBeWritten) {
  510. return;
  511. }
  512. _buffLength = _buffLength + toBeWritten;
  513. _newCreatedBuff = true;
  514. char * newBuff = new char[_buffLength];
  515. memcpy(newBuff, _buff, _writePoint);
  516. delete[] _buff;
  517. _buff = newBuff;
  518. }
  519. };
  520. typedef ReadWriteBuffer ParameterBuffer;
  521. typedef ReadWriteBuffer ResultBuffer;
  522. } // namespace NativeTask
  523. #endif /* BUFFERS_H_ */