瀏覽代碼

HADOOP-1148. More re-indentation.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@529763 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
ff1a25733f

+ 6 - 6
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -4043,9 +4043,9 @@ class FSNamesystem implements FSConstants {
    */
   public static class FsckServlet extends HttpServlet {
     @SuppressWarnings("unchecked")
-    public void doGet(HttpServletRequest request,
-                      HttpServletResponse response
-                      ) throws ServletException, IOException {
+      public void doGet(HttpServletRequest request,
+                        HttpServletResponse response
+                        ) throws ServletException, IOException {
       Map<String,String[]> pmap = request.getParameterMap();
       try {
         ServletContext context = getServletContext();
@@ -4071,9 +4071,9 @@ class FSNamesystem implements FSConstants {
    */
   public static class GetImageServlet extends HttpServlet {
     @SuppressWarnings("unchecked")
-    public void doGet(HttpServletRequest request,
-                      HttpServletResponse response
-                      ) throws ServletException, IOException {
+      public void doGet(HttpServletRequest request,
+                        HttpServletResponse response
+                        ) throws ServletException, IOException {
       Map<String,String[]> pmap = request.getParameterMap();
       try {
         ServletContext context = getServletContext();

+ 3 - 3
src/java/org/apache/hadoop/dfs/SecondaryNameNode.java

@@ -426,9 +426,9 @@ public class SecondaryNameNode implements FSConstants, Runnable {
    */
   public static class GetImageServlet extends HttpServlet {
     @SuppressWarnings("unchecked")
-    public void doGet(HttpServletRequest request,
-                      HttpServletResponse response
-                      ) throws ServletException, IOException {
+      public void doGet(HttpServletRequest request,
+                        HttpServletResponse response
+                        ) throws ServletException, IOException {
       Map<String,String[]> pmap = request.getParameterMap();
       try {
         ServletContext context = getServletContext();

+ 1 - 1
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -81,7 +81,7 @@ public class IsolationRunner {
     }
     
     public TaskCompletionEvent[] getMapCompletionEvents(String jobId, 
-        int fromEventId, int maxLocs) throws IOException {
+                                                        int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;
     }
   }

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -229,7 +229,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     }
 
     public TaskCompletionEvent[] getMapCompletionEvents(
-        String jobId, int fromEventId, int maxLocs) throws IOException {
+                                                        String jobId, int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;
     }
     

+ 90 - 90
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -549,23 +549,23 @@ class ReduceTask extends Task {
     private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
       //spawn a thread to give copy progress heartbeats
       Thread copyProgress = new Thread() {
-        public void run() {
-          LOG.debug("Started thread: " + getName());
-          while (true) {
-            try {
-              reportProgress(umbilical);
-              Thread.sleep(PROGRESS_INTERVAL);
-            } catch (InterruptedException e) {
-              return;
-            } catch (Throwable e) {
-              LOG.info("Thread Exception in " +
-                       "reporting copy progress\n" +
-                       StringUtils.stringifyException(e));
-              continue;
+          public void run() {
+            LOG.debug("Started thread: " + getName());
+            while (true) {
+              try {
+                reportProgress(umbilical);
+                Thread.sleep(PROGRESS_INTERVAL);
+              } catch (InterruptedException e) {
+                return;
+              } catch (Throwable e) {
+                LOG.info("Thread Exception in " +
+                         "reporting copy progress\n" +
+                         StringUtils.stringifyException(e));
+                continue;
+              }
             }
           }
-        }
-      };
+        };
       copyProgress.setName("Copy progress reporter for task "+getTaskId());
       copyProgress.setDaemon(true);
       return copyProgress;
@@ -640,7 +640,7 @@ class ReduceTask extends Task {
               size = copyOutput(loc);
             } catch (IOException e) {
               LOG.warn(reduceTask.getTaskId() + " copy failed: " +
-                  loc.getMapTaskId() + " from " + loc.getHost());
+                       loc.getMapTaskId() + " from " + loc.getHost());
               LOG.warn(StringUtils.stringifyException(e));
             } finally {
               finish(size);
@@ -649,7 +649,7 @@ class ReduceTask extends Task {
             return; // ALL DONE
           } catch (Throwable th) {
             LOG.error("Map output copy failure: " + 
-                StringUtils.stringifyException(th));
+                      StringUtils.stringifyException(th));
           }
         }
       }
@@ -661,22 +661,22 @@ class ReduceTask extends Task {
        * @throws InterruptedException if the copier should give up
        */
       private long copyOutput(MapOutputLocation loc
-      ) throws IOException, InterruptedException {
+                              ) throws IOException, InterruptedException {
         if (!neededOutputs.contains(loc.getMapId())) {
           return CopyResult.OBSOLETE;
         }
         String reduceId = reduceTask.getTaskId();
         LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
-            " output from " + loc.getHost() + ".");
+                 " output from " + loc.getHost() + ".");
         // the place where the file should end up
         Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
-            loc.getMapId() + ".out");
+                                               loc.getMapId() + ".out");
         // a working filename that will be unique to this attempt
         Path tmpFilename = new Path(finalFilename + "-" + id);
         // this copies the map output file
         tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
-            tmpFilename, reduceTask.getPartition(),
-            STALLED_COPY_TIMEOUT);
+                                  tmpFilename, reduceTask.getPartition(),
+                                  STALLED_COPY_TIMEOUT);
         if (!neededOutputs.contains(loc.getMapId())) {
           if (tmpFilename != null) {
             FileSystem fs = tmpFilename.getFileSystem(conf);
@@ -686,7 +686,7 @@ class ReduceTask extends Task {
         }
         if (tmpFilename == null)
           throw new IOException("File " + finalFilename + "-" + id + 
-          " not created");
+                                " not created");
         long bytes = -1;
         // lock the ReduceTask while we do the rename
         synchronized (ReduceTask.this) {
@@ -702,25 +702,25 @@ class ReduceTask extends Task {
           if (!fs.rename(tmpFilename, finalFilename)) {
             fs.delete(tmpFilename);
             throw new IOException("failure to rename map output " + 
-                tmpFilename);
+                                  tmpFilename);
           }
           bytes = fs.getLength(finalFilename);
           LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
-              " output from " + loc.getHost() + ".");
+                   " output from " + loc.getHost() + ".");
           //Create a thread to do merges. Synchronize access/update to 
           //mergeInProgress
           if (!mergeInProgress && 
               (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || 
-                  (mergeThreshold > 0 && 
-                      inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= 
-                        mergeThreshold))&&
-                      mergeThrowable == null) {
+               (mergeThreshold > 0 && 
+                inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= 
+                mergeThreshold))&&
+              mergeThrowable == null) {
             LOG.info(reduceId + " InMemoryFileSystem " + 
-                inMemFileSys.getUri().toString() +
-                " is " + inMemFileSys.getPercentUsed() + 
-            " full. Triggering merge");
+                     inMemFileSys.getUri().toString() +
+                     " is " + inMemFileSys.getPercentUsed() + 
+                     " full. Triggering merge");
             InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
-                (LocalFileSystem)localFileSys, sorter);
+                                                          (LocalFileSystem)localFileSys, sorter);
             m.setName("Thread for merging in memory files");
             m.setDaemon(true);
             mergeInProgress = true;
@@ -734,7 +734,7 @@ class ReduceTask extends Task {
     }
     
     private void configureClasspath(JobConf conf)
-    throws IOException {
+      throws IOException {
       
       // get the task and the current classloader which will become the parent
       Task task = ReduceTask.this;
@@ -769,7 +769,7 @@ class ReduceTask extends Task {
     }
     
     public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
-    throws IOException {
+      throws IOException {
       
       configureClasspath(conf);
       this.umbilical = umbilical;      
@@ -785,12 +785,12 @@ class ReduceTask extends Task {
       URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
       inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
       LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
-          + uri);
+               + uri);
       localFileSys = FileSystem.getLocal(conf);
       //create an instance of the sorter
       sorter =
         new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
-            conf.getMapOutputValueClass(), conf);
+                                conf.getMapOutputValueClass(), conf);
       
       // hosts -> next contact time
       this.penaltyBox = new Hashtable();
@@ -846,11 +846,11 @@ class ReduceTask extends Task {
         while (numCopied < numOutputs && mergeThrowable == null) {
           
           LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
-          " map output(s)");
+                   " map output(s)");
           
           if (!neededOutputs.isEmpty()) {
             LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
-            " map output location(s)");
+                     " map output location(s)");
             try {
               // Put the hash entries for the failed fetches. Entries here
               // might be replaced by (mapId) hashkeys from new successful 
@@ -866,19 +866,19 @@ class ReduceTask extends Task {
               // put discovered them on the known list
               for (int i=0; i < locs.size(); i++) {
                 knownOutputs.put(new Integer(locs.get(i).getMapId()), 
-                    locs.get(i));
+                                 locs.get(i));
               }
               LOG.info(reduceTask.getTaskId() +
-                 " Got " + locs.size() + 
-                 " new map outputs from tasktracker and " + retryFetches.size()
-                 + " map outputs from previous failures");
+                       " Got " + locs.size() + 
+                       " new map outputs from tasktracker and " + retryFetches.size()
+                       + " map outputs from previous failures");
               // clear the "failed" fetches hashmap
               retryFetches.clear();
             }
             catch (IOException ie) {
               LOG.warn(reduceTask.getTaskId() +
-                  " Problem locating map outputs: " +
-                  StringUtils.stringifyException(ie));
+                       " Problem locating map outputs: " +
+                       StringUtils.stringifyException(ie));
             }
           }
           
@@ -887,7 +887,7 @@ class ReduceTask extends Task {
           int numSlow = 0, numDups = 0;
           
           LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
-          " known map output location(s); scheduling...");
+                   " known map output location(s); scheduling...");
           
           synchronized (scheduledCopies) {
             Iterator locIt = knownOutputs.values().iterator();
@@ -916,8 +916,8 @@ class ReduceTask extends Task {
             scheduledCopies.notifyAll();
           }
           LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
-              " of " + numKnown + " known outputs (" + numSlow +
-              " slow hosts and " + numDups + " dup hosts)");
+                   " of " + numKnown + " known outputs (" + numSlow +
+                   " slow hosts and " + numDups + " dup hosts)");
           
           // if we have no copies in flight and we can't schedule anything
           // new, just wait for a bit
