Browse Source

MAPREDUCE-7060. Cherry Pick PathOutputCommitter class/factory to branch-3.0.
Contributed by Steve Loughran.

Steve Loughran 7 năm trước cách đây
mục cha
commit
c960ae0b9a
11 tập tin đã thay đổi với 1080 bổ sung21 xóa
  1. 21 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
  2. 184 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
  3. 3 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
  4. 38 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java
  5. 6 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
  6. 79 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java
  7. 17 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
  8. 204 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
  9. 11 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  10. 22 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
  11. 495 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java

+ 21 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 
 /**
@@ -53,14 +54,23 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase {
   @SuppressWarnings("deprecation")
   @Override
   public void initializeMemberVariables() {
-    xmlFilename = new String("mapred-default.xml");
-    configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class,
-        JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class,
-	FileInputFormat.class, Job.class, NLineInputFormat.class,
-	JobConf.class, FileOutputCommitter.class };
+    xmlFilename = "mapred-default.xml";
+    configurationClasses = new Class[] {
+        MRJobConfig.class,
+        MRConfig.class,
+        JHAdminConfig.class,
+        ShuffleHandler.class,
+        FileOutputFormat.class,
+        FileInputFormat.class,
+        Job.class,
+        NLineInputFormat.class,
+        JobConf.class,
+        FileOutputCommitter.class,
+        PathOutputCommitterFactory.class
+    };
 
     // Initialize used variables
-    configurationPropsToSkipCompare = new HashSet<String>();
+    configurationPropsToSkipCompare = new HashSet<>();
 
     // Set error modes
     errorIfMissingConfigProps = true;
@@ -82,6 +92,11 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase {
         MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
     configurationPropsToSkipCompare.add(
         MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
+
+    // PathOutputCommitterFactory values
+    xmlPrefixToSkipCompare = new HashSet<>();
+    xmlPrefixToSkipCompare.add(
+        PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME);
   }
 
 }

+ 184 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java

@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a special committer which creates the factory for the committer and
+ * runs off that. Why does it exist? So that you can explicitly instantiate
+ * a committer by classname and yet still have the actual implementation
+ * driven dynamically by the factory options and destination filesystem.
+ * This simplifies integration
+ * with existing code which takes the classname of a committer.
+ * There's no factory for this, as that would lead to a loop.
+ *
+ * All commit protocol methods and accessors are delegated to the
+ * wrapped committer.
+ *
+ * How to use:
+ *
+ * <ol>
+ *   <li>
+ *     In applications which take a classname of committer in
+ *     a configuration option, set it to the canonical name of this class
+ *     (see {@link #NAME}). When this class is instantiated, it will
+ *     use the factory mechanism to locate the configured committer for the
+ *     destination.
+ *   </li>
+ *   <li>
+ *     In code, explicitly create an instance of this committer through
+ *     its constructor, then invoke commit lifecycle operations on it.
+ *     The dynamically configured committer will be created in the constructor
+ *     and have the lifecycle operations relayed to it.
+ *   </li>
+ * </ol>
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class BindingPathOutputCommitter extends PathOutputCommitter {
+
+  /**
+   * The classname for use in configurations.
+   */
+  public static final String NAME
+      = BindingPathOutputCommitter.class.getCanonicalName();
+
+  /**
+   * The bound committer.
+   */
+  private final PathOutputCommitter committer;
+
+  /**
+   * Instantiate.
+   * @param outputPath output path (may be null)
+   * @param context task context
+   * @throws IOException on any failure.
+   */
+  public BindingPathOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    committer = PathOutputCommitterFactory.getCommitterFactory(outputPath,
+        context.getConfiguration())
+        .createOutputCommitter(outputPath, context);
+  }
+
+  @Override
+  public Path getOutputPath() {
+    return committer.getOutputPath();
+  }
+
+  @Override
+  public Path getWorkPath() throws IOException {
+    return committer.getWorkPath();
+  }
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    committer.setupJob(jobContext);
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    committer.setupTask(taskContext);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext)
+      throws IOException {
+    return committer.needsTaskCommit(taskContext);
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    committer.commitTask(taskContext);
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    committer.abortTask(taskContext);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public void cleanupJob(JobContext jobContext) throws IOException {
+    super.cleanupJob(jobContext);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, JobStatus.State state)
+      throws IOException {
+    committer.abortJob(jobContext, state);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public boolean isRecoverySupported() {
+    return committer.isRecoverySupported();
+  }
+
+  @Override
+  public boolean isCommitJobRepeatable(JobContext jobContext)
+      throws IOException {
+    return committer.isCommitJobRepeatable(jobContext);
+  }
+
+  @Override
+  public boolean isRecoverySupported(JobContext jobContext) throws IOException {
+    return committer.isRecoverySupported(jobContext);
+  }
+
+  @Override
+  public void recoverTask(TaskAttemptContext taskContext) throws IOException {
+    committer.recoverTask(taskContext);
+  }
+
+  @Override
+  public boolean hasOutputPath() {
+    return committer.hasOutputPath();
+  }
+
+  @Override
+  public String toString() {
+    return "BindingPathOutputCommitter{"
+        + "committer=" + committer +
+        '}';
+  }
+
+  /**
+   * Get the inner committer.
+   * @return the bonded committer.
+   */
+  public PathOutputCommitter getCommitter() {
+    return committer;
+  }
+}

+ 3 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -155,17 +155,11 @@ public class FileOutputCommitter extends PathOutputCommitter {
    * @return the path where final output of the job should be placed.  This
    * could also be considered the committed application attempt path.
    */
-  private Path getOutputPath() {
+  @Override
+  public Path getOutputPath() {
     return this.outputPath;
   }
-  
-  /**
-   * @return true if we have an output path set, else false.
-   */
-  private boolean hasOutputPath() {
-    return this.outputPath != null;
-  }
-  
+
   /**
    * @return the path where the output of pending job attempts are
    * stored.

+ 38 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Creates a {@link FileOutputCommitter}, always.
+ */
+public final class FileOutputCommitterFactory
+    extends PathOutputCommitterFactory {
+
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return createFileOutputCommitter(outputPath, context);
+  }
+
+}

+ 6 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

@@ -328,12 +328,14 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
     job.getConfiguration().set(BASE_OUTPUT_NAME, name);
   }
 
-  public synchronized 
-     OutputCommitter getOutputCommitter(TaskAttemptContext context
-                                        ) throws IOException {
+  public synchronized
+      OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException {
     if (committer == null) {
       Path output = getOutputPath(context);
-      committer = new FileOutputCommitter(output, context);
+      committer = PathOutputCommitterFactory.getCommitterFactory(
+          output,
+          context.getConfiguration()).createOutputCommitter(output, context);
     }
     return committer;
   }

+ 79 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A factory which creates any named committer identified
+ * in the option {@link PathOutputCommitterFactory#NAMED_COMMITTER_CLASS}.
+ */
+public final class NamedCommitterFactory extends
+    PathOutputCommitterFactory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NamedCommitterFactory.class);
+
+  @SuppressWarnings("JavaReflectionMemberAccess")
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    Class<? extends PathOutputCommitter> clazz = loadCommitterClass(context);
+    LOG.debug("Using PathOutputCommitter implementation {}", clazz);
+    try {
+      Constructor<? extends PathOutputCommitter> ctor
+          = clazz.getConstructor(Path.class, TaskAttemptContext.class);
+      return ctor.newInstance(outputPath, context);
+    } catch (NoSuchMethodException
+        | InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException e) {
+      throw new IOException("Failed to create " + clazz
+          + ":" + e, e);
+    }
+  }
+
+  /**
+   * Load the class named in {@link #NAMED_COMMITTER_CLASS}.
+   * @param context job or task context
+   * @return the committer class
+   * @throws IOException if no committer was defined.
+   */
+  private Class<? extends PathOutputCommitter> loadCommitterClass(
+      JobContext context) throws IOException {
+    Preconditions.checkNotNull(context, "null context");
+    Configuration conf = context.getConfiguration();
+    String value = conf.get(NAMED_COMMITTER_CLASS, "");
+    if (value.isEmpty()) {
+      throw new IOException("No committer defined in " + NAMED_COMMITTER_CLASS);
+    }
+    return conf.getClass(NAMED_COMMITTER_CLASS,
+        FileOutputCommitter.class, PathOutputCommitter.class);
+  }
+}

