Explorar o código

HADOOP-237. Add metric reporting to DFS and MapReduce.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@423872 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=19) %!d(string=hai) anos
pai
achega
4c8207118d

+ 6 - 0
CHANGES.txt

@@ -58,6 +58,12 @@ Trunk (unreleased changes)
 16. HADOOP-351.  Make Hadoop IPC kernel independent of Jetty.
     (Devaraj Das via cutting)
 
+17. HADOOP-237.  Add metric reporting to DFS and MapReduce.  With only
+    minor configuration changes, one can now monitor many Hadoop
+    system statistics using Ganglia or other monitoring systems.
+    (Milind Bhandarkar via cutting)
+
+
 Release 0.4.0 - 2006-06-28
 
  1. HADOOP-298.  Improved progress reports for CopyFiles utility, the

+ 1 - 0
build.xml

@@ -215,6 +215,7 @@
       <fileset file="${conf.dir}/mapred-default.xml"/>
       <fileset file="${conf.dir}/commons-logging.properties"/>
       <fileset file="${conf.dir}/log4j.properties"/>
+      <fileset file="${conf.dir}/hadoop-metrics.properties"/>
       <zipfileset dir="${build.webapps}" prefix="webapps"/>
     </jar>
   </target>

+ 25 - 0
conf/hadoop-metrics.properties

@@ -0,0 +1,25 @@
+# Configuration of the "dfs" context for null
+dfs.class=org.apache.hadoop.metrics.spi.NullContext
+
+# Configuration of the "mapred" context for null
+mapred.class=org.apache.hadoop.metrics.spi.NullContext
+
+# Configuration of the "dfs" context for file
+#dfs.class=org.apache.hadoop.metrics.file.FileContext
+#dfs.period=10
+#dfs.fileName=/tmp/dfsmetrics.log
+
+# Configuration of the "mapred" context for file
+#mapred.class=org.apache.hadoop.metrics.file.FileContext
+#mapred.period=10
+#mapred.fileName=/tmp/mrmetrics.log
+
+# Configuration of the "dfs" context for ganglia
+# dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext
+# dfs.period=10
+# dfs.servers=localhost:8649
+
+# Configuration of the "mapred" context for ganglia
+# mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext
+# mapred.period=10
+# mapred.servers=localhost:8649