@@ -929,7 +929,7 @@ class ReduceTask extends Task {
           
           while (numInFlight > 0 && mergeThrowable == null) {
             LOG.debug(reduceTask.getTaskId() + " numInFlight = " + 
-                numInFlight);
+                      numInFlight);
             CopyResult cr = getCopyResult();
             
             if (cr != null) {
@@ -944,25 +944,25 @@ class ReduceTask extends Task {
                 
                 copyPhase.startNextPhase();
                 copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
-                    + " at " +
-                    mbpsFormat.format(transferRate) +  " MB/s)");          
+                                    + " at " +
+                                    mbpsFormat.format(transferRate) +  " MB/s)");          
               } else if (cr.isObsolete()) {
                 //ignore
                 LOG.info(reduceTask.getTaskId() + 
-                    " Ignoring obsolete copy result for Map Task: " + 
-                    cr.getLocation().getMapTaskId() + " from host: " + 
-                    cr.getHost());
+                         " Ignoring obsolete copy result for Map Task: " + 
+                         cr.getLocation().getMapTaskId() + " from host: " + 
+                         cr.getHost());
               } else {
                 retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
                 
                 // wait a random amount of time for next contact
                 currentTime = System.currentTimeMillis();
                 long nextContact = currentTime + 60 * 1000 +
-                backoff.nextInt(maxBackoff*1000);
+                  backoff.nextInt(maxBackoff*1000);
                 penaltyBox.put(cr.getHost(), new Long(nextContact));          
                 LOG.warn(reduceTask.getTaskId() + " adding host " +
-                    cr.getHost() + " to penalty box, next contact in " +
-                    ((nextContact-currentTime)/1000) + " seconds");
+                         cr.getHost() + " to penalty box, next contact in " +
+                         ((nextContact-currentTime)/1000) + " seconds");
                 
                 // other outputs from the failed host may be present in the
                 // knownOutputs cache, purge them. This is important in case
