Browse Source

HADOOP-954. Change use of metrics to use callback mechanism. Contributed by David & Nigel.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@505421 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
022fdcab7b

+ 4 - 0
CHANGES.txt

@@ -31,6 +31,10 @@ Branch 0.11 - unreleased
  5. HADOOP-992.  Fix MiniMR unit tests to use MiniDFS when specified,
  5. HADOOP-992.  Fix MiniMR unit tests to use MiniDFS when specified,
     rather than the local FS.  (omalley via cutting)
     rather than the local FS.  (omalley via cutting)
 
 
+ 6. HADOOP-954.  Change use of metrics to use callback mechanism.
+    Also rename utility class Metrics to MetricsUtil.
+    (David Bowen & Nigel Daley via cutting)
+
 
 
 Release 0.11.0 - 2007-02-02
 Release 0.11.0 - 2007-02-02
 
 

+ 39 - 20
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -22,7 +22,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
@@ -34,7 +34,9 @@ import org.apache.hadoop.net.NetworkTopology;
 import java.io.*;
 import java.io.*;
 import java.net.*;
 import java.net.*;
 import java.util.*;
 import java.util.*;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.Updater;
 
 
 /**********************************************************
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * DataNode is a class (and program) that stores a set of
@@ -116,55 +118,72 @@ public class DataNode implements FSConstants, Runnable {
     long heartBeatInterval;
     long heartBeatInterval;
     private DataStorage storage = null;
     private DataStorage storage = null;
     private StatusHttpServer infoServer;
     private StatusHttpServer infoServer;
+    private DataNodeMetrics myMetrics = new DataNodeMetrics();
     private static InetSocketAddress nameNodeAddr;
     private static InetSocketAddress nameNodeAddr;
     private static DataNode datanodeObject = null;
     private static DataNode datanodeObject = null;
-    private class DataNodeMetrics {
-      private MetricsRecord metricsRecord = null;
-      
-      
-      private long bytesWritten = 0L;
-      private long bytesRead = 0L;
-      private long blocksWritten = 0L;
-      private long blocksRead = 0L;
-      private long blocksReplicated = 0L;
-      private long blocksRemoved = 0L;
+
+    private class DataNodeMetrics implements Updater {
+      private final MetricsRecord metricsRecord;
+      private int bytesWritten = 0;
+      private int bytesRead = 0;
+      private int blocksWritten = 0;
+      private int blocksRead = 0;
+      private int blocksReplicated = 0;
+      private int blocksRemoved = 0;
       
       
       DataNodeMetrics() {
       DataNodeMetrics() {
-        metricsRecord = Metrics.createRecord("dfs", "datanode");
+        MetricsContext context = MetricsUtil.getContext("dfs");
+        metricsRecord = MetricsUtil.createRecord(context, "datanode");
+        context.registerUpdater(this);
       }
       }
       
       
+      /**
+       * Since this object is a registered updater, this method will be called
+       * periodically, e.g. every 5 seconds.
+       */
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          metricsRecord.incrMetric("bytes_read", bytesRead);
+          metricsRecord.incrMetric("bytes_written", bytesWritten);
+          metricsRecord.incrMetric("blocks_read", blocksRead);
+          metricsRecord.incrMetric("blocks_written", blocksWritten);
+          metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
+          metricsRecord.incrMetric("blocks_removed", blocksRemoved);
+              
+          bytesWritten = 0;
+          bytesRead = 0;
+          blocksWritten = 0;
+          blocksRead = 0;
+          blocksReplicated = 0;
+          blocksRemoved = 0;
+        }
+        metricsRecord.update();
+      }
+
       synchronized void readBytes(int nbytes) {
       synchronized void readBytes(int nbytes) {
         bytesRead += nbytes;
         bytesRead += nbytes;
-        Metrics.report(metricsRecord, "bytes_read", bytesRead);
       }
       }
       
       
       synchronized void wroteBytes(int nbytes) {
       synchronized void wroteBytes(int nbytes) {
         bytesWritten += nbytes;
         bytesWritten += nbytes;
-        Metrics.report(metricsRecord, "bytes_written", bytesWritten);
       }
       }
       
       
       synchronized void readBlocks(int nblocks) {
       synchronized void readBlocks(int nblocks) {
         blocksRead += nblocks;
         blocksRead += nblocks;
-        Metrics.report(metricsRecord, "blocks_read", blocksRead);
       }
       }
       
       
       synchronized void wroteBlocks(int nblocks) {
       synchronized void wroteBlocks(int nblocks) {
         blocksWritten += nblocks;
         blocksWritten += nblocks;
-        Metrics.report(metricsRecord, "blocks_written", blocksWritten);
       }
       }
       
       
       synchronized void replicatedBlocks(int nblocks) {
       synchronized void replicatedBlocks(int nblocks) {
         blocksReplicated += nblocks;
         blocksReplicated += nblocks;
-        Metrics.report(metricsRecord, "blocks_replicated", blocksReplicated);
       }
       }
       
       
       synchronized void removedBlocks(int nblocks) {
       synchronized void removedBlocks(int nblocks) {
         blocksRemoved += nblocks;
         blocksRemoved += nblocks;
-        Metrics.report(metricsRecord, "blocks_removed", blocksRemoved);
       }
       }
     }
     }
