Browse Source

HADOOP-1485. Add metrics for monitoring shuffle. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@551245 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 năm trước cách đây
mục cha
commit
1ac5ebefb5

+ 3 - 0
CHANGES.txt

@@ -264,6 +264,9 @@ Trunk (unreleased changes)
  80. HADOOP-1028.  Add log messages for server startup and shutdown.
      (Tsz Wo Sze via cutting)
 
+ 81. HADOOP-1485.  Add metrics for monitoring shuffle.
+     (Devaraj Das via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

+ 3 - 4
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.mapred.ReduceTask.ReduceCopier.ShuffleClientMetrics;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.conf.*;
 
@@ -187,7 +187,7 @@ class MapOutputLocation implements Writable, MRConstants {
    */
   public Path getFile(InMemoryFileSystem inMemFileSys,
                       FileSystem localFileSys,
-                      MetricsRecord shuffleMetrics,
+                      ShuffleClientMetrics shuffleMetrics,
                       Path localFilename, 
                       LocalDirAllocator lDirAlloc,
                       Configuration conf, int reduce,
@@ -240,8 +240,7 @@ class MapOutputLocation implements Writable, MRConstants {
           int len = input.read(buffer);
           while (len > 0) {
             totalBytes += len;
-            shuffleMetrics.incrMetric("shuffle_input_bytes", len);
-            shuffleMetrics.update();
+            shuffleMetrics.inputBytes(len);
             output.write(buffer, 0 , len);
             if (currentThread.isInterrupted()) {
               throw new InterruptedException();

+ 69 - 9
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -342,7 +343,7 @@ class ReduceTask extends Task {
     done(umbilical);
   }
 
-  private class ReduceCopier implements MRConstants {
+  class ReduceCopier implements MRConstants {
 
     /** Reference to the umbilical object */
     private TaskUmbilicalProtocol umbilical;
@@ -431,9 +432,9 @@ class ReduceTask extends Task {
     private MapOutputCopier[] copiers = null;
     
     /**
-     * The threads for fetching the files.
+     * The object for metrics reporting.
      */
-    private MetricsRecord shuffleMetrics = null;
+    private ShuffleClientMetrics shuffleClientMetrics = null;
     
     /**
      * the minimum interval between tasktracker polls
@@ -464,6 +465,65 @@ class ReduceTask extends Task {
      */
     private long ramfsMergeOutputSize;
     
+    /**
+     * This class contains the methods that should be used for metrics-reporting
+     * the specific metrics for shuffle. This class actually reports the
+     * metrics for the shuffle client (the ReduceTask), and hence the name
+     * ShuffleClientMetrics.
+     */
+    class ShuffleClientMetrics implements Updater {
+      private MetricsRecord shuffleMetrics = null;
+      private int numFailedFetches = 0;
+      private int numSuccessFetches = 0;
+      private long numBytes = 0;
+      private int numThreadsBusy = 0;
+      ShuffleClientMetrics(JobConf conf) {
+        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+        this.shuffleMetrics = 
+          MetricsUtil.createRecord(metricsContext, "shuffleInput");
+        this.shuffleMetrics.setTag("user", conf.getUser());
+        this.shuffleMetrics.setTag("jobName", conf.getJobName());
+        this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobId());
+        this.shuffleMetrics.setTag("taskId", getTaskId());
+        this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
+        metricsContext.registerUpdater(this);
+      }
+      public synchronized void inputBytes(long numBytes) {
+        this.numBytes += numBytes;
+      }
+      public synchronized void failedFetch() {
+        ++numFailedFetches;
+      }
+      public synchronized void successFetch() {
+        ++numSuccessFetches;
+      }
+      public synchronized void threadBusy() {
+        ++numThreadsBusy;
+      }
+      public synchronized void threadFree() {
+        --numThreadsBusy;
+      }
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
+          shuffleMetrics.incrMetric("shuffle_failed_fetches", 
+                                    numFailedFetches);
+          shuffleMetrics.incrMetric("shuffle_success_fetches", 
+                                    numSuccessFetches);
+          if (numCopiers != 0) {
+            shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
+                100*((float)numThreadsBusy/numCopiers));
+          } else {
+            shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
+          }
+          numBytes = 0;
+          numSuccessFetches = 0;
+          numFailedFetches = 0;
+        }
+        shuffleMetrics.update();
+      }
+    }
+
     /** Represents the result of an attempt to copy a map output */
     private class CopyResult {
       
@@ -567,13 +627,17 @@ class ReduceTask extends Task {
             }
             
             try {
+              shuffleClientMetrics.threadBusy();
               start(loc);
               size = copyOutput(loc);
+              shuffleClientMetrics.successFetch();
             } catch (IOException e) {
               LOG.warn(reduceTask.getTaskId() + " copy failed: " +
                        loc.getMapTaskId() + " from " + loc.getHost());
               LOG.warn(StringUtils.stringifyException(e));
+              shuffleClientMetrics.failedFetch();
             } finally {
+              shuffleClientMetrics.threadFree();
               finish(size);
             }
           } catch (InterruptedException e) { 
@@ -607,7 +671,7 @@ class ReduceTask extends Task {
         // a working filename that will be unique to this attempt
         Path tmpFilename = new Path(filename + "-" + id);
         // this copies the map output file
-        tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
+        tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics,
                                   tmpFilename, lDirAlloc, 
                                   conf, reduceTask.getPartition(), 
                                   STALLED_COPY_TIMEOUT, reporter);
@@ -712,6 +776,7 @@ class ReduceTask extends Task {
       throws IOException {
       
       configureClasspath(conf);
+      this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
       this.umbilical = umbilical;      
       this.reduceTask = ReduceTask.this;
       this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
@@ -743,11 +808,6 @@ class ReduceTask extends Task {
       
       this.lastPollTime = 0;
       
-      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-      this.shuffleMetrics = 
-        MetricsUtil.createRecord(metricsContext, "shuffleInput");
-      this.shuffleMetrics.setTag("user", conf.getUser());
-      this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
 
       // Seed the random number generator with a reasonably globally unique seed
       long randomSeed = System.nanoTime() + 

+ 84 - 19
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -148,6 +148,7 @@ public class TaskTracker
   private int failures;
   private int finishedCount[] = new int[1];
   private MapEventsFetcherThread mapEventsFetcher;
+  int workerThreads;
   /**
    * the minimum interval between jobtracker polls
    */
@@ -157,6 +158,60 @@ public class TaskTracker
    */  
   private int probe_sample_size = 50;
     
+  private ShuffleServerMetrics shuffleServerMetrics;
+  /** This class contains the methods that should be used for metrics-reporting
+   * the specific metrics for shuffle. The TaskTracker is actually a server for
+   * the shuffle and hence the name ShuffleServerMetrics.
+   */
+  private class ShuffleServerMetrics implements Updater {
+    private MetricsRecord shuffleMetricsRecord = null;
+    private int serverHandlerBusy = 0;
+    private long outputBytes = 0;
+    private int failedOutputs = 0;
+    private int successOutputs = 0;
+    ShuffleServerMetrics(JobConf conf) {
+      MetricsContext context = MetricsUtil.getContext("mapred");
+      shuffleMetricsRecord = 
+                           MetricsUtil.createRecord(context, "shuffleOutput");
+      this.shuffleMetricsRecord.setTag("sessionId", conf.getSessionId());
+      context.registerUpdater(this);
+    }
+    synchronized void serverHandlerBusy() {
+      ++serverHandlerBusy;
+    }
+    synchronized void serverHandlerFree() {
+      --serverHandlerBusy;
+    }
+    synchronized void outputBytes(long bytes) {
+      outputBytes += bytes;
+    }
+    synchronized void failedOutput() {
+      ++failedOutputs;
+    }
+    synchronized void successOutput() {
+      ++successOutputs;
+    }
+    public void doUpdates(MetricsContext unused) {
+      synchronized (this) {
+        if (workerThreads != 0) {
+          shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 
+              100*((float)serverHandlerBusy/workerThreads));
+        } else {
+          shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 0);
+        }
+        shuffleMetricsRecord.incrMetric("shuffle_output_bytes", 
+                                        outputBytes);
+        shuffleMetricsRecord.incrMetric("shuffle_failed_outputs", 
+                                        failedOutputs);
+        shuffleMetricsRecord.incrMetric("shuffle_success_outputs", 
+                                        successOutputs);
+        outputBytes = 0;
+        failedOutputs = 0;
+        successOutputs = 0;
+      }
+      shuffleMetricsRecord.update();
+    }
+  }
   private class TaskTrackerMetrics implements Updater {
     private MetricsRecord metricsRecord = null;
     private int numCompletedTasks = 0;
@@ -663,7 +718,8 @@ public class TaskTracker
     int httpPort = conf.getInt("tasktracker.http.port", 50060);
     String httpBindAddress = conf.get("tasktracker.http.bindAddress", "0.0.0.0");
     this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
-    int workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    this.shuffleServerMetrics = new ShuffleServerMetrics(fConf);
     server.setThreads(1, workerThreads);
     // let the jsp pages get to the task tracker, config, and other relevant
     // objects
@@ -674,6 +730,7 @@ public class TaskTracker
     server.setAttribute("conf", conf);
     server.setAttribute("log", LOG);
     server.setAttribute("localDirAllocator", localDirAllocator);
+    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
     server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.start();
     this.httpPort = server.getPort();
@@ -1839,27 +1896,31 @@ public class TaskTracker
       ServletContext context = getServletContext();
       int reduce = Integer.parseInt(reduceId);
       byte[] buffer = new byte[MAX_BYTES_TO_READ];
-      OutputStream outStream = response.getOutputStream();
-      JobConf conf = (JobConf) context.getAttribute("conf");
-      LocalDirAllocator lDirAlloc = 
-        (LocalDirAllocator)context.getAttribute("localDirAllocator");
-      FileSystem fileSys = 
-        (FileSystem) context.getAttribute("local.file.system");
-
-      // Index file
-      Path indexFileName = lDirAlloc.getLocalPathToRead(
-                                            mapId+"/file.out.index", conf);
-      FSDataInputStream indexIn = null;
-         
-      // Map-output file
-      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-                                            mapId+"/file.out", conf);
-      FSDataInputStream mapOutputIn = null;
-        
       // true iff IOException was caused by attempt to access input
       boolean isInputException = true;
-        
+      OutputStream outStream = null;
+      FSDataInputStream indexIn = null;
+      FSDataInputStream mapOutputIn = null;
+      
+      ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
+                                      context.getAttribute("shuffleServerMetrics");
       try {
+        shuffleMetrics.serverHandlerBusy();
+        outStream = response.getOutputStream();
+        JobConf conf = (JobConf) context.getAttribute("conf");
+        LocalDirAllocator lDirAlloc = 
+          (LocalDirAllocator)context.getAttribute("localDirAllocator");
+        FileSystem fileSys = 
+          (FileSystem) context.getAttribute("local.file.system");
+
+        // Index file
+        Path indexFileName = lDirAlloc.getLocalPathToRead(
+            mapId+"/file.out.index", conf);
+        
+        // Map-output file
+        Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+            mapId+"/file.out", conf);
+
         /**
          * Read the index file to get the information about where
          * the map-output for the given reducer is available. 
@@ -1899,6 +1960,7 @@ public class TaskTracker
                                    ? (int)partLength : MAX_BYTES_TO_READ);
         while (len > 0) {
           try {
+            shuffleMetrics.outputBytes(len);
             outStream.write(buffer, 0, len);
             outStream.flush();
           } catch (IOException ie) {
@@ -1923,6 +1985,7 @@ public class TaskTracker
           tracker.mapOutputLost(mapId, errorMsg);
         }
         response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+        shuffleMetrics.failedOutput();
         throw ie;
       } finally {
         if (indexIn != null) {
@@ -1931,8 +1994,10 @@ public class TaskTracker
         if (mapOutputIn != null) {
           mapOutputIn.close();
         }
+        shuffleMetrics.serverHandlerFree();
       }
       outStream.close();
+      shuffleMetrics.successOutput();
     }
   }
 }