Переглянути джерело

HADOOP-968. Move shuffle and sort code to run in child JVM, rather than in TaskTracker. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@529742 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 роки тому
батько
коміт
b89bba0acb

+ 3 - 0
CHANGES.txt

@@ -222,6 +222,9 @@ Trunk (unreleased changes)
 67. HADOOP-1154.  Fail a streaming task if the threads reading from or 
     writing to the streaming process fail.  (Koji Noguchi via tomwhite)
 
+68. HADOOP-968.  Move shuffle and sort to run in reduce's child JVM,
+    rather than in TaskTracker.  (Devaraj Das via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 4 - 0
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -80,6 +80,10 @@ public class IsolationRunner {
       LOG.info("Task " + taskid + " has problem " + trace);
     }
     
+    public TaskCompletionEvent[] getMapCompletionEvents(String jobId, 
+        int fromEventId, int maxLocs) throws IOException {
+      return TaskCompletionEvent.EMPTY_ARRAY;
+    }
   }
   
   private static ClassLoader makeClassLoader(JobConf conf, 

+ 5 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -228,6 +228,11 @@ class LocalJobRunner implements JobSubmissionProtocol {
       LOG.fatal("FSError: "+ message);
     }
 
+    public TaskCompletionEvent[] getMapCompletionEvents(
+        String jobId, int fromEventId, int maxLocs) throws IOException {
+      return TaskCompletionEvent.EMPTY_ARRAY;
+    }
+    
   }
 
   public LocalJobRunner(Configuration conf) throws IOException {

+ 823 - 0
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -20,19 +20,36 @@ package org.apache.hadoop.mapred;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InMemoryFileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -62,6 +79,7 @@ class ReduceTask extends Task {
   private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
   private boolean sortComplete;
+  private ReduceCopier reduceCopier;
 
   { 
     getProgress().setStatus("reduce"); 
@@ -235,6 +253,12 @@ class ReduceTask extends Task {
                                                            job.getReducerClass(), job);
     FileSystem lfs = FileSystem.getLocal(job);
 
+    if (!job.get("mapred.job.tracker", "local").equals("local")) {
+      reduceCopier = new ReduceCopier(umbilical, job);
+      if (!reduceCopier.fetchOutputs()) {
+        throw new IOException(getTaskId() + "The reduce copier failed");
+      }
+    }
     copyPhase.complete();                         // copy is already complete
     
 
@@ -380,5 +404,804 @@ class ReduceTask extends Task {
   public Configuration getConf() {
     return this.conf;
   }
+  
+  private class ReduceCopier implements MRConstants {
 
+    /** Reference to the umbilical object */
+    private TaskUmbilicalProtocol umbilical;
+    
+    /** Reference to the task object */
+    
+    /** Number of ms before timing out a copy */
+    private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
+    
+    /**
+     * our reduce task instance
+     */
+    private ReduceTask reduceTask;
+    
+    /**
+     * the list of map outputs currently being copied
+     */
+    private List scheduledCopies;
+    
+    /**
+     *  the results of dispatched copy attempts
+     */
+    private List copyResults;
+    
+    /**
+     *  the number of outputs to copy in parallel
+     */
+    private int numCopiers;
+    
+    /**
+     * the maximum amount of time (less 1 minute) to wait to 
+     * contact a host after a copy from it fails. We wait for (1 min +
+     * Random.nextInt(maxBackoff)) seconds.
+     */
+    private int maxBackoff;
+    
+    /**
+     * busy hosts from which copies are being backed off
+     * Map of host -> next contact time
+     */
+    private Map penaltyBox;
+    
+    /**
+     * the set of unique hosts from which we are copying
+     */
+    private Set uniqueHosts;
+    
+    /**
+     * the last time we polled the job tracker
+     */
+    private long lastPollTime;
+    
+    /**
+     * A reference to the in memory file system for writing the map outputs to.
+     */
+    private InMemoryFileSystem inMemFileSys;
+    
+    /**
+     * A reference to the local file system for writing the map outputs to.
+     */
+    private FileSystem localFileSys;
+    
+    /**
+     * An instance of the sorter used for doing merge
+     */
+    private SequenceFile.Sorter sorter;
+    
+    /**
+     * A reference to the throwable object (if merge throws an exception)
+     */
+    private volatile Throwable mergeThrowable;
+    
+    /** 
+     * A flag to indicate that merge is in progress
+     */
+    private volatile boolean mergeInProgress = false;
+    
+    /**
+     * When we accumulate mergeThreshold number of files in ram, we merge/spill
+     */
+    private int mergeThreshold = 500;
+    
+    /**
+     * The threads for fetching the files.
+     */
+    private MapOutputCopier[] copiers = null;
+    
+    /**
+     * The threads for fetching the files.
+     */
+    private MetricsRecord shuffleMetrics = null;
+    
+    /**
+     * the minimum interval between tasktracker polls
+     */
+    private static final long MIN_POLL_INTERVAL = 1000;
+    
+    /**
+     * the number of map output locations to poll for at one time
+     */  
+    private int probe_sample_size = 100;
+    
+    /**
+     * a hashmap from mapId to MapOutputLocation for retrials
+     */
+    private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
+    
+    /** 
+     * a TreeSet for needed map outputs
+     */
+    private Set <Integer> neededOutputs = 
+      Collections.synchronizedSet(new TreeSet<Integer>());
+    
+    /** Represents the result of an attempt to copy a map output */
+    private class CopyResult {
+      
+      // the map output location against which a copy attempt was made
+      private final MapOutputLocation loc;
+      
+      // the size of the file copied, -1 if the transfer failed
+      private final long size;
+      
+      //a flag signifying whether a copy result is obsolete
+      private static final int OBSOLETE = -2;
+      
+      CopyResult(MapOutputLocation loc, long size) {
+        this.loc = loc;
+        this.size = size;
+      }
+      
+      public int getMapId() { return loc.getMapId(); }
+      public boolean getSuccess() { return size >= 0; }
+      public boolean isObsolete() { 
+        return size == OBSOLETE;
+      }
+      public long getSize() { return size; }
+      public String getHost() { return loc.getHost(); }
+      public MapOutputLocation getLocation() { return loc; }
+    }
+    
+    private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
+      //spawn a thread to give copy progress heartbeats
+      Thread copyProgress = new Thread() {
+        public void run() {
+          LOG.debug("Started thread: " + getName());
+          while (true) {
+            try {
+              reportProgress(umbilical);
+              Thread.sleep(PROGRESS_INTERVAL);
+            } catch (InterruptedException e) {
+              return;
+            } catch (Throwable e) {
+              LOG.info("Thread Exception in " +
+                       "reporting copy progress\n" +
+                       StringUtils.stringifyException(e));
+              continue;
+            }
+          }
+        }
+      };
+      copyProgress.setName("Copy progress reporter for task "+getTaskId());
+      copyProgress.setDaemon(true);
+      return copyProgress;
+    }
+    
+    private int nextMapOutputCopierId = 0;
+    
+    /** Copies map outputs as they become available */
+    private class MapOutputCopier extends Thread {
+      
+      private MapOutputLocation currentLocation = null;
+      private int id = nextMapOutputCopierId++;
+      
+      public MapOutputCopier() {
+        setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
+        LOG.debug(getName() + " created");
+      }
+      
+      /**
+       * Fail the current file that we are fetching
+       * @return were we currently fetching?
+       */
+      public synchronized boolean fail() {
+        if (currentLocation != null) {
+          finish(-1);
+          return true;
+        } else {
+          return false;
+        }
+      }
+      
+      /**
+       * Get the current map output location.
+       */
+      public synchronized MapOutputLocation getLocation() {
+        return currentLocation;
+      }
+      
+      private synchronized void start(MapOutputLocation loc) {
+        currentLocation = loc;
+      }
+      
+      private synchronized void finish(long size) {
+        if (currentLocation != null) {
+          LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
+          synchronized (copyResults) {
+            copyResults.add(new CopyResult(currentLocation, size));
+            copyResults.notify();
+          }
+          currentLocation = null;
+        }
+      }
+      
+      /** Loop forever and fetch map outputs as they become available.
+       * The thread exits when it is interrupted by {@link ReduceTaskRunner}
+       */
+      public void run() {
+        while (true) {        
+          try {
+            MapOutputLocation loc = null;
+            long size = -1;
+            
+            synchronized (scheduledCopies) {
+              while (scheduledCopies.isEmpty()) {
+                scheduledCopies.wait();
+              }
+              loc = (MapOutputLocation)scheduledCopies.remove(0);
+            }
+            
+            try {
+              start(loc);
+              size = copyOutput(loc);
+            } catch (IOException e) {
+              LOG.warn(reduceTask.getTaskId() + " copy failed: " +
+                  loc.getMapTaskId() + " from " + loc.getHost());
+              LOG.warn(StringUtils.stringifyException(e));
+            } finally {
+              finish(size);
+            }
+          } catch (InterruptedException e) { 
+            return; // ALL DONE
+          } catch (Throwable th) {
+            LOG.error("Map output copy failure: " + 
+                StringUtils.stringifyException(th));
+          }
+        }
+      }
+      
+      /** Copies a a map output from a remote host, using raw RPC. 
+       * @param currentLocation the map output location to be copied
+       * @return the path (fully qualified) of the copied file
+       * @throws IOException if there is an error copying the file
+       * @throws InterruptedException if the copier should give up
+       */
+      private long copyOutput(MapOutputLocation loc
+      ) throws IOException, InterruptedException {
+        if (!neededOutputs.contains(loc.getMapId())) {
+          return CopyResult.OBSOLETE;
+        }
+        String reduceId = reduceTask.getTaskId();
+        LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
+            " output from " + loc.getHost() + ".");
+        // the place where the file should end up
+        Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
+            loc.getMapId() + ".out");
+        // a working filename that will be unique to this attempt
+        Path tmpFilename = new Path(finalFilename + "-" + id);
+        // this copies the map output file
+        tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
+            tmpFilename, reduceTask.getPartition(),
+            STALLED_COPY_TIMEOUT);
+        if (!neededOutputs.contains(loc.getMapId())) {
+          if (tmpFilename != null) {
+            FileSystem fs = tmpFilename.getFileSystem(conf);
+            fs.delete(tmpFilename);
+          }
+          return CopyResult.OBSOLETE;
+        }
+        if (tmpFilename == null)
+          throw new IOException("File " + finalFilename + "-" + id + 
+          " not created");
+        long bytes = -1;
+        // lock the ReduceTask while we do the rename
+        synchronized (ReduceTask.this) {
+          // This file could have been created in the inmemory
+          // fs or the localfs. So need to get the filesystem owning the path. 
+          FileSystem fs = tmpFilename.getFileSystem(conf);
+          if (!neededOutputs.contains(loc.getMapId())) {
+            fs.delete(tmpFilename);
+            return CopyResult.OBSOLETE;
+          }
+          // if we can't rename the file, something is broken (and IOException
+          // will be thrown). 
+          if (!fs.rename(tmpFilename, finalFilename)) {
+            fs.delete(tmpFilename);
+            throw new IOException("failure to rename map output " + 
+                tmpFilename);
+          }
+          bytes = fs.getLength(finalFilename);
+          LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
+              " output from " + loc.getHost() + ".");
+          //Create a thread to do merges. Synchronize access/update to 
+          //mergeInProgress
+          if (!mergeInProgress && 
+              (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || 
+                  (mergeThreshold > 0 && 
+                      inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= 
+                        mergeThreshold))&&
+                      mergeThrowable == null) {
+            LOG.info(reduceId + " InMemoryFileSystem " + 
+                inMemFileSys.getUri().toString() +
+                " is " + inMemFileSys.getPercentUsed() + 
+            " full. Triggering merge");
+            InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
+                (LocalFileSystem)localFileSys, sorter);
+            m.setName("Thread for merging in memory files");
+            m.setDaemon(true);
+            mergeInProgress = true;
+            m.start();
+          }
+          neededOutputs.remove(loc.getMapId());
+        }
+        return bytes;
+      }
+      
+    }
+    
+    private void configureClasspath(JobConf conf)
+    throws IOException {
+      
+      // get the task and the current classloader which will become the parent
+      Task task = ReduceTask.this;
+      ClassLoader parent = conf.getClassLoader();   
+      
+      // get the work directory which holds the elements we are dynamically
+      // adding to the classpath
+      File workDir = new File(task.getJobFile()).getParentFile();
+      File jobCacheDir = new File(workDir.getParent(), "work");
+      ArrayList<URL> urllist = new ArrayList<URL>();
+      
+      // add the jars and directories to the classpath
+      String jar = conf.getJar();
+      if (jar != null) {      
+        File[] libs = new File(jobCacheDir, "lib").listFiles();
+        if (libs != null) {
+          for (int i = 0; i < libs.length; i++) {
+            urllist.add(libs[i].toURL());
+          }
+        }
+        urllist.add(new File(jobCacheDir, "classes").toURL());
+        urllist.add(jobCacheDir.toURL());
+        
+      }
+      urllist.add(workDir.toURL());
+      
+      // create a new classloader with the old classloader as its parent
+      // then set that classloader as the one used by the current jobconf
+      URL[] urls = urllist.toArray(new URL[urllist.size()]);
+      URLClassLoader loader = new URLClassLoader(urls, parent);
+      conf.setClassLoader(loader);
+    }
+    
+    public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
+    throws IOException {
+      
+      configureClasspath(conf);
+      this.umbilical = umbilical;      
+      this.reduceTask = ReduceTask.this;
+      this.scheduledCopies = new ArrayList(100);
+      this.copyResults = new ArrayList(100);    
+      this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
+      this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+      this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
+      
+      //we want to distinguish inmem fs instances for different reduces. Hence,
+      //append a unique string in the uri for the inmem fs name
+      URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
+      inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
+      LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
+          + uri);
+      localFileSys = FileSystem.getLocal(conf);
+      //create an instance of the sorter
+      sorter =
+        new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
+            conf.getMapOutputValueClass(), conf);
+      
+      // hosts -> next contact time
+      this.penaltyBox = new Hashtable();
+      
+      // hostnames
+      this.uniqueHosts = new HashSet();
+      
+      this.lastPollTime = 0;
+      
+      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+      this.shuffleMetrics = 
+        MetricsUtil.createRecord(metricsContext, "shuffleInput");
+      this.shuffleMetrics.setTag("user", conf.getUser());
+    }
+    
+    public boolean fetchOutputs() throws IOException {
+      final int      numOutputs = reduceTask.getNumMaps();
+      Map<Integer, MapOutputLocation> knownOutputs = 
+        new HashMap<Integer, MapOutputLocation>();
+      int            numInFlight = 0, numCopied = 0;
+      int            lowThreshold = numCopiers*2;
+      long           bytesTransferred = 0;
+      DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
+      Random         backoff = new Random();
+      final Progress copyPhase = 
+        reduceTask.getProgress().phase();
+      
+      //tweak the probe sample size (make it a function of numCopiers)
+      probe_sample_size = Math.max(numCopiers*5, 50);
+      
+      for (int i = 0; i < numOutputs; i++) {
+        neededOutputs.add(new Integer(i));
+        copyPhase.addPhase();       // add sub-phase per file
+      }
+      
+      copiers = new MapOutputCopier[numCopiers];
+      
+      // start all the copying threads
+      for (int i=0; i < copiers.length; i++) {
+        copiers[i] = new MapOutputCopier();
+        copiers[i].start();
+      }
+      
+      // start the clock for bandwidth measurement
+      long startTime = System.currentTimeMillis();
+      long currentTime = startTime;
+      IntWritable fromEventId = new IntWritable(0);
+      
+      Thread copyProgress = createProgressThread(umbilical);
+      copyProgress.start();
+      try {
+        // loop until we get all required outputs
+        while (numCopied < numOutputs && mergeThrowable == null) {
+          
+          LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
+          " map output(s)");
+          
+          if (!neededOutputs.isEmpty()) {
+            LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+            " map output location(s)");
+            try {
+              // Put the hash entries for the failed fetches. Entries here
+              // might be replaced by (mapId) hashkeys from new successful 
+              // Map executions, if the fetch failures were due to lost tasks.
+              // The replacements, if at all, will happen when we query the
+              // tasktracker and put the mapId hashkeys with new 
+              // MapOutputLocations as values
+              knownOutputs.putAll(retryFetches);
+              // The call getSuccessMapEvents will modify fromEventId to a val
+              // that it should be for the next call to getSuccessMapEvents
+              List <MapOutputLocation> locs = getSuccessMapEvents(fromEventId);
+              
+              // put discovered them on the known list
+              for (int i=0; i < locs.size(); i++) {
+                knownOutputs.put(new Integer(locs.get(i).getMapId()), 
+                    locs.get(i));
+              }
+              LOG.info(reduceTask.getTaskId() +
+                 " Got " + locs.size() + 
+                 " new map outputs from tasktracker and " + retryFetches.size()
+                 + " map outputs from previous failures");
+              // clear the "failed" fetches hashmap
+              retryFetches.clear();
+            }
+            catch (IOException ie) {
+              LOG.warn(reduceTask.getTaskId() +
+                  " Problem locating map outputs: " +
+                  StringUtils.stringifyException(ie));
+            }
+          }
+          
+          // now walk through the cache and schedule what we can
+          int numKnown = knownOutputs.size(), numScheduled = 0;
+          int numSlow = 0, numDups = 0;
+          
+          LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
+          " known map output location(s); scheduling...");
+          
+          synchronized (scheduledCopies) {
+            Iterator locIt = knownOutputs.values().iterator();
+            
+            currentTime = System.currentTimeMillis();
+            while (locIt.hasNext()) {
+              
+              MapOutputLocation loc = (MapOutputLocation)locIt.next();
+              Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
+              boolean penalized = false, duplicate = false;
+              
+              if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
+                penalized = true; numSlow++;
+              }
+              if (uniqueHosts.contains(loc.getHost())) {
+                duplicate = true; numDups++;
+              }
+              
+              if (!penalized && !duplicate) {
+                uniqueHosts.add(loc.getHost());
+                scheduledCopies.add(loc);
+                locIt.remove();  // remove from knownOutputs
+                numInFlight++; numScheduled++;
+              }
+            }
+            scheduledCopies.notifyAll();
+          }
+          LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
+              " of " + numKnown + " known outputs (" + numSlow +
+              " slow hosts and " + numDups + " dup hosts)");
+          
+          // if we have no copies in flight and we can't schedule anything
+          // new, just wait for a bit
+          try {
+            if (numInFlight == 0 && numScheduled == 0) {
+              Thread.sleep(5000);
+            }
+          } catch (InterruptedException e) { } // IGNORE
+          
+          while (numInFlight > 0 && mergeThrowable == null) {
+            LOG.debug(reduceTask.getTaskId() + " numInFlight = " + 
+                numInFlight);
+            CopyResult cr = getCopyResult();
+            
+            if (cr != null) {
+              if (cr.getSuccess()) {  // a successful copy
+                numCopied++;
+                bytesTransferred += cr.getSize();
+                
+                long secsSinceStart = 
+                  (System.currentTimeMillis()-startTime)/1000+1;
+                float mbs = ((float)bytesTransferred)/(1024*1024);
+                float transferRate = mbs/secsSinceStart;
+                
+                copyPhase.startNextPhase();
+                copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
+                    + " at " +
+                    mbpsFormat.format(transferRate) +  " MB/s)");          
+              } else if (cr.isObsolete()) {
+                //ignore
+                LOG.info(reduceTask.getTaskId() + 
+                    " Ignoring obsolete copy result for Map Task: " + 
+                    cr.getLocation().getMapTaskId() + " from host: " + 
+                    cr.getHost());
+              } else {
+                retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
+                
+                // wait a random amount of time for next contact
+                currentTime = System.currentTimeMillis();
+                long nextContact = currentTime + 60 * 1000 +
+                backoff.nextInt(maxBackoff*1000);
+                penaltyBox.put(cr.getHost(), new Long(nextContact));          
+                LOG.warn(reduceTask.getTaskId() + " adding host " +
+                    cr.getHost() + " to penalty box, next contact in " +
+                    ((nextContact-currentTime)/1000) + " seconds");
+                
+                // other outputs from the failed host may be present in the
+                // knownOutputs cache, purge them. This is important in case
+                // the failure is due to a lost tasktracker (causes many
+                // unnecessary backoffs). If not, we only take a small hit
+                // polling the tasktracker a few more times
+                Iterator locIt = knownOutputs.values().iterator();
+                while (locIt.hasNext()) {
+                  MapOutputLocation loc = (MapOutputLocation)locIt.next();
+                  if (cr.getHost().equals(loc.getHost())) {
+                    retryFetches.put(new Integer(loc.getMapId()), loc);
+                    locIt.remove();
+                  }
+                }
+              }
+              uniqueHosts.remove(cr.getHost());
+              numInFlight--;
+            }
+            
+            boolean busy = true;
+            // ensure we have enough to keep us busy
+            if (numInFlight < lowThreshold && (numOutputs-numCopied) > 
+              probe_sample_size) {
+              busy = false;
+            }
+            //Check whether we have more CopyResult to check. If there is none,
+            //and we are not busy enough, break
+            synchronized (copyResults) {
+              if (copyResults.size() == 0 && !busy) {
+                break;
+              }
+            }
+          }
+          
+        }
+        
+        // all done, inform the copiers to exit
+        synchronized (copiers) {
+          synchronized (scheduledCopies) {
+            for (int i=0; i < copiers.length; i++) {
+              copiers[i].interrupt();
+              copiers[i] = null;
+            }
+          }
+        }
+        
+        //Do a merge of in-memory files (if there are any)
+        if (mergeThrowable == null) {
+          try {
+            //wait for an ongoing merge (if it is in flight) to complete
+            while (mergeInProgress) {
+              Thread.sleep(200);
+            }
+            LOG.info(reduceTask.getTaskId() + 
+                " Copying of all map outputs complete. " + 
+                "Initiating the last merge on the remaining files in " + 
+                inMemFileSys.getUri());
+            if (mergeThrowable != null) {
+              //this could happen if the merge that
+              //was in progress threw an exception
+              throw mergeThrowable;
+            }
+            //initiate merge
+            Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+            if (inMemClosedFiles.length == 0) {
+              LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
+                  inMemFileSys.getUri());
+              return numCopied == numOutputs;
+            }
+            //name this output file same as the name of the first file that is 
+            //there in the current list of inmem files (this is guaranteed to be
+            //absent on the disk currently. So we don't overwrite a prev. 
+            //created spill). Also we need to create the output file now since
+            //it is not guaranteed that this file will be present after merge
+            //is called (we delete empty sequence files as soon as we see them
+            //in the merge method)
+            SequenceFile.Writer writer = sorter.cloneFileAttributes(
+                inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+                localFileSys.makeQualified(inMemClosedFiles[0]), null);
+            
+            SequenceFile.Sorter.RawKeyValueIterator rIter = null;
+            try {
+              rIter = sorter.merge(inMemClosedFiles, true, 
+                  inMemClosedFiles.length, 
+                  new Path(reduceTask.getTaskId()));
+            } catch (Exception e) { 
+              //make sure that we delete the ondisk file that we created earlier
+              //when we invoked cloneFileAttributes
+              writer.close();
+              localFileSys.delete(inMemClosedFiles[0]);
+              throw new IOException (StringUtils.stringifyException(e));
+            }
+            sorter.writeFile(rIter, writer);
+            writer.close();
+            LOG.info(reduceTask.getTaskId() +
+                " Merge of the " +inMemClosedFiles.length +
+                " files in InMemoryFileSystem complete." +
+                " Local file is " + inMemClosedFiles[0]);
+          } catch (Throwable t) {
+            LOG.warn(reduceTask.getTaskId() +
+                " Final merge of the inmemory files threw an exception: " + 
+                StringUtils.stringifyException(t));
+            return false;
+          }
+        }
+        return mergeThrowable == null && numCopied == numOutputs;
+      } finally {
+        inMemFileSys.close();
+        copyProgress.interrupt();
+      }
+    }
+    
+    
+    private CopyResult getCopyResult() {  
+      synchronized (copyResults) {
+        while (copyResults.isEmpty()) {
+          try {
+            copyResults.wait();
+          } catch (InterruptedException e) { }
+        }
+        if (copyResults.isEmpty()) {
+          return null;
+        } else {
+          return (CopyResult) copyResults.remove(0);
+        }
+      }    
+    }
+    
+    /** Queries the task tracker for a set of outputs ready to be copied
+     * @param fromEventId the first event ID we want to start from, this is
+     * modified by the call to this method
+     * @param jobClient the job tracker
+     * @return a set of locations to copy outputs from
+     * @throws IOException
+     */  
+    private List <MapOutputLocation> getSuccessMapEvents(IntWritable fromEventId)
+        throws IOException {
+      
+      long currentTime = System.currentTimeMillis();    
+      long pollTime = lastPollTime + MIN_POLL_INTERVAL;
+      while (currentTime < pollTime) {
+        try {
+          Thread.sleep(pollTime-currentTime);
+        } catch (InterruptedException ie) { } // IGNORE
+        currentTime = System.currentTimeMillis();
+      }
+      lastPollTime = currentTime;
+      
+      TaskCompletionEvent t[] = umbilical.getMapCompletionEvents(
+          reduceTask.getJobId().toString(),
+          fromEventId.get(),
+          probe_sample_size);
+      
+      List <MapOutputLocation> mapOutputsList = 
+        new ArrayList<MapOutputLocation>();
+      for (int i = 0; i < t.length; i++) {
+        if (t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
+          URI u = URI.create(t[i].getTaskTrackerHttp());
+          String host = u.getHost();
+          int port = u.getPort();
+          String taskId = t[i].getTaskId();
+          int mId = t[i].idWithinJob();
+          mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
+        }
+      }
+    
+      fromEventId.set(fromEventId.get() + t.length);
+      return mapOutputsList;
+    }
+    
+    
+    private class InMemFSMergeThread extends Thread {
+      private InMemoryFileSystem inMemFileSys;
+      private LocalFileSystem localFileSys;
+      private SequenceFile.Sorter sorter;
+      
+      public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
+          LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
+        this.inMemFileSys = inMemFileSys;
+        this.localFileSys = localFileSys;
+        this.sorter = sorter;
+      }
+      public void run() {
+        LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
+        try {
+          //initiate merge
+          Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+          //Note that the above Path[] could be of length 0 if all copies are 
+          //in flight. So we make sure that we have some 'closed' map
+          //output files to merge to get the benefit of in-memory merge
+          if (inMemClosedFiles.length >= 
+            (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+            //name this output file same as the name of the first file that is 
+            //there in the current list of inmem files (this is guaranteed to
+            //be absent on the disk currently. So we don't overwrite a prev. 
+            //created spill). Also we need to create the output file now since
+            //it is not guaranteed that this file will be present after merge
+            //is called (we delete empty sequence files as soon as we see them
+            //in the merge method)
+            SequenceFile.Writer writer = sorter.cloneFileAttributes(
+                inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+                localFileSys.makeQualified(inMemClosedFiles[0]), null);
+            SequenceFile.Sorter.RawKeyValueIterator rIter;
+            try {
+              rIter = sorter.merge(inMemClosedFiles, true, 
+                  inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
+            } catch (Exception e) { 
+              //make sure that we delete the ondisk file that we created 
+              //earlier when we invoked cloneFileAttributes
+              writer.close();
+              localFileSys.delete(inMemClosedFiles[0]);
+              throw new IOException (StringUtils.stringifyException(e));
+            }
+            sorter.writeFile(rIter, writer);
+            writer.close();
+            LOG.info(reduceTask.getTaskId() + 
+                " Merge of the " +inMemClosedFiles.length +
+                " files in InMemoryFileSystem complete." +
+                " Local file is " + inMemClosedFiles[0]);
+          }
+          else {
+            LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 
+                inMemFileSys.getUri());
+          }
+        } catch (Throwable t) {
+          LOG.warn(reduceTask.getTaskId() +
+              " Intermediate Merge of the inmemory files threw an exception: "
+              + StringUtils.stringifyException(t));
+          ReduceCopier.this.mergeThrowable = t;
+        }
+        finally {
+          mergeInProgress = false;
+        }
+      }
+    }
+    final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
+      public boolean accept(Path file) {
+        return file.toString().endsWith(".out");
+      }     
+    };
+  }
 }