+ 17 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java

@@ -75,10 +75,27 @@ public abstract class PathOutputCommitter extends OutputCommitter {
         + " {}", outputPath, context);
   }
 
+  /**
+   * Get the final directory where work will be placed once the job
+   * is committed. This may be null, in which case, there is no output
+   * path to write data to.
+   * @return the path where final output of the job should be placed.
+   */
+  public abstract Path getOutputPath();
+
+  /**
+   * Predicate: is there an output path?
+   * @return true if we have an output path set, else false.
+   */
+  public boolean hasOutputPath() {
+    return getOutputPath() != null;
+  }
+
   /**
    * Get the directory that the task should write results into.
    * Warning: there's no guarantee that this work path is on the same
    * FS as the final output, or that it's visible across machines.
+   * May be null.
    * @return the work directory
    * @throws IOException IO problem
    */

+ 204 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java

@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A factory for committers implementing the {@link PathOutputCommitter}
+ * methods, and so can be used from {@link FileOutputFormat}.
+ * The base implementation returns {@link FileOutputCommitter} instances.
+ *
+ * Algorithm:
+ * <ol>
+ *   <li>If an explicit committer factory is named, it is used.</li>
+ *   <li>The output path is examined.
+ *   If is non null and there is an explicit schema for that filesystem,
+ *   its factory is instantiated.</li>
+ *   <li>Otherwise, an instance of {@link FileOutputCommitter} is
+ *   created.</li>
+ * </ol>
+ *
+ * In {@link FileOutputFormat}, the created factory has its method
+ * {@link #createOutputCommitter(Path, TaskAttemptContext)} with a task
+ * attempt context and a possibly null path.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PathOutputCommitterFactory extends Configured {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PathOutputCommitterFactory.class);
+
+  /**
+   * Name of the configuration option used to configure the
+   * output committer factory to use unless there is a specific
+   * one for a schema.
+   */
+  public static final String COMMITTER_FACTORY_CLASS =
+      "mapreduce.outputcommitter.factory.class";
+
+  /**
+   * Scheme prefix for per-filesystem scheme committers.
+   */
+  public static final String COMMITTER_FACTORY_SCHEME =
+      "mapreduce.outputcommitter.factory.scheme";
+
+  /**
+   * String format pattern for per-filesystem scheme committers.
+   */
+  public static final String COMMITTER_FACTORY_SCHEME_PATTERN =
+      COMMITTER_FACTORY_SCHEME + ".%s";
+
+
+  /**
+   * The {@link FileOutputCommitter} factory.
+   */
+  public static final String FILE_COMMITTER_FACTORY  =
+      "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory";
+
+  /**
+   * The {@link FileOutputCommitter} factory.
+   */
+  public static final String NAMED_COMMITTER_FACTORY  =
+      "org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory";
+
+  /**
+   * The named output committer.
+   * Creates any committer listed in
+   */
+  public static final String NAMED_COMMITTER_CLASS =
+      "mapreduce.outputcommitter.named.classname";
+
+  /**
+   * Default committer factory name: {@value}.
+   */
+  public static final String COMMITTER_FACTORY_DEFAULT =
+      FILE_COMMITTER_FACTORY;
+
+  /**
+   * Create an output committer for a task attempt.
+   * @param outputPath output path. This may be null.
+   * @param context context
+   * @return a new committer
+   * @throws IOException problems instantiating the committer
+   */
+  public PathOutputCommitter createOutputCommitter(
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return createFileOutputCommitter(outputPath, context);
+  }
+
+  /**
+   * Create an instance of the default committer, a {@link FileOutputCommitter}
+   * for a task.
+   * @param outputPath the task's output path, or or null if no output path
+   * has been defined.
+   * @param context the task attempt context
+   * @return the committer to use
+   * @throws IOException problems instantiating the committer
+   */
+  protected final PathOutputCommitter createFileOutputCommitter(
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    LOG.debug("Creating FileOutputCommitter for path {} and context {}",
+        outputPath, context);
+    return new FileOutputCommitter(outputPath, context);
+  }
+
+  /**
+   * Get the committer factory for a configuration.
+   * @param outputPath the job's output path. If null, it means that the
+   * schema is unknown and a per-schema factory cannot be determined.
+   * @param conf configuration
+   * @return an instantiated committer factory
+   */
+  public static PathOutputCommitterFactory getCommitterFactory(
+      Path outputPath,
+      Configuration conf) {
+    // determine which key to look up the overall one or a schema-specific
+    // key
+    LOG.debug("Looking for committer factory for path {}", outputPath);
+    String key = COMMITTER_FACTORY_CLASS;
+    if (StringUtils.isEmpty(conf.getTrimmed(key)) && outputPath != null) {
+      // there is no explicit factory and there's an output path
+      // Get the scheme of the destination
+      String scheme = outputPath.toUri().getScheme();
+
+      // and see if it has a key
+      String schemeKey = String.format(COMMITTER_FACTORY_SCHEME_PATTERN,
+          scheme);
+      if (StringUtils.isNotEmpty(conf.getTrimmed(schemeKey))) {
+        // it does, so use that key in the classname lookup
+        LOG.debug("Using schema-specific factory for {}", outputPath);
+        key = schemeKey;
+      } else {
+        LOG.debug("No scheme-specific factory defined in {}", schemeKey);
+      }
+    }
+
+    // create the factory. Before using Configuration.getClass, check
+    // for an empty configuration value, as that raises ClassNotFoundException.
+    Class<? extends PathOutputCommitterFactory> factory;
+    String trimmedValue = conf.getTrimmed(key, "");
+    if (StringUtils.isEmpty(trimmedValue)) {
+      // empty/null value, use default
+      LOG.debug("No output committer factory defined,"
+          + " defaulting to FileOutputCommitterFactory");
+      factory = FileOutputCommitterFactory.class;
+    } else {
+      // key is set, get the class
+      factory = conf.getClass(key,
+          FileOutputCommitterFactory.class,
+          PathOutputCommitterFactory.class);
+      LOG.debug("Using OutputCommitter factory class {} from key {}",
+          factory, key);
+    }
+    return ReflectionUtils.newInstance(factory, conf);
+  }
+
+  /**
+   * Create the committer factory for a task attempt and destination, then
+   * create the committer from it.
+   * @param outputPath the task's output path, or or null if no output path
+   * has been defined.
+   * @param context the task attempt context
+   * @return the committer to use
+   * @throws IOException problems instantiating the committer
+   */
+  public static PathOutputCommitter createCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return getCommitterFactory(outputPath,
+        context.getConfiguration())
+        .createOutputCommitter(outputPath, context);
+  }
+
+}

