Browse Source

HADOOP-3747. Adds counter suport for MultipleOutputs. Contributed by Alejandro Abdelnur.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@679859 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
2b691973e8

+ 3 - 0
CHANGES.txt

@@ -107,6 +107,9 @@ Trunk (unreleased changes)
     shuffle and the backoff logic is dependent on the type of timeout.
     (Jothi Padmanabhan via ddas)
 
+    HADOOP-3747. Adds counter suport for MultipleOutputs. 
+    (Alejandro Abdelnur via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 92 - 1
src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java

@@ -45,6 +45,13 @@ import java.util.*;
  * key/values written to the job <code>OutputCollector</code> are part of the
  * reduce phase.
  * <p/>
+ * MultipleOutputs supports counters, by default the are disabled. The counters
+ * group is the {@link MultipleOutputs} class name.
+ * </p>
+ * The names of the counters are the same as the named outputs. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ * <p/>
  * Job configuration usage pattern is:
  * <pre>
  *
@@ -115,6 +122,13 @@ public class MultipleOutputs {
   private static final String VALUE = ".value";
   private static final String MULTI = ".multi";
 
+  private static final String COUNTERS_ENABLED = "mo.counters";
+
+  /**
+   * Counters group used by the the counters of MultipleOutputs.
+   */
+  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+
   /**
    * Checks if a named output is alreadyDefined or not.
    *
@@ -319,12 +333,52 @@ public class MultipleOutputs {
     conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
   }
 
+  /**
+   * Enables or disables counters for the named outputs.
+   * <p/>
+   * By default these counters are disabled.
+   * <p/>
+   * MultipleOutputs supports counters, by default the are disabled.
+   * The counters group is the {@link MultipleOutputs} class name.
+   * </p>
+   * The names of the counters are the same as the named outputs. For multi
+   * named outputs the name of the counter is the concatenation of the named
+   * output, and underscore '_' and the multiname.
+   *
+   * @param conf    job conf to enableadd the named output.
+   * @param enabled indicates if the counters will be enabled or not.
+   */
+  public static void setCountersEnabled(JobConf conf, boolean enabled) {
+    conf.setBoolean(COUNTERS_ENABLED, enabled);
+  }
+
+  /**
+   * Returns if the counters for the named outputs are enabled or not.
+   * <p/>
+   * By default these counters are disabled.
+   * <p/>
+   * MultipleOutputs supports counters, by default the are disabled.
+   * The counters group is the {@link MultipleOutputs} class name.
+   * </p>
+   * The names of the counters are the same as the named outputs. For multi
+   * named outputs the name of the counter is the concatenation of the named
+   * output, and underscore '_' and the multiname.
+   *
+   *
+   * @param conf    job conf to enableadd the named output.
+   * @return TRUE if the counters are enabled, FALSE if they are disabled.
+   */
+  public static boolean getCountersEnabled(JobConf conf) {
+    return conf.getBoolean(COUNTERS_ENABLED, false);
+  }
+
   // instance code, to be used from Mapper/Reducer code
 
   private JobConf conf;
   private OutputFormat outputFormat;
   private Set<String> namedOutputs;
   private Map<String, RecordWriter> recordWriters;
+  private boolean countersEnabled;
 
   /**
    * Creates and initializes multiple named outputs support, it should be
@@ -338,6 +392,7 @@ public class MultipleOutputs {
     namedOutputs = Collections.unmodifiableSet(
       new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
     recordWriters = new HashMap<String, RecordWriter>();
+    countersEnabled = getCountersEnabled(job);
   }
 
   /**
@@ -354,20 +409,56 @@ public class MultipleOutputs {
   // MultithreaderMapRunner.
   private synchronized RecordWriter getRecordWriter(String namedOutput,
                                                     String baseFileName,
-                                                    Reporter reporter)
+                                                    final Reporter reporter)
     throws IOException {
     RecordWriter writer = recordWriters.get(baseFileName);
     if (writer == null) {
+      if (countersEnabled && reporter == null) {
+        throw new IllegalArgumentException(
+          "Counters are enabled, Reporter cannot be NULL");
+      }
       JobConf jobConf = new JobConf(conf);
       jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
       FileSystem fs = FileSystem.get(conf);
       writer =
         outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
+
+      if (countersEnabled) {
+        if (reporter == null) {
+          throw new IllegalArgumentException(
+            "Counters are enabled, Reporter cannot be NULL");
+        }
+        writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
+      }
+
       recordWriters.put(baseFileName, writer);
     }
     return writer;
   }
 
+  private static class RecordWriterWithCounter implements RecordWriter {
+    private RecordWriter writer;
+    private String counterName;
+    private Reporter reporter;
+
+    public RecordWriterWithCounter(RecordWriter writer, String counterName,
+                                   Reporter reporter) {
+      this.writer = writer;
+      this.counterName = counterName;
+      this.reporter = reporter;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void write(Object key, Object value) throws IOException {
+      reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
+      writer.write(key, value);
+    }
+
+    public void close(Reporter reporter) throws IOException {
+      writer.close(reporter);
+    }
+  }
+
   /**
    * Gets the output collector for a named output.
    * <p/>

+ 25 - 1
src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java

@@ -37,8 +37,16 @@ public class TestMultipleOutputs extends HadoopTestCase {
     super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
   }
 
+  public void testWithoutCounters() throws Exception {
+    _testMultipleOutputs(false);
+  }
+
+  public void testWithCounters() throws Exception {
+    _testMultipleOutputs(true);
+  }
+
   @SuppressWarnings({"unchecked"})
-  public void testMultipleOutputs() throws Exception {
+  protected void _testMultipleOutputs(boolean withCounters) throws Exception {
     Path inDir = new Path("testing/mo/input");
     Path outDir = new Path("testing/mo/output");
 
@@ -85,6 +93,8 @@ public class TestMultipleOutputs extends HadoopTestCase {
     MultipleOutputs.addMultiNamedOutput(conf, "sequence",
       SequenceFileOutputFormat.class, LongWritable.class, Text.class);
 
+    MultipleOutputs.setCountersEnabled(conf, withCounters);
+
     conf.setMapperClass(MOMap.class);
     conf.setReducerClass(MOReduce.class);
 
@@ -147,6 +157,20 @@ public class TestMultipleOutputs extends HadoopTestCase {
     reader.close();
     assertFalse(count == 0);
 
+    Counters.Group counters =
+      job.getCounters().getGroup(MultipleOutputs.class.getName());
+    if (!withCounters) {
+      assertEquals(0, counters.size());
+    }
+    else {
+      assertEquals(4, counters.size());
+      assertEquals(4, counters.getCounter("text"));
+      assertEquals(2, counters.getCounter("sequence_A"));
+      assertEquals(4, counters.getCounter("sequence_B"));
+      assertEquals(2, counters.getCounter("sequence_C"));
+
+    }
+
   }
 
   @SuppressWarnings({"unchecked"})