Просмотр исходного кода

HADOOP-3853. Move multiple input format (HADOOP-372) extension to library package. (tomwhite via johan)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@685227 13f79535-47bb-0310-9956-ffa450edef68
Johan Oskarsson 17 лет назад
Родитель
Сommit
f0af55ed47

+ 3 - 0
CHANGES.txt

@@ -154,6 +154,9 @@ Trunk (unreleased changes)
     the end of the heartbeat rpc, rather than the start. This causes better
     the end of the heartbeat rpc, rather than the start. This causes better
     behavior if the JobTracker is overloaded. (acmurthy via omalley)
     behavior if the JobTracker is overloaded. (acmurthy via omalley)
 
 
+		HADOOP-3853. Move multiple input format (HADOOP-372) extension to 
+		library package. (tomwhite via johan)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 2 - 102
src/mapred/org/apache/hadoop/mapred/FileInputFormat.java

@@ -21,19 +21,16 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
@@ -401,103 +398,6 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
     conf.set("mapred.input.dir", dirs == null ? dirStr :
     conf.set("mapred.input.dir", dirs == null ? dirStr :
       dirs + StringUtils.COMMA_STR + dirStr);
       dirs + StringUtils.COMMA_STR + dirStr);
   }
   }
-  
-  /**
-   * Add a {@link Path} with a custom {@link InputFormat} to the list of
-   * inputs for the map-reduce job.
-   * 
-   * @param conf The configuration of the job
-   * @param path {@link Path} to be added to the list of inputs for the job
-   * @param inputFormatClass {@link InputFormat} class to use for this path
-   */
-  public static void addInputPath(JobConf conf, Path path,
-      Class<? extends InputFormat> inputFormatClass) {
-
-    String inputFormatMapping = path.toString() + ";"
-       + inputFormatClass.getName();
-    String inputFormats = conf.get("mapred.input.dir.formats");
-    conf.set("mapred.input.dir.formats",
-       inputFormats == null ? inputFormatMapping : inputFormats + ","
-           + inputFormatMapping);
-
-    conf.setInputFormat(DelegatingInputFormat.class);
-  }
-
-  /**
-   * Add a {@link Path} with a custom {@link InputFormat} and
-   * {@link Mapper} to the list of inputs for the map-reduce job.
-   * 
-   * @param conf The configuration of the job
-   * @param path {@link Path} to be added to the list of inputs for the job
-   * @param inputFormatClass {@link InputFormat} class to use for this path
-   * @param mapperClass {@link Mapper} class to use for this path
-   */
-  public static void addInputPath(JobConf conf, Path path,
-      Class<? extends InputFormat> inputFormatClass,
-      Class<? extends Mapper> mapperClass) {
-
-    addInputPath(conf, path, inputFormatClass);
-
-    String mapperMapping = path.toString() + ";" + mapperClass.getName();
-    String mappers = conf.get("mapred.input.dir.mappers");
-    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
-       : mappers + "," + mapperMapping);
-
-    conf.setMapperClass(DelegatingMapper.class);
-  }
-
-  /**
-   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
-   * that should be used for them.
-   * 
-   * @param conf The confuration of the job
-   * @see #addInputPath(JobConf, Path, Class)
-   * @return A map of paths to inputformats for the job
-   */
-  static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
-    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
-    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
-    for (String pathMapping : pathMappings) {
-      String[] split = pathMapping.split(";");
-      InputFormat inputFormat;
-      try {
-       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
-           .getClassByName(split[1]), conf);
-      } catch (ClassNotFoundException e) {
-       throw new RuntimeException(e);
-      }
-      m.put(new Path(split[0]), inputFormat);
-    }
-    return m;
-  }
-
-  /**
-   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
-   * should be used for them.
-   * 
-   * @param conf The confuration of the job
-   * @see #addInputPath(JobConf, Path, Class, Class)
-   * @return A map of paths to mappers for the job
-   */
-  @SuppressWarnings("unchecked")
-  static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
-    if (conf.get("mapred.input.dir.mappers") == null) {
-      return Collections.emptyMap();
-    }
-    Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>();
-    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
-    for (String pathMapping : pathMappings) {
-      String[] split = pathMapping.split(";");
-      Class<? extends Mapper> mapClass;
-      try {
-       mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]);
-      } catch (ClassNotFoundException e) {
-       throw new RuntimeException(e);
-      }
-      m.put(new Path(split[0]), mapClass);
-    }
-    return m;
-  }
          
          
   // This method escapes commas in the glob pattern of the given paths.
   // This method escapes commas in the glob pattern of the given paths.
   private static String[] getPathStrings(String commaSeparatedPaths) {
   private static String[] getPathStrings(String commaSeparatedPaths) {

+ 12 - 5
src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java → src/mapred/org/apache/hadoop/mapred/lib/DelegatingInputFormat.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapred.lib;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -27,20 +27,27 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
 /**
 /**
  * An {@link InputFormat} that delegates behaviour of paths to multiple other
  * An {@link InputFormat} that delegates behaviour of paths to multiple other
  * InputFormats.
  * InputFormats.
  * 
  * 
- * @see FileInputFormat#addInputPath(JobConf, Path, Class, Class)
+ * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
  */
  */
 public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
 public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
 
 
   @Deprecated
   @Deprecated
   public void validateInput(JobConf conf) throws IOException {
   public void validateInput(JobConf conf) throws IOException {
     JobConf confCopy = new JobConf(conf);
     JobConf confCopy = new JobConf(conf);
-    Map<Path, InputFormat> formatMap = FileInputFormat.getInputFormatMap(conf);
+    Map<Path, InputFormat> formatMap = MultipleInputs.getInputFormatMap(conf);
     for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
     for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
       Path path = entry.getKey();
       Path path = entry.getKey();
       InputFormat format = entry.getValue();
       InputFormat format = entry.getValue();
@@ -53,8 +60,8 @@ public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
 
 
     JobConf confCopy = new JobConf(conf);
     JobConf confCopy = new JobConf(conf);
     List<InputSplit> splits = new ArrayList<InputSplit>();
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    Map<Path, InputFormat> formatMap = FileInputFormat.getInputFormatMap(conf);
-    Map<Path, Class<? extends Mapper>> mapperMap = FileInputFormat
+    Map<Path, InputFormat> formatMap = MultipleInputs.getInputFormatMap(conf);
+    Map<Path, Class<? extends Mapper>> mapperMap = MultipleInputs
        .getMapperTypeMap(conf);
        .getMapperTypeMap(conf);
     Map<Class<? extends InputFormat>, List<Path>> formatPaths
     Map<Class<? extends InputFormat>, List<Path>> formatPaths
         = new HashMap<Class<? extends InputFormat>, List<Path>>();
         = new HashMap<Class<? extends InputFormat>, List<Path>>();

+ 7 - 2
src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java → src/mapred/org/apache/hadoop/mapred/lib/DelegatingMapper.java

@@ -16,17 +16,22 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapred.lib;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
 /**
 /**
  * An {@link Mapper} that delegates behaviour of paths to multiple other
  * An {@link Mapper} that delegates behaviour of paths to multiple other
  * mappers.
  * mappers.
  * 
  * 
- * @see FileInputFormat#addInputPath(JobConf, Path, Class, Class)
+ * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
  */
  */
 public class DelegatingMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2> {
 public class DelegatingMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2> {
 
 

+ 131 - 0
src/mapred/org/apache/hadoop/mapred/lib/MultipleInputs.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class supports MapReduce jobs that have multiple input paths with
+ * a different {@link InputFormat} and {@link Mapper} for each path 
+ */
+public class MultipleInputs {
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} to the list of
+   * inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   */
+  public static void addInputPath(JobConf conf, Path path,
+      Class<? extends InputFormat> inputFormatClass) {
+
+    String inputFormatMapping = path.toString() + ";"
+       + inputFormatClass.getName();
+    String inputFormats = conf.get("mapred.input.dir.formats");
+    conf.set("mapred.input.dir.formats",
+       inputFormats == null ? inputFormatMapping : inputFormats + ","
+           + inputFormatMapping);
+
+    conf.setInputFormat(DelegatingInputFormat.class);
+  }
+
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} and
+   * {@link Mapper} to the list of inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   * @param mapperClass {@link Mapper} class to use for this path
+   */
+  public static void addInputPath(JobConf conf, Path path,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+
+    addInputPath(conf, path, inputFormatClass);
+
+    String mapperMapping = path.toString() + ";" + mapperClass.getName();
+    String mappers = conf.get("mapred.input.dir.mappers");
+    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+       : mappers + "," + mapperMapping);
+
+    conf.setMapperClass(DelegatingMapper.class);
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
+   * that should be used for them.
+   * 
+   * @param conf The confuration of the job
+   * @see #addInputPath(JobConf, Path, Class)
+   * @return A map of paths to inputformats for the job
+   */
+  static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
+    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
+    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      InputFormat inputFormat;
+      try {
+       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
+           .getClassByName(split[1]), conf);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), inputFormat);
+    }
+    return m;
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
+   * should be used for them.
+   * 
+   * @param conf The confuration of the job
+   * @see #addInputPath(JobConf, Path, Class, Class)
+   * @return A map of paths to mappers for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
+    if (conf.get("mapred.input.dir.mappers") == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>();
+    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      Class<? extends Mapper> mapClass;
+      try {
+       mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), mapClass);
+    }
+    return m;
+  }
+}