+ 59 - 1
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -19,6 +19,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.mapred.StatusHttpServer;
@@ -26,6 +27,7 @@ import org.apache.hadoop.mapred.StatusHttpServer;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import org.apache.hadoop.metrics.MetricsRecord;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -100,6 +102,54 @@ public class DataNode implements FSConstants, Runnable {
     private static InetSocketAddress nameNodeAddr;
     private static DataNode datanodeObject = null;
     static Date startTime = new Date(System.currentTimeMillis());
+    private class DataNodeMetrics {
+      private MetricsRecord metricsRecord = null;
+      
+      
+      private long bytesWritten = 0L;
+      private long bytesRead = 0L;
+      private long blocksWritten = 0L;
+      private long blocksRead = 0L;
+      private long blocksReplicated = 0L;
+      private long blocksRemoved = 0L;
+      
+      DataNodeMetrics() {
+        metricsRecord = Metrics.createRecord("dfs", "datanode");
+      }
+      
+      synchronized void readBytes(int nbytes) {
+        bytesRead += nbytes;
+        Metrics.report(metricsRecord, "bytes-read", bytesRead);
+      }
+      
+      synchronized void wroteBytes(int nbytes) {
+        bytesWritten += nbytes;
+        Metrics.report(metricsRecord, "bytes-written", bytesWritten);
+      }
+      
+      synchronized void readBlocks(int nblocks) {
+        blocksRead += nblocks;
+        Metrics.report(metricsRecord, "blocks-read", blocksRead);
+      }
+      
+      synchronized void wroteBlocks(int nblocks) {
+        blocksWritten += nblocks;
+        Metrics.report(metricsRecord, "blocks-written", blocksWritten);
+      }
+      
+      synchronized void replicatedBlocks(int nblocks) {
+        blocksReplicated += nblocks;
+        Metrics.report(metricsRecord, "blocks-replicated", blocksReplicated);
+      }
+      
+      synchronized void removedBlocks(int nblocks) {
+        blocksRemoved += nblocks;
+        Metrics.report(metricsRecord, "blocks-removed", blocksRemoved);
+      }
+    }
+    
+    DataNodeMetrics myMetrics = new DataNodeMetrics();
+
     /**
      * Create the DataNode given a configuration and a dataDir.
      * 'dataDir' is where the blocks are stored.
@@ -344,7 +394,9 @@ public class DataNode implements FSConstants, Runnable {
                 // Some local block(s) are obsolete and can be 
                 // safely garbage-collected.
                 //
-                data.invalidate(cmd.getBlocks());
+                Block toDelete[] = cmd.getBlocks();
+                data.invalidate(toDelete);
+                myMetrics.removedBlocks(toDelete.length);
               } else if( cmd.shutdownNode()) {
                 // shut down the data node
                 this.shutdown();
@@ -556,6 +608,7 @@ public class DataNode implements FSConstants, Runnable {
                       int bytesRead = 0;
                       try {
                           bytesRead = in2.read(buf);
+                          myMetrics.readBytes(bytesRead);
                       } catch (IOException iex) {
                           shutdown();
                           throw iex;
@@ -565,6 +618,7 @@ public class DataNode implements FSConstants, Runnable {
                           len -= bytesRead;
                           try {
                               bytesRead = in2.read(buf);
+                              myMetrics.readBytes(bytesRead);
                           } catch (IOException iex) {
                               shutdown();
                               throw iex;
@@ -582,6 +636,7 @@ public class DataNode implements FSConstants, Runnable {
                       }
                   }
               }
+              myMetrics.readBlocks(1);
               LOG.info("Served block " + b + " to " + s.getInetAddress());
           } finally {
               out.close();
@@ -660,6 +715,7 @@ public class DataNode implements FSConstants, Runnable {
                   }
                   out2.write(encodingType);
                   out2.writeLong(len);
+                  myMetrics.replicatedBlocks(1);
                 } catch (IOException ie) {
                   if (out2 != null) {
                     LOG.info("Exception connecting to mirror " + mirrorNode 
@@ -692,6 +748,7 @@ public class DataNode implements FSConstants, Runnable {
                   if (bytesRead > 0) {
                     try {
                       out.write(buf, 0, bytesRead);
+                      myMetrics.wroteBytes(bytesRead);
                     } catch (IOException iex) {
                       shutdown();
                       throw iex;
@@ -794,6 +851,7 @@ public class DataNode implements FSConstants, Runnable {
               }
             }
             data.finalizeBlock(b);
+            myMetrics.wroteBlocks(1);
             
             // 
             // Tell the namenode that we've received this block 

+ 11 - 2
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -23,6 +23,8 @@ import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.Metrics;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -45,6 +47,8 @@ class FSDirectory implements FSConstants {
     private static final byte OP_MKDIR = 3;
     private static final byte OP_SET_REPLICATION = 4;
 
+    private int numFilesDeleted = 0;
+    
     /******************************************************
      * We keep an in-memory representation of the file/block
      * hierarchy.
@@ -190,6 +194,7 @@ class FSDirectory implements FSConstants {
                     v.add(blocks[i]);
                 }
             }
+            Metrics.report(metricsRecord, "files-deleted", ++numFilesDeleted);
             for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 child.collectSubtreeBlocks(v);
@@ -297,6 +302,8 @@ class FSDirectory implements FSConstants {
     boolean ready = false;
     int namespaceID = 0;  /// a persistent attribute of the namespace
 
+    private MetricsRecord metricsRecord = null;
+    
     /** Access an existing dfs name directory. */
     public FSDirectory(File dir, Configuration conf) throws IOException {
         File fullimage = new File(dir, "image");
@@ -314,6 +321,8 @@ class FSDirectory implements FSConstants {
             this.editlog = new DataOutputStream(new FileOutputStream(edits));
             editlog.writeInt( DFS_CURRENT_VERSION );
         }
+     
+        metricsRecord = Metrics.createRecord("dfs", "namenode");
     }
 
     /** Create a new dfs name directory.  Caution: this destroys all files
@@ -651,7 +660,7 @@ class FSDirectory implements FSConstants {
             }
         }
     }
-
+    
     /**
      * Add the given filename to the fs.
      */
@@ -668,7 +677,7 @@ class FSDirectory implements FSConstants {
                     +blocks.length+" blocks to the file system" );
            return false;
         }
