浏览代码

HADOOP-1480. Add counters to the C++ Pipes API. (acmurthy via omalley)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@686893 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 17 年之前
父节点
当前提交
edbf4de3fd

+ 2 - 0
CHANGES.txt

@@ -92,6 +92,8 @@ Trunk (unreleased changes)
     HADOOP-3585. FailMon package for hardware failure monitoring and 
     analysis of anomalies. (Ioannis Koltsidas via dhruba)
 
+    HADOOP-1480. Add counters to the C++ Pipes API. (acmurthy via omalley)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

+ 24 - 0
src/c++/pipes/api/hadoop/Pipes.hh

@@ -56,6 +56,19 @@ public:
  */
 class TaskContext {
 public:
+  /**
+   * Counter to keep track of a property and its value.
+   */
+  class Counter {
+  private:
+    int id;
+  public:
+    Counter(int counterId) : id(counterId) {}
+    Counter(const Counter& counter) : id(counter.id) {}
+
+    int getId() const { return id; }
+  };
+  
   /**
    * Get the JobConf for the current task.
    */
@@ -89,6 +102,17 @@ public:
    */
   virtual void setStatus(const std::string& status) = 0;
 
+  /**
+   * Register a counter with the given group and name.
+   */
+  virtual Counter* 
+    getCounter(const std::string& group, const std::string& name) = 0;
+
+  /**
+   * Increment the value of the counter with the given amount.
+   */
+  virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
+  
   virtual ~TaskContext() {}
 };
 

+ 62 - 2
src/c++/pipes/impl/HadoopPipes.cc

@@ -99,6 +99,10 @@ namespace HadoopPipes {
     virtual void status(const string& message) = 0;
     virtual void progress(float progress) = 0;
     virtual void done() = 0;
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) = 0;
+    virtual void 
+      incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
     virtual ~UpwardProtocol() {}
   };
 
@@ -150,6 +154,19 @@ namespace HadoopPipes {
               lineSeparator);
     }
 
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) {
+      fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
+              fieldSeparator, group.c_str(), fieldSeparator, name.c_str(), 
+              lineSeparator);
+    }
+
+    virtual void incrementCounter(const TaskContext::Counter* counter, 
+                                  uint64_t amount) {
+      fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(), 
+              fieldSeparator, (long)amount, lineSeparator);
+    }
+    
     virtual void done() {
       fprintf(stream, "done%c", lineSeparator);
     }
@@ -272,8 +289,9 @@ namespace HadoopPipes {
 
   enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
                      MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
-                     CLOSE, ABORT,
-                     OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE};
+                     CLOSE, ABORT, 
+                     OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
+                     REGISTER_COUNTER, INCREMENT_COUNTER};
 
   class BinaryUpwardProtocol: public UpwardProtocol {
   private:
@@ -313,6 +331,21 @@ namespace HadoopPipes {
       serializeInt(DONE, *stream);
     }
 
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) {
+      serializeInt(REGISTER_COUNTER, *stream);
+      serializeInt(id, *stream);
+      serializeString(group, *stream);
+      serializeString(name, *stream);
+    }
+
+    virtual void incrementCounter(const TaskContext::Counter* counter, 
+                                  uint64_t amount) {
+      serializeInt(INCREMENT_COUNTER, *stream);
+      serializeInt(counter->getId(), *stream);
+      serializeLong(amount, *stream);
+    }
+    
     ~BinaryUpwardProtocol() {
       delete stream;
     }
@@ -505,6 +538,14 @@ namespace HadoopPipes {
       return valueItr != endValueItr;
     }
     