+ 5 - 2
src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java → src/mapred/org/apache/hadoop/mapred/lib/TaggedInputSplit.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapred.lib;
 
 
 import java.io.DataInput;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutput;
@@ -25,13 +25,16 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
 /**
 /**
  * An {@link InputSplit} that tags another InputSplit with extra data for use by
  * An {@link InputSplit} that tags another InputSplit with extra data for use by
  * {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
  * {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
  */
  */
-public class TaggedInputSplit implements Configurable, InputSplit {
+class TaggedInputSplit implements Configurable, InputSplit {
 
 
   private Class<? extends InputSplit> inputSplitClass;
   private Class<? extends InputSplit> inputSplitClass;
 
 

+ 0 - 49
src/test/org/apache/hadoop/mapred/TestFileInputFormat.java

@@ -18,8 +18,6 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
 
 
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockLocation;
@@ -85,52 +83,5 @@ public class TestFileInputFormat extends TestCase {
       }
       }
     }
     }
   }
   }
-  
-  public void testAddInputPathWithFormat() {
-    final JobConf conf = new JobConf();
-    FileInputFormat.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
-    FileInputFormat.addInputPath(conf, new Path("/bar"),
-        KeyValueTextInputFormat.class);
-    final Map<Path, InputFormat> inputs = FileInputFormat
-       .getInputFormatMap(conf);
-    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
-    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
-       .getClass());
-  }
-
-  public void testAddInputPathWithMapper() {
-    final JobConf conf = new JobConf();
-    FileInputFormat.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
-       MapClass.class);
-    FileInputFormat.addInputPath(conf, new Path("/bar"),
-       KeyValueTextInputFormat.class, MapClass2.class);
-    final Map<Path, InputFormat> inputs = FileInputFormat
-       .getInputFormatMap(conf);
-    final Map<Path, Class<? extends Mapper>> maps = FileInputFormat
-       .getMapperTypeMap(conf);
-
-    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
-    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
-       .getClass());
-    assertEquals(MapClass.class, maps.get(new Path("/foo")));
-    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
-  }
-
-  static class MapClass implements Mapper<String, String, String, String> {
-
-    public void map(String key, String value,
-       OutputCollector<String, String> output, Reporter reporter)
-       throws IOException {
-    }
-
-    public void configure(JobConf job) {
-    }
-
-    public void close() throws IOException {
-    }
-  }
-
-  static class MapClass2 extends MapClass {
-  }
 
 
 }
 }