-        // add create file record to log
+        // add createRecord file record to log
         UTF8 nameReplicationPair[] = new UTF8[] { 
                               path, 
                               toLogReplication( replication )};

+ 50 - 3
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -23,6 +23,9 @@ import org.apache.hadoop.conf.*;
 
 import java.io.*;
 
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.Metrics;
+
 /**********************************************************
  * NameNode serves as both directory namespace manager and
  * "inode table" for the Hadoop DFS.  There is a single NameNode
@@ -81,6 +84,38 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       FSDirectory.format(getDir(conf), conf);
     }
 
+    private class NameNodeMetrics {
+      private MetricsRecord metricsRecord = null;
+      
+      private long numFilesCreated = 0L;
+      private long numFilesOpened = 0L;
+      private long numFilesRenamed = 0L;
+      private long numFilesListed = 0L;
+      
+      NameNodeMetrics() {
+        metricsRecord = Metrics.createRecord("dfs", "namenode");
+      }
+      
+      synchronized void createFile() {
+        Metrics.report(metricsRecord, "files-created", ++numFilesCreated);
+      }
+      
+      synchronized void openFile() {
+        Metrics.report(metricsRecord, "files-opened", ++numFilesOpened);
+      }
+      
+      synchronized void renameFile() {
+        Metrics.report(metricsRecord, "files-renamed", ++numFilesRenamed);
+      }
+      
+      synchronized void listFile(int nfiles) {
+        numFilesListed += nfiles;
+        Metrics.report(metricsRecord, "files-listed", numFilesListed);
+      }
+    }
+    
+    private NameNodeMetrics myMetrics = null;
+    
     /**
      * Create a NameNode at the default location
      */
@@ -93,13 +128,14 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     /**
      * Create a NameNode at the specified location and start it.
      */
