瀏覽代碼

HADOOP-1251. Add a method to Reporter to get the map InputSplit. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@529432 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
5f380925ac

+ 3 - 0
CHANGES.txt

@@ -213,6 +213,9 @@ Trunk (unreleased changes)
 64. HADOOP-1148.  Re-indent all Java source code to consistently use
     two spaces per indent level.  (cutting)
 
+65. HADOOP-1251.  Add a method to Reporter to get the map InputSplit.
+    (omalley via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 15 - 9
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -59,6 +59,7 @@ class MapTask extends Task {
   private String splitClass;
   private MapOutputFile mapOutputFile = new MapOutputFile();
   private JobConf conf;
+  private InputSplit instantiatedSplit = null;
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
 
@@ -84,6 +85,7 @@ class MapTask extends Task {
     super.localizeConfiguration(conf);
     Path localSplit = new Path(new Path(getJobFile()).getParent(), 
                                "split.dta");
+    LOG.debug("Writing local split to " + localSplit);
     DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
     Text.writeString(out, splitClass);
     split.write(out);
@@ -107,6 +109,10 @@ class MapTask extends Task {
     split.readFields(in);
   }
 
+  InputSplit getInputSplit() throws UnsupportedOperationException {
+    return instantiatedSplit;
+  }
+
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
@@ -115,9 +121,8 @@ class MapTask extends Task {
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
     
     // reinstantiate the split
-    InputSplit split;
     try {
-      split = (InputSplit) 
+      instantiatedSplit = (InputSplit) 
         ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
     } catch (ClassNotFoundException exp) {
       IOException wrap = new IOException("Split class " + splitClass + 
@@ -126,18 +131,19 @@ class MapTask extends Task {
       throw wrap;
     }
     DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(this.split.get(), 0, this.split.getSize());
-    split.readFields(splitBuffer);
+    splitBuffer.reset(split.get(), 0, split.getSize());
+    instantiatedSplit.readFields(splitBuffer);
     
     // if it is a file split, we can give more details
-    if (split instanceof FileSplit) {
-      job.set("map.input.file", ((FileSplit) split).getPath().toString());
-      job.setLong("map.input.start", ((FileSplit) split).getStart());
-      job.setLong("map.input.length", ((FileSplit) split).getLength());
+    if (instantiatedSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) instantiatedSplit;
+      job.set("map.input.file", fileSplit.getPath().toString());
+      job.setLong("map.input.start", fileSplit.getStart());
+      job.setLong("map.input.length", fileSplit.getLength());
     }
       
     final RecordReader rawIn =                  // open input
-      job.getInputFormat().getRecordReader(split, job, reporter);
+      job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
 
     RecordReader in = new RecordReader() {      // wrap in progress reporter
 

+ 11 - 0
src/java/org/apache/hadoop/mapred/Reporter.java

@@ -35,6 +35,9 @@ public interface Reporter extends Progressable {
       }
       public void incrCounter(Enum key, long amount) {
       }
+      public InputSplit getInputSplit() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("NULL reporter has no input");
+      }
     };
 
   /**
@@ -53,4 +56,12 @@ public interface Reporter extends Progressable {
    * be incremented
    */
   public abstract void incrCounter(Enum key, long amount);
+  
+  /**
+   * Get the InputSplit object for a map.
+   * @return the input split that the map is reading from
+   * @throws UnsupportedOperationException if called outside a mapper
+   */
+  public abstract InputSplit getInputSplit() 
+    throws UnsupportedOperationException;
 }

+ 7 - 0
src/java/org/apache/hadoop/mapred/Task.java

@@ -173,6 +173,10 @@ abstract class Task implements Writable, Configurable {
 
   public Progress getProgress() { return taskProgress; }
 
+  InputSplit getInputSplit() throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("Input only available on map");
+  }
+
   protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
     throws IOException 
   {
@@ -192,6 +196,9 @@ abstract class Task implements Writable, Configurable {
             counters.incrCounter(key, amount);
           }
         }
+        public InputSplit getInputSplit() throws UnsupportedOperationException {
+          return Task.this.getInputSplit();
+        }
       };
   }
 

+ 11 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.Progressable;
 
 import java.io.DataInput;
@@ -215,6 +214,11 @@ public class TestMiniMRLocalFS extends TestCase {
                     OutputCollector out, Reporter reporter) throws IOException {
       System.out.println("map: " + key + ", " + value);
       out.collect((WritableComparable) value, key);
+      InputSplit split = reporter.getInputSplit();
+      if (split.getClass() != MyInputFormat.MySplit.class) {
+        throw new IOException("Got wrong split in MyMapper! " + 
+                              split.getClass().getName());
+      }
     }
   }
 
@@ -222,6 +226,12 @@ public class TestMiniMRLocalFS extends TestCase {
     public void reduce(WritableComparable key, Iterator values, 
                        OutputCollector output, Reporter reporter
                        ) throws IOException {
+      try {
+        InputSplit split = reporter.getInputSplit();
+        throw new IOException("Got an input split of " + split);
+      } catch (UnsupportedOperationException e) {
+        // expected result
+      }
       while (values.hasNext()) {
         Writable value = (Writable) values.next();
         System.out.println("reduce: " + key + ", " + value);

+ 1 - 7
src/test/org/apache/hadoop/record/TestRecordWritable.java

@@ -46,12 +46,6 @@ public class TestRecordWritable extends TestCase {
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path file = new Path(dir, "test.seq");
     
-    Reporter reporter = new Reporter() {
-        public void setStatus(String status) throws IOException {}
-        public void progress() throws IOException {}
-        public void incrCounter(Enum key, long amount) {}
-      };
-    
     int seed = new Random().nextInt();
     //LOG.info("seed = "+seed);
     Random random = new Random(seed);
@@ -95,7 +89,7 @@ public class TestRecordWritable extends TestCase {
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
-            format.getRecordReader(splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, Reporter.NULL);
           try {
             int count = 0;
             while (reader.next(key, value)) {