123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "CombineHandler.h"
- namespace NativeTask {
- const char * REFILL = "refill";
- const int LENGTH_OF_REFILL_STRING = 6;
- const Command CombineHandler::COMBINE(4, "Combine");
- CombineHandler::CombineHandler()
- : _combineContext(NULL), _kvIterator(NULL), _writer(NULL), _kType(UnknownType),
- _vType(UnknownType), _config(NULL), _kvCached(false), _combineInputRecordCount(0),
- _combineInputBytes(0), _combineOutputRecordCount(0), _combineOutputBytes(0) {
- }
- CombineHandler::~CombineHandler() {
- }
- void CombineHandler::configure(Config * config) {
- _config = config;
- MapOutputSpec::getSpecFromConfig(_config, _mapOutputSpec);
- _kType = _mapOutputSpec.keyType;
- _vType = _mapOutputSpec.valueType;
- }
- uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
- uint32_t written = 0;
- bool firstKV = true;
- _out.position(0);
- if (_kvCached) {
- uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
- outputInt(bswap(_key.outerLength));
- outputInt(bswap(_value.outerLength));
- outputKeyOrValue(_key, _kType);
- outputKeyOrValue(_value, _vType);
- written += kvLength;
- _kvCached = false;
- firstKV = false;
- }
- uint32_t recordCount = 0;
- while (nextKeyValue(_key, _value)) {
- //::sleep(5);
- _kvCached = false;
- recordCount++;
- uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
- if (!firstKV && kvLength > _out.remain()) {
- _kvCached = true;
- break;
- } else {
- firstKV = false;
- //write final key length and final value length
- outputInt(bswap(_key.outerLength));
- outputInt(bswap(_value.outerLength));
- outputKeyOrValue(_key, _kType);
- outputKeyOrValue(_value, _vType);
- written += kvLength;
- }
- }
- if (_out.position() > 0) {
- flushOutput();
- }
- _combineInputRecordCount += recordCount;
- _combineInputBytes += written;
- return written;
- }
- /**
- * KV: key or value
- */
- void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) {
- switch (type) {
- case TextType:
- output(KV.varBytes, KV.outerLength - KV.buffer.length());
- output(KV.buffer.data(), KV.buffer.length());
- break;
- case BytesType:
- outputInt(bswap(KV.buffer.length()));
- output(KV.buffer.data(), KV.buffer.length());
- break;
- default:
- output(KV.buffer.data(), KV.buffer.length());
- break;
- }
- }
- bool CombineHandler::nextKeyValue(SerializeInfo & key, SerializeInfo & value) {
- if (!_kvIterator->next(key.buffer, value.buffer)) {
- return false;
- }
- uint32_t varLength = 0;
- switch (_kType) {
- case TextType:
- WritableUtils::WriteVInt(key.buffer.length(), key.varBytes, varLength);
- key.outerLength = key.buffer.length() + varLength;
- break;
- case BytesType:
- key.outerLength = key.buffer.length() + 4;
- break;
- default:
- key.outerLength = key.buffer.length();
- break;
- }
- //prepare final value length
- uint32_t varValueLength = 0;
- switch (_vType) {
- case TextType:
- WritableUtils::WriteVInt(value.buffer.length(), value.varBytes, varValueLength);
- value.outerLength = value.buffer.length() + varValueLength;
- break;
- case BytesType:
- value.outerLength = value.buffer.length() + 4;
- break;
- default:
- value.outerLength = value.buffer.length();
- break;
- }
- return true;
- }
- uint32_t CombineHandler::feedDataToJava(SerializationFramework serializationType) {
- if (serializationType == WRITABLE_SERIALIZATION) {
- return feedDataToJavaInWritableSerialization();
- }
- THROW_EXCEPTION(IOException, "Native Serialization not supported");
- }
- void CombineHandler::handleInput(ByteBuffer & in) {
- char * buff = in.current();
- uint32_t length = in.remain();
- uint32_t remain = length;
- char * pos = buff;
- if (_asideBuffer.remain() > 0) {
- uint32_t filledLength = _asideBuffer.fill(pos, length);
- pos += filledLength;
- remain -= filledLength;
- }
- if (_asideBuffer.size() > 0 && _asideBuffer.remain() == 0) {
- _asideBuffer.position(0);
- write(_asideBuffer.current(), _asideBuffer.size());
- _asideBuffer.wrap(NULL, 0);
- }
- if (remain == 0) {
- return;
- }
- KVBuffer * kvBuffer = (KVBuffer *)pos;
- if (unlikely(remain < kvBuffer->headerLength())) {
- THROW_EXCEPTION(IOException, "k/v meta information incomplete");
- }
- uint32_t kvLength = kvBuffer->lengthConvertEndium();
- if (kvLength > remain) {
- _asideBytes.resize(kvLength);
- _asideBuffer.wrap(_asideBytes.buff(), _asideBytes.size());
- _asideBuffer.fill(pos, remain);
- pos += remain;
- remain = 0;
- } else {
- write(pos, remain);
- }
- }
- void CombineHandler::write(char * buf, uint32_t length) {
- KVBuffer * kv = NULL;
- char * pos = buf;
- uint32_t remain = length;
- uint32_t outputRecordCount = 0;
- while (remain > 0) {
- kv = (KVBuffer *)pos;
- kv->keyLength = bswap(kv->keyLength);
- kv->valueLength = bswap(kv->valueLength);
- _writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
- outputRecordCount++;
- remain -= kv->length();
- pos += kv->length();
- }
- _combineOutputRecordCount += outputRecordCount;
- _combineOutputBytes += length;
- }
- string toString(uint32_t length) {
- string result;
- result.reserve(4);
- result.assign((char *)(&length), 4);
- return result;
- }
- void CombineHandler::onLoadData() {
- feedDataToJava(WRITABLE_SERIALIZATION);
- }
- ResultBuffer * CombineHandler::onCall(const Command& command, ParameterBuffer * param) {
- THROW_EXCEPTION(UnsupportException, "Command not supported by RReducerHandler");
- }
- void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) {
- _combineInputRecordCount = 0;
- _combineOutputRecordCount = 0;
- _combineInputBytes = 0;
- _combineOutputBytes = 0;
- this->_combineContext = &type;
- this->_kvIterator = kvIterator;
- this->_writer = writer;
- call(COMBINE, NULL);
- LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, "
- "output Record Count: %d, output Bytes: %d",
- _combineInputRecordCount, _combineInputBytes,
- _combineOutputRecordCount, _combineOutputBytes);
- return;
- }
- void CombineHandler::finish() {
- }
- } /* namespace NativeTask */
|