-    public NameNode(File dir, int port, Configuration conf) throws IOException {     
+    public NameNode(File dir, int port, Configuration conf) throws IOException {
         this.namesystem = new FSNamesystem(dir, conf);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
         this.server = RPC.getServer(this, port, handlerCount, false, conf);
         this.datanodeStartupPeriod =
             conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
         this.server.start();
+        myMetrics = new NameNodeMetrics();
     }
 
     /** Return the configured directory where name data is stored. */
@@ -133,6 +169,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     /////////////////////////////////////////////////////
     // ClientProtocol
     /////////////////////////////////////////////////////
+    
     /**
      */
     public LocatedBlock[] open(String src) throws IOException {
@@ -140,6 +177,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         if (openResults == null) {
             throw new IOException("Cannot open filename " + src);
         } else {
+            myMetrics.openFile();
             Block blocks[] = (Block[]) openResults[0];
             DatanodeInfo sets[][] = (DatanodeInfo[][]) openResults[1];
             LocatedBlock results[] = new LocatedBlock[blocks.length];
@@ -167,6 +205,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
                                                 overwrite,
                                                 replication,
                                                 blockSize);
+       myMetrics.createFile();
         Block b = (Block) results[0];
         DatanodeInfo targets[] = (DatanodeInfo[]) results[1];
         return new LocatedBlock(b, targets);
@@ -265,7 +304,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
      */
     public boolean rename(String src, String dst) throws IOException {
         stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst );
-        return namesystem.renameTo(new UTF8(src), new UTF8(dst));
+        boolean ret = namesystem.renameTo(new UTF8(src), new UTF8(dst));
+        if (ret) {
+            myMetrics.renameFile();
+        }
+        return ret;
     }
 
     /**
@@ -329,7 +372,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     /**
      */
     public DFSFileInfo[] getListing(String src) throws IOException {
-        return namesystem.getListing(new UTF8(src));
+        DFSFileInfo[] files = namesystem.getListing(new UTF8(src));
+        if (files != null) {
+            myMetrics.listFile(files.length);
+        }
+        return files;
     }
 
     /**

+ 61 - 0
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -27,6 +27,9 @@ import java.net.*;
 import java.text.NumberFormat;
 import java.util.*;
 
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.Metrics;
+
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
@@ -320,7 +323,53 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         }
     }
 
+    static class JobTrackerMetrics {
+      private MetricsRecord metricsRecord = null;
+      
+      private long numMapTasksLaunched = 0L;
+      private long numMapTasksCompleted = 0L;
+      private long numReduceTasksLaunched = 0L;
+      private long numReduceTasksCompleted = 0L;
+      private long numJobsSubmitted = 0L;
+      private long numJobsCompleted = 0L;
+      
+      JobTrackerMetrics() {
+        metricsRecord = Metrics.createRecord("mapred", "jobtracker");
+      }
+      
+      synchronized void launchMap() {
+        Metrics.report(metricsRecord, "maps-launched",
+            ++numMapTasksLaunched);
+      }
+      
+      synchronized void completeMap() {
+        Metrics.report(metricsRecord, "maps-completed",
+            ++numMapTasksCompleted);
+      }
+      
+      synchronized void launchReduce() {
+        Metrics.report(metricsRecord, "reduces-launched",
+            ++numReduceTasksLaunched);
+      }
+      
+      synchronized void completeReduce() {
+        Metrics.report(metricsRecord, "reduces-completed",
+            ++numReduceTasksCompleted);
+      }
+      
+      synchronized void submitJob() {
+        Metrics.report(metricsRecord, "jobs-submitted",
+            ++numJobsSubmitted);
+      }
+      
+      synchronized void completeJob() {
+        Metrics.report(metricsRecord, "jobs-completed",
+            ++numJobsCompleted);
+      }
+    }
 
+    private JobTrackerMetrics myMetrics = null;
+    
     /////////////////////////////////////////////////////////////////
     // The real JobTracker
     ////////////////////////////////////////////////////////////////
@@ -459,6 +508,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
         this.startTime = System.currentTimeMillis();
 
+        myMetrics = new JobTrackerMetrics();
         this.expireTrackersThread = new Thread(this.expireTrackers);
         this.expireTrackersThread.start();
         this.retireJobsThread = new Thread(this.retireJobs);
@@ -795,6 +845,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                     Task t = job.obtainNewMapTask(taskTracker, tts);
                     if (t != null) {
                       expireLaunchingTasks.addNewTask(t.getTaskId());
+                      myMetrics.launchMap();
                       return t;
                     }
 
@@ -831,6 +882,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                     Task t = job.obtainNewReduceTask(taskTracker, tts);
                     if (t != null) {
                       expireLaunchingTasks.addNewTask(t.getTaskId());
+                      myMetrics.launchReduce();
                       return t;
                     }
 
@@ -950,6 +1002,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 }
             }
         }
+        myMetrics.submitJob();
         return job.getStatus();
     }
 
@@ -1096,6 +1149,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
                 if (report.getRunState() == TaskStatus.SUCCEEDED) {
                     job.completedTask(tip, report);
+                    if (tip.isMapTask()) {
+                        myMetrics.completeMap();
+                    } else {
+                        myMetrics.completeReduce();
+                    }
+                    if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+                        myMetrics.completeJob();
+                    }
                 } else if (report.getRunState() == TaskStatus.FAILED) {
                     // Tell the job to fail the relevant task
                     job.failedTask(tip, report.getTaskId(), report, 

+ 8 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {
@@ -35,6 +36,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
   private int map_tasks = 0;
   private int reduce_tasks = 0;
 
+  private JobTrackerMetrics myMetrics = null;
+
   public long getProtocolVersion(String protocol, long clientVersion) {
     return JobSubmissionProtocol.versionID;
   }
@@ -98,7 +101,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
+          myMetrics.launchMap();
           map.run(localConf, this);
+          myMetrics.completeMap();
           map_tasks -= 1;
         }
 
@@ -114,7 +119,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
           this.mapoutputFile.removeAll(mapId);
         }
 
-        // run a single reduce task
         {
           ReduceTask reduce = new ReduceTask(jobId, file, 
                                              reduceId, 0, mapIds.size());
@@ -122,7 +126,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
           reduce.localizeConfiguration(localConf);
           reduce.setConf(localConf);
           reduce_tasks += 1;
+          myMetrics.launchReduce();
           reduce.run(localConf, this);
+          myMetrics.completeReduce();
           reduce_tasks -= 1;
         }
         this.mapoutputFile.removeAll(reduceId);
@@ -188,6 +194,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public LocalJobRunner(Configuration conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.conf = conf;
+    myMetrics = new JobTrackerMetrics();
   }
 
   // JobSubmissionProtocol methods

+ 56 - 3
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -21,10 +21,17 @@ import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.metrics.MetricsRecord;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.metrics.Metrics;
 
 /** A Map task. */
 class MapTask extends Task {
 
+    public static final Log LOG =
+        LogFactory.getLog("org.apache.hadoop.mapred.MapTask");
+
   static {                                        // register a ctor
     WritableFactories.setFactory
       (MapTask.class,
@@ -33,6 +40,42 @@ class MapTask extends Task {
        });
   }
 
+  
+  private class MapTaskMetrics {
+    private MetricsRecord metricsRecord = null;
+    
+    private long numInputRecords = 0L;
+    private long numInputBytes = 0L;
+    private long numOutputRecords = 0L;
+    private long numOutputBytes = 0L;
+    
+    MapTaskMetrics(String taskId) {
+      metricsRecord = Metrics.createRecord("mapred", "map", "taskid", taskId);
+    }
+    
+    private void reportMetric(String name, long value) {
+      if (metricsRecord != null) {
+        metricsRecord.setMetric(name, value);
+        metricsRecord.update();
+      }
+    }
+    
+    synchronized void mapInput(long numBytes) {
+      Metrics.report(metricsRecord, "input-records", ++numInputRecords);
+      numInputBytes += numBytes;
+      Metrics.report(metricsRecord, "input-bytes", numInputBytes);
+    }
+    
+    synchronized void mapOutput(long numBytes) {
+      Metrics.report(metricsRecord, "output-records", ++numOutputRecords);
+      numOutputBytes += numBytes;
+      Metrics.report(metricsRecord, "output-bytes", numOutputBytes);
+    }
+    
+  }
+  
+  private MapTaskMetrics myMetrics = null;
+
   private FileSplit split;
   private MapOutputFile mapOutputFile = new MapOutputFile();
   private JobConf conf;
@@ -43,6 +86,7 @@ class MapTask extends Task {
                  int partition, FileSplit split) {
     super(jobId, jobFile, taskId, partition);
     this.split = split;
+    myMetrics = new MapTaskMetrics(taskId);
   }
 
   public boolean isMapTask() {
@@ -72,6 +116,9 @@ class MapTask extends Task {
 
     split = new FileSplit();
     split.readFields(in);
+    if (myMetrics == null) {
+        myMetrics = new MapTaskMetrics(getTaskId());
+    }
   }
 
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
@@ -91,6 +138,7 @@ class MapTask extends Task {
                                   job.getMapOutputKeyClass(),
                                   job.getMapOutputValueClass(),
                                   compressTemps);
+        LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
       }
 
       final Partitioner partitioner =
@@ -100,9 +148,11 @@ class MapTask extends Task {
           public synchronized void collect(WritableComparable key,
                                            Writable value)
             throws IOException {
-            outs[partitioner.getPartition(key, value, partitions)]
-              .append(key, value);
+            SequenceFile.Writer out = outs[partitioner.getPartition(key, value, partitions)];
+            long beforePos = out.getLength();
+            out.append(key, value);
             reportProgress(umbilical);
+            myMetrics.mapOutput(out.getLength() - beforePos);
           }
         };
 
@@ -128,7 +178,10 @@ class MapTask extends Task {
               (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
             reportProgress(umbilical, progress);
 
-            return rawIn.next(key, value);
+            long beforePos = getPos();
+            boolean ret = rawIn.next(key, value);
+            myMetrics.mapInput(getPos() - beforePos);
+            return ret;
           }
           public long getPos() throws IOException { return rawIn.getPos(); }
           public void close() throws IOException { rawIn.close(); }

+ 34 - 0
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -19,12 +19,17 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
 import java.util.*;
 import java.text.*;
 
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+
 /** A Reduce task. */
 class ReduceTask extends Task {
 
@@ -36,6 +41,29 @@ class ReduceTask extends Task {
        });
   }
 
+  private class ReduceTaskMetrics {
+    private MetricsRecord metricsRecord = null;
+    
+    private long numInputRecords = 0L;
+    private long numOutputRecords = 0L;
+    
+    ReduceTaskMetrics(String taskId) {
+      metricsRecord = Metrics.createRecord("mapred", "reduce", "taskid", taskId);
+    }
+    
+    synchronized void reduceInput() {
+      Metrics.report(metricsRecord, "input-records", ++numInputRecords);
+    }
+    
+    synchronized void reduceOutput() {
+      Metrics.report(metricsRecord, "output-records", ++numOutputRecords);
+    }
+  }
+  
+  private ReduceTaskMetrics myMetrics = null;
+  
+  private UTF8 jobId = new UTF8();
+
   private int numMaps;
   private boolean sortComplete;
 
@@ -53,6 +81,7 @@ class ReduceTask extends Task {
                     int partition, int numMaps) {
     super(jobId, jobFile, taskId, partition);
     this.numMaps = numMaps;
+    myMetrics = new ReduceTaskMetrics(taskId);
   }
 
   public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -83,6 +112,9 @@ class ReduceTask extends Task {
     super.readFields(in);
 
     numMaps = in.readInt();
+    if (myMetrics == null) {
+        myMetrics = new ReduceTaskMetrics(getTaskId());
+    }
   }
 
   /** Iterates values while keys match in sorted input. */
@@ -224,6 +256,7 @@ class ReduceTask extends Task {
         public void collect(WritableComparable key, Writable value)
           throws IOException {
           out.write(key, value);
+          myMetrics.reduceOutput();
           reportProgress(umbilical);
         }
       };
@@ -235,6 +268,7 @@ class ReduceTask extends Task {
       ValuesIterator values = new ValuesIterator(in, length, comparator,
                                                  umbilical);
       while (values.more()) {
+        myMetrics.reduceInput();
         reducer.reduce(values.getKey(), values, collector, reporter);
         values.nextKey();
       }

+ 31 - 1
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -19,6 +19,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -26,6 +27,10 @@ import java.io.*;
 import java.net.*;
 import java.util.*;
 
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
  * in a networked environment.  It contacts the JobTracker
@@ -91,6 +96,28 @@ public class TaskTracker
     private int maxCurrentTasks;
     private int failures;
     private int finishedCount[] = new int[1];
+    
+    private class TaskTrackerMetrics {
+      private MetricsRecord metricsRecord = null;
+      
+      private long totalTasksCompleted = 0L;
+      
+      TaskTrackerMetrics() {
+        metricsRecord = Metrics.createRecord("mapred", "tasktracker");
+      }
+      
+      synchronized void completeTask() {
+        if (metricsRecord != null) {
+          metricsRecord.setMetric("tasks-completed", ++totalTasksCompleted);
+          metricsRecord.setMetric("maps-running", mapTotal);
+          metricsRecord.setMetric("reduce-running", reduceTotal);
+          metricsRecord.update();
+        }
+      }
+    }
+    
+    private TaskTrackerMetrics myMetrics = null;
+
     /**
      * A list of tips that should be cleaned up.
      */
@@ -143,6 +170,8 @@ public class TaskTracker
         this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
         
         
+        this.myMetrics = new TaskTrackerMetrics();
+        
         // port numbers
         this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
 
@@ -333,6 +362,7 @@ public class TaskTracker
                         } else {
                             reduceTotal--;
                         }
+                        myMetrics.completeTask();
                         it.remove();
                     }
                 }