+ 5 - 813
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -17,428 +17,23 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InMemoryFileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.conf.*;
 
 import java.io.*;
-import java.util.*;
-import java.net.*;
-import java.text.DecimalFormat;
-import org.apache.hadoop.util.Progressable;
 
 /** Runs a reduce task. */
-class ReduceTaskRunner extends TaskRunner implements MRConstants {
-  /** Number of ms before timing out a copy */
-  private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
-  
+class ReduceTaskRunner extends TaskRunner {
   /** 
    * for cleaning up old map outputs
    */
   private MapOutputFile mapOutputFile;
   
-  /**
-   * our reduce task instance
-   */
-  private ReduceTask reduceTask;
-    
-  /**
-   * the list of map outputs currently being copied
-   */
-  private List scheduledCopies;
-  
-  /**
-   *  the results of dispatched copy attempts
-   */
-  private List copyResults;
-  
-  /**
-   *  the number of outputs to copy in parallel
-   */
-  private int numCopiers;
-
-  /**
-   * the maximum amount of time (less 1 minute) to wait to 
-   * contact a host after a copy from it fails. We wait for (1 min +
-   * Random.nextInt(maxBackoff)) seconds.
-   */
-  private int maxBackoff;
-  
-  /**
-   * busy hosts from which copies are being backed off
-   * Map of host -> next contact time
-   */
-  private Map penaltyBox;
-
-  /**
-   * the set of unique hosts from which we are copying
-   */
-  private Set uniqueHosts;
-  
-  /**
-   * the last time we polled the job tracker
-   */
-  private long lastPollTime;
-  
-  /**
-   * A reference to the in memory file system for writing the map outputs to.
-   */
-  private InMemoryFileSystem inMemFileSys;
-  
-  /**
-   * A reference to the local file system for writing the map outputs to.
-   */
-  private FileSystem localFileSys;
-
-  /**
-   * An instance of the sorter used for doing merge
-   */
-  private SequenceFile.Sorter sorter;
-  
-  /**
-   * A reference to the throwable object (if merge throws an exception)
-   */
-  private volatile Throwable mergeThrowable;
-  
-  /** 
-   * A flag to indicate that merge is in progress
-   */
-  private volatile boolean mergeInProgress = false;
-
-  /**
-   * When we accumulate merge_threshold number of files in ram, we merge/spill
-   */
-  private int mergeThreshold = 500;
-  
-  /**
-   * The threads for fetching the files.
-   */
-  private MapOutputCopier[] copiers = null;
-  
-  /**
-   * The threads for fetching the files.
-   */
-  private MetricsRecord shuffleMetrics = null;
-  
-  /**
-   * the minimum interval between jobtracker polls
-   */
-  private static final long MIN_POLL_INTERVAL = 5000;
-  
-  /**
-   * the number of map output locations to poll for at one time
-   */  
-  private int probe_sample_size = 50;
-  
-  /**
-   * a hashmap from mapId to MapOutputLocation for retrials
-   */
-  private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
-  
-  /** 
-   * a TreeSet for needed map outputs
-   */
-  private Set <Integer> neededOutputs = 
-    Collections.synchronizedSet(new TreeSet<Integer>());
-  
-  /** Represents the result of an attempt to copy a map output */
-  private class CopyResult {
-    
-    // the map output location against which a copy attempt was made
-    private final MapOutputLocation loc;
-    
-    // the size of the file copied, -1 if the transfer failed
-    private final long size;
-    
-    //a flag signifying whether a copy result is obsolete
-    private static final int OBSOLETE = -2;
-    
-    CopyResult(MapOutputLocation loc, long size) {
-      this.loc = loc;
-      this.size = size;
-    }
-    
-    public int getMapId() { return loc.getMapId(); }
-    public boolean getSuccess() { return size >= 0; }
-    public boolean isObsolete() { 
-      return size == OBSOLETE;
-    }
-    public long getSize() { return size; }
-    public String getHost() { return loc.getHost(); }
-    public MapOutputLocation getLocation() { return loc; }
-  }
-
-  private class PingTimer extends Thread implements Progressable {
-    Task task = getTask();
-    TaskTracker tracker = getTracker();
-
-    public void run() {
-      LOG.info(task.getTaskId() + " Started thread: " + getName());
-      while (true) {
-        try {
-          progress();
-          Thread.sleep(Task.PROGRESS_INTERVAL);
-        }
-        catch (InterruptedException i) {
-          return;
-        }
-        catch (Throwable e) {
-          LOG.info(task.getTaskId() + " Thread Exception in " +
-                   "reporting sort progress\n" +
-                   StringUtils.stringifyException(e));
-          continue;
-        }
-      }
-    }
-    
-    public void progress() {
-      task.reportProgress(tracker);
-    }
-  }
-
-  private static int nextMapOutputCopierId = 0;
-
-  /** Copies map outputs as they become available */
-  private class MapOutputCopier extends Thread {
-
-    private MapOutputLocation currentLocation = null;
-    private int id = nextMapOutputCopierId++;
-    
-    public MapOutputCopier() {
-      setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
-      LOG.debug(getName() + " created");
-    }
-    
-    /**
-     * Fail the current file that we are fetching
-     * @return were we currently fetching?
-     */
-    public synchronized boolean fail() {
-      if (currentLocation != null) {
-        finish(-1);
-        return true;
-      } else {
-        return false;
-      }
-    }
-    
-    /**
-     * Get the current map output location.
-     */
-    public synchronized MapOutputLocation getLocation() {
-      return currentLocation;
-    }
-    
-    private synchronized void start(MapOutputLocation loc) {
-      currentLocation = loc;
-    }
-    
-    private synchronized void finish(long size) {
-      if (currentLocation != null) {
-        LOG.debug(getName() + " finishing " + currentLocation + " = " + size);
-        synchronized (copyResults) {
-          copyResults.add(new CopyResult(currentLocation, size));
-          copyResults.notify();
-        }
-        currentLocation = null;
-      }
-    }
-    
-    /** Loop forever and fetch map outputs as they become available.
-     * The thread exits when it is interrupted by the {@link ReduceTaskRunner}
-     */
-    public void run() {
-      while (true) {        
-        try {
-          MapOutputLocation loc = null;
-          long size = -1;
-          
-          synchronized (scheduledCopies) {
-            while (scheduledCopies.isEmpty()) {
-              scheduledCopies.wait();
-            }
-            loc = (MapOutputLocation)scheduledCopies.remove(0);
-          }
-
-          try {
-            start(loc);
-            size = copyOutput(loc);
-          } catch (IOException e) {
-            LOG.warn(reduceTask.getTaskId() + " copy failed: " +
-                     loc.getMapTaskId() + " from " + loc.getHost());
-            LOG.warn(StringUtils.stringifyException(e));
-          } finally {
-            finish(size);
-          }
-        } catch (InterruptedException e) { 
-          return; // ALL DONE
-        } catch (Throwable th) {
-          LOG.error("Map output copy failure: " + 
-                    StringUtils.stringifyException(th));
-        }
-      }
-    }
-
-    /** Copies a a map output from a remote host, using raw RPC. 
-     * @param currentLocation the map output location to be copied
-     * @return the path (fully qualified) of the copied file
-     * @throws IOException if there is an error copying the file
-     * @throws InterruptedException if the copier should give up
-     */
-    private long copyOutput(MapOutputLocation loc
-                            ) throws IOException, InterruptedException {
-      if (!neededOutputs.contains(loc.getMapId())) {
-        return CopyResult.OBSOLETE;
-      }
-      String reduceId = reduceTask.getTaskId();
-      LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
-               " output from " + loc.getHost() + ".");
-      // the place where the file should end up
-      Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
-                                             loc.getMapId() + ".out");
-      // a working filename that will be unique to this attempt
-      Path tmpFilename = new Path(finalFilename + "-" + id);
-      // this copies the map output file
-      tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
-                                tmpFilename, reduceTask.getPartition(),
-                                STALLED_COPY_TIMEOUT);
-      if (!neededOutputs.contains(loc.getMapId())) {
-        if (tmpFilename != null) {
-          FileSystem fs = tmpFilename.getFileSystem(conf);
-          fs.delete(tmpFilename);
-        }
-        return CopyResult.OBSOLETE;
-      }
-      if (tmpFilename == null)
-        throw new IOException("File " + finalFilename + "-" + id + 
-                              " not created");
-      long bytes = -1;
-      // lock the ReduceTaskRunner while we do the rename
-      synchronized (ReduceTaskRunner.this) {
-        // This file could have been created in the inmemory
-        // fs or the localfs. So need to get the filesystem owning the path. 
-        FileSystem fs = tmpFilename.getFileSystem(conf);
-        if (!neededOutputs.contains(loc.getMapId())) {
-          fs.delete(tmpFilename);
-          return CopyResult.OBSOLETE;
-        }
-        // if we can't rename the file, something is broken (and IOException
-        // will be thrown). 
-        if (!fs.rename(tmpFilename, finalFilename)) {
-          fs.delete(tmpFilename);
-          throw new IOException("failure to rename map output " + tmpFilename);
-        }
-        bytes = fs.getLength(finalFilename);
-        LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
-                 " output from " + loc.getHost() + ".");
-        //Create a thread to do merges. Synchronize access/update to 
-        //mergeInProgress
-        if (!mergeInProgress && 
-            (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || 
-             (mergeThreshold > 0 && 
-              inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= mergeThreshold))&&
-            mergeThrowable == null) {
-          LOG.info(reduceId + " InMemoryFileSystem " + 
-                   inMemFileSys.getUri().toString() +
-                   " is " + inMemFileSys.getPercentUsed() + 
-                   " full. Triggering merge");
-          InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
-                                                        (LocalFileSystem)localFileSys, sorter);
-          m.setName("Thread for merging in memory files");
-          m.setDaemon(true);
-          mergeInProgress = true;
-          m.start();
-        }
-        neededOutputs.remove(loc.getMapId());
-      }
-      return bytes;
-    }
-
-  }
-  
-  private void configureClasspath(JobConf conf)
-    throws IOException {
-    
-    // get the task and the current classloader which will become the parent
-    Task task = getTask();
-    ClassLoader parent = conf.getClassLoader();   
-    
-    // get the work directory which holds the elements we are dynamically
-    // adding to the classpath
-    File workDir = new File(task.getJobFile()).getParentFile();
-    File jobCacheDir = new File(workDir.getParent(), "work");
-    ArrayList<URL> urllist = new ArrayList<URL>();
-    
-    // add the jars and directories to the classpath
-    String jar = conf.getJar();
-    if (jar != null) {      
-      File[] libs = new File(jobCacheDir, "lib").listFiles();
-      if (libs != null) {
-        for (int i = 0; i < libs.length; i++) {
-          urllist.add(libs[i].toURL());
-        }
-      }
-      urllist.add(new File(jobCacheDir, "classes").toURL());
-      urllist.add(jobCacheDir.toURL());
-     
-    }
-    urllist.add(workDir.toURL());
-    
-    // create a new classloader with the old classloader as its parent
-    // then set that classloader as the one used by the current jobconf
-    URL[] urls = urllist.toArray(new URL[urllist.size()]);
-    URLClassLoader loader = new URLClassLoader(urls, parent);
-    conf.setClassLoader(loader);
-  }
-
   public ReduceTaskRunner(Task task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
     
     super(task, tracker, conf);
-    configureClasspath(conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
-
-    this.reduceTask = (ReduceTask)getTask();
-    this.scheduledCopies = new ArrayList(100);
-    this.copyResults = new ArrayList(100);    
-    this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
-    this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
-    this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
-
-    //we want to distinguish inmem fs instances for different reduces. Hence,
-    //append a unique string in the uri for the inmem fs name
-    URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
-    inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
-    LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " +
-             uri);
-    localFileSys = FileSystem.getLocal(conf);
-    //create an instance of the sorter
-    sorter =
-      new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
-                              conf.getMapOutputValueClass(), conf);
-    
-    // hosts -> next contact time
-    this.penaltyBox = new Hashtable();
-    
-    // hostnames
-    this.uniqueHosts = new HashSet();
-    
-    this.lastPollTime = 0;
-
-    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-    this.shuffleMetrics = 
-      MetricsUtil.createRecord(metricsContext, "shuffleInput");
-    this.shuffleMetrics.setTag("user", conf.getUser());
   }
 
   /** Assemble all of the map output files */
