Просмотр исходного кода

HADOOP-3149. Adds a way in which map/reducetasks can create multiple outputs. Contributed by Alejandro Abdelnur.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@675546 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 лет назад
Родитель
Сommit
48083284d1

+ 3 - 0
CHANGES.txt

@@ -45,6 +45,9 @@ Trunk (unreleased changes)
     naming convention,such as, hadoop.rm.queue.queue-name.property-name.
     (Hemanth Yamijala via ddas)
 
+    HADOOP-3149. Adds a way in which map/reducetasks can create multiple 
+    outputs. (Alejandro Abdelnur via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3577. Tools to inject blocks into name node and simulated

+ 52 - 0
src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -230,5 +231,56 @@ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
     // ${mapred.out.dir}/_temporary/_${taskid}/${name}
     return new Path(taskTmpDir, name);
   } 
+
+  /**
+   * Helper function to generate a name that is unique for the task.
+   *
+   * <p>The generated name can be used to create custom files from within the
+   * different tasks for the job, the names for different tasks will not collide
+   * with each other.</p>
+   *
+   * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
+   * reduces and the task partition number. For example, give a name 'test'
+   * running on the first map o the job the generated name will be
+   * 'test-m-00000'.</p>
+   *
+   * @param conf the configuration for the job.
+   * @param name the name to make unique.
+   * @return a unique name accross all tasks of the job.
+   */
+  public static String getUniqueName(JobConf conf, String name) {
+    int partition = conf.getInt("mapred.task.partition", -1);
+    if (partition == -1) {
+      throw new IllegalArgumentException(
+        "This method can only be called from within a Job");
+    }
+
+    String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" : "r";
+
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+
+    return name + "-" + taskType + "-" + numberFormat.format(partition);
+  }
+
+  /**
+   * Helper function to generate a {@link Path} for a file that is unique for
+   * the task within the job output directory.
+   *
+   * <p>The path can be used to create custom files from within the map and
+   * reduce tasks. The path name will be unique for each task. The path parent
+   * will be the job output directory.</p>ls
+   *
+   * <p>This method uses the {@link #getUniqueName} method to make the file name
+   * unique for the task.</p>
+   *
+   * @param conf the configuration for the job.
+   * @param name the name for the file.
+   * @return a unique path accross all tasks of the job.
+   */
+  public static Path getPathForCustomFile(JobConf conf, String name) {
+    return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
+  }
 }
 

+ 470 - 0
src/mapred/org/apache/hadoop/mapred/lib/MultipleOutputs.java

