浏览代码

HADOOP-3332. Reduces the amount of logging in Reducer's shuffle phase. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@656260 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父节点
当前提交
ee21cec863
共有 2 个文件被更改,包括 61 次插入20 次删除
  1. 3 0
      CHANGES.txt
  2. 58 20
      src/java/org/apache/hadoop/mapred/ReduceTask.java

+ 3 - 0
CHANGES.txt

@@ -120,6 +120,9 @@ Trunk (unreleased changes)
     HADOOP-3334. Move lease handling from FSNamesystem into a seperate class.
     (Tsz Wo (Nicholas), SZE via rangadi)
 
+    HADOOP-3332. Reduces the amount of logging in Reducer's shuffle phase.
+    (Devaraj Das)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 58 - 20
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -32,6 +32,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.LinkedHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -613,6 +614,11 @@ class ReduceTask extends Task {
      * Initial backoff interval (milliseconds)
      */
     private static final int BACKOFF_INIT = 4000; 
+    
+    /**
+     * The interval for logging in the shuffle
+     */
+    private static final int MIN_LOG_TIME = 60000;
 
     /**
      * This class contains the methods that should be used for metrics-reporting
@@ -989,7 +995,7 @@ class ReduceTask extends Task {
       sorter.setProgressable(getReporter(umbilical));
       
       // hosts -> next contact time
-      this.penaltyBox = new Hashtable<String, Long>();
+      this.penaltyBox = new LinkedHashMap<String, Long>();
       
       // hostnames
       this.uniqueHosts = new HashSet<String>();
@@ -1041,16 +1047,25 @@ class ReduceTask extends Task {
       // start the clock for bandwidth measurement
       long startTime = System.currentTimeMillis();
       long currentTime = startTime;
-      long lastProgressTime = System.currentTimeMillis();
+      long lastProgressTime = startTime;
+      long lastOutputTime = 0;
       IntWritable fromEventId = new IntWritable(0);
       
       try {
         // loop until we get all required outputs
         while (!neededOutputs.isEmpty() && mergeThrowable == null) {
           
-          LOG.info(reduceTask.getTaskID() + " Need another " 
+          currentTime = System.currentTimeMillis();
+          boolean logNow = false;
+          if (currentTime - lastOutputTime > MIN_LOG_TIME) {
+            lastOutputTime = currentTime;
+            logNow = true;
+          }
+          if (logNow) {
+            LOG.info(reduceTask.getTaskID() + " Need another " 
                    + neededOutputs.size() + " map output(s) where " 
                    + numInFlight + " is already in progress");
+          }
           
           try {
             // Put the hash entries for the failed fetches. Entries here
@@ -1065,16 +1080,29 @@ class ReduceTask extends Task {
             // used for the next call to getMapCompletionEvents
             int currentNumKnownMaps = knownOutputs.size();
             int currentNumObsoleteMapIds = obsoleteMapIds.size();
-              getMapCompletionEvents(fromEventId, knownOutputs);
+            getMapCompletionEvents(fromEventId, knownOutputs);
 
 
-            LOG.info(reduceTask.getTaskID() + ": " +  
-                "Got " + (knownOutputs.size()-currentNumKnownMaps) + 
-                " new map-outputs & " + 
-                (obsoleteMapIds.size()-currentNumObsoleteMapIds) + 
-                " obsolete map-outputs from tasktracker and " + 
-                retryFetches.size() + " map-outputs from previous failures"
-            );
+            int numNewOutputs = knownOutputs.size()-currentNumKnownMaps;
+            if (numNewOutputs > 0 || logNow) {
+              LOG.info(reduceTask.getTaskID() + ": " +  
+                  "Got " + numNewOutputs + 
+                  " new map-outputs & number of known map outputs is " + 
+                  knownOutputs.size());
+            }
+            
+            int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds;
+            if (numNewObsoleteMaps > 0) {
+              LOG.info(reduceTask.getTaskID() + ": " +  
+                  "Got " + numNewObsoleteMaps + 
+                  " obsolete map-outputs from tasktracker ");  
+            }
+            
+            if (retryFetches.size() > 0) {
+              LOG.info(reduceTask.getTaskID() + ": " +  
+                  "Got " + retryFetches.size() +
+                  " map-outputs from previous failures");
+            }
             // clear the "failed" fetches hashmap
             retryFetches.clear();
           }
@@ -1086,10 +1114,7 @@ class ReduceTask extends Task {
           
           // now walk through the cache and schedule what we can
           int numKnown = knownOutputs.size(), numScheduled = 0;
-          int numSlow = 0, numDups = 0;
-          
-          LOG.info(reduceTask.getTaskID() + " Got " + numKnown + 
-                   " known map output location(s); scheduling...");
+          int numDups = 0;
           
           synchronized (scheduledCopies) {
             // Randomize the map output locations to prevent 
@@ -1098,7 +1123,7 @@ class ReduceTask extends Task {
             
             Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
             
-            currentTime = System.currentTimeMillis();
+            
             while (locIt.hasNext()) {
               
               MapOutputLocation loc = locIt.next();
@@ -1112,8 +1137,12 @@ class ReduceTask extends Task {
               Long penaltyEnd = penaltyBox.get(loc.getHost());
               boolean penalized = false, duplicate = false;
               
-              if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
-                penalized = true; numSlow++;
+              if (penaltyEnd != null) {
+                if (currentTime < penaltyEnd.longValue()) {
+                  penalized = true;
+                } else {
+                  penaltyBox.remove(loc.getHost());
+                }
               }
               if (uniqueHosts.contains(loc.getHost())) {
                 duplicate = true; numDups++;
@@ -1128,9 +1157,18 @@ class ReduceTask extends Task {
             }
             scheduledCopies.notifyAll();
           }
-          LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
-                   " of " + numKnown + " known outputs (" + numSlow +
+          if (numScheduled > 0 || logNow) {
+            LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
+                   " of " + numKnown + " known outputs (" + penaltyBox.size() +
                    " slow hosts and " + numDups + " dup hosts)");
+          }
+          if (penaltyBox.size() > 0 && logNow) {
+            LOG.info("Penalized(slow) Hosts: ");
+            for (String host : penaltyBox.keySet()) {
+              LOG.info(host + " Will be considered after: " + 
+                  ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
+            }
+          }
           
           // Check if a on-disk merge can be done. This will help if there
           // are no copies to be fetched but sufficient copies to be merged.