@@ -448,418 +43,15 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     }
     
     // cleanup from failures
-    this.mapOutputFile.removeAll(reduceTask.getTaskId());
-    
-    final int      numOutputs = reduceTask.getNumMaps();
-    Map<Integer, MapOutputLocation> knownOutputs = 
-      new HashMap<Integer, MapOutputLocation>();
-    int            numInFlight = 0, numCopied = 0;
-    int            lowThreshold = numCopiers*2;
-    long           bytesTransferred = 0;
-    DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-    Random         backoff = new Random();
-    final Progress copyPhase = getTask().getProgress().phase();
-    
-    //tweak the probe sample size (make it a function of numCopiers)
-    probe_sample_size = Math.max(numCopiers*5, 50);
-    
-    for (int i = 0; i < numOutputs; i++) {
-      neededOutputs.add(new Integer(i));
-      copyPhase.addPhase();       // add sub-phase per file
-    }
-
-    InterTrackerProtocol jobClient = getTracker().getJobClient();
-    copiers = new MapOutputCopier[numCopiers];
-
-    // start all the copying threads
-    for (int i=0; i < copiers.length; i++) {
-      copiers[i] = new MapOutputCopier();
-      copiers[i].start();
-    }
-    
-    // start the clock for bandwidth measurement
-    long startTime = System.currentTimeMillis();
-    long currentTime = startTime;
-    IntWritable fromEventId = new IntWritable(0);
-    
-    PingTimer pingTimer = new PingTimer();
-    pingTimer.setName("Map output copy reporter for task " + 
-                      reduceTask.getTaskId());
-    pingTimer.setDaemon(true);
-    pingTimer.start();
-    try {
-      // loop until we get all required outputs or are killed
-      while (!killed && numCopied < numOutputs && mergeThrowable == null) {
-
-        LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
-                 " map output(s)");
-
-        if (!neededOutputs.isEmpty()) {
-          LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
-                   " map output location(s)");
-          try {
-            // Put the hash entries for the failed fetches. Entries here
-            // might be replaced by (mapId) hashkeys from new successful 
-            // Map executions, if the fetch failures were due to lost tasks.
-            // The replacements, if at all, will happen when we query the
-            // JobTracker and put the mapId hashkeys with new MapOutputLocations
-            // as values
-            knownOutputs.putAll(retryFetches);
-            // the call to queryJobTracker will modify fromEventId to a value
-            // that it should be for the next call to queryJobTracker
-            List <MapOutputLocation> locs = queryJobTracker(fromEventId, 
-                                                            jobClient);
-          
-            // put discovered them on the known list
-            for (int i=0; i < locs.size(); i++) {
-              knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
-            }
-            LOG.info(reduceTask.getTaskId() +
-                     " Got " + locs.size() + 
-                     " new map outputs from jobtracker and " + retryFetches.size() +
-                     " map outputs from previous failures");
-            // clear the "failed" fetches hashmap
-            retryFetches.clear();
-          }
-          catch (IOException ie) {
-            LOG.warn(reduceTask.getTaskId() +
-                     " Problem locating map outputs: " +
-                     StringUtils.stringifyException(ie));
-          }
-        }
-      
-        // now walk through the cache and schedule what we can
-        int numKnown = knownOutputs.size(), numScheduled = 0;
-        int numSlow = 0, numDups = 0;
-
-        LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
-                 " known map output location(s); scheduling...");
-
-        synchronized (scheduledCopies) {
-          Iterator locIt = knownOutputs.values().iterator();
-
-          currentTime = System.currentTimeMillis();
-          while (locIt.hasNext()) {
-
-            MapOutputLocation loc = (MapOutputLocation)locIt.next();
-            Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
-            boolean penalized = false, duplicate = false;
- 
-            if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
-              penalized = true; numSlow++;
-            }
-            if (uniqueHosts.contains(loc.getHost())) {
-              duplicate = true; numDups++;
-            }
- 
-            if (!penalized && !duplicate) {
-              uniqueHosts.add(loc.getHost());
-              scheduledCopies.add(loc);
-              locIt.remove();  // remove from knownOutputs
-              numInFlight++; numScheduled++;
-            }
-          }
-          scheduledCopies.notifyAll();
-        }
-        LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
-                 " of " + numKnown + " known outputs (" + numSlow +
-                 " slow hosts and " + numDups + " dup hosts)");
-
-        // if we have no copies in flight and we can't schedule anything
-        // new, just wait for a bit
-        try {
-          if (numInFlight == 0 && numScheduled == 0) {
-            Thread.sleep(5000);
-          }
-        } catch (InterruptedException e) { } // IGNORE
-
-        while (!killed && numInFlight > 0 && mergeThrowable == null) {
-          LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
-          CopyResult cr = getCopyResult();
-        
-          if (cr != null) {
-            if (cr.getSuccess()) {  // a successful copy
-              numCopied++;
-              bytesTransferred += cr.getSize();
-          
-              long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
-              float mbs = ((float)bytesTransferred)/(1024*1024);
-              float transferRate = mbs/secsSinceStart;
-          
-              copyPhase.startNextPhase();
-              copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + 
-                                  " at " +
-                                  mbpsFormat.format(transferRate) +  " MB/s)");          
-            } else if (cr.isObsolete()) {
-              //ignore
-              LOG.info(reduceTask.getTaskId() + 
-                       " Ignoring obsolete copy result for Map Task: " + 
-                       cr.getLocation().getMapTaskId() + " from host: " + 
-                       cr.getHost());
-            } else {
-              retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
-          
-              // wait a random amount of time for next contact
-              currentTime = System.currentTimeMillis();
-              long nextContact = currentTime + 60 * 1000 +
-                backoff.nextInt(maxBackoff*1000);
-              penaltyBox.put(cr.getHost(), new Long(nextContact));          
-              LOG.warn(reduceTask.getTaskId() + " adding host " +
-                       cr.getHost() + " to penalty box, next contact in " +
-                       ((nextContact-currentTime)/1000) + " seconds");
-
-              // other outputs from the failed host may be present in the
-              // knownOutputs cache, purge them. This is important in case
-              // the failure is due to a lost tasktracker (causes many
-              // unnecessary backoffs). If not, we only take a small hit
-              // polling the jobtracker a few more times
-              Iterator locIt = knownOutputs.values().iterator();
-              while (locIt.hasNext()) {
-                MapOutputLocation loc = (MapOutputLocation)locIt.next();
-                if (cr.getHost().equals(loc.getHost())) {
-                  retryFetches.put(new Integer(loc.getMapId()), loc);
-                  locIt.remove();
-                }
-              }
-            }
-            uniqueHosts.remove(cr.getHost());
-            numInFlight--;
-          }
-        
-          boolean busy = true;
-          // ensure we have enough to keep us busy
-          if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
-            busy = false;
-          }
-          //Check whether we have more CopyResult to check. If there is none, and
-          //we are not busy enough, break
-          synchronized (copyResults) {
-            if (copyResults.size() == 0 && !busy) {
-              break;
-            }
-          }
-        }
-      
-      }
-
-      // all done, inform the copiers to exit
-      synchronized (copiers) {
-        synchronized (scheduledCopies) {
-          for (int i=0; i < copiers.length; i++) {
-            copiers[i].interrupt();
-            copiers[i] = null;
-          }
-        }
-      }
-    
-      //Do a merge of in-memory files (if there are any)
-      if (!killed && mergeThrowable == null) {
-        try {
-          //wait for an ongoing merge (if it is in flight) to complete
-          while (mergeInProgress) {
-            Thread.sleep(200);
-          }
-          LOG.info(reduceTask.getTaskId() + 
-                   " Copying of all map outputs complete. " + 
-                   "Initiating the last merge on the remaining files in " + 
-                   inMemFileSys.getUri());
-          if (mergeThrowable != null) {
-            //this could happen if the merge that
-            //was in progress threw an exception
-            throw mergeThrowable;
-          }
-          //initiate merge
-          Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
-          if (inMemClosedFiles.length == 0) {
-            LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
-                     inMemFileSys.getUri());
-            return numCopied == numOutputs;
-          }
-          //name this output file same as the name of the first file that is 
-          //there in the current list of inmem files (this is guaranteed to be
-          //absent on the disk currently. So we don't overwrite a prev. 
-          //created spill). Also we need to create the output file now since
-          //it is not guaranteed that this file will be present after merge
-          //is called (we delete empty sequence files as soon as we see them
-          //in the merge method)
-          SequenceFile.Writer writer = sorter.cloneFileAttributes(
-                                                                  inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                                                                  localFileSys.makeQualified(inMemClosedFiles[0]), null);
-        
-          RawKeyValueIterator rIter = null;
-          try {
-            rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
-                                 new Path(reduceTask.getTaskId()));
-          } catch (Exception e) { 
-            //make sure that we delete the ondisk file that we created earlier
-            //when we invoked cloneFileAttributes
-            writer.close();
-            localFileSys.delete(inMemClosedFiles[0]);
-            throw new IOException (StringUtils.stringifyException(e));
-          }
-          sorter.writeFile(rIter, writer);
-          writer.close();
-          LOG.info(reduceTask.getTaskId() +
-                   " Merge of the " +inMemClosedFiles.length +
-                   " files in InMemoryFileSystem complete." +
-                   " Local file is " + inMemClosedFiles[0]);
-        } catch (Throwable t) {
-          LOG.warn(reduceTask.getTaskId() +
-                   " Final merge of the inmemory files threw an exception: " + 
-                   StringUtils.stringifyException(t));
-          return false;
-        }
-      }
-      return mergeThrowable == null && numCopied == numOutputs && !killed;
-    } finally {
-      inMemFileSys.close();
-      pingTimer.interrupt();
-    }
-  }
-  
-  
-  private CopyResult getCopyResult() {  
-    synchronized (copyResults) {
-      while (!killed && copyResults.isEmpty()) {
-        try {
-          copyResults.wait();
-        } catch (InterruptedException e) { }
-      }
-      if (copyResults.isEmpty()) {
-        return null;
-      } else {
-        return (CopyResult) copyResults.remove(0);
-      }
-    }    
+    this.mapOutputFile.removeAll(getTask().getTaskId());
+    return true;
   }
   