@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The MultipleOutputs class simplifies writting to additional outputs other
+ * than the job default output via the <code>OutputCollector</code> passed to
+ * the <code>map()</code> and <code>reduce()</code> methods of the
+ * <code>Mapper</code> and <code>Reducer</code> implementations.
+ * <p/>
+ * Each additional output, or named output, may be configured with its own
+ * <code>OutputFormat</code>, with its own key class and with its own value
+ * class.
+ * <p/>
+ * A named output can be a single file or a multi file. The later is refered as
+ * a multi named output.
+ * <p/>
+ * A multi named output is an unbound set of files all sharing the same
+ * <code>OutputFormat</code>, key class and value class configuration.
+ * <p/>
+ * When named outputs are used within a <code>Mapper</code> implementation,
+ * key/values written to a name output are not part of the reduce phase, only
+ * key/values written to the job <code>OutputCollector</code> are part of the
+ * reduce phase.
+ * <p/>
+ * Job configuration usage pattern is:
+ * <pre>
+ *
+ * JobConf conf = new JobConf();
+ *
+ * conf.setInputPath(inDir);
+ * FileOutputFormat.setOutputPath(conf, outDir);
+ *
+ * conf.setMapperClass(MOMap.class);
+ * conf.setReducerClass(MOReduce.class);
+ * ...
+ *
+ * // Defines additional single text based output 'text' for the job
+ * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
+ * LongWritable.class, Text.class);
+ *
+ * // Defines additional multi sequencefile based output 'sequence' for the
+ * // job
+ * MultipleOutputs.addMultiNamedOutput(conf, "seq",
+ *   SequenceFileOutputFormat.class,
+ *   LongWritable.class, Text.class);
+ * ...
+ *
+ * JobClient jc = new JobClient();
+ * RunningJob job = jc.submitJob(conf);
+ *
+ * ...
+ * </pre>
+ * <p/>
+ * Job configuration usage pattern is:
+ * <pre>
+ *
+ * public class MOReduce implements
+ *   Reducer&lt;WritableComparable, Writable&gt; {
+ * private MultipleOutputs mos;
+ *
+ * public void configure(JobConf conf) {
+ * ...
+ * mos = new MultipleOutputs(conf);
+ * }
+ *
+ * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
+ * OutputCollector output, Reporter reporter)
+ * throws IOException {
+ * ...
+ * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
+ * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
+ * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
+ * ...
+ * }
+ *
+ * public void close() throws IOException {
+ * mos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+public class MultipleOutputs {
+
+  private static final String NAMED_OUTPUTS = "mo.namedOutputs";
+
+  private static final String MO_PREFIX = "mo.namedOutput.";
+
+  private static final String FORMAT = ".format";
+  private static final String KEY = ".key";
+  private static final String VALUE = ".value";
+  private static final String MULTI = ".multi";
+
+  /**
+   * Checks if a named output is alreadyDefined or not.
+   *
+   * @param conf           job conf
+   * @param namedOutput    named output names
+   * @param alreadyDefined whether the existence/non-existence of
+   *                       the named output is to be checked
+   * @throws IllegalArgumentException if the output name is alreadyDefined or
+   *                                  not depending on the value of the
+   *                                  'alreadyDefined' parameter
+   */
+  private static void checkNamedOutput(JobConf conf, String namedOutput,
+                                       boolean alreadyDefined) {
+    List<String> definedChannels = getNamedOutputsList(conf);
+    if (alreadyDefined && definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' already alreadyDefined");
+    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' not defined");
+    }
+  }
+
+  /**
+   * Checks if a named output name is valid token.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkTokenName(String namedOutput) {
+    if (namedOutput == null || namedOutput.length() == 0) {
+      throw new IllegalArgumentException(
+        "Name cannot be NULL or emtpy");
+    }
+    for (char ch : namedOutput.toCharArray()) {
+      if ((ch >= 'A') && (ch <= 'Z')) {
+        continue;
+      }
+      if ((ch >= 'a') && (ch <= 'z')) {
+        continue;
+      }
+      if ((ch >= '0') && (ch <= '9')) {
+        continue;
+      }
+      throw new IllegalArgumentException(
+        "Name cannot be have a '" + ch + "' char");
+    }
+  }
+
+  /**
+   * Checks if a named output name is valid.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkNamedOutputName(String namedOutput) {
+    checkTokenName(namedOutput);
+    // name cannot be the name used for the default output
+    if (namedOutput.equals("part")) {
+      throw new IllegalArgumentException(
+        "Named output name cannot be 'part'");
+    }
+  }
+
+  /**
+   * Returns list of channel names.
+   *
+   * @param conf job conf
+   * @return List of channel Names
+   */
+  public static List<String> getNamedOutputsList(JobConf conf) {
+    List<String> names = new ArrayList<String>();
+    StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
+    while (st.hasMoreTokens()) {
+      names.add(st.nextToken());
+    }
+    return names;
+  }
+
+
+  /**
+   * Returns if a named output is multiple.
+   *
+   * @param conf        job conf
+   * @param namedOutput named output
+   * @return <code>true</code> if the name output is multi, <code>false</code>
+   *         if it is single. If the name output is not defined it returns
+   *         <code>false</code>
+   */
+  public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
+    checkNamedOutput(conf, namedOutput, false);
+    return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
+  }
+
+  /**
+   * Returns the named output OutputFormat.
+   *
+   * @param conf        job conf
+   * @param namedOutput named output
+   * @return namedOutput OutputFormat
+   */
+  public static Class<? extends OutputFormat> getNamedOutputFormatClass(
+    JobConf conf, String namedOutput) {
+    checkNamedOutput(conf, namedOutput, false);
+    return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
+      OutputFormat.class);
+  }
+
+  /**
+   * Returns the key class for a named output.
+   *
+   * @param conf        job conf
+   * @param namedOutput named output
+   * @return class for the named output key
+   */
+  public static Class<?> getNamedOutputKeyClass(JobConf conf,
+                                                String namedOutput) {
+    checkNamedOutput(conf, namedOutput, false);
+    return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
+      Object.class);
+  }
+
+  /**
+   * Returns the value class for a named output.
+   *
+   * @param conf        job conf
+   * @param namedOutput named output
+   * @return class of named output value
+   */
+  public static Class<?> getNamedOutputValueClass(JobConf conf,
+                                                  String namedOutput) {
+    checkNamedOutput(conf, namedOutput, false);
+    return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
+      Object.class);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param conf              job conf to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the
+   *                          default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keyClass          key class
+   * @param valueClass        value class
+   */
+  public static void addNamedOutput(JobConf conf, String namedOutput,
+                                Class<? extends OutputFormat> outputFormatClass,
+                                Class<?> keyClass, Class<?> valueClass) {
+    addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
+      valueClass);
+  }
+
+  /**
+   * Adds a multi named output for the job.
+   * <p/>
+   *
+   * @param conf              job conf to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the
+   *                          default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keyClass          key class
+   * @param valueClass        value class
+   */
+  public static void addMultiNamedOutput(JobConf conf, String namedOutput,
+                               Class<? extends OutputFormat> outputFormatClass,
+                               Class<?> keyClass, Class<?> valueClass) {
+    addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
+      valueClass);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param conf              job conf to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the
+   *                          default output.
+   * @param multi             indicates if the named output is multi
+   * @param outputFormatClass OutputFormat class.
+   * @param keyClass          key class
+   * @param valueClass        value class
+   */
+  private static void addNamedOutput(JobConf conf, String namedOutput,
+                               boolean multi,
+                               Class<? extends OutputFormat> outputFormatClass,
+                               Class<?> keyClass, Class<?> valueClass) {
+    checkNamedOutputName(namedOutput);
+    checkNamedOutput(conf, namedOutput, true);
+    conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+      OutputFormat.class);
+    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+    conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
+  }
+
+  // instance code, to be used from Mapper/Reducer code
+
+  private JobConf conf;
+  private OutputFormat outputFormat;
+  private Set<String> namedOutputs;
+  private Map<String, RecordWriter> recordWriters;
+
+  /**
+   * Creates and initializes multiple named outputs support, it should be
+   * instantiated in the Mapper/Reducer configure method.
+   *
+   * @param job the job configuration object
+   */
+  public MultipleOutputs(JobConf job) {
+    this.conf = job;
+    outputFormat = new InternalFileOutputFormat();
+    namedOutputs = Collections.unmodifiableSet(
+      new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
+    recordWriters = new HashMap<String, RecordWriter>();
+  }
+
+  /**
+   * Returns iterator with the defined name outputs.
+   *
+   * @return iterator with the defined named outputs
+   */
+  public Iterator<String> getNamedOutputs() {
+    return namedOutputs.iterator();
+  }
+
+
+  // by being synchronized MultipleOutputTask can be use with a
+  // MultithreaderMapRunner.
+  private synchronized RecordWriter getRecordWriter(String namedOutput,
+                                                    String baseFileName,
+                                                    Reporter reporter)
+    throws IOException {
+    RecordWriter writer = recordWriters.get(baseFileName);
+    if (writer == null) {
+      JobConf jobConf = new JobConf(conf);
+      jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
+      FileSystem fs = FileSystem.get(conf);
+      writer =
+        outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
+      recordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+  /**
+   * Gets the output collector for a named output.
+   * <p/>
+   *
+   * @param namedOutput the named output name
+   * @param reporter    the reporter
+   * @return the output collector for the given named output
+   * @throws IOException thrown if output collector could not be created
+   */
+  @SuppressWarnings({"unchecked"})
+  public OutputCollector getCollector(String namedOutput, Reporter reporter)
+    throws IOException {
+    return getCollector(namedOutput, null, reporter);
+  }
+
+  /**
+   * Gets the output collector for a multi named output.
+   * <p/>
+   *
+   * @param namedOutput the named output name
+   * @param multiName   the multi name part
+   * @param reporter    the reporter
+   * @return the output collector for the given named output
+   * @throws IOException thrown if output collector could not be created
+   */
+  @SuppressWarnings({"unchecked"})
+  public OutputCollector getCollector(String namedOutput, String multiName,
+                                      Reporter reporter)
+    throws IOException {
+
+    checkNamedOutputName(namedOutput);
+    if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    boolean multi = isMultiNamedOutput(conf, namedOutput);
+
+    if (!multi && multiName != null) {
+      throw new IllegalArgumentException("Name output '" + namedOutput +
+        "' has not been defined as multi");
+    }
+    if (multi) {
+      checkTokenName(multiName);
+    }
+
+    String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
+
+    final RecordWriter writer =
+      getRecordWriter(namedOutput, baseFileName, reporter);
+
+    return new OutputCollector() {
+
+      @SuppressWarnings({"unchecked"})
+      public void collect(Object key, Object value) throws IOException {
+        writer.write(key, value);
+      }
+
+    };
+  }
+
+  /**
+   * Closes all the opened named outputs.
+   * <p/>
+   * If overriden subclasses must invoke <code>super.close()</code> at the
+   * end of their <code>close()</code>
+   *
+   * @throws java.io.IOException thrown if any of the MultipleOutput files
+   *                             could not be closed properly.
+   */
+  public void close() throws IOException {
+    for (RecordWriter writer : recordWriters.values()) {
+      writer.close(null);
+    }
+  }
+
+  private static class InternalFileOutputFormat extends
+    FileOutputFormat<Object, Object> {
+
+    public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
+
+    @SuppressWarnings({"unchecked"})
+    public RecordWriter<Object, Object> getRecordWriter(
+      FileSystem fs, JobConf job, String baseFileName, Progressable progress)
+      throws IOException {
+
+      String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
+      String fileName = getUniqueName(job, baseFileName);
+
+      // The following trick leverages the instantiation of a record writer via
+      // the job conf thus supporting arbitrary output formats.
+      JobConf outputConf = new JobConf(job);
+      outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
+      outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
+      outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
+      OutputFormat outputFormat = outputConf.getOutputFormat();
+      return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
+    }
+  }
+
+}

+ 163 - 0
src/test/org/apache/hadoop/mapred/TestFileOutputFormat.java

@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestFileOutputFormat extends HadoopTestCase {
+
+  public TestFileOutputFormat() throws IOException {
+    super(HadoopTestCase.CLUSTER_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  public void testCustomFile() throws Exception {
+    Path inDir = new Path("testing/fileoutputformat/input");
+    Path outDir = new Path("testing/fileoutputformat/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+        .replace(' ', '+');
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    file = fs.create(new Path(inDir, "part-1"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    conf.setJobName("fof");
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(TestMap.class);
+    conf.setReducerClass(TestReduce.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    JobClient jc = new JobClient(conf);
+    RunningJob job = jc.submitJob(conf);
+    while (!job.isComplete()) {
+      Thread.sleep(100);
+    }
+    assertTrue(job.isSuccessful());
+
+    boolean map0 = false;
+    boolean map1 = false;
+    boolean reduce = false;
+    FileStatus[] statuses = fs.listStatus(outDir);
+    for (FileStatus status : statuses) {
+      map0 = map0 || status.getPath().getName().equals("test-m-00000");
+      map1 = map1 || status.getPath().getName().equals("test-m-00001");
+      reduce = reduce || status.getPath().getName().equals("test-r-00000");
+    }
+
+    assertTrue(map0);
+    assertTrue(map1);
+    assertTrue(reduce);
+  }
+
+  public static class TestMap implements Mapper<LongWritable, Text,
+    LongWritable, Text> {
+
+    public void configure(JobConf conf) {
+      try {
+        FileSystem fs = FileSystem.get(conf);
+        OutputStream os =
+          fs.create(FileOutputFormat.getPathForCustomFile(conf, "test"));
+        os.write(1);
+        os.close();
+      }
+      catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    public void map(LongWritable key, Text value,
+                    OutputCollector<LongWritable, Text> output,
+                    Reporter reporter) throws IOException {
+      output.collect(key, value);
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+  public static class TestReduce implements Reducer<LongWritable, Text,
+    LongWritable, Text> {
+
+    public void configure(JobConf conf) {
+      try {
+        FileSystem fs = FileSystem.get(conf);
+        OutputStream os =
+          fs.create(FileOutputFormat.getPathForCustomFile(conf, "test"));
+        os.write(1);
+        os.close();
+      }
+      catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    public void reduce(LongWritable key, Iterator<Text> values,
+                       OutputCollector<LongWritable, Text> output,
+                       Reporter reporter) throws IOException {
+      while (values.hasNext()) {
+        Text value = values.next();
+        output.collect(key, value);
+      }
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+}

+ 215 - 0
src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java

@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+public class TestMultipleOutputs extends HadoopTestCase {
+
+  public TestMultipleOutputs() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public void testMultipleOutputs() throws Exception {
+    Path inDir = new Path("testing/mo/input");
+    Path outDir = new Path("testing/mo/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+        .replace(' ', '+');
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    file = fs.create(new Path(inDir, "part-1"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    conf.setJobName("mo");
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
+      LongWritable.class, Text.class);
+    MultipleOutputs.addMultiNamedOutput(conf, "sequence",
+      SequenceFileOutputFormat.class, LongWritable.class, Text.class);
+
+    conf.setMapperClass(MOMap.class);
+    conf.setReducerClass(MOReduce.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    JobClient jc = new JobClient(conf);
+    RunningJob job = jc.submitJob(conf);
+    while (!job.isComplete()) {
+      Thread.sleep(100);
+    }
+
+    // assert number of named output part files
+    int namedOutputCount = 0;
+    FileStatus[] statuses = fs.listStatus(outDir);
+    for (FileStatus status : statuses) {
+      if (status.getPath().getName().equals("text-m-00000") ||
+        status.getPath().getName().equals("text-m-00001") ||
+        status.getPath().getName().equals("text-r-00000") ||
+        status.getPath().getName().equals("sequence_A-m-00000") ||
+        status.getPath().getName().equals("sequence_A-m-00001") ||
+        status.getPath().getName().equals("sequence_B-m-00000") ||
+        status.getPath().getName().equals("sequence_B-m-00001") ||
+        status.getPath().getName().equals("sequence_B-r-00000") ||
+        status.getPath().getName().equals("sequence_C-r-00000")) {
+        namedOutputCount++;
+      }
+    }
+    assertEquals(9, namedOutputCount);
+
+    // assert TextOutputFormat files correctness
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(fs.open(
+        new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
+    int count = 0;
+    String line = reader.readLine();
+    while (line != null) {
+      assertTrue(line.endsWith("text"));
+      line = reader.readLine();
+      count++;
+    }
+    reader.close();
+    assertFalse(count == 0);
+
+    // assert SequenceOutputFormat files correctness
+    SequenceFile.Reader seqReader =
+      new SequenceFile.Reader(fs, new Path(FileOutputFormat.getOutputPath(conf),
+        "sequence_B-r-00000"), conf);
+
+    assertEquals(LongWritable.class, seqReader.getKeyClass());
+    assertEquals(Text.class, seqReader.getValueClass());
+
+    count = 0;
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    while (seqReader.next(key, value)) {
+      assertEquals("sequence", value.toString());
+      count++;
+    }
+    reader.close();
+    assertFalse(count == 0);
+
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
+    Text> {
+
+    private MultipleOutputs mos;
+
+    public void configure(JobConf conf) {
+      mos = new MultipleOutputs(conf);
+    }
+
+    public void map(LongWritable key, Text value,
+                    OutputCollector<LongWritable, Text> output,
+                    Reporter reporter)
+      throws IOException {
+      if (!value.toString().equals("a")) {
+        output.collect(key, value);
+      } else {
+        mos.getCollector("text", reporter).collect(key, new Text("text"));
+        mos.getCollector("sequence", "A", reporter).collect(key,
+          new Text("sequence"));
+        mos.getCollector("sequence", "B", reporter).collect(key,
+          new Text("sequence"));
+      }
+    }
+
+    public void close() throws IOException {
+      mos.close();
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOReduce implements Reducer<LongWritable, Text,
+    LongWritable, Text> {
+
+    private MultipleOutputs mos;
+
+    public void configure(JobConf conf) {
+      mos = new MultipleOutputs(conf);
+    }
+
+    public void reduce(LongWritable key, Iterator<Text> values,
+                       OutputCollector<LongWritable, Text> output,
+                       Reporter reporter)
+      throws IOException {
+      while (values.hasNext()) {
+        Text value = values.next();
+        if (!value.toString().equals("b")) {
+          output.collect(key, value);
+        } else {
+          mos.getCollector("text", reporter).collect(key, new Text("text"));
+          mos.getCollector("sequence", "B", reporter).collect(key,
+            new Text("sequence"));
+          mos.getCollector("sequence", "C", reporter).collect(key,
+            new Text("sequence"));
+        }
+      }
+    }
+
+    public void close() throws IOException {
+      mos.close();
+    }
+  }
+
+}