123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #ifndef BUFFERS_H_
- #define BUFFERS_H_
- #include "lib/Streams.h"
- #include "lib/Compressions.h"
- #include "lib/Constants.h"
- namespace NativeTask {
- /**
- * A lightweight read buffer, act as buffered input stream
- */
- class ReadBuffer {
- protected:
- char * _buff;
- uint32_t _remain;
- uint32_t _size;
- uint32_t _capacity;
- InputStream * _stream;
- InputStream * _source;
- protected:
- inline char * current() {
- return _buff + _size - _remain;
- }
- char * fillGet(uint32_t count);
- int32_t fillRead(char * buff, uint32_t len);
- int64_t fillReadVLong();
- public:
- ReadBuffer();
- void init(uint32_t size, InputStream * stream, const string & codec);
- ~ReadBuffer();
- /**
- * use get() to get inplace continuous memory of small object
- */
- inline char * get(uint32_t count) {
- if (likely(count <= _remain)) {
- char * ret = current();
- _remain -= count;
- return ret;
- }
- return fillGet(count);
- }
- /**
- * read to outside buffer
- */
- inline int32_t read(char * buff, uint32_t len) {
- if (likely(len <= _remain)) {
- memcpy(buff, current(), len);
- _remain -= len;
- return len;
- }
- return fillRead(buff, len);
- }
- /**
- * read to outside buffer, use simple_memcpy
- */
- inline void readUnsafe(char * buff, uint32_t len) {
- if (likely(len <= _remain)) {
- simple_memcpy(buff, current(), len);
- _remain -= len;
- return;
- }
- fillRead(buff, len);
- }
- /**
- * read VUInt
- */
- inline int64_t readVLong() {
- if (likely(_remain > 0)) {
- char * mark = current();
- if (*(int8_t*)mark >= (int8_t)-112) {
- _remain--;
- return (int64_t)*mark;
- }
- }
- return fillReadVLong();
- }
- /**
- * read uint32_t little endian
- */
- inline uint32_t read_uint32_le() {
- return *(uint32_t*)get(4);
- }
- /**
- * read uint32_t big endian
- */
- inline uint32_t read_uint32_be() {
- return hadoop_be32toh(read_uint32_le());
- }
- };
- /**
- * A light weighted append buffer, used as buffered output streams
- */
- class AppendBuffer {
- protected:
- char * _buff;
- uint32_t _remain;
- uint32_t _capacity;
- uint64_t _counter;
- OutputStream * _stream;
- OutputStream * _dest;
- bool _compression;
- protected:
- void flushd();
- inline char * current() {
- return _buff + _capacity - _remain;
- }
- void write_inner(const void * data, uint32_t len);
- void write_vlong_inner(int64_t v);
- void write_vuint2_inner(uint32_t v1, uint32_t v2);
- public:
- AppendBuffer();
- ~AppendBuffer();
- void init(uint32_t size, OutputStream * stream, const string & codec);
- CompressStream * getCompressionStream();
- uint64_t getCounter() {
- return _counter;
- }
- inline char * borrowUnsafe(uint32_t len) {
- if (likely(_remain >= len)) {
- return current();
- }
- if (likely(_capacity >= len)) {
- flushd();
- return _buff;
- }
- return NULL;
- }
- inline void useUnsafe(uint32_t count) {
- _remain -= count;
- }
- inline void write(char c) {
- if (unlikely(_remain == 0)) {
- flushd();
- }
- *current() = c;
- _remain--;
- }
- inline void write(const void * data, uint32_t len) {
- if (likely(len <= _remain)) { // append directly
- simple_memcpy(current(), data, len);
- _remain -= len;
- return;
- }
- write_inner(data, len);
- }
- inline void write_uint32_le(uint32_t v) {
- if (unlikely(4 > _remain)) {
- flushd();
- }
- *(uint32_t*)current() = v;
- _remain -= 4;
- return;
- }
- inline void write_uint32_be(uint32_t v) {
- write_uint32_le(hadoop_be32toh(v));
- }
- inline void write_uint64_le(uint64_t v) {
- if (unlikely(8 > _remain)) {
- flushd();
- }
- *(uint64_t*)current() = v;
- _remain -= 8;
- return;
- }
- inline void write_uint64_be(uint64_t v) {
- write_uint64_le(hadoop_be64toh(v));
- }
- inline void write_vlong(int64_t v) {
- if (likely(_remain > 0 && v <= 127 && v >= -112)) {
- *(char*)current() = (char)v;
- _remain--;
- return;
- }
- write_vlong_inner(v);
- }
- inline void write_vuint(uint32_t v) {
- if (likely(_remain > 0 && v <= 127)) {
- *(char*)current() = (char)v;
- _remain--;
- return;
- }
- write_vlong_inner(v);
- }
- inline void write_vuint2(uint32_t v1, uint32_t v2) {
- if (likely(_remain >= 2 && v1 <= 127 && v2 <= 127)) {
- *(char*)current() = (char)v1;
- *(char*)(current() + 1) = (char)v2;
- _remain -= 2;
- return;
- }
- write_vuint2_inner(v1, v2);
- }
- /**
- * flush current buffer, clear content
- */
- inline void flush() {
- if (_remain < _capacity) {
- flushd();
- }
- }
- };
- /**
- * Memory Key-Value buffer pair with direct address content, so can be
- * easily copied or dumped to file
- */
- struct KVBuffer {
- uint32_t keyLength;
- uint32_t valueLength;
- char content[1];
- char * getKey() {
- return content;
- }
- char * getValue() {
- return content + keyLength;
- }
- KVBuffer * next() {
- return ((KVBuffer*)(content + keyLength + valueLength));
- }
- std::string str() {
- return std::string(content, keyLength) + "\t" + std::string(getValue(), valueLength);
- }
- uint32_t length() {
- return keyLength + valueLength + SIZE_OF_KV_LENGTH;
- }
- uint32_t lengthConvertEndium() {
- return hadoop_be32toh(keyLength) + hadoop_be32toh(valueLength) + SIZE_OF_KV_LENGTH;
- }
- void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) {
- keyLength = keylen;
- valueLength = vallen;
- if (keylen > 0) {
- simple_memcpy(getKey(), key, keylen);
- }
- if (vallen > 0) {
- simple_memcpy(getValue(), value, vallen);
- }
- }
- static uint32_t headerLength() {
- return SIZE_OF_KV_LENGTH;
- }
- };
- struct KVBufferWithParititionId {
- uint32_t partitionId;
- KVBuffer buffer;
- inline static uint32_t minLength() {
- return SIZE_OF_PARTITION_LENGTH + SIZE_OF_KV_LENGTH;
- }
- int length() {
- return 4 + buffer.length();
- }
- int lengthConvertEndium() {
- return 4 + buffer.lengthConvertEndium();
- }
- };
- /**
- * Native side abstraction of java ByteBuffer
- */
- class ByteBuffer {
- private:
- char * _buff;
- uint32_t _limit;
- uint32_t _position;
- uint32_t _capacity;
- public:
- ByteBuffer()
- : _buff(NULL), _limit(0), _position(0), _capacity(0) {
- }
- ~ByteBuffer() {
- }
- void reset(char * buff, uint32_t inputCapacity) {
- this->_buff = buff;
- this->_capacity = inputCapacity;
- this->_position = 0;
- this->_limit = 0;
- }
- int capacity() {
- return this->_capacity;
- }
- uint32_t remain() {
- return _limit - _position;
- }
- uint32_t limit() {
- return _limit;
- }
- uint32_t advance(int positionOffset) {
- _position += positionOffset;
- return _position;
- }
- uint32_t position() {
- return this->_position;
- }
- void position(uint32_t newPos) {
- this->_position = newPos;
- }
- void rewind(uint32_t newPos, uint32_t newLimit) {
- this->_position = newPos;
- if (newLimit > this->_capacity) {
- THROW_EXCEPTION(IOException, "length larger than input buffer capacity");
- }
- this->_limit = newLimit;
- }
- char * current() {
- return _buff + _position;
- }
- char * base() {
- return _buff;
- }
- };
- class ByteArray {
- private:
- char * _buff;
- uint32_t _length;
- uint32_t _capacity;
- public:
- ByteArray()
- : _buff(NULL), _length(0), _capacity(0) {
- }
- ~ByteArray() {
- if (NULL != _buff) {
- delete[] _buff;
- _buff = NULL;
- }
- _length = 0;
- _capacity = 0;
- }
- void resize(uint32_t newSize) {
- if (newSize <= _capacity) {
- _length = newSize;
- } else {
- if (NULL != _buff) {
- delete[] _buff;
- _buff = NULL;
- }
- _capacity = 2 * newSize;
- _buff = new char[_capacity];
- _length = newSize;
- }
- }
- char * buff() {
- return _buff;
- }
- uint32_t size() {
- return _length;
- }
- };
- class FixSizeContainer {
- private:
- char * _buff;
- uint32_t _pos;
- uint32_t _size;
- public:
- FixSizeContainer()
- : _buff(NULL), _pos(0), _size(0) {
- }
- ~FixSizeContainer() {
- }
- void wrap(char * buff, uint32_t size) {
- _size = size;
- _buff = buff;
- _pos = 0;
- }
- void rewind() {
- _pos = 0;
- }
- uint32_t remain() {
- return _size - _pos;
- }
- char * current() {
- return _buff + _pos;
- }
- char * base() {
- return _buff;
- }
- uint32_t size() {
- return _size;
- }
- /**
- * return the length of actually filled data.
- */
- uint32_t fill(const char * source, uint32_t maxSize) {
- if (_pos > _size) {
- return 0;
- }
- uint32_t remain = _size - _pos;
- uint32_t length = (maxSize < remain) ? maxSize : remain;
- simple_memcpy(_buff + _pos, source, length);
- _pos += length;
- return length;
- }
- uint32_t position() {
- return _pos;
- }
- void position(int pos) {
- _pos = pos;
- }
- };
- class ReadWriteBuffer {
- private:
- static const uint32_t INITIAL_LENGTH = 16;
- uint32_t _readPoint;
- uint32_t _writePoint;
- char * _buff;
- uint32_t _buffLength;
- bool _newCreatedBuff;
- public:
- ReadWriteBuffer(uint32_t length)
- : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
- _buffLength = length;
- if (_buffLength > 0) {
- _buff = new char[_buffLength];
- _newCreatedBuff = true;
- }
- }
- ReadWriteBuffer()
- : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) {
- }
- ~ReadWriteBuffer() {
- if (_newCreatedBuff) {
- delete[] _buff;
- _buff = NULL;
- }
- }
- void setReadPoint(uint32_t pos) {
- _readPoint = pos;
- }
- void setWritePoint(uint32_t pos) {
- _writePoint = pos;
- }
- char * getBuff() {
- return _buff;
- }
- uint32_t getWritePoint() {
- return _writePoint;
- }
- uint32_t getReadPoint() {
- return _readPoint;
- }
- void writeInt(uint32_t param) {
- uint32_t written = param;
- checkWriteSpaceAndResizeIfNecessary(4);
- *((uint32_t *)(_buff + _writePoint)) = written;
- _writePoint += 4;
- }
- void writeLong(uint64_t param) {
- uint64_t written = param;
- checkWriteSpaceAndResizeIfNecessary(8);
- *((uint64_t *)(_buff + _writePoint)) = written;
- _writePoint += 8;
- }
- void writeString(const char * param, uint32_t length) {
- writeInt(length);
- checkWriteSpaceAndResizeIfNecessary(length);
- memcpy(_buff + _writePoint, param, length);
- _writePoint += length;
- }
- void writeString(std::string * param) {
- const char * str = param->c_str();
- int length = param->size();
- writeString(str, length);
- }
- void writePointer(void * param) {
- uint64_t written = (uint64_t)(param);
- writeLong(written);
- }
- uint32_t readInt() {
- char * readPos = _buff + _readPoint;
- uint32_t result = *((uint32_t *)(readPos));
- _readPoint += 4;
- return result;
- }
- uint64_t readLong() {
- char * readPos = _buff + _readPoint;
- uint64_t result = *((uint64_t *)(readPos));
- _readPoint += 8;
- return result;
- }
- std::string * readString() {
- uint32_t len = readInt();
- char * strBegin = _buff + _readPoint;
- _readPoint += len;
- return new std::string(strBegin, len);
- }
- void * readPointer() {
- uint64_t result = readLong();
- return (void *)(result);
- }
- private:
- void checkWriteSpaceAndResizeIfNecessary(uint32_t toBeWritten) {
- if (_buffLength == 0) {
- _newCreatedBuff = true;
- _buffLength = INITIAL_LENGTH > toBeWritten ? INITIAL_LENGTH : toBeWritten;
- _buff = new char[_buffLength];
- }
- if (_buffLength - _writePoint >= toBeWritten) {
- return;
- }
- _buffLength = _buffLength + toBeWritten;
- _newCreatedBuff = true;
- char * newBuff = new char[_buffLength];
- memcpy(newBuff, _buff, _writePoint);
- delete[] _buff;
- _buff = newBuff;
- }
- };
- typedef ReadWriteBuffer ParameterBuffer;
- typedef ReadWriteBuffer ResultBuffer;
- } // namespace NativeTask
- #endif /* BUFFERS_H_ */
|