@@ -985,7 +985,7 @@ class ReduceTask extends Task {
             boolean busy = true;
             // ensure we have enough to keep us busy
             if (numInFlight < lowThreshold && (numOutputs-numCopied) > 
-              probe_sample_size) {
+                probe_sample_size) {
               busy = false;
             }
             //Check whether we have more CopyResult to check. If there is none,
@@ -1017,9 +1017,9 @@ class ReduceTask extends Task {
               Thread.sleep(200);
             }
             LOG.info(reduceTask.getTaskId() + 
-                " Copying of all map outputs complete. " + 
-                "Initiating the last merge on the remaining files in " + 
-                inMemFileSys.getUri());
+                     " Copying of all map outputs complete. " + 
+                     "Initiating the last merge on the remaining files in " + 
+                     inMemFileSys.getUri());
             if (mergeThrowable != null) {
               //this could happen if the merge that
               //was in progress threw an exception
@@ -1029,7 +1029,7 @@ class ReduceTask extends Task {
             Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
             if (inMemClosedFiles.length == 0) {
               LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
-                  inMemFileSys.getUri());
+                       inMemFileSys.getUri());
               return numCopied == numOutputs;
             }
             //name this output file same as the name of the first file that is 
@@ -1040,14 +1040,14 @@ class ReduceTask extends Task {
             //is called (we delete empty sequence files as soon as we see them
             //in the merge method)
             SequenceFile.Writer writer = sorter.cloneFileAttributes(
-                inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                localFileSys.makeQualified(inMemClosedFiles[0]), null);
+                                                                    inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+                                                                    localFileSys.makeQualified(inMemClosedFiles[0]), null);
             
             SequenceFile.Sorter.RawKeyValueIterator rIter = null;
             try {
               rIter = sorter.merge(inMemClosedFiles, true, 
-                  inMemClosedFiles.length, 
-                  new Path(reduceTask.getTaskId()));
+                                   inMemClosedFiles.length, 
+                                   new Path(reduceTask.getTaskId()));
             } catch (Exception e) { 
               //make sure that we delete the ondisk file that we created earlier
               //when we invoked cloneFileAttributes
@@ -1058,13 +1058,13 @@ class ReduceTask extends Task {
             sorter.writeFile(rIter, writer);
             writer.close();
             LOG.info(reduceTask.getTaskId() +
-                " Merge of the " +inMemClosedFiles.length +
-                " files in InMemoryFileSystem complete." +
-                " Local file is " + inMemClosedFiles[0]);
+                     " Merge of the " +inMemClosedFiles.length +
+                     " files in InMemoryFileSystem complete." +
+                     " Local file is " + inMemClosedFiles[0]);
           } catch (Throwable t) {
             LOG.warn(reduceTask.getTaskId() +
-                " Final merge of the inmemory files threw an exception: " + 
-                StringUtils.stringifyException(t));
+                     " Final merge of the inmemory files threw an exception: " + 
+                     StringUtils.stringifyException(t));
             return false;
           }
         }
