|
@@ -448,10 +448,10 @@ class MapTask extends Task {
|
|
|
private final Counters.Counter combineInputCounter;
|
|
|
private final Counters.Counter combineOutputCounter;
|
|
|
|
|
|
- private ArrayList<IndexRecord[]> indexCacheList;
|
|
|
+ private ArrayList<SpillRecord> indexCacheList;
|
|
|
private int totalIndexCacheMemory;
|
|
|
- private static final int INDEX_CACHE_MEMORY_LIMIT = 1024*1024;
|
|
|
-
|
|
|
+ private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
|
Reporter reporter) throws IOException {
|
|
@@ -463,7 +463,7 @@ class MapTask extends Task {
|
|
|
|
|
|
rfs = ((LocalFileSystem)localFs).getRaw();
|
|
|
|
|
|
- indexCacheList = new ArrayList<IndexRecord[]>();
|
|
|
+ indexCacheList = new ArrayList<SpillRecord>();
|
|
|
|
|
|
//sanity checks
|
|
|
final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
|
|
@@ -912,37 +912,19 @@ class MapTask extends Task {
|
|
|
: (bufvoid - bufend) + bufstart) +
|
|
|
partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
- FSDataOutputStream indexOut = null;
|
|
|
- IFileOutputStream indexChecksumOut = null;
|
|
|
- IndexRecord[] irArray = null;
|
|
|
try {
|
|
|
// create spill file
|
|
|
- Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
|
|
|
- numSpills, size);
|
|
|
+ final SpillRecord spillRec = new SpillRecord(partitions);
|
|
|
+ final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
|
|
|
+ numSpills, size);
|
|
|
out = rfs.create(filename);
|
|
|
- // All records (reducers) of a given spill go to
|
|
|
- // the same destination (memory or file).
|
|
|
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
|
|
|
- // create spill index file
|
|
|
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
- getTaskID(), numSpills,
|
|
|
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
|
|
|
- indexOut = rfs.create(indexFilename);
|
|
|
- indexChecksumOut = new IFileOutputStream(indexOut);
|
|
|
- }
|
|
|
- else {
|
|
|
- irArray = new IndexRecord[partitions];
|
|
|
- indexCacheList.add(numSpills,irArray);
|
|
|
- totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
final int endPosition = (kvend > kvstart)
|
|
|
? kvend
|
|
|
: kvoffsets.length + kvend;
|
|
|
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
|
|
|
int spindex = kvstart;
|
|
|
+ IndexRecord rec = new IndexRecord();
|
|
|
InMemValBytes value = new InMemValBytes();
|
|
|
for (int i = 0; i < partitions; ++i) {
|
|
|
IFile.Writer<K, V> writer = null;
|
|
@@ -983,28 +965,34 @@ class MapTask extends Task {
|
|
|
|
|
|
// close the writer
|
|
|
writer.close();
|
|
|
-
|
|
|
- if (indexChecksumOut != null) {
|
|
|
- // write the index as <offset, raw-length, compressed-length>
|
|
|
- writeIndexRecord(indexChecksumOut, segmentStart, writer);
|
|
|
- }
|
|
|
- else {
|
|
|
- irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
|
|
|
- writer.getCompressedLength());
|
|
|
- }
|
|
|
+
|
|
|
+ // record offsets
|
|
|
+ rec.startOffset = segmentStart;
|
|
|
+ rec.rawLength = writer.getRawLength();
|
|
|
+ rec.partLength = writer.getCompressedLength();
|
|
|
+ spillRec.putIndex(rec, i);
|
|
|
+
|
|
|
writer = null;
|
|
|
} finally {
|
|
|
if (null != writer) writer.close();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
|
|
|
+ // create spill index file
|
|
|
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
+ getTaskID(), numSpills,
|
|
|
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
+ spillRec.writeToFile(indexFilename, job);
|
|
|
+ } else {
|
|
|
+ indexCacheList.add(spillRec);
|
|
|
+ totalIndexCacheMemory +=
|
|
|
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
+ }
|
|
|
LOG.info("Finished spill " + numSpills);
|
|
|
++numSpills;
|
|
|
} finally {
|
|
|
if (out != null) out.close();
|
|
|
- if (indexChecksumOut != null) {
|
|
|
- indexChecksumOut.close();
|
|
|
- }
|
|
|
- if (indexOut != null) indexOut.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1017,38 +1005,22 @@ class MapTask extends Task {
|
|
|
throws IOException {
|
|
|
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
- FSDataOutputStream indexOut = null;
|
|
|
- IFileOutputStream indexChecksumOut = null;
|
|
|
- IndexRecord[] irArray = null;
|
|
|
final int partition = partitioner.getPartition(key, value, partitions);
|
|
|
try {
|
|
|
// create spill file
|
|
|
- Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
|
|
|
- numSpills, size);
|
|
|
+ final SpillRecord spillRec = new SpillRecord(partitions);
|
|
|
+ final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
|
|
|
+ numSpills, size);
|
|
|
out = rfs.create(filename);
|
|
|
|
|
|
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
|
|
|
- // create spill index
|
|
|
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
- getTaskID(), numSpills,
|
|
|
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
-
|
|
|
- indexOut = rfs.create(indexFilename);
|
|
|
- indexChecksumOut = new IFileOutputStream(indexOut);
|
|
|
- }
|
|
|
- else {
|
|
|
- irArray = new IndexRecord[partitions];
|
|
|
- indexCacheList.add(numSpills,irArray);
|
|
|
- totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
- }
|
|
|
-
|
|
|
// we don't run the combiner for a single record
|
|
|
+ IndexRecord rec = new IndexRecord();
|
|
|
for (int i = 0; i < partitions; ++i) {
|
|
|
IFile.Writer<K, V> writer = null;
|
|
|
try {
|
|
|
long segmentStart = out.getPos();
|
|
|
// Create a new codec, don't care!
|
|
|
- writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec,
|
|
|
+ writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
|
|
|
spilledRecordsCounter);
|
|
|
|
|
|
if (i == partition) {
|
|
@@ -1060,24 +1032,32 @@ class MapTask extends Task {
|
|
|
}
|
|
|
writer.close();
|
|
|
|
|
|
- if (indexChecksumOut != null) {
|
|
|
- writeIndexRecord(indexChecksumOut,segmentStart,writer);
|
|
|
- }
|
|
|
- else {
|
|
|
- irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
|
|
|
- writer.getCompressedLength());
|
|
|
- }
|
|
|
+ // record offsets
|
|
|
+ rec.startOffset = segmentStart;
|
|
|
+ rec.rawLength = writer.getRawLength();
|
|
|
+ rec.partLength = writer.getCompressedLength();
|
|
|
+ spillRec.putIndex(rec, i);
|
|
|
+
|
|
|
writer = null;
|
|
|
} catch (IOException e) {
|
|
|
if (null != writer) writer.close();
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
|
|
|
+ // create spill index file
|
|
|
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
+ getTaskID(), numSpills,
|
|
|
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
+ spillRec.writeToFile(indexFilename, job);
|
|
|
+ } else {
|
|
|
+ indexCacheList.add(spillRec);
|
|
|
+ totalIndexCacheMemory +=
|
|
|
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
+ }
|
|
|
++numSpills;
|
|
|
} finally {
|
|
|
if (out != null) out.close();
|
|
|
- if (indexChecksumOut != null) indexChecksumOut.close();
|
|
|
- if (indexOut != null) indexOut.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1172,102 +1152,99 @@ class MapTask extends Task {
|
|
|
// get the approximate size of the final output/index files
|
|
|
long finalOutFileSize = 0;
|
|
|
long finalIndexFileSize = 0;
|
|
|
- Path [] filename = new Path[numSpills];
|
|
|
-
|
|
|
+ final Path[] filename = new Path[numSpills];
|
|
|
+ final TaskAttemptID mapId = getTaskID();
|
|
|
+
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
- filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
|
|
|
+ filename[i] = mapOutputFile.getSpillFile(mapId, i);
|
|
|
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
|
|
|
}
|
|
|
-
|
|
|
if (numSpills == 1) { //the spill is the final output
|
|
|
rfs.rename(filename[0],
|
|
|
new Path(filename[0].getParent(), "file.out"));
|
|
|
if (indexCacheList.size() == 0) {
|
|
|
- rfs.rename(mapOutputFile.getSpillIndexFile(getTaskID(), 0),
|
|
|
- new Path(filename[0].getParent(),"file.out.index"));
|
|
|
- }
|
|
|
- else {
|
|
|
- writeSingleSpillIndexToFile(getTaskID(),
|
|
|
+ rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
|
|
|
new Path(filename[0].getParent(),"file.out.index"));
|
|
|
+ } else {
|
|
|
+ indexCacheList.get(0).writeToFile(
|
|
|
+ new Path(filename[0].getParent(),"file.out.index"), job);
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ // read in paged indices
|
|
|
+ for (int i = indexCacheList.size(); i < numSpills; ++i) {
|
|
|
+ Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
|
|
|
+ indexCacheList.add(new SpillRecord(indexFileName, job));
|
|
|
+ }
|
|
|
+
|
|
|
//make correction in the length to include the sequence file header
|
|
|
//lengths for each partition
|
|
|
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
|
|
|
-
|
|
|
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
-
|
|
|
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskID(),
|
|
|
+ Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
|
|
|
finalOutFileSize);
|
|
|
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
|
|
|
- getTaskID(), finalIndexFileSize);
|
|
|
-
|
|
|
- //The output stream for the final single output file
|
|
|
-
|
|
|
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true,
|
|
|
- 4096);
|
|
|
+ mapId, finalIndexFileSize);
|
|
|
|
|
|
- //The final index file output stream
|
|
|
- FSDataOutputStream finalIndexOut = rfs.create(finalIndexFile, true,
|
|
|
- 4096);
|
|
|
-
|
|
|
- IFileOutputStream finalIndexChecksumOut =
|
|
|
- new IFileOutputStream(finalIndexOut);
|
|
|
+ //The output stream for the final single output file
|
|
|
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
|
|
|
|
|
|
if (numSpills == 0) {
|
|
|
//create dummy files
|
|
|
- for (int i = 0; i < partitions; i++) {
|
|
|
- long segmentStart = finalOut.getPos();
|
|
|
- Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
|
|
|
- valClass, codec, null);
|
|
|
- writer.close();
|
|
|
- writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
|
|
|
+ IndexRecord rec = new IndexRecord();
|
|
|
+ SpillRecord sr = new SpillRecord(partitions);
|
|
|
+ try {
|
|
|
+ for (int i = 0; i < partitions; i++) {
|
|
|
+ long segmentStart = finalOut.getPos();
|
|
|
+ Writer<K, V> writer =
|
|
|
+ new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
|
|
|
+ writer.close();
|
|
|
+ rec.startOffset = segmentStart;
|
|
|
+ rec.rawLength = writer.getRawLength();
|
|
|
+ rec.partLength = writer.getCompressedLength();
|
|
|
+ sr.putIndex(rec, i);
|
|
|
+ }
|
|
|
+ sr.writeToFile(finalIndexFile, job);
|
|
|
+ } finally {
|
|
|
+ finalOut.close();
|
|
|
}
|
|
|
- finalOut.close();
|
|
|
- finalIndexChecksumOut.close();
|
|
|
- finalIndexOut.close();
|
|
|
return;
|
|
|
}
|
|
|
{
|
|
|
- for (int parts = 0; parts < partitions; parts++){
|
|
|
+ IndexRecord rec = new IndexRecord();
|
|
|
+ final SpillRecord spillRec = new SpillRecord(partitions);
|
|
|
+ for (int parts = 0; parts < partitions; parts++) {
|
|
|
//create the segments to be merged
|
|
|
- List<Segment<K, V>> segmentList =
|
|
|
+ List<Segment<K,V>> segmentList =
|
|
|
new ArrayList<Segment<K, V>>(numSpills);
|
|
|
- TaskAttemptID mapId = getTaskID();
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
- final IndexRecord indexRecord =
|
|
|
- getIndexInformation(mapId, i, parts);
|
|
|
+ IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
|
|
|
|
|
|
- long segmentOffset = indexRecord.startOffset;
|
|
|
- long segmentLength = indexRecord.partLength;
|
|
|
-
|
|
|
- Segment<K, V> s =
|
|
|
- new Segment<K, V>(job, rfs, filename[i], segmentOffset,
|
|
|
- segmentLength, codec, true);
|
|
|
+ Segment<K,V> s =
|
|
|
+ new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
|
|
|
+ indexRecord.partLength, codec, true);
|
|
|
segmentList.add(i, s);
|
|
|
-
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- long rawSegmentLength = indexRecord.rawLength;
|
|
|
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
|
|
|
- "Spill =" + i + "(" + segmentOffset + ","+
|
|
|
- rawSegmentLength + ", " + segmentLength + ")");
|
|
|
+ "Spill =" + i + "(" + indexRecord.startOffset + "," +
|
|
|
+ indexRecord.rawLength + ", " + indexRecord.partLength + ")");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//merge
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- RawKeyValueIterator kvIter =
|
|
|
- Merger.merge(job, rfs,
|
|
|
+ RawKeyValueIterator kvIter = Merger.merge(job, rfs,
|
|
|
keyClass, valClass,
|
|
|
- segmentList, job.getInt("io.sort.factor", 100),
|
|
|
- new Path(getTaskID().toString()),
|
|
|
+ segmentList, job.getInt("io.sort.factor", 100),
|
|
|
+ new Path(mapId.toString()),
|
|
|
job.getOutputKeyComparator(), reporter,
|
|
|
null, spilledRecordsCounter);
|
|
|
|
|
|
//write merged output to disk
|
|
|
long segmentStart = finalOut.getPos();
|
|
|
- Writer<K, V> writer =
|
|
|
+ Writer<K, V> writer =
|
|
|
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
|
|
|
spilledRecordsCounter);
|
|
|
if (null == combinerClass || numSpills < minSpillsForCombine) {
|
|
@@ -1279,99 +1256,21 @@ class MapTask extends Task {
|
|
|
|
|
|
//close
|
|
|
writer.close();
|
|
|
-
|
|
|
- //write index record
|
|
|
- writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
|
|
|
+
|
|
|
+ // record offsets
|
|
|
+ rec.startOffset = segmentStart;
|
|
|
+ rec.rawLength = writer.getRawLength();
|
|
|
+ rec.partLength = writer.getCompressedLength();
|
|
|
+ spillRec.putIndex(rec, parts);
|
|
|
}
|
|
|
+ spillRec.writeToFile(finalIndexFile, job);
|
|
|
finalOut.close();
|
|
|
- finalIndexChecksumOut.close();
|
|
|
- finalIndexOut.close();
|
|
|
- //cleanup
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
rfs.delete(filename[i],true);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void writeIndexRecord(IFileOutputStream indexOut,
|
|
|
- long start,
|
|
|
- Writer<K, V> writer)
|
|
|
- throws IOException {
|
|
|
- //when we write the offset/decompressed-length/compressed-length to
|
|
|
- //the final index file, we write longs for both compressed and
|
|
|
- //decompressed lengths. This helps us to reliably seek directly to
|
|
|
- //the offset/length for a partition when we start serving the
|
|
|
- //byte-ranges to the reduces. We probably waste some space in the
|
|
|
- //file by doing this as opposed to writing VLong but it helps us later on.
|
|
|
- // index record: <offset, raw-length, compressed-length>
|
|
|
- //StringBuffer sb = new StringBuffer();
|
|
|
-
|
|
|
- DataOutputStream wrapper = new DataOutputStream(indexOut);
|
|
|
- wrapper.writeLong(start);
|
|
|
- wrapper.writeLong(writer.getRawLength());
|
|
|
- long segmentLength = writer.getCompressedLength();
|
|
|
- wrapper.writeLong(segmentLength);
|
|
|
- LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
|
|
|
- segmentLength + ")");
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This function returns the index information for the given mapId, Spill
|
|
|
- * number and reducer combination. Index Information is obtained
|
|
|
- * transparently from either the indexMap or the underlying indexFile
|
|
|
- * @param mapId
|
|
|
- * @param spillNum
|
|
|
- * @param reducer
|
|
|
- * @return
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private IndexRecord getIndexInformation( TaskAttemptID mapId,
|
|
|
- int spillNum,
|
|
|
- int reducer)
|
|
|
- throws IOException {
|
|
|
- IndexRecord[] irArray = null;
|
|
|
-
|
|
|
- if (indexCacheList.size() > spillNum) {
|
|
|
- irArray = indexCacheList.get(spillNum);
|
|
|
- }
|
|
|
- else {
|
|
|
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, spillNum);
|
|
|
- irArray = IndexRecord.readIndexFile(indexFileName, job);
|
|
|
- indexCacheList.add(spillNum,irArray);
|
|
|
- rfs.delete(indexFileName,false);
|
|
|
- }
|
|
|
- return irArray[reducer];
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This function writes index information from the indexMap to the
|
|
|
- * index file that could be used by mergeParts
|
|
|
- * @param mapId
|
|
|
- * @param finalName
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void writeSingleSpillIndexToFile(TaskAttemptID mapId,
|
|
|
- Path finalName)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- IndexRecord[] irArray = null;
|
|
|
-
|
|
|
- irArray = indexCacheList.get(0);
|
|
|
-
|
|
|
- FSDataOutputStream indexOut = rfs.create(finalName);
|
|
|
- IFileOutputStream indexChecksumOut = new IFileOutputStream (indexOut);
|
|
|
- DataOutputStream wrapper = new DataOutputStream(indexChecksumOut);
|
|
|
-
|
|
|
- for (int i = 0; i < irArray.length; i++) {
|
|
|
- wrapper.writeLong(irArray[i].startOffset);
|
|
|
- wrapper.writeLong(irArray[i].rawLength);
|
|
|
- wrapper.writeLong(irArray[i].partLength);
|
|
|
- }
|
|
|
-
|
|
|
- wrapper.close();
|
|
|
- indexOut.close();
|
|
|
- }
|
|
|
-
|
|
|
} // MapOutputBuffer
|
|
|
|
|
|
/**
|