-  /** Queries the job tracker for a set of outputs ready to be copied
-   * @param fromEventId the first event ID we want to start from, this will be
-   * modified by the call to this method
-   * @param jobClient the job tracker
-   * @return a set of locations to copy outputs from
-   * @throws IOException
-   */  
-  private List <MapOutputLocation> queryJobTracker(IntWritable fromEventId, 
-                                                   InterTrackerProtocol jobClient)
-    throws IOException {
-    
-    long currentTime = System.currentTimeMillis();    
-    long pollTime = lastPollTime + MIN_POLL_INTERVAL;
-    while (currentTime < pollTime) {
-      try {
-        Thread.sleep(pollTime-currentTime);
-      } catch (InterruptedException ie) { } // IGNORE
-      currentTime = System.currentTimeMillis();
-    }
-    lastPollTime = currentTime;
-
-    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
-                                                                reduceTask.getJobId().toString(),
-                                                                fromEventId.get(),
-                                                                probe_sample_size);
-    
-    List <MapOutputLocation> mapOutputsList = new ArrayList();
-    for (int i = 0; i < t.length; i++) {
-      if (t[i].isMap && 
-          t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
-        URI u = URI.create(t[i].getTaskTrackerHttp());
-        String host = u.getHost();
-        int port = u.getPort();
-        String taskId = t[i].getTaskId();
-        int mId = t[i].idWithinJob();
-        mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
-      }
-    }
-    fromEventId.set(fromEventId.get() + t.length);
-    return mapOutputsList;
-  }
-
   
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
+    LOG.info(getTask()+" done; removing files.");
     getTask().getProgress().setStatus("closed");
     this.mapOutputFile.removeAll(getTask().getTaskId());
   }