-    
-    DataNodeMetrics myMetrics = new DataNodeMetrics();
 
 
     /**
     /**
      * Create the DataNode given a configuration and an array of dataDirs.
      * Create the DataNode given a configuration and an array of dataDirs.

+ 17 - 6
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -25,7 +25,8 @@ import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
 
 
 /*************************************************
 /*************************************************
  * FSDirectory stores the filesystem directory state.
  * FSDirectory stores the filesystem directory state.
@@ -223,7 +224,7 @@ class FSDirectory implements FSConstants {
                     v.add(blocks[i]);
                     v.add(blocks[i]);
                 }
                 }
             }
             }
-            Metrics.report(metricsRecord, "files_deleted", ++numFilesDeleted);
+            incrDeletedFileCount();
             for (Iterator it = children.values().iterator(); it.hasNext(); ) {
             for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 INode child = (INode) it.next();
                 child.collectSubtreeBlocks(v);
                 child.collectSubtreeBlocks(v);
@@ -307,19 +308,25 @@ class FSDirectory implements FSConstants {
     FSImage fsImage;  
     FSImage fsImage;  
     boolean ready = false;
     boolean ready = false;
     int namespaceID = 0;    // TODO: move to FSImage class, it belongs there
     int namespaceID = 0;    // TODO: move to FSImage class, it belongs there
-    // Metrics members
-    private MetricsRecord metricsRecord = null;
-    private int numFilesDeleted = 0;
+    // Metrics record
+    private MetricsRecord directoryMetrics = null;
     
     
     /** Access an existing dfs name directory. */
     /** Access an existing dfs name directory. */
     public FSDirectory(File[] dirs) throws IOException {
     public FSDirectory(File[] dirs) throws IOException {
       this.fsImage = new FSImage( dirs );
       this.fsImage = new FSImage( dirs );
+      initialize();
     }
     }
 
 
     public FSDirectory(FSImage fsImage) throws IOException {
     public FSDirectory(FSImage fsImage) throws IOException {
       this.fsImage = fsImage;
       this.fsImage = fsImage;
+      initialize();
     }
     }
     
     
