|
@@ -442,7 +442,11 @@ class MapTask extends Task {
|
|
|
private final Counters.Counter mapOutputRecordCounter;
|
|
|
private final Counters.Counter combineInputCounter;
|
|
|
private final Counters.Counter combineOutputCounter;
|
|
|
-
|
|
|
+
|
|
|
+ private ArrayList<IndexRecord[]> indexCacheList;
|
|
|
+ private int totalIndexCacheMemory;
|
|
|
+ private static final int INDEX_CACHE_MEMORY_LIMIT = 1024*1024;
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
|
Reporter reporter) throws IOException {
|
|
@@ -454,6 +458,8 @@ class MapTask extends Task {
|
|
|
|
|
|
rfs = ((LocalFileSystem)localFs).getRaw();
|
|
|
|
|
|
+ indexCacheList = new ArrayList<IndexRecord[]>();
|
|
|
+
|
|
|
//sanity checks
|
|
|
final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
|
|
|
final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
|
|
@@ -902,16 +908,31 @@ class MapTask extends Task {
|
|
|
partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
FSDataOutputStream indexOut = null;
|
|
|
+ IFileOutputStream indexChecksumOut = null;
|
|
|
+ IndexRecord[] irArray = null;
|
|
|
try {
|
|
|
// create spill file
|
|
|
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
|
|
|
numSpills, size);
|
|
|
out = rfs.create(filename);
|
|
|
- // create spill index
|
|
|
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
- getTaskID(), numSpills,
|
|
|
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
- indexOut = localFs.create(indexFilename);
|
|
|
+ // All records (reducers) of a given spill go to
|
|
|
+ // the same destination (memory or file).
|
|
|
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
|
|
|
+ // create spill index file
|
|
|
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
+ getTaskID(), numSpills,
|
|
|
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
+
|
|
|
+ indexOut = rfs.create(indexFilename);
|
|
|
+ indexChecksumOut = new IFileOutputStream(indexOut);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ irArray = new IndexRecord[partitions];
|
|
|
+ indexCacheList.add(numSpills,irArray);
|
|
|
+ totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
final int endPosition = (kvend > kvstart)
|
|
|
? kvend
|
|
|
: kvoffsets.length + kvend;
|
|
@@ -957,8 +978,14 @@ class MapTask extends Task {
|
|
|
// close the writer
|
|
|
writer.close();
|
|
|
|
|
|
- // write the index as <offset, raw-length, compressed-length>
|
|
|
- writeIndexRecord(indexOut, out, segmentStart, writer);
|
|
|
+ if (indexChecksumOut != null) {
|
|
|
+ // write the index as <offset, raw-length, compressed-length>
|
|
|
+ writeIndexRecord(indexChecksumOut, segmentStart, writer);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
|
|
|
+ writer.getCompressedLength());
|
|
|
+ }
|
|
|
writer = null;
|
|
|
} finally {
|
|
|
if (null != writer) writer.close();
|
|
@@ -968,6 +995,9 @@ class MapTask extends Task {
|
|
|
++numSpills;
|
|
|
} finally {
|
|
|
if (out != null) out.close();
|
|
|
+ if (indexChecksumOut != null) {
|
|
|
+ indexChecksumOut.close();
|
|
|
+ }
|
|
|
if (indexOut != null) indexOut.close();
|
|
|
}
|
|
|
}
|
|
@@ -982,17 +1012,30 @@ class MapTask extends Task {
|
|
|
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
FSDataOutputStream indexOut = null;
|
|
|
+ IFileOutputStream indexChecksumOut = null;
|
|
|
+ IndexRecord[] irArray = null;
|
|
|
final int partition = partitioner.getPartition(key, value, partitions);
|
|
|
try {
|
|
|
// create spill file
|
|
|
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
|
|
|
numSpills, size);
|
|
|
out = rfs.create(filename);
|
|
|
- // create spill index
|
|
|
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
- getTaskID(), numSpills,
|
|
|
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
- indexOut = localFs.create(indexFilename);
|
|
|
+
|
|
|
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
|
|
|
+ // create spill index
|
|
|
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
|
|
|
+ getTaskID(), numSpills,
|
|
|
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
+
|
|
|
+ indexOut = rfs.create(indexFilename);
|
|
|
+ indexChecksumOut = new IFileOutputStream(indexOut);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ irArray = new IndexRecord[partitions];
|
|
|
+ indexCacheList.add(numSpills,irArray);
|
|
|
+ totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
|
|
|
+ }
|
|
|
+
|
|
|
// we don't run the combiner for a single record
|
|
|
for (int i = 0; i < partitions; ++i) {
|
|
|
IFile.Writer<K, V> writer = null;
|
|
@@ -1010,8 +1053,14 @@ class MapTask extends Task {
|
|
|
}
|
|
|
writer.close();
|
|
|
|
|
|
- // index record
|
|
|
- writeIndexRecord(indexOut, out, segmentStart, writer);
|
|
|
+ if (indexChecksumOut != null) {
|
|
|
+ writeIndexRecord(indexChecksumOut,segmentStart,writer);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
|
|
|
+ writer.getCompressedLength());
|
|
|
+ }
|
|
|
+ writer = null;
|
|
|
} catch (IOException e) {
|
|
|
if (null != writer) writer.close();
|
|
|
throw e;
|
|
@@ -1020,6 +1069,7 @@ class MapTask extends Task {
|
|
|
++numSpills;
|
|
|
} finally {
|
|
|
if (out != null) out.close();
|
|
|
+ if (indexChecksumOut != null) indexChecksumOut.close();
|
|
|
if (indexOut != null) indexOut.close();
|
|
|
}
|
|
|
}
|
|
@@ -1116,20 +1166,23 @@ class MapTask extends Task {
|
|
|
long finalOutFileSize = 0;
|
|
|
long finalIndexFileSize = 0;
|
|
|
Path [] filename = new Path[numSpills];
|
|
|
- Path [] indexFileName = new Path[numSpills];
|
|
|
- FileSystem localFs = FileSystem.getLocal(job);
|
|
|
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
|
|
|
- indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
|
|
|
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
|
|
|
}
|
|
|
|
|
|
if (numSpills == 1) { //the spill is the final output
|
|
|
- rfs.rename(filename[0],
|
|
|
- new Path(filename[0].getParent(), "file.out"));
|
|
|
- localFs.rename(indexFileName[0],
|
|
|
- new Path(indexFileName[0].getParent(),"file.out.index"));
|
|
|
+ rfs.rename(filename[0],
|
|
|
+ new Path(filename[0].getParent(), "file.out"));
|
|
|
+ if (indexCacheList.size() == 0) {
|
|
|
+ rfs.rename(mapOutputFile.getSpillIndexFile(getTaskID(), 0),
|
|
|
+ new Path(filename[0].getParent(),"file.out.index"));
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ writeSingleSpillIndexToFile(getTaskID(),
|
|
|
+ new Path(filename[0].getParent(),"file.out.index"));
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
//make correction in the length to include the sequence file header
|
|
@@ -1149,8 +1202,12 @@ class MapTask extends Task {
|
|
|
4096);
|
|
|
|
|
|
//The final index file output stream
|
|
|
- FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
|
|
|
+ FSDataOutputStream finalIndexOut = rfs.create(finalIndexFile, true,
|
|
|
4096);
|
|
|
+
|
|
|
+ IFileOutputStream finalIndexChecksumOut =
|
|
|
+ new IFileOutputStream(finalIndexOut);
|
|
|
+
|
|
|
if (numSpills == 0) {
|
|
|
//create dummy files
|
|
|
for (int i = 0; i < partitions; i++) {
|
|
@@ -1158,9 +1215,10 @@ class MapTask extends Task {
|
|
|
Writer<K, V> writer = new Writer<K, V>(job, finalOut,
|
|
|
keyClass, valClass, codec);
|
|
|
writer.close();
|
|
|
- writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
|
|
|
+ writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
|
|
|
}
|
|
|
finalOut.close();
|
|
|
+ finalIndexChecksumOut.close();
|
|
|
finalIndexOut.close();
|
|
|
return;
|
|
|
}
|
|
@@ -1169,13 +1227,15 @@ class MapTask extends Task {
|
|
|
//create the segments to be merged
|
|
|
List<Segment<K, V>> segmentList =
|
|
|
new ArrayList<Segment<K, V>>(numSpills);
|
|
|
+ TaskAttemptID mapId = getTaskID();
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
- FSDataInputStream indexIn = localFs.open(indexFileName[i]);
|
|
|
- indexIn.seek(parts * MAP_OUTPUT_INDEX_RECORD_LENGTH);
|
|
|
- long segmentOffset = indexIn.readLong();
|
|
|
- long rawSegmentLength = indexIn.readLong();
|
|
|
- long segmentLength = indexIn.readLong();
|
|
|
- indexIn.close();
|
|
|
+ IndexRecord indexRecord =
|
|
|
+ getIndexInformation(mapId, i, parts);
|
|
|
+
|
|
|
+ long segmentOffset = indexRecord.startOffset;
|
|
|
+ long rawSegmentLength = indexRecord.rawLength;
|
|
|
+ long segmentLength = indexRecord.partLength;
|
|
|
+
|
|
|
FSDataInputStream in = rfs.open(filename[i]);
|
|
|
in.seek(segmentOffset);
|
|
|
|
|
@@ -1185,9 +1245,11 @@ class MapTask extends Task {
|
|
|
segmentList.add(i, s);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Index: (" + indexFileName[i] + ", " + segmentOffset +
|
|
|
+ LOG.debug("MapId=" + mapId + " Reducer=" + parts +
|
|
|
+ "Spill =" + i + "(" + segmentOffset + ","+
|
|
|
rawSegmentLength + ", " + segmentLength + ")");
|
|
|
}
|
|
|
+ indexRecord = null;
|
|
|
}
|
|
|
|
|
|
//merge
|
|
@@ -1214,20 +1276,20 @@ class MapTask extends Task {
|
|
|
writer.close();
|
|
|
|
|
|
//write index record
|
|
|
- writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
|
|
|
+ writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
|
|
|
}
|
|
|
finalOut.close();
|
|
|
+ finalIndexChecksumOut.close();
|
|
|
finalIndexOut.close();
|
|
|
//cleanup
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
rfs.delete(filename[i],true);
|
|
|
- localFs.delete(indexFileName[i], true);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void writeIndexRecord(FSDataOutputStream indexOut,
|
|
|
- FSDataOutputStream out, long start,
|
|
|
+ private void writeIndexRecord(IFileOutputStream indexOut,
|
|
|
+ long start,
|
|
|
Writer<K, V> writer)
|
|
|
throws IOException {
|
|
|
//when we write the offset/decompressed-length/compressed-length to
|
|
@@ -1238,14 +1300,73 @@ class MapTask extends Task {
|
|
|
//file by doing this as opposed to writing VLong but it helps us later on.
|
|
|
// index record: <offset, raw-length, compressed-length>
|
|
|
//StringBuffer sb = new StringBuffer();
|
|
|
- indexOut.writeLong(start);
|
|
|
- indexOut.writeLong(writer.getRawLength());
|
|
|
+
|
|
|
+ DataOutputStream wrapper = new DataOutputStream(indexOut);
|
|
|
+ wrapper.writeLong(start);
|
|
|
+ wrapper.writeLong(writer.getRawLength());
|
|
|
long segmentLength = writer.getCompressedLength();
|
|
|
- indexOut.writeLong(segmentLength);
|
|
|
+ wrapper.writeLong(segmentLength);
|
|
|
LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
|
|
|
segmentLength + ")");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This function returns the index information for the given mapId, Spill
|
|
|
+ * number and reducer combination. Index Information is obtained
|
|
|
+ * transparently from either the indexMap or the underlying indexFile
|
|
|
+ * @param mapId
|
|
|
+ * @param spillNum
|
|
|
+ * @param reducer
|
|
|
+ * @return
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private IndexRecord getIndexInformation( TaskAttemptID mapId,
|
|
|
+ int spillNum,
|
|
|
+ int reducer)
|
|
|
+ throws IOException {
|
|
|
+ IndexRecord[] irArray = null;
|
|
|
+
|
|
|
+ if (indexCacheList.size() > spillNum) {
|
|
|
+ irArray = indexCacheList.get(spillNum);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, spillNum);
|
|
|
+ irArray = IndexRecord.readIndexFile(indexFileName, job);
|
|
|
+ indexCacheList.add(spillNum,irArray);
|
|
|
+ rfs.delete(indexFileName,false);
|
|
|
+ }
|
|
|
+ return irArray[reducer];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This function writes index information from the indexMap to the
|
|
|
+ * index file that could be used by mergeParts
|
|
|
+ * @param mapId
|
|
|
+ * @param finalName
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void writeSingleSpillIndexToFile(TaskAttemptID mapId,
|
|
|
+ Path finalName)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ IndexRecord[] irArray = null;
|
|
|
+
|
|
|
+ irArray = indexCacheList.get(0);
|
|
|
+
|
|
|
+ FSDataOutputStream indexOut = rfs.create(finalName);
|
|
|
+ IFileOutputStream indexChecksumOut = new IFileOutputStream (indexOut);
|
|
|
+ DataOutputStream wrapper = new DataOutputStream(indexChecksumOut);
|
|
|
+
|
|
|
+ for (int i = 0; i < irArray.length; i++) {
|
|
|
+ wrapper.writeLong(irArray[i].startOffset);
|
|
|
+ wrapper.writeLong(irArray[i].rawLength);
|
|
|
+ wrapper.writeLong(irArray[i].partLength);
|
|
|
+ }
|
|
|
+
|
|
|
+ wrapper.close();
|
|
|
+ indexOut.close();
|
|
|
+ }
|
|
|
+
|
|
|
} // MapOutputBuffer
|
|
|
|
|
|
/**
|