-
-  /**
-   * Kill the child process, but also kick getCopyResult so that it checks
-   * the kill flag.
-   */
-  public void kill() {
-    synchronized (copyResults) {
-      super.kill();
-      copyResults.notify();
-    }
-  }
-
-  private class InMemFSMergeThread extends Thread {
-    private InMemoryFileSystem inMemFileSys;
-    private LocalFileSystem localFileSys;
-    private SequenceFile.Sorter sorter;
-    
-    public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
-                              LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
-      this.inMemFileSys = inMemFileSys;
-      this.localFileSys = localFileSys;
-      this.sorter = sorter;
-    }
-    public void run() {
-      LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
-      try {
-        //initiate merge
-        Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
-        //Note that the above Path[] could be of length 0 if all copies are 
-        //in flight. So we make sure that we have some 'closed' map
-        //output files to merge to get the benefit of in-memory merge
-        if (inMemClosedFiles.length >= 
-            (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
-          //name this output file same as the name of the first file that is 
-          //there in the current list of inmem files (this is guaranteed to be
-          //absent on the disk currently. So we don't overwrite a prev. 
-          //created spill). Also we need to create the output file now since
-          //it is not guaranteed that this file will be present after merge
-          //is called (we delete empty sequence files as soon as we see them
-          //in the merge method)
-          SequenceFile.Writer writer = sorter.cloneFileAttributes(
-                                                                  inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                                                                  localFileSys.makeQualified(inMemClosedFiles[0]), null);
-          RawKeyValueIterator rIter;
-          try {
-            rIter = sorter.merge(inMemClosedFiles, true, 
-                                 inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
-          } catch (Exception e) { 
-            //make sure that we delete the ondisk file that we created earlier
-            //when we invoked cloneFileAttributes
-            writer.close();
-            localFileSys.delete(inMemClosedFiles[0]);
-            throw new IOException (StringUtils.stringifyException(e));
-          }
-          sorter.writeFile(rIter, writer);
-          writer.close();
-          LOG.info(reduceTask.getTaskId() + 
-                   " Merge of the " +inMemClosedFiles.length +
-                   " files in InMemoryFileSystem complete." +
-                   " Local file is " + inMemClosedFiles[0]);
-        }
-        else {
-          LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 
-                   inMemFileSys.getUri());
-        }
-      } catch (Throwable t) {
-        LOG.warn(reduceTask.getTaskId() +
-                 " Intermediate Merge of the inmemory files threw an exception: " + 
-                 StringUtils.stringifyException(t));
-        ReduceTaskRunner.this.mergeThrowable = t;
-      }
-      finally {
-        mergeInProgress = false;
-      }
-    }
-  }
-  final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
-      public boolean accept(Path file) {
-        return file.toString().endsWith(".out");
-      }     
-    };
 }

