|
@@ -36,6 +36,8 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.SortedSet;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -47,6 +49,8 @@ import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.ChecksumFileSystem;
|
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.InputBuffer;
|
|
@@ -65,7 +69,6 @@ import org.apache.hadoop.metrics.Updater;
|
|
|
import org.apache.hadoop.util.Progress;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
|
|
|
import static org.apache.hadoop.mapred.Task.Counter.*;
|
|
|
|
|
@@ -98,6 +101,29 @@ class ReduceTask extends Task {
|
|
|
getCounters().findCounter(REDUCE_INPUT_RECORDS);
|
|
|
private Counters.Counter reduceOutputCounter =
|
|
|
getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
|
|
|
+
|
|
|
+ // A custom comparator for map output files. Here the ordering is determined
|
|
|
+ // by the file's size and path. In case of files with same size and different
|
|
|
+ // file paths, the first parameter is considered smaller than the second one.
|
|
|
+ // In case of files with same size and path are considered equal.
|
|
|
+ private Comparator<FileStatus> mapOutputFileComparator =
|
|
|
+ new Comparator<FileStatus>() {
|
|
|
+ public int compare(FileStatus a, FileStatus b) {
|
|
|
+ if (a.getLen() < b.getLen())
|
|
|
+ return -1;
|
|
|
+ else if (a.getLen() == b.getLen())
|
|
|
+ if (a.getPath().toString().equals(b.getPath().toString()))
|
|
|
+ return 0;
|
|
|
+ else
|
|
|
+ return -1;
|
|
|
+ else
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // A sorted set for keeping a set of map output files on disk
|
|
|
+ private final SortedSet<FileStatus> mapOutputFilesOnDisk =
|
|
|
+ new TreeSet<FileStatus>(mapOutputFileComparator);
|
|
|
|
|
|
public ReduceTask() {
|
|
|
super();
|
|
@@ -138,6 +164,24 @@ class ReduceTask extends Task {
|
|
|
|
|
|
numMaps = in.readInt();
|
|
|
}
|
|
|
+
|
|
|
+ // Get the input files for the reducer.
|
|
|
+ private Path[] getMapFiles(FileSystem fs, boolean isLocal)
|
|
|
+ throws IOException {
|
|
|
+ List<Path> fileList = new ArrayList<Path>();
|
|
|
+ if (isLocal) {
|
|
|
+ // for local jobs
|
|
|
+ for(int i = 0; i < numMaps; ++i) {
|
|
|
+ fileList.add(mapOutputFile.getInputFile(i, getTaskId()));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // for non local jobs
|
|
|
+ for (FileStatus filestatus : mapOutputFilesOnDisk) {
|
|
|
+ fileList.add(filestatus.getPath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return fileList.toArray(new Path[0]);
|
|
|
+ }
|
|
|
|
|
|
/** Iterates values while keys match in sorted input. */
|
|
|
static class ValuesIterator implements Iterator {
|
|
@@ -253,34 +297,19 @@ class ReduceTask extends Task {
|
|
|
startCommunicationThread(umbilical);
|
|
|
|
|
|
FileSystem lfs = FileSystem.getLocal(job);
|
|
|
+ boolean isLocal = true;
|
|
|
if (!job.get("mapred.job.tracker", "local").equals("local")) {
|
|
|
reduceCopier = new ReduceCopier(umbilical, job);
|
|
|
if (!reduceCopier.fetchOutputs()) {
|
|
|
throw new IOException(getTaskId() + "The reduce copier failed");
|
|
|
}
|
|
|
+ isLocal = false;
|
|
|
}
|
|
|
copyPhase.complete(); // copy is already complete
|
|
|
|
|
|
|
|
|
- // open a file to collect map output
|
|
|
- // since we don't know how many map outputs got merged in memory, we have
|
|
|
- // to check whether a given map output exists, and if it does, add it in
|
|
|
- // the list of files to merge, otherwise not.
|
|
|
- List<Path> mapFilesList = new ArrayList<Path>();
|
|
|
- for(int i=0; i < numMaps; i++) {
|
|
|
- Path f;
|
|
|
- try {
|
|
|
- //catch and ignore DiskErrorException, since some map outputs will
|
|
|
- //really be absent (inmem merge).
|
|
|
- f = mapOutputFile.getInputFile(i, getTaskId());
|
|
|
- } catch (DiskErrorException d) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (lfs.exists(f))
|
|
|
- mapFilesList.add(f);
|
|
|
- }
|
|
|
- Path[] mapFiles = new Path[mapFilesList.size()];
|
|
|
- mapFiles = mapFilesList.toArray(mapFiles);
|
|
|
+ // get the input files for the reducer to merge
|
|
|
+ Path[] mapFiles = getMapFiles(lfs, isLocal);
|
|
|
|
|
|
Path tempDir = new Path(getTaskId());
|
|
|
|
|
@@ -298,6 +327,10 @@ class ReduceTask extends Task {
|
|
|
rIter = sorter.merge(mapFiles, tempDir,
|
|
|
!conf.getKeepFailedTaskFiles()); // sort
|
|
|
|
|
|
+ // free up the data structures
|
|
|
+ mapOutputFilesOnDisk.clear();
|
|
|
+ mapFiles = null;
|
|
|
+
|
|
|
sortPhase.complete(); // sort is complete
|
|
|
setPhase(TaskStatus.Phase.REDUCE);
|
|
|
|
|
@@ -420,11 +453,21 @@ class ReduceTask extends Task {
|
|
|
*/
|
|
|
private SequenceFile.Sorter sorter;
|
|
|
|
|
|
+ /**
|
|
|
+ * Number of files to merge at a time
|
|
|
+ */
|
|
|
+ private int ioSortFactor;
|
|
|
+
|
|
|
/**
|
|
|
* A reference to the throwable object (if merge throws an exception)
|
|
|
*/
|
|
|
private volatile Throwable mergeThrowable;
|
|
|
|
|
|
+ /**
|
|
|
+ * A flag to indicate that localFS merge is in progress
|
|
|
+ */
|
|
|
+ private volatile boolean localFSMergeInProgress = false;
|
|
|
+
|
|
|
/**
|
|
|
* A flag to indicate that merge is in progress
|
|
|
*/
|
|
@@ -755,12 +798,12 @@ class ReduceTask extends Task {
|
|
|
if (tmpFilename == null)
|
|
|
throw new IOException("File " + filename + "-" + id +
|
|
|
" not created");
|
|
|
+ // This file could have been created in the inmemory
|
|
|
+ // fs or the localfs. So need to get the filesystem owning the path.
|
|
|
+ FileSystem fs = tmpFilename.getFileSystem(conf);
|
|
|
long bytes = -1;
|
|
|
// lock the ReduceTask while we do the rename
|
|
|
synchronized (ReduceTask.this) {
|
|
|
- // This file could have been created in the inmemory
|
|
|
- // fs or the localfs. So need to get the filesystem owning the path.
|
|
|
- FileSystem fs = tmpFilename.getFileSystem(conf);
|
|
|
if (!neededOutputs.contains(loc.getMapId())) {
|
|
|
fs.delete(tmpFilename);
|
|
|
return CopyResult.OBSOLETE;
|
|
@@ -802,6 +845,16 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
neededOutputs.remove(loc.getMapId());
|
|
|
}
|
|
|
+
|
|
|
+ // Check if the map output file hits the local file-system by checking
|
|
|
+ // their schemes
|
|
|
+ String localFSScheme = localFileSys.getUri().getScheme();
|
|
|
+ String outputFileScheme = fs.getUri().getScheme();
|
|
|
+ if (localFSScheme.equals(outputFileScheme)) {
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
+ mapOutputFilesOnDisk.add(fs.getFileStatus(filename));
|
|
|
+ }
|
|
|
+ }
|
|
|
return bytes;
|
|
|
}
|
|
|
|
|
@@ -861,6 +914,7 @@ class ReduceTask extends Task {
|
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
|
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
|
|
|
|
|
|
+ this.ioSortFactor = conf.getInt("io.sort.factor", 10);
|
|
|
// the exponential backoff formula
|
|
|
// backoff (t) = init * base^(t-1)
|
|
|
// so for max retries we get
|
|
@@ -931,6 +985,13 @@ class ReduceTask extends Task {
|
|
|
copiers = new MapOutputCopier[numCopiers];
|
|
|
|
|
|
Reporter reporter = getReporter(umbilical);
|
|
|
+ // create an instance of the sorter for merging the on-disk files
|
|
|
+ SequenceFile.Sorter localFileSystemSorter =
|
|
|
+ new SequenceFile.Sorter(localFileSys, conf.getOutputKeyComparator(),
|
|
|
+ conf.getMapOutputKeyClass(),
|
|
|
+ conf.getMapOutputValueClass(), conf);
|
|
|
+ localFileSystemSorter.setProgressable(reporter);
|
|
|
+
|
|
|
// start all the copying threads
|
|
|
for (int i=0; i < copiers.length; i++) {
|
|
|
copiers[i] = new MapOutputCopier(reporter);
|
|
@@ -1040,6 +1101,23 @@ class ReduceTask extends Task {
|
|
|
" of " + numKnown + " known outputs (" + numSlow +
|
|
|
" slow hosts and " + numDups + " dup hosts)");
|
|
|
|
|
|
+ // Check if a on-disk merge can be done. This will help if there
|
|
|
+ // are no copies to be fetched but sufficient copies to be merged.
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
+ if (!localFSMergeInProgress
|
|
|
+ && (mapOutputFilesOnDisk.size() >= (2 * ioSortFactor - 1))) {
|
|
|
+ // make sure that only one thread merges the disk files
|
|
|
+ localFSMergeInProgress = true;
|
|
|
+ // start the on-disk-merge process
|
|
|
+ LocalFSMerger lfsm =
|
|
|
+ new LocalFSMerger((LocalFileSystem)localFileSys,
|
|
|
+ localFileSystemSorter);
|
|
|
+ lfsm.setName("Thread for merging on-disk files");
|
|
|
+ lfsm.setDaemon(true);
|
|
|
+ lfsm.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// if we have no copies in flight and we can't schedule anything
|
|
|
// new, just wait for a bit
|
|
|
try {
|
|
@@ -1214,6 +1292,11 @@ class ReduceTask extends Task {
|
|
|
//Do a merge of in-memory files (if there are any)
|
|
|
if (mergeThrowable == null) {
|
|
|
try {
|
|
|
+ // Wait for the on-disk merge to complete
|
|
|
+ while (localFSMergeInProgress) {
|
|
|
+ Thread.sleep(200);
|
|
|
+ }
|
|
|
+
|
|
|
//wait for an ongoing merge (if it is in flight) to complete
|
|
|
while (mergeInProgress) {
|
|
|
Thread.sleep(200);
|
|
@@ -1266,6 +1349,11 @@ class ReduceTask extends Task {
|
|
|
" Merge of the " +inMemClosedFiles.length +
|
|
|
" files in InMemoryFileSystem complete." +
|
|
|
" Local file is " + outputPath);
|
|
|
+
|
|
|
+ FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
+ mapOutputFilesOnDisk.add(status);
|
|
|
+ }
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn(reduceTask.getTaskId() +
|
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
@@ -1393,6 +1481,75 @@ class ReduceTask extends Task {
|
|
|
}
|
|
|
|
|
|
|
|
|
+ /** Starts merging the local copy (on disk) of the map's output so that
|
|
|
+ * most of the reducer's input is sorted i.e overlapping shuffle
|
|
|
+ * and merge phases.
|
|
|
+ */
|
|
|
+ private class LocalFSMerger extends Thread {
|
|
|
+ private LocalFileSystem localFileSys;
|
|
|
+ private SequenceFile.Sorter sorter;
|
|
|
+
|
|
|
+ public LocalFSMerger(LocalFileSystem fs, SequenceFile.Sorter sorter) {
|
|
|
+ this.localFileSys = fs;
|
|
|
+ this.sorter = sorter;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ Path[] mapFiles = new Path[ioSortFactor];
|
|
|
+ long approxOutputSize = 0;
|
|
|
+ int bytesPerSum =
|
|
|
+ reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
|
|
|
+ LOG.info(reduceTask.getTaskId()
|
|
|
+ + " Merging map output files on disk");
|
|
|
+ // 1. Prepare the list of files to be merged. This list is prepared
|
|
|
+ // using a list of map output files on disk. Currently we merge
|
|
|
+ // io.sort.factor files into 1.
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
+ for (int i = 0; i < ioSortFactor; ++i) {
|
|
|
+ FileStatus filestatus = mapOutputFilesOnDisk.first();
|
|
|
+ mapOutputFilesOnDisk.remove(filestatus);
|
|
|
+ mapFiles[i] = filestatus.getPath();
|
|
|
+ approxOutputSize += filestatus.getLen();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // add the checksum length
|
|
|
+ approxOutputSize += ChecksumFileSystem
|
|
|
+ .getChecksumLength(approxOutputSize,
|
|
|
+ bytesPerSum);
|
|
|
+
|
|
|
+ // 2. Start the on-disk merge process
|
|
|
+ Path outputPath =
|
|
|
+ lDirAlloc.getLocalPathForWrite(mapFiles[0].toString(),
|
|
|
+ approxOutputSize, conf)
|
|
|
+ .suffix(".merged");
|
|
|
+ SequenceFile.Writer writer =
|
|
|
+ sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
|
|
|
+ SequenceFile.Sorter.RawKeyValueIterator iter;
|
|
|
+ Path tmpDir = new Path(reduceTask.getTaskId());
|
|
|
+ iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
|
|
|
+ sorter.writeFile(iter, writer);
|
|
|
+ writer.close();
|
|
|
+
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
+ mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info(reduceTask.getTaskId()
|
|
|
+ + " Finished merging map output files on disk.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn(reduceTask.getTaskId()
|
|
|
+ + " Merging of the local FS files threw an exception: "
|
|
|
+ + StringUtils.stringifyException(ioe));
|
|
|
+ if (mergeThrowable == null) {
|
|
|
+ mergeThrowable = ioe;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ localFSMergeInProgress = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private class InMemFSMergeThread extends Thread {
|
|
|
private InMemoryFileSystem inMemFileSys;
|
|
|
private LocalFileSystem localFileSys;
|
|
@@ -1451,6 +1608,11 @@ class ReduceTask extends Task {
|
|
|
" Merge of the " +inMemClosedFiles.length +
|
|
|
" files in InMemoryFileSystem complete." +
|
|
|
" Local file is " + outputPath);
|
|
|
+
|
|
|
+ FileStatus status = localFileSys.getFileStatus(outputPath);
|
|
|
+ synchronized (mapOutputFilesOnDisk) {
|
|
|
+ mapOutputFilesOnDisk.add(status);
|
|
|
+ }
|
|
|
}
|
|
|
else {
|
|
|
LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
|