|
@@ -18,16 +18,22 @@
|
|
package org.apache.hadoop.mapred;
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
+import org.apache.hadoop.fs.InMemoryFileSystem;
|
|
|
|
+import org.apache.hadoop.fs.LocalFileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.fs.PathFilter;
|
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
|
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
|
|
import org.apache.hadoop.util.*;
|
|
import org.apache.hadoop.util.*;
|
|
|
|
|
|
import java.io.*;
|
|
import java.io.*;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
+import java.net.*;
|
|
import java.text.DecimalFormat;
|
|
import java.text.DecimalFormat;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
/** Runs a reduce task. */
|
|
/** Runs a reduce task. */
|
|
-class ReduceTaskRunner extends TaskRunner {
|
|
|
|
|
|
+class ReduceTaskRunner extends TaskRunner implements MRConstants {
|
|
/** Number of ms before timing out a copy */
|
|
/** Number of ms before timing out a copy */
|
|
private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
|
|
private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
|
|
|
|
|
|
@@ -79,11 +85,31 @@ class ReduceTaskRunner extends TaskRunner {
|
|
*/
|
|
*/
|
|
private long lastPollTime;
|
|
private long lastPollTime;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * A reference to the in memory file system for writing the map outputs to.
|
|
|
|
+ */
|
|
|
|
+ private InMemoryFileSystem inMemFileSys;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* A reference to the local file system for writing the map outputs to.
|
|
* A reference to the local file system for writing the map outputs to.
|
|
*/
|
|
*/
|
|
private FileSystem localFileSys;
|
|
private FileSystem localFileSys;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * An instance of the sorter used for doing merge
|
|
|
|
+ */
|
|
|
|
+ private SequenceFile.Sorter sorter;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A reference to the throwable object (if merge throws an exception)
|
|
|
|
+ */
|
|
|
|
+ private volatile Throwable mergeThrowable;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A flag to indicate that merge is in progress
|
|
|
|
+ */
|
|
|
|
+ private volatile boolean mergeInProgress = false;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The threads for fetching the files.
|
|
* The threads for fetching the files.
|
|
*/
|
|
*/
|
|
@@ -120,9 +146,28 @@ class ReduceTaskRunner extends TaskRunner {
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
}
|
|
}
|
|
|
|
|
|
- private class PingTimer implements Progressable {
|
|
|
|
|
|
+ private class PingTimer extends Thread implements Progressable {
|
|
Task task = getTask();
|
|
Task task = getTask();
|
|
TaskTracker tracker = getTracker();
|
|
TaskTracker tracker = getTracker();
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ LOG.info(task.getTaskId() + " Started thread: " + getName());
|
|
|
|
+ while (true) {
|
|
|
|
+ try {
|
|
|
|
+ progress();
|
|
|
|
+ Thread.sleep(Task.PROGRESS_INTERVAL);
|
|
|
|
+ }
|
|
|
|
+ catch (InterruptedException i) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ catch (Throwable e) {
|
|
|
|
+ LOG.info(task.getTaskId() + " Thread Exception in " +
|
|
|
|
+ "reporting sort progress\n" +
|
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
public void progress() {
|
|
public void progress() {
|
|
task.reportProgress(tracker);
|
|
task.reportProgress(tracker);
|
|
@@ -134,7 +179,6 @@ class ReduceTaskRunner extends TaskRunner {
|
|
/** Copies map outputs as they become available */
|
|
/** Copies map outputs as they become available */
|
|
private class MapOutputCopier extends Thread {
|
|
private class MapOutputCopier extends Thread {
|
|
|
|
|
|
- private PingTimer pingTimer = new PingTimer();
|
|
|
|
private MapOutputLocation currentLocation = null;
|
|
private MapOutputLocation currentLocation = null;
|
|
private int id = nextMapOutputCopierId++;
|
|
private int id = nextMapOutputCopierId++;
|
|
|
|
|
|
@@ -196,7 +240,7 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
|
|
|
try {
|
|
try {
|
|
start(loc);
|
|
start(loc);
|
|
- size = copyOutput(loc, pingTimer);
|
|
|
|
|
|
+ size = copyOutput(loc);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn(reduceTask.getTaskId() + " copy failed: " +
|
|
LOG.warn(reduceTask.getTaskId() + " copy failed: " +
|
|
loc.getMapTaskId() + " from " + loc.getHost());
|
|
loc.getMapTaskId() + " from " + loc.getHost());
|
|
@@ -215,13 +259,11 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
|
|
|
/** Copies a a map output from a remote host, using raw RPC.
|
|
/** Copies a a map output from a remote host, using raw RPC.
|
|
* @param currentLocation the map output location to be copied
|
|
* @param currentLocation the map output location to be copied
|
|
- * @param pingee a status object to ping as we make progress
|
|
|
|
- * @return the size of the copied file
|
|
|
|
|
|
+ * @return the path (fully qualified) of the copied file
|
|
* @throws IOException if there is an error copying the file
|
|
* @throws IOException if there is an error copying the file
|
|
* @throws InterruptedException if the copier should give up
|
|
* @throws InterruptedException if the copier should give up
|
|
*/
|
|
*/
|
|
- private long copyOutput(MapOutputLocation loc,
|
|
|
|
- Progressable pingee
|
|
|
|
|
|
+ private long copyOutput(MapOutputLocation loc
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
|
|
|
|
String reduceId = reduceTask.getTaskId();
|
|
String reduceId = reduceTask.getTaskId();
|
|
@@ -233,21 +275,42 @@ class ReduceTaskRunner extends TaskRunner {
|
|
// a working filename that will be unique to this attempt
|
|
// a working filename that will be unique to this attempt
|
|
Path tmpFilename = new Path(finalFilename + "-" + id);
|
|
Path tmpFilename = new Path(finalFilename + "-" + id);
|
|
// this copies the map output file
|
|
// this copies the map output file
|
|
- long bytes = loc.getFile(localFileSys, tmpFilename,
|
|
|
|
- reduceTask.getPartition(), pingee,
|
|
|
|
|
|
+ tmpFilename = loc.getFile(inMemFileSys, localFileSys, tmpFilename,
|
|
|
|
+ reduceTask.getPartition(),
|
|
STALLED_COPY_TIMEOUT);
|
|
STALLED_COPY_TIMEOUT);
|
|
|
|
+ if (tmpFilename == null)
|
|
|
|
+ throw new IOException("File " + finalFilename + "-" + id +
|
|
|
|
+ " not created");
|
|
|
|
+ long bytes = -1;
|
|
// lock the ReduceTaskRunner while we do the rename
|
|
// lock the ReduceTaskRunner while we do the rename
|
|
synchronized (ReduceTaskRunner.this) {
|
|
synchronized (ReduceTaskRunner.this) {
|
|
- // if we can't rename the file, something is broken
|
|
|
|
- if (!(new File(tmpFilename.toString()).
|
|
|
|
- renameTo(new File(finalFilename.toString())))) {
|
|
|
|
- localFileSys.delete(tmpFilename);
|
|
|
|
|
|
+ // if we can't rename the file, something is broken (and IOException
|
|
|
|
+ // will be thrown). This file could have been created in the inmemory
|
|
|
|
+ // fs or the localfs. So need to get the filesystem owning the path.
|
|
|
|
+ FileSystem fs = tmpFilename.getFileSystem(conf);
|
|
|
|
+ if (!fs.rename(tmpFilename, finalFilename)) {
|
|
|
|
+ fs.delete(tmpFilename);
|
|
throw new IOException("failure to rename map output " + tmpFilename);
|
|
throw new IOException("failure to rename map output " + tmpFilename);
|
|
}
|
|
}
|
|
|
|
+ bytes = fs.getLength(finalFilename);
|
|
|
|
+ LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
|
|
|
|
+ " output from " + loc.getHost() + ".");
|
|
|
|
+ //Create a thread to do merges. Synchronize access/update to
|
|
|
|
+ //mergeInProgress
|
|
|
|
+ if (!mergeInProgress && inMemFileSys.getPercentUsed() >=
|
|
|
|
+ MAX_INMEM_FILESYS_USE) {
|
|
|
|
+ LOG.info(reduceId + " InMemoryFileSystem " +
|
|
|
|
+ inMemFileSys.getUri().toString() +
|
|
|
|
+ " is " + inMemFileSys.getPercentUsed() +
|
|
|
|
+ " full. Triggering merge");
|
|
|
|
+ InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
|
|
|
|
+ (LocalFileSystem)localFileSys, sorter);
|
|
|
|
+ m.setName("Thread for merging in memory files");
|
|
|
|
+ m.setDaemon(true);
|
|
|
|
+ mergeInProgress = true;
|
|
|
|
+ m.start();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
|
|
|
|
- " output from " + loc.getHost() + ".");
|
|
|
|
-
|
|
|
|
return bytes;
|
|
return bytes;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -258,7 +321,6 @@ class ReduceTaskRunner extends TaskRunner {
|
|
super(task, tracker, conf);
|
|
super(task, tracker, conf);
|
|
this.mapOutputFile = new MapOutputFile();
|
|
this.mapOutputFile = new MapOutputFile();
|
|
this.mapOutputFile.setConf(conf);
|
|
this.mapOutputFile.setConf(conf);
|
|
- localFileSys = FileSystem.getLocal(conf);
|
|
|
|
|
|
|
|
this.reduceTask = (ReduceTask)getTask();
|
|
this.reduceTask = (ReduceTask)getTask();
|
|
this.scheduledCopies = new ArrayList(100);
|
|
this.scheduledCopies = new ArrayList(100);
|
|
@@ -266,6 +328,18 @@ class ReduceTaskRunner extends TaskRunner {
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
|
|
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
|
|
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
|
|
|
|
|
|
|
|
+ //we want to distinguish inmem fs instances for different reduces. Hence,
|
|
|
|
+ //append a unique string in the uri for the inmem fs name
|
|
|
|
+ URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
|
|
|
|
+ inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
|
|
|
|
+ LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " +
|
|
|
|
+ uri);
|
|
|
|
+ localFileSys = FileSystem.getLocal(conf);
|
|
|
|
+ //create an instance of the sorter
|
|
|
|
+ sorter =
|
|
|
|
+ new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
|
|
|
|
+ conf.getMapOutputValueClass(), conf);
|
|
|
|
+
|
|
// hosts -> next contact time
|
|
// hosts -> next contact time
|
|
this.penaltyBox = new Hashtable();
|
|
this.penaltyBox = new Hashtable();
|
|
|
|
|
|
@@ -311,9 +385,14 @@ class ReduceTaskRunner extends TaskRunner {
|
|
// start the clock for bandwidth measurement
|
|
// start the clock for bandwidth measurement
|
|
long startTime = System.currentTimeMillis();
|
|
long startTime = System.currentTimeMillis();
|
|
long currentTime = startTime;
|
|
long currentTime = startTime;
|
|
-
|
|
|
|
|
|
+ PingTimer pingTimer = new PingTimer();
|
|
|
|
+ pingTimer.setName("Map output copy reporter for task " +
|
|
|
|
+ reduceTask.getTaskId());
|
|
|
|
+ pingTimer.setDaemon(true);
|
|
|
|
+ pingTimer.start();
|
|
|
|
+ try {
|
|
// loop until we get all required outputs or are killed
|
|
// loop until we get all required outputs or are killed
|
|
- while (!killed && numCopied < numOutputs) {
|
|
|
|
|
|
+ while (!killed && numCopied < numOutputs && mergeThrowable == null) {
|
|
|
|
|
|
LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
|
|
LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
|
|
" map output(s)");
|
|
" map output(s)");
|
|
@@ -382,12 +461,11 @@ class ReduceTaskRunner extends TaskRunner {
|
|
// new, just wait for a bit
|
|
// new, just wait for a bit
|
|
try {
|
|
try {
|
|
if (numInFlight == 0 && numScheduled == 0) {
|
|
if (numInFlight == 0 && numScheduled == 0) {
|
|
- getTask().reportProgress(getTracker());
|
|
|
|
Thread.sleep(5000);
|
|
Thread.sleep(5000);
|
|
}
|
|
}
|
|
} catch (InterruptedException e) { } // IGNORE
|
|
} catch (InterruptedException e) { } // IGNORE
|
|
|
|
|
|
- while (!killed && numInFlight > 0) {
|
|
|
|
|
|
+ while (!killed && numInFlight > 0 && mergeThrowable == null) {
|
|
LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
|
|
LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
|
|
CopyResult cr = getCopyResult();
|
|
CopyResult cr = getCopyResult();
|
|
|
|
|
|
@@ -404,7 +482,6 @@ class ReduceTaskRunner extends TaskRunner {
|
|
copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
|
|
copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
|
|
" at " +
|
|
" at " +
|
|
mbpsFormat.format(transferRate) + " MB/s)");
|
|
mbpsFormat.format(transferRate) + " MB/s)");
|
|
- getTask().reportProgress(getTracker());
|
|
|
|
} else {
|
|
} else {
|
|
// this copy failed, put it back onto neededOutputs
|
|
// this copy failed, put it back onto neededOutputs
|
|
neededOutputs.add(new Integer(cr.getMapId()));
|
|
neededOutputs.add(new Integer(cr.getMapId()));
|
|
@@ -454,7 +531,64 @@ class ReduceTaskRunner extends TaskRunner {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- return numCopied == numOutputs && !killed;
|
|
|
|
|
|
+ if (mergeThrowable != null) {
|
|
|
|
+ //set the task state to FAILED
|
|
|
|
+ TaskTracker tracker = ReduceTaskRunner.this.getTracker();
|
|
|
|
+ TaskTracker.TaskInProgress tip =
|
|
|
|
+ tracker.runningTasks.get(reduceTask.getTaskId());
|
|
|
|
+ tip.runstate = TaskStatus.State.FAILED;
|
|
|
|
+ try {
|
|
|
|
+ tip.cleanup();
|
|
|
|
+ } catch (Throwable ie2) {
|
|
|
|
+ // Ignore it, we are just trying to cleanup.
|
|
|
|
+ }
|
|
|
|
+ inMemFileSys.close();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //Do a merge of in-memory files (if there are any)
|
|
|
|
+ if (!killed && mergeThrowable == null) {
|
|
|
|
+ try {
|
|
|
|
+ //wait for an ongoing merge (if it is in flight) to complete
|
|
|
|
+ while (mergeInProgress) {
|
|
|
|
+ Thread.sleep(200);
|
|
|
|
+ }
|
|
|
|
+ LOG.info(reduceTask.getTaskId() +
|
|
|
|
+ " Copying of all map outputs complete. " +
|
|
|
|
+ "Initiating the last merge on the remaining files in " +
|
|
|
|
+ inMemFileSys.getUri());
|
|
|
|
+ //initiate merge
|
|
|
|
+ Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
|
|
|
|
+ if (inMemClosedFiles.length == 0) {
|
|
|
|
+ LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
|
|
|
|
+ inMemFileSys.getUri());
|
|
|
|
+ return numCopied == numOutputs;
|
|
|
|
+ }
|
|
|
|
+ RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true,
|
|
|
|
+ inMemClosedFiles.length);
|
|
|
|
+ //name this output file same as the name of the first file that is
|
|
|
|
+ //there in the current list of inmem files (this is guaranteed to be
|
|
|
|
+ //absent on the disk currently. So we don't overwrite a prev.
|
|
|
|
+ //created spill)
|
|
|
|
+ SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
|
|
+ inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
|
|
+ localFileSys.makeQualified(inMemClosedFiles[0]), null);
|
|
|
|
+ sorter.writeFile(rIter, writer);
|
|
|
|
+ writer.close();
|
|
|
|
+ LOG.info(reduceTask.getTaskId() +
|
|
|
|
+ " Merge of the " +inMemClosedFiles.length +
|
|
|
|
+ " files in InMemoryFileSystem complete." +
|
|
|
|
+ " Local file is " + inMemClosedFiles[0]);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.warn("Merge of the inmemory files threw an exception: " +
|
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
|
+ inMemFileSys.close();
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return mergeThrowable == null && numCopied == numOutputs && !killed;
|
|
|
|
+ } finally {
|
|
|
|
+ pingTimer.interrupt();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -529,4 +663,60 @@ class ReduceTaskRunner extends TaskRunner {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private class InMemFSMergeThread extends Thread {
|
|
|
|
+ private InMemoryFileSystem inMemFileSys;
|
|
|
|
+ private LocalFileSystem localFileSys;
|
|
|
|
+ private SequenceFile.Sorter sorter;
|
|
|
|
+
|
|
|
|
+ public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
|
|
|
|
+ LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
|
|
|
|
+ this.inMemFileSys = inMemFileSys;
|
|
|
|
+ this.localFileSys = localFileSys;
|
|
|
|
+ this.sorter = sorter;
|
|
|
|
+ }
|
|
|
|
+ public void run() {
|
|
|
|
+ LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
|
|
|
|
+ try {
|
|
|
|
+ //initiate merge
|
|
|
|
+ Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
|
|
|
|
+ //Note that the above Path[] could be of length 0 if all copies are
|
|
|
|
+ //in flight. So we make sure that we have some 'closed' map
|
|
|
|
+ //output files to merge to get the benefit of in-memory merge
|
|
|
|
+ if (inMemClosedFiles.length >=
|
|
|
|
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
|
|
|
|
+ RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true,
|
|
|
|
+ inMemClosedFiles.length);
|
|
|
|
+ //name this output file same as the name of the first file that is
|
|
|
|
+ //there in the current list of inmem files (this is guaranteed to be
|
|
|
|
+ //absent on the disk currently. So we don't overwrite a prev.
|
|
|
|
+ //created spill)
|
|
|
|
+ SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
|
|
+ inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
|
|
+ localFileSys.makeQualified(inMemClosedFiles[0]), null);
|
|
|
|
+ sorter.writeFile(rIter, writer);
|
|
|
|
+ writer.close();
|
|
|
|
+ LOG.info(reduceTask.getTaskId() +
|
|
|
|
+ " Merge of the " +inMemClosedFiles.length +
|
|
|
|
+ " files in InMemoryFileSystem complete." +
|
|
|
|
+ " Local file is " + inMemClosedFiles[0]);
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
|
|
|
|
+ inMemFileSys.getUri());
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.warn("Merge of the inmemory files threw an exception: " +
|
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
|
+ ReduceTaskRunner.this.mergeThrowable = t;
|
|
|
|
+ }
|
|
|
|
+ finally {
|
|
|
|
+ mergeInProgress = false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
|
|
|
|
+ public boolean accept(Path file) {
|
|
|
|
+ return file.toString().endsWith(".out");
|
|
|
|
+ }
|
|
|
|
+ };
|
|
}
|
|
}
|