@@ -1099,7 +1099,7 @@ class ReduceTask extends Task {
      * @throws IOException
      */  
     private List <MapOutputLocation> getSuccessMapEvents(IntWritable fromEventId)
-        throws IOException {
+      throws IOException {
       
       long currentTime = System.currentTimeMillis();    
       long pollTime = lastPollTime + MIN_POLL_INTERVAL;
@@ -1112,9 +1112,9 @@ class ReduceTask extends Task {
       lastPollTime = currentTime;
       
       TaskCompletionEvent t[] = umbilical.getMapCompletionEvents(
-          reduceTask.getJobId().toString(),
-          fromEventId.get(),
-          probe_sample_size);
+                                                                 reduceTask.getJobId().toString(),
+                                                                 fromEventId.get(),
+                                                                 probe_sample_size);
       
       List <MapOutputLocation> mapOutputsList = 
         new ArrayList<MapOutputLocation>();
@@ -1140,7 +1140,7 @@ class ReduceTask extends Task {
       private SequenceFile.Sorter sorter;
       
       public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
-          LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
+                                LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
         this.inMemFileSys = inMemFileSys;
         this.localFileSys = localFileSys;
         this.sorter = sorter;
@@ -1154,7 +1154,7 @@ class ReduceTask extends Task {
           //in flight. So we make sure that we have some 'closed' map
           //output files to merge to get the benefit of in-memory merge
           if (inMemClosedFiles.length >= 
-            (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+              (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
             //name this output file same as the name of the first file that is 
             //there in the current list of inmem files (this is guaranteed to
             //be absent on the disk currently. So we don't overwrite a prev. 
@@ -1163,12 +1163,12 @@ class ReduceTask extends Task {
             //is called (we delete empty sequence files as soon as we see them
             //in the merge method)
             SequenceFile.Writer writer = sorter.cloneFileAttributes(
-                inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                localFileSys.makeQualified(inMemClosedFiles[0]), null);
+                                                                    inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+                                                                    localFileSys.makeQualified(inMemClosedFiles[0]), null);
             SequenceFile.Sorter.RawKeyValueIterator rIter;
             try {
               rIter = sorter.merge(inMemClosedFiles, true, 
-                  inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
+                                   inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
             } catch (Exception e) { 
               //make sure that we delete the ondisk file that we created 
               //earlier when we invoked cloneFileAttributes
@@ -1179,18 +1179,18 @@ class ReduceTask extends Task {
             sorter.writeFile(rIter, writer);
             writer.close();
             LOG.info(reduceTask.getTaskId() + 
-                " Merge of the " +inMemClosedFiles.length +
-                " files in InMemoryFileSystem complete." +
-                " Local file is " + inMemClosedFiles[0]);
+                     " Merge of the " +inMemClosedFiles.length +
+                     " files in InMemoryFileSystem complete." +
+                     " Local file is " + inMemClosedFiles[0]);
           }
           else {
             LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 
-                inMemFileSys.getUri());
+                     inMemFileSys.getUri());
           }
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskId() +
-              " Intermediate Merge of the inmemory files threw an exception: "
-              + StringUtils.stringifyException(t));
+                   " Intermediate Merge of the inmemory files threw an exception: "
+                   + StringUtils.stringifyException(t));
           ReduceCopier.this.mergeThrowable = t;
         }
         finally {
@@ -1199,9 +1199,9 @@ class ReduceTask extends Task {
       }
     }
     final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
-      public boolean accept(Path file) {
-        return file.toString().endsWith(".out");
-      }     
-    };
+        public boolean accept(Path file) {
+          return file.toString().endsWith(".out");
+        }     
+      };
   }
 }