+    private void initialize() {
+      MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+      directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
+    }
+
     void loadFSImage( Configuration conf ) throws IOException {
     void loadFSImage( Configuration conf ) throws IOException {
       fsImage.loadFSImage( conf );
       fsImage.loadFSImage( conf );
       synchronized (this) {
       synchronized (this) {
@@ -327,9 +334,13 @@ class FSDirectory implements FSConstants {
         this.notifyAll();
         this.notifyAll();
         fsImage.getEditLog().create();
         fsImage.getEditLog().create();
       }
       }
-      metricsRecord = Metrics.createRecord("dfs", "namenode");
     }
     }
 
 
+    private void incrDeletedFileCount() {
+        directoryMetrics.incrMetric("files_deleted", 1);
+        directoryMetrics.update();
+    }
+    
     /**
     /**
      * Shutdown the filestore
      * Shutdown the filestore
      */
      */

+ 35 - 13
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -27,9 +27,12 @@ import org.apache.hadoop.util.StringUtils;
 
 
 import java.io.*;
 import java.io.*;
 import java.net.*;
 import java.net.*;
+import org.apache.hadoop.dfs.DatanodeProtocol.DataNodeAction;
 
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
 
 
 /**********************************************************
 /**********************************************************
  * NameNode serves as both directory namespace manager and
  * NameNode serves as both directory namespace manager and
@@ -110,33 +113,52 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       fsimage.getEditLog().close();
       fsimage.getEditLog().close();
     }
     }
 
 
-    private class NameNodeMetrics {
-      private MetricsRecord metricsRecord = null;
-      
-      private long numFilesCreated = 0L;
-      private long numFilesOpened = 0L;
-      private long numFilesRenamed = 0L;
-      private long numFilesListed = 0L;
+    private class NameNodeMetrics implements Updater {
+      private final MetricsRecord metricsRecord;
+      private int numFilesCreated = 0;
+      private int numFilesOpened = 0;
+      private int numFilesRenamed = 0;
+      private int numFilesListed = 0;
       
       
       NameNodeMetrics() {
       NameNodeMetrics() {
-        metricsRecord = Metrics.createRecord("dfs", "namenode");
+        MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+        metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
+        metricsContext.registerUpdater(this);
+      }
+      
+      /**
+       * Since this object is a registered updater, this method will be called
+       * periodically, e.g. every 5 seconds.
+       */
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          metricsRecord.incrMetric("files_created", numFilesCreated);
+          metricsRecord.incrMetric("files_opened", numFilesOpened);
+          metricsRecord.incrMetric("files_renamed", numFilesRenamed);
+          metricsRecord.incrMetric("files_listed", numFilesListed);
+              
+          numFilesCreated = 0;
+          numFilesOpened = 0;
+          numFilesRenamed = 0;
+          numFilesListed = 0;
+        }
+        metricsRecord.update();
       }
       }
       
       
       synchronized void createFile() {
       synchronized void createFile() {
-        Metrics.report(metricsRecord, "files_created", ++numFilesCreated);
+        ++numFilesCreated;
       }
       }
       
       
       synchronized void openFile() {
       synchronized void openFile() {
-        Metrics.report(metricsRecord, "files_opened", ++numFilesOpened);
+        ++numFilesOpened;
       }
       }
       
       
       synchronized void renameFile() {
       synchronized void renameFile() {
-        Metrics.report(metricsRecord, "files_renamed", ++numFilesRenamed);
+        ++numFilesRenamed;
       }
       }
       
       
       synchronized void listFile(int nfiles) {
       synchronized void listFile(int nfiles) {
         numFilesListed += nfiles;
         numFilesListed += nfiles;
-        Metrics.report(metricsRecord, "files_listed", numFilesListed);
       }
       }
     }
     }
     
     

+ 42 - 22
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -31,7 +31,9 @@ import java.text.NumberFormat;
 import java.util.*;
 import java.util.*;
 
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
 
 
 /*******************************************************
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * JobTracker is the central location for submitting and 
@@ -372,48 +374,66 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         }
         }
     }
     }
 
 
-    static class JobTrackerMetrics {
+    static class JobTrackerMetrics implements Updater {
       private MetricsRecord metricsRecord = null;
       private MetricsRecord metricsRecord = null;
-      
-      private long numMapTasksLaunched = 0L;
-      private long numMapTasksCompleted = 0L;
-      private long numReduceTasksLaunched = 0L;
-      private long numReduceTasksCompleted = 0L;
-      private long numJobsSubmitted = 0L;
-      private long numJobsCompleted = 0L;
+      private int numMapTasksLaunched = 0;
+      private int numMapTasksCompleted = 0;
+      private int numReduceTasksLaunched = 0;
+      private int numReduceTasksCompleted = 0;
+      private int numJobsSubmitted = 0;
+      private int numJobsCompleted = 0;
       
       
       JobTrackerMetrics() {
       JobTrackerMetrics() {
-        metricsRecord = Metrics.createRecord("mapred", "jobtracker");
+          MetricsContext context = MetricsUtil.getContext("mapred");
+          metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+          context.registerUpdater(this);
+      }
+      
+      /**
+       * Since this object is a registered updater, this method will be called
+       * periodically, e.g. every 5 seconds.
+       */
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+          metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+          metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+          metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+          metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
+          metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
+              
+          numMapTasksLaunched = 0;
+          numMapTasksCompleted = 0;
+          numReduceTasksLaunched = 0;
+          numReduceTasksCompleted = 0;
+          numJobsSubmitted = 0;
+          numJobsCompleted = 0;
+        }
+        metricsRecord.update();
       }
       }
       
       
       synchronized void launchMap() {
       synchronized void launchMap() {
-        Metrics.report(metricsRecord, "maps_launched",
-            ++numMapTasksLaunched);
+        ++numMapTasksLaunched;
       }
       }
       
       
       synchronized void completeMap() {
       synchronized void completeMap() {
-        Metrics.report(metricsRecord, "maps_completed",
-            ++numMapTasksCompleted);
+        ++numMapTasksCompleted;
       }
       }
       
       
       synchronized void launchReduce() {
       synchronized void launchReduce() {
-        Metrics.report(metricsRecord, "reduces_launched",
-            ++numReduceTasksLaunched);
+        ++numReduceTasksLaunched;
       }
       }
       
       
       synchronized void completeReduce() {
       synchronized void completeReduce() {
-        Metrics.report(metricsRecord, "reduces_completed",
-            ++numReduceTasksCompleted);
+        ++numReduceTasksCompleted;
       }
       }
       
       
       synchronized void submitJob() {
       synchronized void submitJob() {
-        Metrics.report(metricsRecord, "jobs_submitted",
-            ++numJobsSubmitted);
+        ++numJobsSubmitted;
       }
       }
       
       
       synchronized void completeJob() {
       synchronized void completeJob() {
-        Metrics.report(metricsRecord, "jobs_completed",
-            ++numJobsCompleted);
+        ++numJobsCompleted;
       }
       }
     }
     }
 
 

