|
@@ -24,7 +24,6 @@ import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
|
|
|
import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
|
|
|
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
|
|
|
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
|
|
|
-import static org.apache.hadoop.mapred.Task.Counter.MAP_FIRST_LEVEL_SPILLS;
|
|
|
|
|
|
import java.io.DataInput;
|
|
|
import java.io.DataOutput;
|
|
@@ -68,7 +67,7 @@ class MapTask extends Task {
|
|
|
/**
|
|
|
* The size of each record in the index file for the map-outputs.
|
|
|
*/
|
|
|
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 32;
|
|
|
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
|
|
|
|
|
|
|
|
|
private BytesWritable split = new BytesWritable();
|
|
@@ -448,7 +447,6 @@ class MapTask extends Task {
|
|
|
private final Counters.Counter mapOutputRecordCounter;
|
|
|
private final Counters.Counter combineInputCounter;
|
|
|
private final Counters.Counter combineOutputCounter;
|
|
|
- private final Counters.Counter firstLevelSpillsCounter;
|
|
|
|
|
|
private ArrayList<IndexRecord[]> indexCacheList;
|
|
|
private int totalIndexCacheMemory;
|
|
@@ -511,7 +509,6 @@ class MapTask extends Task {
|
|
|
mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
|
|
|
combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
|
|
|
combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
|
|
|
- firstLevelSpillsCounter = counters.findCounter(MAP_FIRST_LEVEL_SPILLS);
|
|
|
// compression
|
|
|
if (job.getCompressMapOutput()) {
|
|
|
Class<? extends CompressionCodec> codecClass =
|
|
@@ -951,10 +948,7 @@ class MapTask extends Task {
|
|
|
IFile.Writer<K, V> writer = null;
|
|
|
try {
|
|
|
long segmentStart = out.getPos();
|
|
|
- long numRecordsInThisPartition = 0;
|
|
|
writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
|
|
|
- long prevCnt = writer.getNumRecordsWritten();
|
|
|
-
|
|
|
if (null == combinerClass) {
|
|
|
// spill directly
|
|
|
DataInputBuffer key = new DataInputBuffer();
|
|
@@ -985,19 +979,17 @@ class MapTask extends Task {
|
|
|
combineAndSpill(kvIter, combineInputCounter);
|
|
|
}
|
|
|
}
|
|
|
- numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
|
|
|
+
|
|
|
// close the writer
|
|
|
writer.close();
|
|
|
|
|
|
if (indexChecksumOut != null) {
|
|
|
- // write the index as
|
|
|
- // <offset, raw-length, compressed-length, numRecords>
|
|
|
- writeIndexRecord(indexChecksumOut, segmentStart, writer,
|
|
|
- numRecordsInThisPartition);
|
|
|
+ // write the index as <offset, raw-length, compressed-length>
|
|
|
+ writeIndexRecord(indexChecksumOut, segmentStart, writer);
|
|
|
}
|
|
|
else {
|
|
|
irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
|
|
|
- writer.getCompressedLength(), numRecordsInThisPartition);
|
|
|
+ writer.getCompressedLength());
|
|
|
}
|
|
|
writer = null;
|
|
|
} finally {
|
|
@@ -1066,13 +1058,12 @@ class MapTask extends Task {
|
|
|
}
|
|
|
writer.close();
|
|
|
|
|
|
- long numRecords = (i == partition) ? 1:0;
|
|
|
if (indexChecksumOut != null) {
|
|
|
- writeIndexRecord(indexChecksumOut,segmentStart,writer,numRecords);
|
|
|
+ writeIndexRecord(indexChecksumOut,segmentStart,writer);
|
|
|
}
|
|
|
else {
|
|
|
irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
|
|
|
- writer.getCompressedLength(), numRecords);
|
|
|
+ writer.getCompressedLength());
|
|
|
}
|
|
|
writer = null;
|
|
|
} catch (IOException e) {
|
|
@@ -1181,8 +1172,6 @@ class MapTask extends Task {
|
|
|
long finalIndexFileSize = 0;
|
|
|
Path [] filename = new Path[numSpills];
|
|
|
|
|
|
- firstLevelSpillsCounter.increment(numSpills);
|
|
|
-
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
|
|
|
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
|
|
@@ -1199,8 +1188,7 @@ class MapTask extends Task {
|
|
|
writeSingleSpillIndexToFile(getTaskID(),
|
|
|
new Path(filename[0].getParent(),"file.out.index"));
|
|
|
}
|
|
|
- spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
|
|
|
- return;
|
|
|
+ return;
|
|
|
}
|
|
|
//make correction in the length to include the sequence file header
|
|
|
//lengths for each partition
|
|
@@ -1232,7 +1220,7 @@ class MapTask extends Task {
|
|
|
Writer<K, V> writer = new Writer<K, V>(job, finalOut,
|
|
|
keyClass, valClass, codec);
|
|
|
writer.close();
|
|
|
- writeIndexRecord(finalIndexChecksumOut, segmentStart, writer, 0);
|
|
|
+ writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
|
|
|
}
|
|
|
finalOut.close();
|
|
|
finalIndexChecksumOut.close();
|
|
@@ -1268,6 +1256,7 @@ class MapTask extends Task {
|
|
|
}
|
|
|
indexRecord = null;
|
|
|
}
|
|
|
+
|
|
|
//merge
|
|
|
@SuppressWarnings("unchecked")
|
|
|
RawKeyValueIterator kvIter =
|
|
@@ -1281,37 +1270,19 @@ class MapTask extends Task {
|
|
|
long segmentStart = finalOut.getPos();
|
|
|
Writer<K, V> writer =
|
|
|
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
|
|
|
- long numRecordsInThisPartition;
|
|
|
- long prevCnt = Writer.getNumRecordsWritten();
|
|
|
if (null == combinerClass || numSpills < minSpillsForCombine) {
|
|
|
Merger.writeFile(kvIter, writer, reporter);
|
|
|
} else {
|
|
|
combineCollector.setWriter(writer);
|
|
|
combineAndSpill(kvIter, combineInputCounter);
|
|
|
}
|
|
|
- numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
|
|
|
|
|
|
//close
|
|
|
writer.close();
|
|
|
|
|
|
//write index record
|
|
|
- writeIndexRecord(finalIndexChecksumOut,segmentStart, writer,
|
|
|
- numRecordsInThisPartition);
|
|
|
+ writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
|
|
|
}
|
|
|
-
|
|
|
- // In Map Phase, Spills to disk are done at 3 places:
|
|
|
- // (1) First Level Spills in sortAndSpill() - either
|
|
|
- // (a) without combiner
|
|
|
- // or (b) with combiner
|
|
|
- // (2) Outputs of Intermediate(multi-level) merges in Merger.merge
|
|
|
- // (3) Output of final level merge - See above if-else
|
|
|
- // (a) Merger.writeFile
|
|
|
- // or (b) combineAndSpill
|
|
|
- // In all the cases, IFile.Writer.append() takes care of counting
|
|
|
- // the records written to disk
|
|
|
-
|
|
|
- spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
|
|
|
-
|
|
|
finalOut.close();
|
|
|
finalIndexChecksumOut.close();
|
|
|
finalIndexOut.close();
|
|
@@ -1324,7 +1295,7 @@ class MapTask extends Task {
|
|
|
|
|
|
private void writeIndexRecord(IFileOutputStream indexOut,
|
|
|
long start,
|
|
|
- Writer<K, V> writer, long numRecords)
|
|
|
+ Writer<K, V> writer)
|
|
|
throws IOException {
|
|
|
//when we write the offset/decompressed-length/compressed-length to
|
|
|
//the final index file, we write longs for both compressed and
|
|
@@ -1340,9 +1311,8 @@ class MapTask extends Task {
|
|
|
wrapper.writeLong(writer.getRawLength());
|
|
|
long segmentLength = writer.getCompressedLength();
|
|
|
wrapper.writeLong(segmentLength);
|
|
|
- wrapper.writeLong(numRecords);
|
|
|
LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
|
|
|
- segmentLength + "," + numRecords + ")");
|
|
|
+ segmentLength + ")");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1396,7 +1366,6 @@ class MapTask extends Task {
|
|
|
wrapper.writeLong(irArray[i].startOffset);
|
|
|
wrapper.writeLong(irArray[i].rawLength);
|
|
|
wrapper.writeLong(irArray[i].partLength);
|
|
|
- wrapper.writeLong(irArray[i].numRecords);
|
|
|
}
|
|
|
|
|
|
wrapper.close();
|