+ 237 - 0
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,6 +37,8 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
+import java.util.Collections;
+import java.util.Collection;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -50,6 +53,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -143,6 +147,15 @@ public class TaskTracker
   private int maxCurrentTasks;
   private int failures;
   private int finishedCount[] = new int[1];
+    private MapEventsFetcherThread mapEventsFetcher;
+    /**
+     * the minimum interval between jobtracker polls
+     */
+    private static final long MIN_POLL_INTERVAL = 5000;
+    /**
+     * Number of maptask completion events locations to poll for at one time
+     */  
+    private int probe_sample_size = 50;
     
   private class TaskTrackerMetrics {
     private MetricsRecord metricsRecord = null;
@@ -227,6 +240,7 @@ public class TaskTracker
       synchronized (rJob) {
         rJob.tasks.add(tip);
       }
+        runningJobs.notify(); //notify the fetcher thread
       return rJob;
     }
   }
@@ -292,6 +306,9 @@ public class TaskTracker
         
     this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
     this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+        int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
+        //tweak the probe sample size (make it a function of numCopiers)
+        probe_sample_size = Math.max(numCopiers*5, 50);
         
         
     this.myMetrics = new TaskTrackerMetrics();
@@ -333,8 +350,166 @@ public class TaskTracker
                        jobTrackAddr, this.fConf);
         
     this.running = true;