+ 189 - 189
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -147,15 +147,15 @@ public class TaskTracker
   private int maxCurrentTasks;
   private int failures;
   private int finishedCount[] = new int[1];
-    private MapEventsFetcherThread mapEventsFetcher;
-    /**
-     * the minimum interval between jobtracker polls
-     */
-    private static final long MIN_POLL_INTERVAL = 5000;
-    /**
-     * Number of maptask completion events locations to poll for at one time
-     */  
-    private int probe_sample_size = 50;
+  private MapEventsFetcherThread mapEventsFetcher;
+  /**
+   * the minimum interval between jobtracker polls
+   */
+  private static final long MIN_POLL_INTERVAL = 5000;
+  /**
+   * Number of maptask completion events locations to poll for at one time
+   */  
+  private int probe_sample_size = 50;
     
   private class TaskTrackerMetrics {
     private MetricsRecord metricsRecord = null;
@@ -240,7 +240,7 @@ public class TaskTracker
       synchronized (rJob) {
         rJob.tasks.add(tip);
       }
-        runningJobs.notify(); //notify the fetcher thread
+      runningJobs.notify(); //notify the fetcher thread
       return rJob;
     }
   }
@@ -306,9 +306,9 @@ public class TaskTracker
         
     this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
     this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
-        int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
-        //tweak the probe sample size (make it a function of numCopiers)
-        probe_sample_size = Math.max(numCopiers*5, 50);
+    int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
+    //tweak the probe sample size (make it a function of numCopiers)
+    probe_sample_size = Math.max(numCopiers*5, 50);
         
         
     this.myMetrics = new TaskTrackerMetrics();
@@ -350,165 +350,165 @@ public class TaskTracker
                        jobTrackAddr, this.fConf);
         
     this.running = true;
-        // start the thread that will fetch map task completion events
-        this.mapEventsFetcher = new MapEventsFetcherThread();
-        mapEventsFetcher.setDaemon(true);
-        mapEventsFetcher.setName(
-            "Map-events fetcher for all reduce tasks " + "on " + 
-            taskTrackerName);
-        mapEventsFetcher.start();
+    // start the thread that will fetch map task completion events
+    this.mapEventsFetcher = new MapEventsFetcherThread();
+    mapEventsFetcher.setDaemon(true);
+    mapEventsFetcher.setName(
+                             "Map-events fetcher for all reduce tasks " + "on " + 
+                             taskTrackerName);
+    mapEventsFetcher.start();
   }
     