+ 11 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -2043,4 +2043,15 @@
   <name>mapreduce.job.send-token-conf</name>
   <value></value>
 </property>
+
+<property>
+  <description>
+    The name of an output committer factory for MRv2 FileOutputFormat to use
+    for committing work. If set, overrides any per-filesystem committer
+    defined for the destination filesystem.
+  </description>
+  <name>mapreduce.outputcommitter.factory.class</name>
+  <value></value>
+</property>
+
 </configuration>

+ 22 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java

@@ -109,14 +109,34 @@ public class TestPathOutputCommitter extends Assert {
     public void abortTask(TaskAttemptContext taskContext) throws IOException {
 
     }
+
+    @Override
+    public Path getOutputPath() {
+      return null;
+    }
   }
 
   /**
    * Stub task context.
+   * The {@link #getConfiguration()} method returns the configuration supplied
+   * in the constructor; while {@link #setOutputCommitter(OutputCommitter)}
+   * sets the committer returned in {@link #getOutputCommitter()}.
+   * Otherwise, the methods are all no-ops.
    */
-  public class TaskContext
+  public static class TaskContext
       implements TaskInputOutputContext<String, String, String, String> {
 
+    private final Configuration configuration;
+
+    public TaskContext() {
+      this(new Configuration());
+    }
+
+    public TaskContext(Configuration conf) {
+      this.configuration = conf;
+    }
+
+
     private OutputCommitter outputCommitter;
 
     public void setOutputCommitter(OutputCommitter outputCommitter) {
@@ -180,7 +200,7 @@ public class TestPathOutputCommitter extends Assert {
 
     @Override
     public Configuration getConfiguration() {
-      return null;
+      return configuration;
     }
 
     @Override

+ 495 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java

@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test the committer factory logic, looking at the override
+ * and fallback behavior.
+ */
+@SuppressWarnings("unchecked")
+public class TestPathOutputCommitterFactory extends Assert {
+
+  private static final String HTTP_COMMITTER_FACTORY = String.format(
+      COMMITTER_FACTORY_SCHEME_PATTERN, "http");
+
+  private static final Path HTTP_PATH = new Path("http://hadoop.apache.org/");
+  private static final Path HDFS_PATH = new Path("hdfs://localhost:8081/");
+
+  private TaskAttemptID taskAttemptID =
+      new TaskAttemptID("local", 0, TaskType.MAP, 1, 2);
+
+  /**
+   * Set a factory for a schema, verify it works.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCommitterFactoryForSchema() throws Throwable {
+    createCommitterFactory(SimpleCommitterFactory.class,
+        HTTP_PATH,
+        newBondedConfiguration());
+  }
+
+  /**
+   * A schema factory only affects that filesystem.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCommitterFactoryFallbackDefault() throws Throwable {
+    createCommitterFactory(FileOutputCommitterFactory.class,
+        HDFS_PATH,
+        newBondedConfiguration());
+  }
+
+  /**
+   * A schema factory only affects that filesystem; test through
+   * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCommitterFallbackDefault() throws Throwable {
+    createCommitter(FileOutputCommitter.class,
+        HDFS_PATH,
+        taskAttempt(newBondedConfiguration()));
+  }
+
+  /**
+   * Verify that you can override any schema with an explicit name.
+   */
+  @Test
+  public void testCommitterFactoryOverride() throws Throwable {
+    Configuration conf = newBondedConfiguration();
+    // set up for the schema factory
+    // and then set a global one which overrides the others.
+    conf.set(COMMITTER_FACTORY_CLASS, OtherFactory.class.getName());
+    createCommitterFactory(OtherFactory.class, HDFS_PATH, conf);
+    createCommitterFactory(OtherFactory.class, HTTP_PATH, conf);
+  }
+
+  /**
+   * Verify that if the factory class option is "", schema factory
+   * resolution still works.
+   */
+  @Test
+  public void testCommitterFactoryEmptyOption() throws Throwable {
+    Configuration conf = newBondedConfiguration();
+    // set up for the schema factory
+    // and then set a global one which overrides the others.
+    conf.set(COMMITTER_FACTORY_CLASS, "");
+    createCommitterFactory(SimpleCommitterFactory.class, HTTP_PATH, conf);
+
+    // and HDFS, with no schema, falls back to the default
+    createCommitterFactory(FileOutputCommitterFactory.class, HDFS_PATH, conf);
+  }
+
+  /**
+   * Verify that if the committer factory class is unknown, you cannot
+   * create committers.
+   */
+  @Test
+  public void testCommitterFactoryUnknown() throws Throwable {
+    Configuration conf = new Configuration();
+    // set the factory to an unknown class
+    conf.set(COMMITTER_FACTORY_CLASS, "unknown");
+    intercept(RuntimeException.class,
+        () -> getCommitterFactory(HDFS_PATH, conf));
+  }
+
+  /**
+   * Verify that if the committer output path is null, you get back
+   * a FileOutputCommitter with null output & work paths.
+   */
+  @Test
+  public void testCommitterNullOutputPath() throws Throwable {
+    // bind http to schema
+    Configuration conf = newBondedConfiguration();
+    // then ask committers for a null path
+    FileOutputCommitter committer = createCommitter(
+        FileOutputCommitterFactory.class,
+        FileOutputCommitter.class,
+        null, conf);
+    assertNull(committer.getOutputPath());
+    assertNull(committer.getWorkPath());
+  }
+
+  /**
+   * Verify that if you explicitly name a committer, that takes priority
+   * over any filesystem committer.
+   */
+  @Test
+  public void testNamedCommitterFactory() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    SimpleCommitter sc = createCommitter(
+        NamedCommitterFactory.class,
+        SimpleCommitter.class, HDFS_PATH, conf);
+    assertEquals("Wrong output path from " + sc,
+        HDFS_PATH,
+        sc.getOutputPath());
+  }
+
+  /**
+   * Verify that if you explicitly name a committer and there's no
+   * path, the committer is picked up.
+   */
+  @Test
+  public void testNamedCommitterFactoryNullPath() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    SimpleCommitter sc = createCommitter(
+        NamedCommitterFactory.class,
+        SimpleCommitter.class,
+        null, conf);
+    assertNull(sc.getOutputPath());
+  }
+
+  /**
+   * Verify that if you explicitly name a committer and there's no
+   * path, the committer is picked up.
+   */
+  @Test
+  public void testNamedCommitterNullPath() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+
+    SimpleCommitter sc = createCommitter(
+        SimpleCommitter.class,
+        null, taskAttempt(conf));
+    assertNull(sc.getOutputPath());
+  }
+
+  /**
+   * Create a factory then a committer, validating the type of both.
+   * @param <T> type of factory
+   * @param <U> type of committer
+   * @param factoryClass expected factory class
+   * @param committerClass expected committer class
+   * @param path output path (may be null)
+   * @param conf configuration
+   * @return the committer
+   * @throws IOException failure to create
+   */
+  private <T extends PathOutputCommitterFactory, U extends PathOutputCommitter>
+      U createCommitter(Class<T> factoryClass,
+      Class<U> committerClass,
+      Path path,
+      Configuration conf) throws IOException {
+    T f = createCommitterFactory(factoryClass, path, conf);
+    PathOutputCommitter committer = f.createOutputCommitter(path,
+        taskAttempt(conf));
+    assertEquals(" Wrong committer for path " + path + " from factory " + f,
+        committerClass, committer.getClass());
+    return (U) committer;
+  }
+
+  /**
+   * Create a committer from a task context, via
+   * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}.
+   * @param <U> type of committer
+   * @param committerClass expected committer class
+   * @param path output path (may be null)
+   * @param context task attempt context
+   * @return the committer
+   * @throws IOException failure to create
+   */
+  private <U extends PathOutputCommitter> U createCommitter(
+      Class<U> committerClass,
+      Path path,
+      TaskAttemptContext context) throws IOException {
+    PathOutputCommitter committer = PathOutputCommitterFactory
+        .createCommitter(path, context);
+    assertEquals(" Wrong committer for path " + path,
+        committerClass, committer.getClass());
+    return (U) committer;
+  }
+
+  /**
+   * Create a factory then a committer, validating its type.
+   * @param factoryClass expected factory class
+   * @param path output path (may be null)
+   * @param conf configuration
+   * @param <T> type of factory
+   * @return the factory
+   */
+  private <T extends PathOutputCommitterFactory> T createCommitterFactory(
+      Class<T> factoryClass,
+      Path path,
+      Configuration conf) {
+    PathOutputCommitterFactory factory = getCommitterFactory(path, conf);
+    assertEquals(" Wrong factory for path " + path,
+        factoryClass, factory.getClass());
+    return (T)factory;
+  }
+
+  /**
+   * Create a new task attempt context.
+   * @param conf config
+   * @return a new context
+   */
+  private TaskAttemptContext taskAttempt(Configuration conf) {
+    return new TaskAttemptContextImpl(conf, taskAttemptID);
+  }
+
+  /**
+   * Verify that if you explicitly name a committer, that takes priority
+   * over any filesystem committer.
+   */
+  @Test
+  public void testFileOutputCommitterFactory() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, FILE_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    getCommitterFactory(HDFS_PATH, conf);
+    createCommitter(
+        FileOutputCommitterFactory.class,
+        FileOutputCommitter.class, null, conf);
+  }
+
+  /**
+   * Follow the entire committer chain down and create a new committer from
+   * the output format.
+   * @throws Throwable on a failure.
+   */
+  @Test
+  public void testFileOutputFormatBinding() throws Throwable {
+    Configuration conf = newBondedConfiguration();
+    conf.set(FileOutputFormat.OUTDIR, HTTP_PATH.toUri().toString());
+    TextOutputFormat<String, String> off = new TextOutputFormat<>();
+    SimpleCommitter committer = (SimpleCommitter)
+        off.getOutputCommitter(taskAttempt(conf));
+    assertEquals("Wrong output path from "+ committer,
+        HTTP_PATH,
+        committer.getOutputPath());
+  }
+
+  /**
+   * Follow the entire committer chain down and create a new committer from
+   * the output format.
+   * @throws Throwable on a failure.
+   */
+  @Test
+  public void testFileOutputFormatBindingNoPath() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.unset(FileOutputFormat.OUTDIR);
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    httpToSimpleFactory(conf);
+    TextOutputFormat<String, String> off = new TextOutputFormat<>();
+    SimpleCommitter committer = (SimpleCommitter)
+        off.getOutputCommitter(taskAttempt(conf));
+    assertNull("Output path from "+ committer,
+        committer.getOutputPath());
+  }
+
+  /**
+   * Bind the http schema CommitterFactory to {@link SimpleCommitterFactory}.
+   * @param conf config to patch
+   */
+  private Configuration httpToSimpleFactory(Configuration conf) {
+    conf.set(HTTP_COMMITTER_FACTORY, SimpleCommitterFactory.class.getName());
+    return conf;
+  }
+
+
+  /**
+   * Create a configuration with the http schema bonded to the simple factory.
+   * @return a new, patched configuration
+   */
+  private Configuration newBondedConfiguration() {
+    return httpToSimpleFactory(new Configuration());
+  }
+
+  /**
+   * Extract the (mandatory) cause of an exception.
+   * @param ex exception
+   * @param clazz expected class
+   * @return the cause, which will be of the expected type
+   * @throws AssertionError if there is a problem
+   */
+  private <E extends Throwable> E verifyCauseClass(Throwable ex,
+      Class<E> clazz) throws AssertionError {
+    Throwable cause = ex.getCause();
+    if (cause == null) {
+      throw new AssertionError("No cause", ex);
+    }
+    if (!cause.getClass().equals(clazz)) {
+      throw new AssertionError("Wrong cause class", cause);
+    }
+    return (E)cause;
+  }
+
+  @Test
+  public void testBadCommitterFactory() throws Throwable {
+    expectFactoryConstructionFailure(HTTP_COMMITTER_FACTORY);
+  }
+
+  @Test
+  public void testBoundCommitterWithSchema() throws Throwable {
+    // this verifies that a bound committer relays to the underlying committer
+    Configuration conf = newBondedConfiguration();
+    TestPathOutputCommitter.TaskContext tac
+        = new TestPathOutputCommitter.TaskContext(conf);
+    BindingPathOutputCommitter committer
+        = new BindingPathOutputCommitter(HTTP_PATH, tac);
+    intercept(IOException.class, "setupJob",
+        () -> committer.setupJob(tac));
+  }
+
+  @Test
+  public void testBoundCommitterWithDefault() throws Throwable {
+    // this verifies that a bound committer relays to the underlying committer
+    Configuration conf = newBondedConfiguration();
+    TestPathOutputCommitter.TaskContext tac
+        = new TestPathOutputCommitter.TaskContext(conf);
+    BindingPathOutputCommitter committer
+        = new BindingPathOutputCommitter(HDFS_PATH, tac);
+    assertEquals(FileOutputCommitter.class,
+        committer.getCommitter().getClass());
+  }
+
+  /**
+   * Set the specific key to a string which is not a factory class; expect
+   * a failure.
+   * @param key key to set
+   * @throws Throwable on a failure
+   */
+  @SuppressWarnings("ThrowableNotThrown")
+  protected void expectFactoryConstructionFailure(String key) throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(key, "Not a factory");
+    RuntimeException ex = intercept(RuntimeException.class,
+        () -> getCommitterFactory(HTTP_PATH, conf));
+    verifyCauseClass(
+        verifyCauseClass(ex, RuntimeException.class),
+        ClassNotFoundException.class);
+  }
+
+  /**
+   * A simple committer.
+   */
+  public static final class SimpleCommitter extends PathOutputCommitter {
+
+    private final Path outputPath;
+
+    public SimpleCommitter(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+      this.outputPath = outputPath;
+    }
+
+    @Override
+    public Path getWorkPath() throws IOException {
+      return null;
+    }
+
+    /**
+     * Job setup throws an exception.
+     * @param jobContext Context of the job
+     * @throws IOException always
+     */
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+      throw new IOException("setupJob");
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public Path getOutputPath() {
+      return outputPath;
+    }
+  }
+
+  /**
+   * The simple committer factory.
+   */
+  private static class SimpleCommitterFactory
+      extends PathOutputCommitterFactory {
+
+    @Override
+    public PathOutputCommitter createOutputCommitter(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      return new SimpleCommitter(outputPath, context);
+    }
+
+  }
+
+  /**
+   * Some other factory.
+   */
+  private static class OtherFactory extends PathOutputCommitterFactory {
+
+    /**
+     * {@inheritDoc}
+     * @param outputPath output path. This may be null.
+     * @param context context
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public PathOutputCommitter createOutputCommitter(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      return new SimpleCommitter(outputPath, context);
+    }
+
+  }
+
+}