+        // start the thread that will fetch map task completion events
+        this.mapEventsFetcher = new MapEventsFetcherThread();
+        mapEventsFetcher.setDaemon(true);
+        mapEventsFetcher.setName(
+            "Map-events fetcher for all reduce tasks " + "on " + 
+            taskTrackerName);
+        mapEventsFetcher.start();
   }
+    
+    private class MapEventsFetcherThread extends Thread {
+
+      private List <FetchStatus> reducesInShuffle() {
+        List <FetchStatus> fList = new ArrayList<FetchStatus>();
+        for (Map.Entry <String, RunningJob> item : runningJobs.entrySet()) {
+          RunningJob rjob = item.getValue();
+          String jobId = item.getKey();
+          FetchStatus f;
+          synchronized (rjob) {
+            f = rjob.getFetchStatus();
+            for (TaskInProgress tip : rjob.tasks) {
+              Task task = tip.getTask();
+              if (!task.isMapTask()) {
+                if (((ReduceTask)task).getPhase() == 
+                  TaskStatus.Phase.SHUFFLE) {
+                  if (rjob.getFetchStatus() == null) {
+                    //this is a new job; we start fetching its map events
+                    f = new FetchStatus(jobId, 
+                        ((ReduceTask)task).getNumMaps());
+                    rjob.setFetchStatus(f);
+                  }
+                  f = rjob.getFetchStatus();
+                  fList.add(f);
+                  break; //no need to check any more tasks belonging to this
+                }
+              }
+            }
+          }
+        }
+        //at this point, we have information about for which of
+        //the running jobs do we need to query the jobtracker for map 
+        //outputs (actually map events).
+        return fList;
+      }
+      
+      public void run() {
+        LOG.info("Starting thread: " + getName());
+        
+        while (true) {
+          try {
+            List <FetchStatus> fList = null;
+            synchronized (runningJobs) {
+              while (((fList = reducesInShuffle()).size()) == 0) {
+                try {
+                  runningJobs.wait();
+                } catch (InterruptedException e) {
+                  LOG.info("Shutting down: " + getName());
+                  return;
+                }
+              }
+            }
+            // now fetch all the map task events for all the reduce tasks
+            // possibly belonging to different jobs
+            for (FetchStatus f : fList) {
+              try {
+                
+                f.fetchMapCompletionEvents();
+                
+                try {
+                  Thread.sleep(MIN_POLL_INTERVAL);
+                } catch (InterruptedException ie) {
+                  LOG.info("Shutting down: " + getName());
+                  return;
+                }
+              } catch (Exception e) {
+                LOG.warn(
+                    "Ignoring exception that fetch for map completion" +
+                    " events threw for " + f.jobId + " threw: " +
+                    StringUtils.stringifyException(e)); 
+              }
+            }
+          } catch (Exception e) {
+            LOG.info("Ignoring exception "  + e.getMessage());
+          }
+        }
+      } 
+    }
+
+    private class FetchStatus {
+      /** The next event ID that we will start querying the JobTracker from*/
+      private IntWritable fromEventId;
+      /** This is the cache of map events for a given job */ 
+      private List<TaskCompletionEvent> allMapEvents;
+      /** This array will store indexes to "SUCCEEDED" map events from
+       * allMapEvents. The array is indexed by the mapId. 
+       * The reason why we store the indexes is to quickly reset SUCCEEDED 
+       * events to OBSOLETE. Thus ReduceTasks might also get to know about 
+       * OBSOLETE events and avoid fetching map outputs from the corresponding 
+       * locations.
+       */ 
+      private int indexToEventsCache[];
+      /** What jobid this fetchstatus object is for*/
+      private String jobId;
+     
+      public FetchStatus(String jobId, int numMaps) {
+        this.fromEventId = new IntWritable(0);
+        this.jobId = jobId;
+        this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
+        this.indexToEventsCache = new int[numMaps];
+      }
+      
+      public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
         
+        TaskCompletionEvent[] mapEvents = 
+                              TaskCompletionEvent.EMPTY_ARRAY;
+        synchronized (allMapEvents) {
+          if (allMapEvents.size() > fromId) {
+            int actualMax = Math.min(max, (allMapEvents.size() - fromId));
+            List <TaskCompletionEvent> eventSublist = 
+              allMapEvents.subList(fromId, actualMax + fromId);
+            mapEvents = 
+              (TaskCompletionEvent[])eventSublist.toArray(mapEvents);
+          }
+        }
+        return mapEvents;
+      }
+      
+      public void fetchMapCompletionEvents() throws IOException {
+        List <TaskCompletionEvent> recentMapEvents = 
+                              queryJobTracker(fromEventId, jobId, jobClient);
+        synchronized (allMapEvents) {
+          for (TaskCompletionEvent t : recentMapEvents) {
+            TaskCompletionEvent.Status status = t.getTaskStatus();
+            allMapEvents.add(t);
+            
+            if (status == TaskCompletionEvent.Status.SUCCEEDED) {
+              //store the index of the events cache for this success event.
+              indexToEventsCache[t.idWithinJob()] = allMapEvents.size();
+            }
+            else if (status == TaskCompletionEvent.Status.FAILED || 
+                status == TaskCompletionEvent.Status.OBSOLETE) {
+              int idx = indexToEventsCache[t.idWithinJob()];
+              //if this map task was declared a success earlier, we will have
+              //idx > 0
+              if (idx > 0) {
+                //Mark the event as OBSOLETE and reset the index to 0. Note 
+                //we access the 'idx - 1' entry. This is because while storing
+                //the idx in indexToEventsCache, we store the 'actual idx + 1'
+                //Helps us to eliminate the index array elements initialization
+                //to something like '-1'
+                TaskCompletionEvent obsoleteEvent = allMapEvents.get(idx - 1);
+                obsoleteEvent.setTaskStatus(
+                              TaskCompletionEvent.Status.OBSOLETE);
+                indexToEventsCache[t.idWithinJob()] = 0;
+              }
+            }
+          }
+        }
+      }
+    }
+
   // intialize the job directory
   private void localizeJob(TaskInProgress tip) throws IOException {
     Path localJarFile = null;
@@ -460,6 +635,9 @@ public class TaskTracker
         
     // Clear local storage
     this.mapOutputFile.cleanupStorage();
+        
+        // Shutdown the fetcher thread
+        this.mapEventsFetcher.interrupt();
   }
 
   /**
@@ -503,6 +681,35 @@ public class TaskTracker
     return fs;
   }
     
+    /** Queries the job tracker for a set of outputs ready to be copied
+     * @param fromEventId the first event ID we want to start from, this is
+     * modified by the call to this method
+     * @param jobClient the job tracker
+     * @return a set of locations to copy outputs from
+     * @throws IOException
+     */  
+    private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
+        String jobId,
+        InterTrackerProtocol jobClient)
+        throws IOException {
+
+      TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+          jobId,
+          fromEventId.get(),
+          probe_sample_size);
+      //we are interested in map task completion events only. So store
+      //only those
+      List <TaskCompletionEvent> recentMapEvents = 
+                                 new ArrayList<TaskCompletionEvent>();
+      for (int i = 0; i < t.length; i++) {
+        if (t[i].isMap) {
+          recentMapEvents.add(t[i]);
+        }
+      }
+      fromEventId.set(fromEventId.get() + t.length);
+      return recentMapEvents;
+    }
+
   /**
    * Main service loop.  Will stay in this loop forever.
    */