+ 15 - 8
src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java → src/test/org/apache/hadoop/mapred/lib/TestDelegatingInputFormat.java

@@ -15,16 +15,23 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapred.lib;
 
 
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 
 
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import junit.framework.TestCase;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-
-import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
 
 
 public class TestDelegatingInputFormat extends TestCase {
 public class TestDelegatingInputFormat extends TestCase {
 
 
@@ -46,13 +53,13 @@ public class TestDelegatingInputFormat extends TestCase {
 
 
       final int numSplits = 100;
       final int numSplits = 100;
 
 
-      FileInputFormat.addInputPath(conf, path, TextInputFormat.class,
+      MultipleInputs.addInputPath(conf, path, TextInputFormat.class,
          MapClass.class);
          MapClass.class);
-      FileInputFormat.addInputPath(conf, path2, TextInputFormat.class,
+      MultipleInputs.addInputPath(conf, path2, TextInputFormat.class,
          MapClass2.class);
          MapClass2.class);
-      FileInputFormat.addInputPath(conf, path3, KeyValueTextInputFormat.class,
+      MultipleInputs.addInputPath(conf, path3, KeyValueTextInputFormat.class,
          MapClass.class);
          MapClass.class);
-      FileInputFormat.addInputPath(conf, path4, TextInputFormat.class,
+      MultipleInputs.addInputPath(conf, path4, TextInputFormat.class,
          MapClass2.class);
          MapClass2.class);
       DelegatingInputFormat inFormat = new DelegatingInputFormat();
       DelegatingInputFormat inFormat = new DelegatingInputFormat();
       InputSplit[] splits = inFormat.getSplits(conf, numSplits);
       InputSplit[] splits = inFormat.getSplits(conf, numSplits);

+ 85 - 0
src/test/org/apache/hadoop/mapred/lib/TestMultipleInputs.java

@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * @see TestDelegatingInputFormat
+ */
+public class TestMultipleInputs extends TestCase {
+  
+  public void testAddInputPathWithFormat() {
+    final JobConf conf = new JobConf();
+    MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
+    MultipleInputs.addInputPath(conf, new Path("/bar"),
+        KeyValueTextInputFormat.class);
+    final Map<Path, InputFormat> inputs = MultipleInputs
+       .getInputFormatMap(conf);
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+  }
+
+  public void testAddInputPathWithMapper() {
+    final JobConf conf = new JobConf();
+    MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
+       MapClass.class);
+    MultipleInputs.addInputPath(conf, new Path("/bar"),
+       KeyValueTextInputFormat.class, MapClass2.class);
+    final Map<Path, InputFormat> inputs = MultipleInputs
+       .getInputFormatMap(conf);
+    final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
+       .getMapperTypeMap(conf);
+
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+    assertEquals(MapClass.class, maps.get(new Path("/foo")));
+    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+  }
+
+  static class MapClass implements Mapper<String, String, String, String> {
+
+    public void map(String key, String value,
+       OutputCollector<String, String> output, Reporter reporter)
+       throws IOException {
+    }
+
+    public void configure(JobConf job) {
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+}