@@ -354,7 +384,7 @@ public class TaskTracker
             }
 
             //
-            // Check if we should create a new Task
+            // Check if we should createRecord a new Task
             //
             try {
               if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {

+ 104 - 0
src/java/org/apache/hadoop/metrics/Metrics.java

@@ -0,0 +1,104 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class to simplify creation and reporting of hadoop metrics.
+ * For examples of usage, see {@link org.apache.hadoop.dfs.DataNode}.
+ * @see org.apache.hadoop.metrics.MetricsRecord
+ * @see org.apache.hadoop.metrics.MetricsContext
+ * @see org.apache.hadoop.metrics.ContextFactory
+ * @author Milind Bhandarkar
+ */
+public class Metrics {
+  private static final Log LOG =
+      LogFactory.getLog("org.apache.hadoop.util.MetricsUtil");
+  
+  /**
+   * Don't allow creation of a new instance of Metrics
+   */
+  private Metrics() {}
+  
+  /**
+   * Utility method to create and return
+   * a new tagged metrics record instance within the
+   * given <code>contextName</code>.
+   * If exception is thrown while creating the record for any reason, it is
+   * logged, and a null record is returned.
+   * @param contextName name of the context
+   * @param recordName the name of the record
+   * @param tagName name of the tag field of metrics record
+   * @param tagValue value of the tag field
+   * @return newly created metrics record
+   */
+  public static MetricsRecord createRecord(String contextName, String recordName,
+      String tagName, String tagValue) {
+    try {
+      MetricsContext metricsContext =
+          ContextFactory.getFactory().getContext(contextName);
+      if (!metricsContext.isMonitoring()) {metricsContext.startMonitoring();}
+      MetricsRecord metricsRecord = metricsContext.createRecord(recordName);
+      metricsRecord.setTag(tagName, tagValue);
+      return metricsRecord;
+    } catch (Throwable ex) {
+      LOG.warn("Could not create metrics record with context:"+contextName, ex);
+      return null;
+    }
+  }
+  
+  /**
+   * Utility method to create and return new metrics record instance within the
+   * given <code>contextName</code>. This record is tagged with hostname.
+   * If exception is thrown while creating the record due to any reason, it is
+   * logged, and a null record is returned.
+   * @param contextName name of the context
+   * @param recordName name of the record
+   * @return newly created metrics record
+   */
+  public static MetricsRecord createRecord(String contextName,
+      String recordName) {
+    String hostname = null;
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException ex) {
+      LOG.info("Cannot get hostname", ex);
+      hostname = "unknown";
+    }
+    return createRecord(contextName, recordName, "hostname", hostname);
+  }
+  
+  /**
+   * Sets the named metric to the specified value in the given metrics record.
+   * Updates the table of buffered data which is to be sent periodically.
+   *
+   * @param record record for which the metric is updated
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   */
+  public static void report(MetricsRecord record, String metricName,
+      long metricValue) {
+    if (record != null) {
+      record.setMetric(metricName, metricValue);
+      record.update();
+    }
+  }
+}