CombineHandler.cc 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "CombineHandler.h"
  19. namespace NativeTask {
  20. const char * REFILL = "refill";
  21. const int LENGTH_OF_REFILL_STRING = 6;
  22. const Command CombineHandler::COMBINE(4, "Combine");
  23. CombineHandler::CombineHandler()
  24. : _combineContext(NULL), _kvIterator(NULL), _writer(NULL), _kType(UnknownType),
  25. _vType(UnknownType), _config(NULL), _kvCached(false), _combineInputRecordCount(0),
  26. _combineInputBytes(0), _combineOutputRecordCount(0), _combineOutputBytes(0) {
  27. }
  28. CombineHandler::~CombineHandler() {
  29. }
  30. void CombineHandler::configure(Config * config) {
  31. _config = config;
  32. MapOutputSpec::getSpecFromConfig(_config, _mapOutputSpec);
  33. _kType = _mapOutputSpec.keyType;
  34. _vType = _mapOutputSpec.valueType;
  35. }
  36. uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
  37. uint32_t written = 0;
  38. bool firstKV = true;
  39. _out.position(0);
  40. if (_kvCached) {
  41. uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
  42. outputInt(bswap(_key.outerLength));
  43. outputInt(bswap(_value.outerLength));
  44. outputKeyOrValue(_key, _kType);
  45. outputKeyOrValue(_value, _vType);
  46. written += kvLength;
  47. _kvCached = false;
  48. firstKV = false;
  49. }
  50. uint32_t recordCount = 0;
  51. while (nextKeyValue(_key, _value)) {
  52. //::sleep(5);
  53. _kvCached = false;
  54. recordCount++;
  55. uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
  56. if (!firstKV && kvLength > _out.remain()) {
  57. _kvCached = true;
  58. break;
  59. } else {
  60. firstKV = false;
  61. //write final key length and final value length
  62. outputInt(bswap(_key.outerLength));
  63. outputInt(bswap(_value.outerLength));
  64. outputKeyOrValue(_key, _kType);
  65. outputKeyOrValue(_value, _vType);
  66. written += kvLength;
  67. }
  68. }
  69. if (_out.position() > 0) {
  70. flushOutput();
  71. }
  72. _combineInputRecordCount += recordCount;
  73. _combineInputBytes += written;
  74. return written;
  75. }
  76. /**
  77. * KV: key or value
  78. */
  79. void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) {
  80. switch (type) {
  81. case TextType:
  82. output(KV.varBytes, KV.outerLength - KV.buffer.length());
  83. output(KV.buffer.data(), KV.buffer.length());
  84. break;
  85. case BytesType:
  86. outputInt(bswap(KV.buffer.length()));
  87. output(KV.buffer.data(), KV.buffer.length());
  88. break;
  89. default:
  90. output(KV.buffer.data(), KV.buffer.length());
  91. break;
  92. }
  93. }
  94. bool CombineHandler::nextKeyValue(SerializeInfo & key, SerializeInfo & value) {
  95. if (!_kvIterator->next(key.buffer, value.buffer)) {
  96. return false;
  97. }
  98. uint32_t varLength = 0;
  99. switch (_kType) {
  100. case TextType:
  101. WritableUtils::WriteVInt(key.buffer.length(), key.varBytes, varLength);
  102. key.outerLength = key.buffer.length() + varLength;
  103. break;
  104. case BytesType:
  105. key.outerLength = key.buffer.length() + 4;
  106. break;
  107. default:
  108. key.outerLength = key.buffer.length();
  109. break;
  110. }
  111. //prepare final value length
  112. uint32_t varValueLength = 0;
  113. switch (_vType) {
  114. case TextType:
  115. WritableUtils::WriteVInt(value.buffer.length(), value.varBytes, varValueLength);
  116. value.outerLength = value.buffer.length() + varValueLength;
  117. break;
  118. case BytesType:
  119. value.outerLength = value.buffer.length() + 4;
  120. break;
  121. default:
  122. value.outerLength = value.buffer.length();
  123. break;
  124. }
  125. return true;
  126. }
  127. uint32_t CombineHandler::feedDataToJava(SerializationFramework serializationType) {
  128. if (serializationType == WRITABLE_SERIALIZATION) {
  129. return feedDataToJavaInWritableSerialization();
  130. }
  131. THROW_EXCEPTION(IOException, "Native Serialization not supported");
  132. }
  133. void CombineHandler::handleInput(ByteBuffer & in) {
  134. char * buff = in.current();
  135. uint32_t length = in.remain();
  136. uint32_t remain = length;
  137. char * pos = buff;
  138. if (_asideBuffer.remain() > 0) {
  139. uint32_t filledLength = _asideBuffer.fill(pos, length);
  140. pos += filledLength;
  141. remain -= filledLength;
  142. }
  143. if (_asideBuffer.size() > 0 && _asideBuffer.remain() == 0) {
  144. _asideBuffer.position(0);
  145. write(_asideBuffer.current(), _asideBuffer.size());
  146. _asideBuffer.wrap(NULL, 0);
  147. }
  148. if (remain == 0) {
  149. return;
  150. }
  151. KVBuffer * kvBuffer = (KVBuffer *)pos;
  152. if (unlikely(remain < kvBuffer->headerLength())) {
  153. THROW_EXCEPTION(IOException, "k/v meta information incomplete");
  154. }
  155. uint32_t kvLength = kvBuffer->lengthConvertEndium();
  156. if (kvLength > remain) {
  157. _asideBytes.resize(kvLength);
  158. _asideBuffer.wrap(_asideBytes.buff(), _asideBytes.size());
  159. _asideBuffer.fill(pos, remain);
  160. pos += remain;
  161. remain = 0;
  162. } else {
  163. write(pos, remain);
  164. }
  165. }
  166. void CombineHandler::write(char * buf, uint32_t length) {
  167. KVBuffer * kv = NULL;
  168. char * pos = buf;
  169. uint32_t remain = length;
  170. uint32_t outputRecordCount = 0;
  171. while (remain > 0) {
  172. kv = (KVBuffer *)pos;
  173. kv->keyLength = bswap(kv->keyLength);
  174. kv->valueLength = bswap(kv->valueLength);
  175. _writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
  176. outputRecordCount++;
  177. remain -= kv->length();
  178. pos += kv->length();
  179. }
  180. _combineOutputRecordCount += outputRecordCount;
  181. _combineOutputBytes += length;
  182. }
  183. string toString(uint32_t length) {
  184. string result;
  185. result.reserve(4);
  186. result.assign((char *)(&length), 4);
  187. return result;
  188. }
  189. void CombineHandler::onLoadData() {
  190. feedDataToJava(WRITABLE_SERIALIZATION);
  191. }
  192. ResultBuffer * CombineHandler::onCall(const Command& command, ParameterBuffer * param) {
  193. THROW_EXCEPTION(UnsupportException, "Command not supported by RReducerHandler");
  194. }
  195. void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) {
  196. _combineInputRecordCount = 0;
  197. _combineOutputRecordCount = 0;
  198. _combineInputBytes = 0;
  199. _combineOutputBytes = 0;
  200. this->_combineContext = &type;
  201. this->_kvIterator = kvIterator;
  202. this->_writer = writer;
  203. call(COMBINE, NULL);
  204. LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, "
  205. "output Record Count: %d, output Bytes: %d",
  206. _combineInputRecordCount, _combineInputBytes,
  207. _combineOutputRecordCount, _combineOutputBytes);
  208. return;
  209. }
  210. void CombineHandler::finish() {
  211. }
  212. } /* namespace NativeTask */