|
@@ -335,44 +335,30 @@ class ReduceTask extends Task {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- FileSystem lfs = FileSystem.getLocal(job);
|
|
|
- FileSystem rfs = ((LocalFileSystem)lfs).getRaw();
|
|
|
-
|
|
|
// Initialize the codec
|
|
|
codec = initCodec();
|
|
|
|
|
|
- boolean isLocal = true;
|
|
|
- if (!job.get("mapred.job.tracker", "local").equals("local")) {
|
|
|
+ boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
|
|
|
+ if (!isLocal) {
|
|
|
reduceCopier = new ReduceCopier(umbilical, job);
|
|
|
if (!reduceCopier.fetchOutputs()) {
|
|
|
throw new IOException(getTaskID() + "The reduce copier failed");
|
|
|
}
|
|
|
- isLocal = false;
|
|
|
}
|
|
|
copyPhase.complete(); // copy is already complete
|
|
|
-
|
|
|
-
|
|
|
- // get the input files for the reducer to merge
|
|
|
- Path[] mapFiles = getMapFiles(lfs, isLocal);
|
|
|
-
|
|
|
- Path tempDir = new Path(getTaskID().toString());
|
|
|
-
|
|
|
- setPhase(TaskStatus.Phase.SORT);
|
|
|
-
|
|
|
-
|
|
|
- // sort the input file
|
|
|
- LOG.info("Initiating final on-disk merge with " + mapFiles.length +
|
|
|
- " files");
|
|
|
- RawKeyValueIterator rIter =
|
|
|
- Merger.merge(job,rfs,
|
|
|
- job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
|
|
|
- codec, mapFiles, !conf.getKeepFailedTaskFiles(),
|
|
|
- job.getInt("io.sort.factor", 100), tempDir,
|
|
|
- job.getOutputKeyComparator(), reporter);
|
|
|
+ setPhase(TaskStatus.Phase.SORT);
|
|
|
+
|
|
|
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
|
|
|
+ RawKeyValueIterator rIter = isLocal
|
|
|
+ ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
|
|
|
+ job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
|
|
|
+ !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
|
|
|
+ new Path(getTaskID().toString()), job.getOutputKeyComparator(),
|
|
|
+ reporter)
|
|
|
+ : reduceCopier.createKVIterator(job, rfs, reporter);
|
|
|
|
|
|
// free up the data structures
|
|
|
mapOutputFilesOnDisk.clear();
|
|
|
- mapFiles = null;
|
|
|
|
|
|
sortPhase.complete(); // sort is complete
|
|
|
setPhase(TaskStatus.Phase.REDUCE);
|
|
@@ -528,10 +514,21 @@ class ReduceTask extends Task {
|
|
|
private volatile boolean exitLocalFSMerge = false;
|
|
|
|
|
|
/**
|
|
|
- * When we accumulate mergeThreshold number of files in ram, we merge/spill
|
|
|
+ * When we accumulate maxInMemOutputs number of files in ram, we merge/spill
|
|
|
*/
|
|
|
- private int mergeThreshold = 500;
|
|
|
-
|
|
|
+ private final int maxInMemOutputs;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Usage threshold for in-memory output accumulation.
|
|
|
+ */
|
|
|
+ private final float maxInMemCopyPer;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Maximum memory usage of map outputs to merge from memory into
|
|
|
+ * the reduce, in bytes.
|
|
|
+ */
|
|
|
+ private final long maxInMemReduce;
|
|
|
+
|
|
|
/**
|
|
|
* The threads for fetching the files.
|
|
|
*/
|
|
@@ -566,12 +563,7 @@ class ReduceTask extends Task {
|
|
|
Collections.synchronizedSet(new TreeSet<TaskAttemptID>());
|
|
|
|
|
|
private Random random = null;
|
|
|
-
|
|
|
- /**
|
|
|
- * the max size of the merge output from ramfs
|
|
|
- */
|
|
|
- private long ramfsMergeOutputSize;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* the max of all the map completion times
|
|
|
*/
|
|
@@ -849,8 +841,16 @@ class ReduceTask extends Task {
|
|
|
private int numClosed = 0;
|
|
|
private boolean closed = false;
|
|
|
|
|
|
- public ShuffleRamManager(Configuration conf) {
|
|
|
- maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
|
|
|
+ public ShuffleRamManager(Configuration conf) throws IOException {
|
|
|
+ final float maxInMemCopyUse =
|
|
|
+ conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
|
|
|
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
|
|
|
+ throw new IOException("mapred.job.shuffle.input.buffer.percent" +
|
|
|
+ maxInMemCopyUse);
|
|
|
+ }
|
|
|
+ maxSize = (int)Math.min(
|
|
|
+ Runtime.getRuntime().maxMemory() * maxInMemCopyUse,
|
|
|
+ Integer.MAX_VALUE);
|
|
|
maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
|
|
|
LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
|
|
|
", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
|
|
@@ -912,14 +912,11 @@ class ReduceTask extends Task {
|
|
|
&&
|
|
|
// In-memory threshold exceeded and at least two segments
|
|
|
// have been fetched
|
|
|
- (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
|
|
|
- numClosed <
|
|
|
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
|
|
|
- )
|
|
|
+ (getPercentUsed() < maxInMemCopyPer || numClosed < 2)
|
|
|
&&
|
|
|
// More than "mapred.inmem.merge.threshold" map outputs
|
|
|
// have been fetched into memory
|
|
|
- (mergeThreshold <= 0 || numClosed < mergeThreshold)
|
|
|
+ (maxInMemOutputs <= 0 || numClosed < maxInMemOutputs)
|
|
|
&&
|
|
|
// More than MAX... threads are blocked on the RamManager
|
|
|
// or the blocked threads are the last map outputs to be
|
|
@@ -1545,13 +1542,21 @@ class ReduceTask extends Task {
|
|
|
// optimizing for the base 2
|
|
|
this.maxFetchRetriesPerMap = getClosestPowerOf2((this.maxBackoff * 1000
|
|
|
/ BACKOFF_INIT) + 1);
|
|
|
- this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
|
|
|
+ this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
|
|
|
+ this.maxInMemCopyPer =
|
|
|
+ conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
|
|
|
+ final float maxRedPer =
|
|
|
+ conf.getFloat("mapred.job.reduce.input.buffer.percent", 0f);
|
|
|
+ if (maxRedPer > 1.0 || maxRedPer < 0.0) {
|
|
|
+ throw new IOException("mapred.job.reduce.input.buffer.percent" +
|
|
|
+ maxRedPer);
|
|
|
+ }
|
|
|
+ this.maxInMemReduce = (int)Math.min(
|
|
|
+ Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
|
|
|
|
|
|
// Setup the RamManager
|
|
|
ramManager = new ShuffleRamManager(conf);
|
|
|
- ramfsMergeOutputSize =
|
|
|
- (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
|
|
|
-
|
|
|
+
|
|
|
localFileSys = FileSystem.getLocal(conf);
|
|
|
|
|
|
rfs = ((LocalFileSystem)localFileSys).getRaw();
|
|
@@ -2002,13 +2007,21 @@ class ReduceTask extends Task {
|
|
|
return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
|
|
|
}
|
|
|
|
|
|
- private List<Segment<K, V>> createInMemorySegments() {
|
|
|
- List<Segment<K, V>> inMemorySegments =
|
|
|
- new LinkedList<Segment<K, V>>();
|
|
|
+ private long createInMemorySegments(
|
|
|
+ List<Segment<K, V>> inMemorySegments, long leaveBytes)
|
|
|
+ throws IOException {
|
|
|
+ long totalSize = 0L;
|
|
|
synchronized (mapOutputsFilesInMemory) {
|
|
|
- while(mapOutputsFilesInMemory.size() > 0) {
|
|
|
+ // fullSize could come from the RamManager, but files can be
|
|
|
+ // closed but not yet present in mapOutputsFilesInMemory
|
|
|
+ long fullSize = 0L;
|
|
|
+ for (MapOutput mo : mapOutputsFilesInMemory) {
|
|
|
+ fullSize += mo.data.length;
|
|
|
+ }
|
|
|
+ while(fullSize > leaveBytes) {
|
|
|
MapOutput mo = mapOutputsFilesInMemory.remove(0);
|
|
|
-
|
|
|
+ totalSize += mo.data.length;
|
|
|
+ fullSize -= mo.data.length;
|
|
|
Reader<K, V> reader =
|
|
|
new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
|
|
|
mo.data, 0, mo.data.length);
|
|
@@ -2017,9 +2030,160 @@ class ReduceTask extends Task {
|
|
|
inMemorySegments.add(segment);
|
|
|
}
|
|
|
}
|
|
|
- return inMemorySegments;
|
|
|
+ return totalSize;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a RawKeyValueIterator from copied map outputs. All copying
|
|
|
+ * threads have exited, so all of the map outputs are available either in
|
|
|
+ * memory or on disk. We also know that no merges are in progress, so
|
|
|
+ * synchronization is more lax, here.
|
|
|
+ *
|
|
|
+ * The iterator returned must satisfy the following constraints:
|
|
|
+ * 1. Fewer than io.sort.factor files may be sources
|
|
|
+ * 2. No more than maxInMemReduce bytes of map outputs may be resident
|
|
|
+ * in memory when the reduce begins
|
|
|
+ *
|
|
|
+ * If we must perform an intermediate merge to satisfy (1), then we can
|
|
|
+ * keep the excluded outputs from (2) in memory and include them in the
|
|
|
+ * first merge pass. If not, then said outputs must be written to disk
|
|
|
+ * first.
|
|
|
+ */
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private RawKeyValueIterator createKVIterator(
|
|
|
+ JobConf job, FileSystem fs, Reporter reporter) throws IOException {
|
|
|
+
|
|
|
+ // merge config params
|
|
|
+ Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
|
|
|
+ Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
|
|
|
+ boolean keepInputs = job.getKeepFailedTaskFiles();
|
|
|
+ final Path tmpDir = new Path(getTaskID().toString());
|
|
|
+ final RawComparator<K> comparator =
|
|
|
+ (RawComparator<K>)job.getOutputKeyComparator();
|
|
|
+
|
|
|
+ // segments required to vacate memory
|
|
|
+ List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
|
|
|
+ long inMemToDiskBytes = 0;
|
|
|
+ if (mapOutputsFilesInMemory.size() > 0) {
|
|
|
+ TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
|
|
|
+ inMemToDiskBytes = createInMemorySegments(memDiskSegments,
|
|
|
+ maxInMemReduce);
|
|
|
+ final int numMemDiskSegments = memDiskSegments.size();
|
|
|
+ if (numMemDiskSegments > 0 &&
|
|
|
+ ioSortFactor > mapOutputFilesOnDisk.size()) {
|
|
|
+ // must spill to disk, but can't retain in-mem for intermediate merge
|
|
|
+ final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
|
|
|
+ reduceTask.getTaskID(), inMemToDiskBytes);
|
|
|
+ final RawKeyValueIterator rIter = Merger.merge(job, fs,
|
|
|
+ keyClass, valueClass, memDiskSegments, numMemDiskSegments,
|
|
|
+ tmpDir, comparator, reporter);
|
|
|
+ final Writer writer = new Writer(job, fs, outputPath,
|
|
|
+ keyClass, valueClass, codec);
|
|
|
+ try {
|
|
|
+ Merger.writeFile(rIter, writer, reporter);
|
|
|
+ addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (null != outputPath) {
|
|
|
+ fs.delete(outputPath, true);
|
|
|
+ }
|
|
|
+ throw new IOException("Final merge failed", e);
|
|
|
+ } finally {
|
|
|
+ if (null != writer) {
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Merged " + numMemDiskSegments + " segments, " +
|
|
|
+ inMemToDiskBytes + " bytes to disk to satisfy " +
|
|
|
+ "reduce memory limit");
|
|
|
+ inMemToDiskBytes = 0;
|
|
|
+ memDiskSegments.clear();
|
|
|
+ } else if (inMemToDiskBytes != 0) {
|
|
|
+ LOG.info("Keeping " + numMemDiskSegments + " segments, " +
|
|
|
+ inMemToDiskBytes + " bytes in memory for " +
|
|
|
+ "intermediate, on-disk merge");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // segments on disk
|
|
|
+ List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
|
|
|
+ long onDiskBytes = inMemToDiskBytes;
|
|
|
+ Path[] onDisk = getMapFiles(fs, false);
|
|
|
+ for (Path file : onDisk) {
|
|
|
+ onDiskBytes += fs.getFileStatus(file).getLen();
|
|
|
+ diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
|
|
|
+ }
|
|
|
+ LOG.info("Merging " + onDisk.length + " files, " +
|
|
|
+ onDiskBytes + " bytes from disk");
|
|
|
+ Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
|
|
|
+ public int compare(Segment<K, V> o1, Segment<K, V> o2) {
|
|
|
+ if (o1.getLength() == o2.getLength()) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return o1.getLength() < o2.getLength() ? -1 : 1;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // build final list of segments from merged backed by disk + in-mem
|
|
|
+ List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
|
|
|
+ long inMemBytes = createInMemorySegments(finalSegments, 0);
|
|
|
+ LOG.info("Merging " + finalSegments.size() + " segments, " +
|
|
|
+ inMemBytes + " bytes from memory into reduce");
|
|
|
+ if (0 != onDiskBytes) {
|
|
|
+ final int numInMemSegments = memDiskSegments.size();
|
|
|
+ diskSegments.addAll(0, memDiskSegments);
|
|
|
+ memDiskSegments.clear();
|
|
|
+ RawKeyValueIterator diskMerge = Merger.merge(
|
|
|
+ job, fs, keyClass, valueClass, diskSegments,
|
|
|
+ ioSortFactor, numInMemSegments, tmpDir, comparator,
|
|
|
+ reporter, false);
|
|
|
+ diskSegments.clear();
|
|
|
+ if (0 == finalSegments.size()) {
|
|
|
+ return diskMerge;
|
|
|
+ }
|
|
|
+ finalSegments.add(new Segment<K,V>(
|
|
|
+ new RawKVIteratorReader(diskMerge, onDiskBytes), true));
|
|
|
+ }
|
|
|
+ return Merger.merge(job, fs, keyClass, valueClass,
|
|
|
+ finalSegments, finalSegments.size(), tmpDir,
|
|
|
+ comparator, reporter);
|
|
|
+ }
|
|
|
+
|
|
|
+ class RawKVIteratorReader extends IFile.Reader<K,V> {
|
|
|
+
|
|
|
+ private final RawKeyValueIterator kvIter;
|
|
|
+
|
|
|
+ public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
|
|
|
+ throws IOException {
|
|
|
+ super(null, null, size, null);
|
|
|
+ this.kvIter = kvIter;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean next(DataInputBuffer key, DataInputBuffer value)
|
|
|
+ throws IOException {
|
|
|
+ if (kvIter.next()) {
|
|
|
+ final DataInputBuffer kb = kvIter.getKey();
|
|
|
+ final DataInputBuffer vb = kvIter.getValue();
|
|
|
+ final int kp = kb.getPosition();
|
|
|
+ final int klen = kb.getLength() - kp;
|
|
|
+ key.reset(kb.getData(), kp, klen);
|
|
|
+ final int vp = vb.getPosition();
|
|
|
+ final int vlen = vb.getLength() - vp;
|
|
|
+ value.reset(vb.getData(), vp, vlen);
|
|
|
+ bytesRead += klen + vlen;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getPosition() throws IOException {
|
|
|
+ return bytesRead;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() throws IOException {
|
|
|
+ kvIter.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private CopyResult getCopyResult(int numInFlight) {
|
|
|
synchronized (copyResults) {
|
|
|
while (copyResults.isEmpty()) {
|
|
@@ -2258,7 +2422,9 @@ class ReduceTask extends Task {
|
|
|
boolean exit = false;
|
|
|
do {
|
|
|
exit = ramManager.waitForDataToMerge();
|
|
|
- doInMemMerge();
|
|
|
+ if (!exit) {
|
|
|
+ doInMemMerge();
|
|
|
+ }
|
|
|
} while (!exit);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn(reduceTask.getTaskID() +
|
|
@@ -2284,19 +2450,20 @@ class ReduceTask extends Task {
|
|
|
|
|
|
//figure out the mapId
|
|
|
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
|
|
|
-
|
|
|
+
|
|
|
+ List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
|
|
|
+ long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
|
|
|
+ int noInMemorySegments = inMemorySegments.size();
|
|
|
+
|
|
|
Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
|
|
|
- reduceTask.getTaskID(), ramfsMergeOutputSize);
|
|
|
+ reduceTask.getTaskID(), mergeOutputSize);
|
|
|
|
|
|
Writer writer =
|
|
|
new Writer(conf, rfs, outputPath,
|
|
|
conf.getMapOutputKeyClass(),
|
|
|
conf.getMapOutputValueClass(),
|
|
|
codec);
|
|
|
-
|
|
|
- List<Segment<K, V>> inMemorySegments = createInMemorySegments();
|
|
|
- int noInMemorySegments = inMemorySegments.size();
|
|
|
-
|
|
|
+
|
|
|
RawKeyValueIterator rIter = null;
|
|
|
final Reporter reporter = getReporter(umbilical);
|
|
|
try {
|