+ 6 - 0
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.InMemoryFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
 /** The location of a map output file, as passed to a reduce task via the
 /** The location of a map output file, as passed to a reduce task via the
@@ -104,7 +105,9 @@ class MapOutputLocation implements Writable, MRConstants {
    * @param pingee a status object that wants to know when we make progress
    * @param pingee a status object that wants to know when we make progress
    * @param timeout number of ms for connection and read timeout
    * @param timeout number of ms for connection and read timeout
    * @throws IOException when something goes wrong
    * @throws IOException when something goes wrong
+   * @deprecated
    */
    */
+  @Deprecated
   public long getFile(FileSystem fileSys, 
   public long getFile(FileSystem fileSys, 
                       Path localFilename, 
                       Path localFilename, 
                       int reduce,
                       int reduce,
@@ -179,6 +182,7 @@ class MapOutputLocation implements Writable, MRConstants {
    */
    */
   public Path getFile(InMemoryFileSystem inMemFileSys,
   public Path getFile(InMemoryFileSystem inMemFileSys,
                       FileSystem localFileSys,
                       FileSystem localFileSys,
+                      MetricsRecord shuffleMetrics,
                       Path localFilename, 
                       Path localFilename, 
                       int reduce,
                       int reduce,
                       int timeout) throws IOException, InterruptedException {
                       int timeout) throws IOException, InterruptedException {
@@ -224,6 +228,8 @@ class MapOutputLocation implements Writable, MRConstants {
           int len = input.read(buffer);
           int len = input.read(buffer);
           while (len > 0) {
           while (len > 0) {
             totalBytes += len;
             totalBytes += len;
+            shuffleMetrics.incrMetric("input_bytes", len);
+            shuffleMetrics.update();
             output.write(buffer, 0 ,len);
             output.write(buffer, 0 ,len);
             if (currentThread.isInterrupted()) {
             if (currentThread.isInterrupted()) {
               throw new InterruptedException();
               throw new InterruptedException();

+ 21 - 21
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -33,11 +33,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
 
 
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
 
 
 
 
 /** A Map task. */
 /** A Map task. */
@@ -59,27 +61,25 @@ class MapTask extends Task {
   }
   }
   
   
   private class MapTaskMetrics {
   private class MapTaskMetrics {
-    private MetricsRecord metricsRecord = null;
+    private MetricsRecord mapInputMetrics = null;
+    private MetricsRecord mapOutputMetrics = null;
     
     
-    private long numInputRecords = 0L;
-    private long numInputBytes = 0L;
-    private long numOutputRecords = 0L;
-    private long numOutputBytes = 0L;
-    
-    MapTaskMetrics(String taskId) {
-      metricsRecord = Metrics.createRecord("mapred", "map", "taskid", taskId);
+    MapTaskMetrics(String user) {
+        MetricsContext context = MetricsUtil.getContext("mapred");
+        mapInputMetrics = MetricsUtil.createRecord(context, "mapInput", "user", user);
+        mapOutputMetrics = MetricsUtil.createRecord(context, "mapOutput", "user", user);
     }
     }
     
     
-    synchronized void mapInput(long numBytes) {
-      Metrics.report(metricsRecord, "input_records", ++numInputRecords);
-      numInputBytes += numBytes;
-      Metrics.report(metricsRecord, "input_bytes", numInputBytes);
+    synchronized void mapInput(int numBytes) {
+        mapInputMetrics.incrMetric("input_records", 1);
+        mapInputMetrics.incrMetric("input_bytes", numBytes);
+        mapInputMetrics.update();
     }
     }
     
     
-    synchronized void mapOutput(long numBytes) {
-      Metrics.report(metricsRecord, "output_records", ++numOutputRecords);
-      numOutputBytes += numBytes;
-      Metrics.report(metricsRecord, "output_bytes", numOutputBytes);
+    synchronized void mapOutput(int numBytes) {
+        mapOutputMetrics.incrMetric("output_records", 1);
+        mapOutputMetrics.incrMetric("output_bytes", numBytes);
+        mapOutputMetrics.update();
     }
     }
     
     
   }
   }
@@ -96,7 +96,6 @@ class MapTask extends Task {
                  int partition, InputSplit split) {
                  int partition, InputSplit split) {
     super(jobId, jobFile, tipId, taskId, partition);
     super(jobId, jobFile, tipId, taskId, partition);
     this.split = split;
     this.split = split;
-    myMetrics = new MapTaskMetrics(taskId);
   }
   }
 
 
   public boolean isMapTask() {
   public boolean isMapTask() {
@@ -134,13 +133,14 @@ class MapTask extends Task {
     split = new FileSplit();
     split = new FileSplit();
     split.readFields(in);
     split.readFields(in);
     if (myMetrics == null) {
     if (myMetrics == null) {
-        myMetrics = new MapTaskMetrics(getTaskId());
+        myMetrics = new MapTaskMetrics("unknown");
     }
     }
   }
   }
 
 
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
     throws IOException {
 
 
+    myMetrics = new MapTaskMetrics(job.getUser());
     Reporter reporter = getReporter(umbilical, getProgress());
     Reporter reporter = getReporter(umbilical, getProgress());
 
 
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
@@ -164,7 +164,7 @@ class MapTask extends Task {
           setProgress(getProgress());
           setProgress(getProgress());
           long beforePos = getPos();
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           boolean ret = rawIn.next(key, value);
-          myMetrics.mapInput(getPos() - beforePos);
+          myMetrics.mapInput((int)(getPos() - beforePos));
           return ret;
           return ret;
         }
         }
         public long getPos() throws IOException { return rawIn.getPos(); }
         public long getPos() throws IOException { return rawIn.getPos(); }
@@ -326,7 +326,7 @@ class MapTask extends Task {
         int partNumber = partitioner.getPartition(key, value, partitions);
         int partNumber = partitioner.getPartition(key, value, partitions);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
 
-        myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
+        myMetrics.mapOutput((int)(keyValBuffer.getLength() - keyOffset));
 
 
         //now check whether we need to spill to disk
         //now check whether we need to spill to disk
         long totalMem = 0;
         long totalMem = 0;

+ 12 - 11
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 
 
 import java.io.*;
 import java.io.*;
@@ -44,21 +44,22 @@ class ReduceTask extends Task {
   }
   }
 
 
   private class ReduceTaskMetrics {
   private class ReduceTaskMetrics {
-    private MetricsRecord metricsRecord = null;
+    private final MetricsRecord inputMetrics, outputMetrics;
     
     
-    private long numInputRecords = 0L;
-    private long numOutputRecords = 0L;
-    
-    ReduceTaskMetrics(String taskId) {
-      metricsRecord = Metrics.createRecord("mapred", "reduce", "taskid", taskId);
+    ReduceTaskMetrics(String user) {
+        MetricsContext context = MetricsUtil.getContext("mapred");
+        inputMetrics = MetricsUtil.createRecord(context, "reduceInput", "user", user);
+        outputMetrics = MetricsUtil.createRecord(context, "reduceOutput", "user", user);
     }
     }
     
     
     synchronized void reduceInput() {
     synchronized void reduceInput() {
-      Metrics.report(metricsRecord, "input_records", ++numInputRecords);
+        inputMetrics.incrMetric("input_records", 1);
+        inputMetrics.update();
     }
     }
     
     
     synchronized void reduceOutput() {
     synchronized void reduceOutput() {
-      Metrics.report(metricsRecord, "output_records", ++numOutputRecords);
+        outputMetrics.incrMetric("output_records", 1);
+        outputMetrics.update();
     }
     }
   }
   }
   
   
@@ -84,7 +85,6 @@ class ReduceTask extends Task {
                     int partition, int numMaps) {
                     int partition, int numMaps) {
     super(jobId, jobFile, tipId, taskId, partition);
     super(jobId, jobFile, tipId, taskId, partition);
     this.numMaps = numMaps;
     this.numMaps = numMaps;
-    myMetrics = new ReduceTaskMetrics(taskId);
   }
   }
 
 
   public TaskRunner createRunner(TaskTracker tracker) throws IOException {
   public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -116,7 +116,7 @@ class ReduceTask extends Task {
 
 
     numMaps = in.readInt();
     numMaps = in.readInt();
     if (myMetrics == null) {
     if (myMetrics == null) {
-        myMetrics = new ReduceTaskMetrics(getTaskId());
+        myMetrics = new ReduceTaskMetrics("unknown");
     }
     }
   }
   }
 
 
@@ -224,6 +224,7 @@ class ReduceTask extends Task {
 
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
     throws IOException {
+    myMetrics = new ReduceTaskMetrics(job.getUser());
     Class valueClass = job.getMapOutputValueClass();
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                   job.getReducerClass(), job);
                                   job.getReducerClass(), job);

+ 14 - 2
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -24,6 +24,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 
 
 import java.io.*;
 import java.io.*;
@@ -115,6 +118,11 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
    */
    */
   private MapOutputCopier[] copiers = null;
   private MapOutputCopier[] copiers = null;
   
   
+  /**
+   * The threads for fetching the files.
+   */
+  private MetricsRecord shuffleMetrics = null;
+  
   /**
   /**
    * the minimum interval between jobtracker polls
    * the minimum interval between jobtracker polls
    */
    */
@@ -275,8 +283,8 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
       // a working filename that will be unique to this attempt
       // a working filename that will be unique to this attempt
       Path tmpFilename = new Path(finalFilename + "-" + id);
       Path tmpFilename = new Path(finalFilename + "-" + id);
       // this copies the map output file
       // this copies the map output file
-      tmpFilename = loc.getFile(inMemFileSys, localFileSys, tmpFilename,
-                               reduceTask.getPartition(),
+      tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
+                               tmpFilename, reduceTask.getPartition(),
                                STALLED_COPY_TIMEOUT);
                                STALLED_COPY_TIMEOUT);
       if (tmpFilename == null)
       if (tmpFilename == null)
         throw new IOException("File " + finalFilename + "-" + id + 
         throw new IOException("File " + finalFilename + "-" + id + 
@@ -384,6 +392,10 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     this.uniqueHosts = new HashSet();
     this.uniqueHosts = new HashSet();
     
     
     this.lastPollTime = 0;
     this.lastPollTime = 0;
+
+    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+    this.shuffleMetrics = MetricsUtil.createRecord(
+      metricsContext, "shuffleInput", "user", conf.getUser());
   }
   }
 
 
   /** Assemble all of the map output files */
   /** Assemble all of the map output files */

+ 6 - 6
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -21,7 +21,7 @@ import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -38,6 +38,7 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.metrics.MetricsContext;
 
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
@@ -125,17 +126,16 @@ public class TaskTracker
     private class TaskTrackerMetrics {
     private class TaskTrackerMetrics {
       private MetricsRecord metricsRecord = null;
       private MetricsRecord metricsRecord = null;
       
       
-      private long totalTasksCompleted = 0L;
-      
       TaskTrackerMetrics() {
       TaskTrackerMetrics() {
-        metricsRecord = Metrics.createRecord("mapred", "tasktracker");
+          MetricsContext context = MetricsUtil.getContext("mapred");
+          metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
       }
       }
       
       
       synchronized void completeTask() {
       synchronized void completeTask() {
         if (metricsRecord != null) {
         if (metricsRecord != null) {
-          metricsRecord.setMetric("tasks_completed", ++totalTasksCompleted);
+          metricsRecord.incrMetric("tasks_completed", 1);
           metricsRecord.setMetric("maps_running", mapTotal);
           metricsRecord.setMetric("maps_running", mapTotal);
-          metricsRecord.setMetric("reduce_running", reduceTotal);
+          metricsRecord.setMetric("reduces_running", reduceTotal);
           metricsRecord.update();
           metricsRecord.update();
         }
         }
       }
       }

+ 32 - 13
src/java/org/apache/hadoop/metrics/ContextFactory.java

@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Properties;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
+import org.apache.hadoop.metrics.spi.NullContext;
 
 
 /**
 /**
  * Factory class for creating MetricsContext objects.  To obtain an instance
  * Factory class for creating MetricsContext objects.  To obtain an instance
@@ -43,9 +44,14 @@ public class ContextFactory {
     
     
     private static ContextFactory theFactory = null;
     private static ContextFactory theFactory = null;
     
     
-    // private Map<String,Object> attributeMap = new HashMap<String,Object>();
-    private Map attributeMap = new HashMap();
-    private Map<String,MetricsContext> contextMap = new HashMap<String,MetricsContext>();
+    private Map<String,Object> attributeMap = new HashMap<String,Object>();
+    private Map<String,AbstractMetricsContext> contextMap = 
+            new HashMap<String,AbstractMetricsContext>();
+    
+    // Used only when contexts, or the ContextFactory itself, cannot be
+    // created.
+    private static Map<String,MetricsContext> nullContextMap = 
+            new HashMap<String,MetricsContext>();
     
     
     /** Creates a new instance of ContextFactory */
     /** Creates a new instance of ContextFactory */
     protected ContextFactory() {
     protected ContextFactory() {
@@ -116,20 +122,33 @@ public class ContextFactory {
     public synchronized MetricsContext getContext(String contextName) 
     public synchronized MetricsContext getContext(String contextName) 
         throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
         throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
     {
     {
-        if (contextMap.containsKey(contextName)) return contextMap.get(contextName);
-        String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
-        String className = (String) getAttribute(classNameAttribute);
-        if (className == null) {
-            className = DEFAULT_CONTEXT_CLASSNAME;
+        AbstractMetricsContext metricsContext = contextMap.get(contextName);
+        if (metricsContext == null) {
+            String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
+            String className = (String) getAttribute(classNameAttribute);
+            if (className == null) {
+                className = DEFAULT_CONTEXT_CLASSNAME;
+            }
+            Class contextClass = Class.forName(className);
+            metricsContext = (AbstractMetricsContext) contextClass.newInstance();
+            metricsContext.init(contextName, this);
+            contextMap.put(contextName, metricsContext);
         }
         }
-        Class contextClass = Class.forName(className);
-        AbstractMetricsContext metricsContext = 
-                (AbstractMetricsContext) contextClass.newInstance();
-        metricsContext.init(contextName, this);
-        contextMap.put(contextName, metricsContext);
         return metricsContext;
         return metricsContext;
     }
     }
     
     
+    /**
+     * Returns a "null" context - one which does nothing.
+     */
+    public static synchronized MetricsContext getNullContext(String contextName) {
+        MetricsContext nullContext = nullContextMap.get(contextName);
+        if (nullContext == null) {
+            nullContext = new NullContext();
+            nullContextMap.put(contextName, nullContext);
+        }
+        return nullContext;
+    }
+    
     /**
     /**
      * Returns the singleton ContextFactory instance, constructing it if 
      * Returns the singleton ContextFactory instance, constructing it if 
      * necessary. <p/>
      * necessary. <p/>

+ 0 - 106
src/java/org/apache/hadoop/metrics/Metrics.java

@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Utility class to simplify creation and reporting of hadoop metrics.
- * For examples of usage, see {@link org.apache.hadoop.dfs.DataNode}.
- * @see org.apache.hadoop.metrics.MetricsRecord
- * @see org.apache.hadoop.metrics.MetricsContext
- * @see org.apache.hadoop.metrics.ContextFactory
- * @author Milind Bhandarkar
- */
-public class Metrics {
-  private static final Log LOG =
-      LogFactory.getLog("org.apache.hadoop.util.MetricsUtil");
-  
-  /**
-   * Don't allow creation of a new instance of Metrics
-   */
-  private Metrics() {}
-  
-  /**
-   * Utility method to create and return
-   * a new tagged metrics record instance within the
-   * given <code>contextName</code>.
-   * If exception is thrown while creating the record for any reason, it is
-   * logged, and a null record is returned.
-   * @param contextName name of the context
-   * @param recordName the name of the record
-   * @param tagName name of the tag field of metrics record
-   * @param tagValue value of the tag field
-   * @return newly created metrics record
-   */
-  public static MetricsRecord createRecord(String contextName, String recordName,
-      String tagName, String tagValue) {
-    try {
-      MetricsContext metricsContext =
-          ContextFactory.getFactory().getContext(contextName);
-      if (!metricsContext.isMonitoring()) {metricsContext.startMonitoring();}
-      MetricsRecord metricsRecord = metricsContext.createRecord(recordName);
-      metricsRecord.setTag(tagName, tagValue);
-      return metricsRecord;
-    } catch (Throwable ex) {
-      LOG.warn("Could not create metrics record with context:"+contextName, ex);
-      return null;
-    }
-  }
-  
-  /**
-   * Utility method to create and return new metrics record instance within the
-   * given <code>contextName</code>. This record is tagged with hostname.
-   * If exception is thrown while creating the record due to any reason, it is
-   * logged, and a null record is returned.
-   * @param contextName name of the context
-   * @param recordName name of the record
-   * @return newly created metrics record
-   */
-  public static MetricsRecord createRecord(String contextName,
-      String recordName) {
-    String hostname = null;
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException ex) {
-      LOG.info("Cannot get hostname", ex);
-      hostname = "unknown";
-    }
-    return createRecord(contextName, recordName, "hostname", hostname);
-  }
-  
-  /**
-   * Sets the named metric to the specified value in the given metrics record.
-   * Updates the table of buffered data which is to be sent periodically.
-   *
-   * @param record record for which the metric is updated
-   * @param metricName name of the metric
-   * @param metricValue new value of the metric
-   */
-  public static void report(MetricsRecord record, String metricName,
-      long metricValue) {
-    if (record != null) {
-      record.setMetric(metricName, metricValue);
-      record.update();
-    }
-  }
-}

+ 116 - 0
src/java/org/apache/hadoop/metrics/MetricsUtil.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class to simplify creation and reporting of hadoop metrics.
+ * This class makes the simplifying assumption that each metrics record has
+ * exactly one tag, which defaults to being the hostName.
+ *
+ * For examples of usage, see {@link org.apache.hadoop.dfs.DataNode}.
+ * @see org.apache.hadoop.metrics.MetricsRecord
+ * @see org.apache.hadoop.metrics.MetricsContext
+ * @see org.apache.hadoop.metrics.ContextFactory
+ * @author Milind Bhandarkar
+ */
+public class MetricsUtil {
+    
+    private static final Log LOG =
+        LogFactory.getLog("org.apache.hadoop.util.MetricsUtil");
+
+    /**
+     * Don't allow creation of a new instance of Metrics
+     */
+    private MetricsUtil() {}
+    
+    /**
+     * Utility method to return the named context.
+     * If the desired context cannot be created for any reason, the exception
+     * is logged, and a null context is returned.
+     */
+    public static MetricsContext getContext(String contextName) {
+        MetricsContext metricsContext;
+        try {
+            metricsContext = ContextFactory.getFactory().getContext(contextName);
+            if (!metricsContext.isMonitoring()) {
+                metricsContext.startMonitoring();
+            }
+        } catch (Exception ex) {
+            LOG.error("Unable to create metrics context " + contextName, ex);
+            metricsContext = ContextFactory.getNullContext(contextName);
+        }
+        return metricsContext;
+    }
+    
+    /**
+     * Utility method to create and return a new metrics record instance 
+     * within the given name and the specified tag.
+     *
+     * @param context the context
+     * @param recordName the name of the record
+     * @param tagName name of the tag field of metrics record
+     * @param tagValue value of the tag field
+     * @return newly created metrics record
+     */
+    public static MetricsRecord createRecord(MetricsContext context, 
+                                             String recordName,
+                                             String tagName, 
+                                             String tagValue) 
+    {
+        MetricsRecord metricsRecord = context.createRecord(recordName);
+        metricsRecord.setTag(tagName, tagValue);
+        return metricsRecord;
+    }
+
+    /**
+     * Utility method to create and return new metrics record instance within the
+     * given context. This record is tagged with the host name.
+     *
+     * @param context the context
+     * @param recordName name of the record
+     * @return newly created metrics record
+     */
+    public static MetricsRecord createRecord(MetricsContext context, 
+                                            String recordName) 
+    {
+        return createRecord(context, recordName, "hostName", getHostName());
+    }
+    
+    /**
+     * Returns the host name.  If the host name is unobtainable, logs the
+     * exception and returns "unknown".
+     */
+    private static String getHostName() {
+        String hostName = null;
+        try {
+            hostName = InetAddress.getLocalHost().getHostName();
+        } 
+        catch (UnknownHostException ex) {
+            LOG.info("Unable to obtain hostName", ex);
+            hostName = "unknown";
+        }
+        return hostName;
+    }
+
+}