@@ -1373,6 +1580,25 @@ public class TaskTracker
     running = false;
   }
 
+    public TaskCompletionEvent[] getMapCompletionEvents(
+      String jobId, int fromEventId, int maxLocs) throws IOException {
+      
+      TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
+      RunningJob rjob;
+      synchronized (runningJobs) {
+        rjob = runningJobs.get(jobId);          
+        if (rjob != null) {
+          synchronized (rjob) {
+            FetchStatus f = rjob.getFetchStatus();
+            if (f != null) {
+              mapEvents = f.getMapEvents(fromEventId, maxLocs);
+            }
+          }
+        }
+      }
+      return mapEvents;
+    }
+    
   /////////////////////////////////////////////////////
   //  Called by TaskTracker thread after task process ends
   /////////////////////////////////////////////////////
@@ -1418,6 +1644,7 @@ public class TaskTracker
     Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
+      FetchStatus f;
     RunningJob(String jobid, Path jobFile) {
       this.jobid = jobid;
       localized = false;
@@ -1433,6 +1660,14 @@ public class TaskTracker
     String getJobId() {
       return jobid;
     }
+      
+      void setFetchStatus(FetchStatus f) {
+        this.f = f;
+      }
+      
+      FetchStatus getFetchStatus() {
+        return f;
+      }
   }
 
   /** 
@@ -1474,6 +1709,8 @@ public class TaskTracker
         throwable.printStackTrace(new PrintStream(baos));
         umbilical.reportDiagnosticInfo(taskid, baos.toString());
       } finally {
+        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+        metricsContext.close();
         // Shutting down log4j of the child-vm... 
         // This assumes that on return from Task.run() 
         // there is no more logging done.

+ 13 - 1
src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -29,7 +29,8 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  * and parent is via this protocol. */ 
 interface TaskUmbilicalProtocol extends VersionedProtocol {
 
-  public static final long versionID = 1L;
+  /** Changed the version to 2, since we have a new method getMapOutputs */
+  public static final long versionID = 2L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;
@@ -64,4 +65,15 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
   /** Report that the task encounted a local filesystem error.*/
   void fsError(String message) throws IOException;
 
+  /** Called by a reduce task to get the map output locations for finished maps.
+  *
+  * @param taskId the reduce task id
+  * @param fromIndex the index starting from which the locations should be 
+  * fetched
+  * @param maxLocs the max number of locations to fetch
+  * @return an array of TaskCompletionEvent
+  */
+  TaskCompletionEvent[] getMapCompletionEvents(String jobId, 
+      int fromIndex, int maxLocs) throws IOException;
+
 }

+ 2 - 1
src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java

@@ -226,7 +226,8 @@ public abstract class AbstractMetricsContext implements MetricsContext {
    */
   private synchronized void startTimer() {
     if (timer == null) {
-      timer = new Timer();
+      timer = new Timer("Timer thread for monitoring " + getContextName(), 
+                         true);
       TimerTask task = new TimerTask() {
           public void run() {
             try {