|
@@ -76,9 +76,9 @@ void CombineRunnerWrapper::combine(CombineContext type, KVIterator * kvIterator,
|
|
/////////////////////////////////////////////////////////////////
|
|
/////////////////////////////////////////////////////////////////
|
|
|
|
|
|
MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService)
|
|
MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService)
|
|
- : _config(NULL), _buckets(NULL), _keyComparator(NULL), _defaultBlockSize(0),
|
|
|
|
- _combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService), _pool(NULL),
|
|
|
|
- _numPartitions(numberPartitions) {
|
|
|
|
|
|
+ : _config(NULL), _numPartitions(numberPartitions), _buckets(NULL), _keyComparator(NULL),
|
|
|
|
+ _combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService),
|
|
|
|
+ _defaultBlockSize(0), _pool(NULL) {
|
|
_pool = new MemoryPool();
|
|
_pool = new MemoryPool();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -186,7 +186,7 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t k
|
|
delete spillpath;
|
|
delete spillpath;
|
|
}
|
|
}
|
|
|
|
|
|
- dest = dest = partition->allocateKVBuffer(kvlength);
|
|
|
|
|
|
+ dest = partition->allocateKVBuffer(kvlength);
|
|
if (NULL == dest) {
|
|
if (NULL == dest) {
|
|
// io.sort.mb too small, cann't proceed
|
|
// io.sort.mb too small, cann't proceed
|
|
// should not get here, cause get_buffer_to_put can throw OOM exception
|
|
// should not get here, cause get_buffer_to_put can throw OOM exception
|
|
@@ -294,7 +294,7 @@ void MapOutputCollector::middleSpill(const std::string & spillOutput,
|
|
uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
|
|
uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
|
|
|
|
|
|
LOG(
|
|
LOG(
|
|
- "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %llu ms, sort: %llu ms, spill: %llu ms, records: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu}",
|
|
|
|
|
|
+ "[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %"PRIu64" ms, sort: %"PRIu64" ms, spill: %"PRIu64" ms, records: %"PRIu64", uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64"}",
|
|
info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime / M, spillTime / M,
|
|
info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime / M, spillTime / M,
|
|
metrics.recordCount, info->getEndPosition(), info->getRealEndPosition());
|
|
metrics.recordCount, info->getEndPosition(), info->getRealEndPosition());
|
|
|
|
|
|
@@ -339,7 +339,7 @@ void MapOutputCollector::finalSpill(const std::string & filepath,
|
|
|
|
|
|
SortMetrics metrics;
|
|
SortMetrics metrics;
|
|
sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics);
|
|
sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics);
|
|
- LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %llu ms, records: %llu}",
|
|
|
|
|
|
+ LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %"PRIu64" ms, records: %"PRIu64"}",
|
|
metrics.sortTime / M, metrics.recordCount);
|
|
metrics.sortTime / M, metrics.recordCount);
|
|
|
|
|
|
merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions));
|
|
merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions));
|
|
@@ -347,7 +347,7 @@ void MapOutputCollector::finalSpill(const std::string & filepath,
|
|
Timer timer;
|
|
Timer timer;
|
|
merger->merge();
|
|
merger->merge();
|
|
LOG(
|
|
LOG(
|
|
- "[MapOutputCollector::final_merge_and_spill] Merge and Spill:{spilled file id: %d, merge and spill time: %llu ms}",
|
|
|
|
|
|
+ "[MapOutputCollector::final_merge_and_spill] Merge and Spill:{spilled file id: %d, merge and spill time: %"PRIu64" ms}",
|
|
_spillInfos.getSpillCount(), (timer.now() - timer.last()) / M);
|
|
_spillInfos.getSpillCount(), (timer.now() - timer.last()) / M);
|
|
|
|
|
|
delete merger;
|
|
delete merger;
|