-    private class MapEventsFetcherThread extends Thread {
-
-      private List <FetchStatus> reducesInShuffle() {
-        List <FetchStatus> fList = new ArrayList<FetchStatus>();
-        for (Map.Entry <String, RunningJob> item : runningJobs.entrySet()) {
-          RunningJob rjob = item.getValue();
-          String jobId = item.getKey();
-          FetchStatus f;
-          synchronized (rjob) {
-            f = rjob.getFetchStatus();
-            for (TaskInProgress tip : rjob.tasks) {
-              Task task = tip.getTask();
-              if (!task.isMapTask()) {
-                if (((ReduceTask)task).getPhase() == 
+  private class MapEventsFetcherThread extends Thread {
+
+    private List <FetchStatus> reducesInShuffle() {
+      List <FetchStatus> fList = new ArrayList<FetchStatus>();
+      for (Map.Entry <String, RunningJob> item : runningJobs.entrySet()) {
+        RunningJob rjob = item.getValue();
+        String jobId = item.getKey();
+        FetchStatus f;
+        synchronized (rjob) {
+          f = rjob.getFetchStatus();
+          for (TaskInProgress tip : rjob.tasks) {
+            Task task = tip.getTask();
+            if (!task.isMapTask()) {
+              if (((ReduceTask)task).getPhase() == 
                   TaskStatus.Phase.SHUFFLE) {
-                  if (rjob.getFetchStatus() == null) {
-                    //this is a new job; we start fetching its map events
-                    f = new FetchStatus(jobId, 
-                        ((ReduceTask)task).getNumMaps());
-                    rjob.setFetchStatus(f);
-                  }
-                  f = rjob.getFetchStatus();
-                  fList.add(f);
-                  break; //no need to check any more tasks belonging to this
+                if (rjob.getFetchStatus() == null) {
+                  //this is a new job; we start fetching its map events
+                  f = new FetchStatus(jobId, 
+                                      ((ReduceTask)task).getNumMaps());
+                  rjob.setFetchStatus(f);
                 }
+                f = rjob.getFetchStatus();
+                fList.add(f);
+                break; //no need to check any more tasks belonging to this
               }
             }
           }
         }
-        //at this point, we have information about for which of
-        //the running jobs do we need to query the jobtracker for map 
-        //outputs (actually map events).
-        return fList;
       }
+      //at this point, we have information about for which of
+      //the running jobs do we need to query the jobtracker for map 
+      //outputs (actually map events).
+      return fList;
+    }
       
-      public void run() {
-        LOG.info("Starting thread: " + getName());
+    public void run() {
+      LOG.info("Starting thread: " + getName());
         
-        while (true) {
-          try {
-            List <FetchStatus> fList = null;
-            synchronized (runningJobs) {
-              while (((fList = reducesInShuffle()).size()) == 0) {
-                try {
-                  runningJobs.wait();
-                } catch (InterruptedException e) {
-                  LOG.info("Shutting down: " + getName());
-                  return;
-                }
+      while (true) {
+        try {
+          List <FetchStatus> fList = null;
+          synchronized (runningJobs) {
+            while (((fList = reducesInShuffle()).size()) == 0) {
+              try {
+                runningJobs.wait();
+              } catch (InterruptedException e) {
+                LOG.info("Shutting down: " + getName());
+                return;
               }
             }
-            // now fetch all the map task events for all the reduce tasks
-            // possibly belonging to different jobs
-            for (FetchStatus f : fList) {
-              try {
+          }
+          // now fetch all the map task events for all the reduce tasks
+          // possibly belonging to different jobs
+          for (FetchStatus f : fList) {
+            try {
                 
-                f.fetchMapCompletionEvents();
+              f.fetchMapCompletionEvents();
                 
-                try {
-                  Thread.sleep(MIN_POLL_INTERVAL);
-                } catch (InterruptedException ie) {
-                  LOG.info("Shutting down: " + getName());
-                  return;
-                }
-              } catch (Exception e) {
-                LOG.warn(
-                    "Ignoring exception that fetch for map completion" +
-                    " events threw for " + f.jobId + " threw: " +
-                    StringUtils.stringifyException(e)); 
+              try {
+                Thread.sleep(MIN_POLL_INTERVAL);
+              } catch (InterruptedException ie) {
+                LOG.info("Shutting down: " + getName());
+                return;
               }
+            } catch (Exception e) {
+              LOG.warn(
+                       "Ignoring exception that fetch for map completion" +
+                       " events threw for " + f.jobId + " threw: " +
+                       StringUtils.stringifyException(e)); 
             }
-          } catch (Exception e) {
-            LOG.info("Ignoring exception "  + e.getMessage());
           }
+        } catch (Exception e) {
+          LOG.info("Ignoring exception "  + e.getMessage());
         }
-      } 
-    }
-
-    private class FetchStatus {
-      /** The next event ID that we will start querying the JobTracker from*/
-      private IntWritable fromEventId;
-      /** This is the cache of map events for a given job */ 
-      private List<TaskCompletionEvent> allMapEvents;
-      /** This array will store indexes to "SUCCEEDED" map events from
-       * allMapEvents. The array is indexed by the mapId. 
-       * The reason why we store the indexes is to quickly reset SUCCEEDED 
-       * events to OBSOLETE. Thus ReduceTasks might also get to know about 
-       * OBSOLETE events and avoid fetching map outputs from the corresponding 
-       * locations.
-       */ 
-      private int indexToEventsCache[];
-      /** What jobid this fetchstatus object is for*/
-      private String jobId;
-     
-      public FetchStatus(String jobId, int numMaps) {
-        this.fromEventId = new IntWritable(0);
-        this.jobId = jobId;
-        this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
-        this.indexToEventsCache = new int[numMaps];
       }
+    } 
+  }
+
+  private class FetchStatus {
+    /** The next event ID that we will start querying the JobTracker from*/
+    private IntWritable fromEventId;
+    /** This is the cache of map events for a given job */ 
+    private List<TaskCompletionEvent> allMapEvents;
+    /** This array will store indexes to "SUCCEEDED" map events from
+     * allMapEvents. The array is indexed by the mapId. 
+     * The reason why we store the indexes is to quickly reset SUCCEEDED 
+     * events to OBSOLETE. Thus ReduceTasks might also get to know about 
+     * OBSOLETE events and avoid fetching map outputs from the corresponding 
+     * locations.
+     */ 
+    private int indexToEventsCache[];
+    /** What jobid this fetchstatus object is for*/
+    private String jobId;
+     
+    public FetchStatus(String jobId, int numMaps) {
+      this.fromEventId = new IntWritable(0);
+      this.jobId = jobId;
+      this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
+      this.indexToEventsCache = new int[numMaps];
+    }
       
-      public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
+    public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
         
-        TaskCompletionEvent[] mapEvents = 
-                              TaskCompletionEvent.EMPTY_ARRAY;
-        synchronized (allMapEvents) {
-          if (allMapEvents.size() > fromId) {
-            int actualMax = Math.min(max, (allMapEvents.size() - fromId));
-            List <TaskCompletionEvent> eventSublist = 
-              allMapEvents.subList(fromId, actualMax + fromId);
-            mapEvents = 
-              (TaskCompletionEvent[])eventSublist.toArray(mapEvents);
-          }
+      TaskCompletionEvent[] mapEvents = 
+        TaskCompletionEvent.EMPTY_ARRAY;
+      synchronized (allMapEvents) {
+        if (allMapEvents.size() > fromId) {
+          int actualMax = Math.min(max, (allMapEvents.size() - fromId));
+          List <TaskCompletionEvent> eventSublist = 
+            allMapEvents.subList(fromId, actualMax + fromId);
+          mapEvents = 
+            (TaskCompletionEvent[])eventSublist.toArray(mapEvents);
         }
-        return mapEvents;
       }
+      return mapEvents;
+    }
       
-      public void fetchMapCompletionEvents() throws IOException {
-        List <TaskCompletionEvent> recentMapEvents = 
-                              queryJobTracker(fromEventId, jobId, jobClient);
-        synchronized (allMapEvents) {
-          for (TaskCompletionEvent t : recentMapEvents) {
-            TaskCompletionEvent.Status status = t.getTaskStatus();
-            allMapEvents.add(t);
+    public void fetchMapCompletionEvents() throws IOException {
+      List <TaskCompletionEvent> recentMapEvents = 
+        queryJobTracker(fromEventId, jobId, jobClient);
+      synchronized (allMapEvents) {
+        for (TaskCompletionEvent t : recentMapEvents) {
+          TaskCompletionEvent.Status status = t.getTaskStatus();
+          allMapEvents.add(t);
             
-            if (status == TaskCompletionEvent.Status.SUCCEEDED) {
-              //store the index of the events cache for this success event.
-              indexToEventsCache[t.idWithinJob()] = allMapEvents.size();
-            }
-            else if (status == TaskCompletionEvent.Status.FAILED || 
-                status == TaskCompletionEvent.Status.OBSOLETE) {
-              int idx = indexToEventsCache[t.idWithinJob()];
-              //if this map task was declared a success earlier, we will have
-              //idx > 0
-              if (idx > 0) {
-                //Mark the event as OBSOLETE and reset the index to 0. Note 
-                //we access the 'idx - 1' entry. This is because while storing
-                //the idx in indexToEventsCache, we store the 'actual idx + 1'
-                //Helps us to eliminate the index array elements initialization
-                //to something like '-1'
-                TaskCompletionEvent obsoleteEvent = allMapEvents.get(idx - 1);
-                obsoleteEvent.setTaskStatus(
-                              TaskCompletionEvent.Status.OBSOLETE);
-                indexToEventsCache[t.idWithinJob()] = 0;
-              }
+          if (status == TaskCompletionEvent.Status.SUCCEEDED) {
+            //store the index of the events cache for this success event.
+            indexToEventsCache[t.idWithinJob()] = allMapEvents.size();
+          }
+          else if (status == TaskCompletionEvent.Status.FAILED || 
+                   status == TaskCompletionEvent.Status.OBSOLETE) {
+            int idx = indexToEventsCache[t.idWithinJob()];
+            //if this map task was declared a success earlier, we will have
+            //idx > 0
+            if (idx > 0) {
+              //Mark the event as OBSOLETE and reset the index to 0. Note 
+              //we access the 'idx - 1' entry. This is because while storing
+              //the idx in indexToEventsCache, we store the 'actual idx + 1'
+              //Helps us to eliminate the index array elements initialization
+              //to something like '-1'
+              TaskCompletionEvent obsoleteEvent = allMapEvents.get(idx - 1);
+              obsoleteEvent.setTaskStatus(
+                                          TaskCompletionEvent.Status.OBSOLETE);
+              indexToEventsCache[t.idWithinJob()] = 0;
             }
           }
         }
       }
     }
+  }
 
   // intialize the job directory
   private void localizeJob(TaskInProgress tip) throws IOException {
@@ -636,8 +636,8 @@ public class TaskTracker
     // Clear local storage
     this.mapOutputFile.cleanupStorage();
         
-        // Shutdown the fetcher thread
-        this.mapEventsFetcher.interrupt();
+    // Shutdown the fetcher thread
+    this.mapEventsFetcher.interrupt();
   }
 
   /**
@@ -681,34 +681,34 @@ public class TaskTracker
     return fs;
   }
     
-    /** Queries the job tracker for a set of outputs ready to be copied
-     * @param fromEventId the first event ID we want to start from, this is
-     * modified by the call to this method
-     * @param jobClient the job tracker
-     * @return a set of locations to copy outputs from
-     * @throws IOException
-     */  
-    private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
-        String jobId,
-        InterTrackerProtocol jobClient)
-        throws IOException {
-
-      TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
-          jobId,
-          fromEventId.get(),
-          probe_sample_size);
-      //we are interested in map task completion events only. So store
-      //only those
-      List <TaskCompletionEvent> recentMapEvents = 
-                                 new ArrayList<TaskCompletionEvent>();
-      for (int i = 0; i < t.length; i++) {
-        if (t[i].isMap) {
-          recentMapEvents.add(t[i]);
-        }
+  /** Queries the job tracker for a set of outputs ready to be copied
+   * @param fromEventId the first event ID we want to start from, this is
+   * modified by the call to this method
+   * @param jobClient the job tracker
+   * @return a set of locations to copy outputs from
+   * @throws IOException
+   */  
+  private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
+                                                    String jobId,
+                                                    InterTrackerProtocol jobClient)
+    throws IOException {
+
+    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+                                                                jobId,
+                                                                fromEventId.get(),
+                                                                probe_sample_size);
+    //we are interested in map task completion events only. So store
+    //only those
+    List <TaskCompletionEvent> recentMapEvents = 
+      new ArrayList<TaskCompletionEvent>();
+    for (int i = 0; i < t.length; i++) {
+      if (t[i].isMap) {
+        recentMapEvents.add(t[i]);
       }
-      fromEventId.set(fromEventId.get() + t.length);
-      return recentMapEvents;
     }
+    fromEventId.set(fromEventId.get() + t.length);
+    return recentMapEvents;
+  }
 
   /**
    * Main service loop.  Will stay in this loop forever.
@@ -1580,24 +1580,24 @@ public class TaskTracker
     running = false;
   }
 
-    public TaskCompletionEvent[] getMapCompletionEvents(
-      String jobId, int fromEventId, int maxLocs) throws IOException {
+  public TaskCompletionEvent[] getMapCompletionEvents(
+                                                      String jobId, int fromEventId, int maxLocs) throws IOException {
       
-      TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
-      RunningJob rjob;
-      synchronized (runningJobs) {
-        rjob = runningJobs.get(jobId);          
-        if (rjob != null) {
-          synchronized (rjob) {
-            FetchStatus f = rjob.getFetchStatus();
-            if (f != null) {
-              mapEvents = f.getMapEvents(fromEventId, maxLocs);
-            }
+    TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
+    RunningJob rjob;
+    synchronized (runningJobs) {
+      rjob = runningJobs.get(jobId);          
+      if (rjob != null) {
+        synchronized (rjob) {
+          FetchStatus f = rjob.getFetchStatus();
+          if (f != null) {
+            mapEvents = f.getMapEvents(fromEventId, maxLocs);
           }
         }
       }
-      return mapEvents;
     }
+    return mapEvents;
+  }
     
   /////////////////////////////////////////////////////
   //  Called by TaskTracker thread after task process ends
@@ -1644,7 +1644,7 @@ public class TaskTracker
     Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
-      FetchStatus f;
+    FetchStatus f;
     RunningJob(String jobid, Path jobFile) {
       this.jobid = jobid;
       localized = false;
@@ -1661,13 +1661,13 @@ public class TaskTracker
       return jobid;
     }
       
-      void setFetchStatus(FetchStatus f) {
-        this.f = f;
-      }
+    void setFetchStatus(FetchStatus f) {
+      this.f = f;
+    }
       
-      FetchStatus getFetchStatus() {
-        return f;
-      }
+    FetchStatus getFetchStatus() {
+      return f;
+    }
   }
 
   /** 

+ 8 - 8
src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -66,14 +66,14 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
   void fsError(String message) throws IOException;
 
   /** Called by a reduce task to get the map output locations for finished maps.
-  *
-  * @param taskId the reduce task id
-  * @param fromIndex the index starting from which the locations should be 
-  * fetched
-  * @param maxLocs the max number of locations to fetch
-  * @return an array of TaskCompletionEvent
-  */
+   *
+   * @param taskId the reduce task id
+   * @param fromIndex the index starting from which the locations should be 
+   * fetched
+   * @param maxLocs the max number of locations to fetch
+   * @return an array of TaskCompletionEvent
+   */
   TaskCompletionEvent[] getMapCompletionEvents(String jobId, 
-      int fromIndex, int maxLocs) throws IOException;
+                                               int fromIndex, int maxLocs) throws IOException;
 
 }

+ 1 - 1
src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java

@@ -227,7 +227,7 @@ public abstract class AbstractMetricsContext implements MetricsContext {
   private synchronized void startTimer() {
     if (timer == null) {
       timer = new Timer("Timer thread for monitoring " + getContextName(), 
-                         true);
+                        true);
       TimerTask task = new TimerTask() {
           public void run() {
             try {