+    virtual Counter* getCounter(const std::string& group, 
+                               const std::string& name) {
+      return baseContext->getCounter(group, name);
+    }
+
+    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
+      baseContext->incrementCounter(counter, amount);
+    }
   };
 
   /**
@@ -586,6 +627,7 @@ namespace HadoopPipes {
     int numReduces;
     const Factory* factory;
     pthread_mutex_t mutexDone;
+    std::vector<int> registeredCounterIds;
 
   public:
 
@@ -838,6 +880,24 @@ namespace HadoopPipes {
       }
     }
 
+    /**
+     * Register a counter with the given group and name.
+     */
+    virtual Counter* getCounter(const std::string& group, 
+                               const std::string& name) {
+      int id = registeredCounterIds.size();
+      registeredCounterIds.push_back(id);
+      uplink->registerCounter(id, group, name);
+      return new Counter(id);
+    }
+
+    /**
+     * Increment the value of the counter with the given amount.
+     */
+    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
+      uplink->incrementCounter(counter, amount); 
+    }
+
     void closeAll() {
       if (reader) {
         reader->close();

+ 18 - 2
src/examples/pipes/impl/wordcount-nopipe.cc

@@ -24,27 +24,43 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
 class WordCountMap: public HadoopPipes::Mapper {
 public:
-  WordCountMap(HadoopPipes::MapContext& context){}
+  HadoopPipes::TaskContext::Counter* inputWords;
+  
+  WordCountMap(HadoopPipes::TaskContext& context) {
+    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+  }
+  
   void map(HadoopPipes::MapContext& context) {
     std::vector<std::string> words = 
       HadoopUtils::splitString(context.getInputValue(), " ");
     for(unsigned int i=0; i < words.size(); ++i) {
       context.emit(words[i], "1");
     }
+    context.incrementCounter(inputWords, words.size());
   }
 };
 
 class WordCountReduce: public HadoopPipes::Reducer {
 public:
-  WordCountReduce(HadoopPipes::ReduceContext& context){}
+  HadoopPipes::TaskContext::Counter* outputWords;
+
+  WordCountReduce(HadoopPipes::TaskContext& context) {
+    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+  }
+
   void reduce(HadoopPipes::ReduceContext& context) {
     int sum = 0;
     while (context.nextValue()) {
       sum += HadoopUtils::toInt(context.getInputValue());
     }
     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+    context.incrementCounter(outputWords, 1); 
   }
 };
 

+ 18 - 2
src/examples/pipes/impl/wordcount-part.cc

@@ -20,27 +20,43 @@
 #include "hadoop/TemplateFactory.hh"
 #include "hadoop/StringUtils.hh"
 
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
 class WordCountMap: public HadoopPipes::Mapper {
 public:
-  WordCountMap(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* inputWords;
+  
+  WordCountMap(HadoopPipes::TaskContext& context) {
+    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+  }
+  
   void map(HadoopPipes::MapContext& context) {
     std::vector<std::string> words = 
       HadoopUtils::splitString(context.getInputValue(), " ");
     for(unsigned int i=0; i < words.size(); ++i) {
       context.emit(words[i], "1");
     }
+    context.incrementCounter(inputWords, words.size());
   }
 };
 
 class WordCountReduce: public HadoopPipes::Reducer {
 public:
-  WordCountReduce(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* outputWords;
+
+  WordCountReduce(HadoopPipes::TaskContext& context) {
+    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+  }
+
   void reduce(HadoopPipes::ReduceContext& context) {
     int sum = 0;
     while (context.nextValue()) {
       sum += HadoopUtils::toInt(context.getInputValue());
     }
     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+    context.incrementCounter(outputWords, 1); 
   }
 };
 

+ 18 - 2
src/examples/pipes/impl/wordcount-simple.cc

@@ -20,27 +20,43 @@
 #include "hadoop/TemplateFactory.hh"
 #include "hadoop/StringUtils.hh"
 
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
 class WordCountMap: public HadoopPipes::Mapper {
 public:
-  WordCountMap(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* inputWords;
+  
+  WordCountMap(HadoopPipes::TaskContext& context) {
+    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+  }
+  
   void map(HadoopPipes::MapContext& context) {
     std::vector<std::string> words = 
       HadoopUtils::splitString(context.getInputValue(), " ");
     for(unsigned int i=0; i < words.size(); ++i) {
       context.emit(words[i], "1");
     }
+    context.incrementCounter(inputWords, words.size());
   }
 };
 
 class WordCountReduce: public HadoopPipes::Reducer {
 public:
-  WordCountReduce(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* outputWords;
+
+  WordCountReduce(HadoopPipes::TaskContext& context) {
+    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+  }
+
   void reduce(HadoopPipes::ReduceContext& context) {
     int sum = 0;
     while (context.nextValue()) {
       sum += HadoopUtils::toInt(context.getInputValue());
     }
     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+    context.incrementCounter(outputWords, 1); 
   }
 };
 

+ 10 - 0
src/mapred/org/apache/hadoop/mapred/Counters.java

@@ -318,6 +318,16 @@ public class Counters implements Writable, Iterable<Counters.Group> {
     return counter;    
   }
 
+  /**
+   * Find a counter given the group and the name.
+   * @param group the name of the group
+   * @param name the internal name of the counter
+   * @return the counter for that name
+   */
+  public synchronized Counter findCounter(String group, String name) {
+    return getGroup(group).getCounterForName(name);
+  }
+
   /**
    * Find a counter by using strings
    * @param group the name of the group

+ 13 - 0
src/mapred/org/apache/hadoop/mapred/Reporter.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.Progressable;
 
 /** 
@@ -46,6 +47,9 @@ public interface Reporter extends Progressable {
       }
       public void progress() {
       }
+      public Counter getCounter(String group, String name) {
+        return null;
+      }
       public void incrCounter(Enum key, long amount) {
       }
       public void incrCounter(String group, String counter, long amount) {
@@ -62,6 +66,15 @@ public interface Reporter extends Progressable {
    */
   public abstract void setStatus(String status);
   
+  /**
+   * Get the {@link Counter} of the given group with the given name.
+   * 
+   * @param group counter group
+   * @param name counter name
+   * @return the <code>Counter</code> of the given group/name.
+   */
+  public abstract Counter getCounter(String group, String name);
+  
   /**
    * Increments the counter identified by the key, which can be of
    * any {@link Enum} type, by the specified amount.

+ 8 - 0
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -408,6 +409,13 @@ abstract class Task implements Writable, Configurable {
           // indicate that progress update needs to be sent
           setProgressFlag();
         }
+        public Counters.Counter getCounter(String group, String name) {
+          Counters.Counter counter = null;
+          if (counters != null) {
+            counter = counters.findCounter(group, name);
+          }
+          return counter;
+        }
         public void incrCounter(Enum key, long amount) {
           if (counters != null) {
             counters.incrCounter(key, amount);

+ 12 - 1
src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

@@ -73,7 +73,9 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
                                     PARTITIONED_OUTPUT(51),
                                     STATUS(52),
                                     PROGRESS(53),
-                                    DONE(54);
+                                    DONE(54),
+                                    REGISTER_COUNTER(55),
+                                    INCREMENT_COUNTER(56);
     final int code;
     MessageType(int code) {
       this.code = code;
@@ -124,6 +126,15 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
             handler.status(Text.readString(inStream));
           } else if (cmd == MessageType.PROGRESS.code) {
             handler.progress(inStream.readFloat());
+          } else if (cmd == MessageType.REGISTER_COUNTER.code) {
+            int id = WritableUtils.readVInt(inStream);
+            String group = Text.readString(inStream);
+            String name = Text.readString(inStream);
+            handler.registerCounter(id, group, name);
+          } else if (cmd == MessageType.INCREMENT_COUNTER.code) {
+            int id = WritableUtils.readVInt(inStream);
+            long amount = WritableUtils.readVLong(inStream);
+            handler.incrementCounter(id, amount);
           } else if (cmd == MessageType.DONE.code) {
             LOG.debug("Pipe child done");
             handler.done();

+ 23 - 1
src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java

@@ -19,9 +19,14 @@
 package org.apache.hadoop.mapred.pipes;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
@@ -37,7 +42,9 @@ class OutputHandler<K extends WritableComparable,
   private float progressValue = 0.0f;
   private boolean done = false;
   private Throwable exception = null;
-  
+  private Map<Integer, Counters.Counter> registeredCounters = 
+    new HashMap<Integer, Counters.Counter>();
+
   /**
    * Create a handler that will handle any records output from the application.
    * @param collector the "real" collector that takes the output
@@ -121,4 +128,19 @@ class OutputHandler<K extends WritableComparable,
     }
     return done;
   }
+
+  public void registerCounter(int id, String group, String name) throws IOException {
+    Counters.Counter counter = reporter.getCounter(group, name);
+    registeredCounters.put(id, counter);
+  }
+
+  public void incrementCounter(int id, long amount) throws IOException {
+    if (id < registeredCounters.size()) {
+      Counters.Counter counter = registeredCounters.get(id);
+      counter.increment(amount);
+    } else {
+      throw new IOException("Invalid counter with id: " + id);
+    }
+  }
+
 }

+ 16 - 0
src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java

@@ -72,4 +72,20 @@ interface UpwardProtocol<K extends WritableComparable, V extends Writable> {
    * @param e
    */
   void failed(Throwable e);
+  
+  /**
+   * Register a counter with the given id and group/name.
+   * @param group counter group
+   * @param name counter name
+   * @throws IOException
+   */
+  void registerCounter(int id, String group, String name) throws IOException;
+  
+  /**
+   * Increment the value of a registered counter.
+   * @param id counter id of the registered counter
+   * @param amount increment for the counter value
+   * @throws IOException
+   */
+  void incrementCounter(int id, long amount) throws IOException;
 }

+ 11 - 0
src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -37,6 +38,7 @@ import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.StringUtils;
 
 import junit.framework.TestCase;
@@ -164,6 +166,15 @@ public class TestPipes extends TestCase {
         rJob = Submitter.runJob(job);
       }
       assertTrue("pipes job failed", rJob.isSuccessful());
+      
+      Counters counters = rJob.getCounters();
+      Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
+      int numCounters = 0;
+      for (Counter c : wordCountCounters) {
+        System.out.println(c);
+        ++numCounters;
+      }
+      assertTrue("No counters found!", (numCounters > 0));
     }
 
     List<String> results = new ArrayList<String>();