Преглед на файлове

MAPREDUCE-4813. AM timing out during job commit (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1426536 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans преди 12 години
родител
ревизия
402eb18513
променени са 36 файла, в които са добавени 1468 реда и са изтрити 517 реда
  1. 2 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 2 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java
  3. 2 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java
  4. 11 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  5. 28 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEvent.java
  6. 252 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
  7. 26 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventType.java
  8. 50 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterJobAbortEvent.java
  9. 42 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterJobCommitEvent.java
  10. 42 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterJobSetupEvent.java
  11. 43 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterTaskAbortEvent.java
  12. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/package-info.java
  13. 4 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
  14. 36 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobAbortCompletedEvent.java
  15. 27 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCommitCompletedEvent.java
  16. 34 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCommitFailedEvent.java
  17. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
  18. 28 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobSetupCompletedEvent.java
  19. 35 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobSetupFailedEvent.java
  20. 286 90
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  21. 3 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
  22. 3 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
  23. 4 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  24. 1 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  25. 4 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  26. 0 28
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleaner.java
  27. 0 124
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
  28. 0 56
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanupEvent.java
  29. 55 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  30. 1 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
  31. 408 139
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
  32. 7 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
  33. 1 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
  34. 7 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
  35. 9 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
  36. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -651,6 +651,8 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4902. Fix typo "receievd" should be "received" in log output
     (Albert Chu via jlowe)
 
+    MAPREDUCE-4813. AM timing out during job commit (jlowe via bobby)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -42,12 +41,12 @@ public class MapTaskAttemptImpl extends TaskAttemptImpl {
       EventHandler eventHandler, Path jobFile, 
       int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
       TaskAttemptListener taskAttemptListener, 
-      OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
+      Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       AppContext appContext) {
     super(taskId, attempt, eventHandler, 
         taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
-        committer, jobToken, credentials, clock, appContext);
+        jobToken, credentials, clock, appContext);
     this.splitInfo = splitInfo;
   }
 

+ 2 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@@ -40,12 +39,12 @@ public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
   public ReduceTaskAttemptImpl(TaskId id, int attempt,
       EventHandler eventHandler, Path jobFile, int partition,
       int numMapTasks, JobConf conf,
-      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       AppContext appContext) {
     super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
-        conf, new String[] {}, committer, jobToken, credentials, clock,
+        conf, new String[] {}, jobToken, credentials, clock,
         appContext);
     this.numMapTasks = numMapTasks;
   }

+ 11 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -62,6 +62,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -87,8 +90,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
@@ -162,7 +163,7 @@ public class MRAppMaster extends CompositeService {
   private Recovery recoveryServ;
   private ContainerAllocator containerAllocator;
   private ContainerLauncher containerLauncher;
-  private TaskCleaner taskCleaner;
+  private EventHandler<CommitterEvent> committerEventHandler;
   private Speculator speculator;
   private TaskAttemptListener taskAttemptListener;
   private JobTokenSecretManager jobTokenSecretManager =
@@ -268,8 +269,8 @@ public class MRAppMaster extends CompositeService {
     addIfService(taskAttemptListener);
 
     //service to do the task cleanup
-    taskCleaner = createTaskCleaner(context);
-    addIfService(taskCleaner);
+    committerEventHandler = createCommitterEventHandler(context, committer);
+    addIfService(committerEventHandler);
 
     //service to handle requests from JobClient
     clientService = createClientService(context);
@@ -288,7 +289,7 @@ public class MRAppMaster extends CompositeService {
     dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
     dispatcher.register(TaskAttemptEventType.class, 
         new TaskAttemptEventDispatcher());
-    dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+    dispatcher.register(CommitterEventType.class, committerEventHandler);
    
     if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
         || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
@@ -493,7 +494,7 @@ public class MRAppMaster extends CompositeService {
     Job newJob =
         new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
-            completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
+            completedTasksFromPreviousRun, metrics, newApiCommitter,
             currentUser.getUserName(), appSubmitTime, amInfos, context);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
@@ -585,8 +586,9 @@ public class MRAppMaster extends CompositeService {
     return lis;
   }
 
-  protected TaskCleaner createTaskCleaner(AppContext context) {
-    return new TaskCleanerImpl(context);
+  protected EventHandler<CommitterEvent> createCommitterEventHandler(
+      AppContext context, OutputCommitter committer) {
+    return new CommitterEventHandler(context, committer);
   }
 
   protected ContainerAllocator createContainerAllocator(

+ 28 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEvent.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class CommitterEvent extends AbstractEvent<CommitterEventType> {
+
+  public CommitterEvent(CommitterEventType type) {
+    super(type);
+  }
+}

+ 252 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java

@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class CommitterEventHandler extends AbstractService
+    implements EventHandler<CommitterEvent> {
+
+  private static final Log LOG =
+      LogFactory.getLog(CommitterEventHandler.class);
+
+  private final AppContext context;
+  private final OutputCommitter committer;
+  private ThreadPoolExecutor launcherPool;
+  private Thread eventHandlingThread;
+  private BlockingQueue<CommitterEvent> eventQueue =
+      new LinkedBlockingQueue<CommitterEvent>();
+  private final AtomicBoolean stopped;
+  private Thread jobCommitThread = null;
+  private int commitThreadCancelTimeoutMs;
+
+  public CommitterEventHandler(AppContext context, OutputCommitter committer) {
+    super("CommitterEventHandler");
+    this.context = context;
+    this.committer = committer;
+    this.stopped = new AtomicBoolean(false);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    commitThreadCancelTimeoutMs = conf.getInt(
+        MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
+        MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
+  }
+
+  @Override
+  public void start() {
+    ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("CommitterEvent Processor #%d")
+      .build();
+    launcherPool = new ThreadPoolExecutor(5, 5, 1,
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        CommitterEvent event = null;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
+            return;
+          }
+          // the events from the queue are handled in parallel
+          // using a thread pool
+          launcherPool.execute(new EventProcessor(event));        }
+      }
+    });
+    eventHandlingThread.setName("CommitterEvent Handler");
+    eventHandlingThread.start();
+    super.start();
+  }
+
+
+  @Override
+  public void handle(CommitterEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
+    eventHandlingThread.interrupt();
+    launcherPool.shutdown();
+    super.stop();
+  }
+
+  private synchronized void jobCommitStarted() throws IOException {
+    if (jobCommitThread != null) {
+      throw new IOException("Commit while another commit thread active: "
+          + jobCommitThread.toString());
+    }
+
+    jobCommitThread = Thread.currentThread();
+  }
+
+  private synchronized void jobCommitEnded() {
+    if (jobCommitThread == Thread.currentThread()) {
+      jobCommitThread = null;
+      notifyAll();
+    }
+  }
+
+  private synchronized void cancelJobCommit() {
+    Thread threadCommitting = jobCommitThread;
+    if (threadCommitting != null && threadCommitting.isAlive()) {
+      LOG.info("Canceling commit");
+      threadCommitting.interrupt();
+
+      // wait up to configured timeout for commit thread to finish
+      long now = context.getClock().getTime();
+      long timeoutTimestamp = now + commitThreadCancelTimeoutMs;
+      try {
+        while (jobCommitThread == threadCommitting
+            && now > timeoutTimestamp) {
+          wait(now - timeoutTimestamp);
+          now = context.getClock().getTime();
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  private class EventProcessor implements Runnable {
+    private CommitterEvent event;
+
+    EventProcessor(CommitterEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Processing the event " + event.toString());
+      switch (event.getType()) {
+      case JOB_SETUP:
+        handleJobSetup((CommitterJobSetupEvent) event);
+        break;
+      case JOB_COMMIT:
+        handleJobCommit((CommitterJobCommitEvent) event);
+        break;
+      case JOB_ABORT:
+        handleJobAbort((CommitterJobAbortEvent) event);
+        break;
+      case TASK_ABORT:
+        handleTaskAbort((CommitterTaskAbortEvent) event);
+        break;
+      default:
+        throw new YarnException("Unexpected committer event "
+            + event.toString());
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void handleJobSetup(CommitterJobSetupEvent event) {
+      try {
+        committer.setupJob(event.getJobContext());
+        context.getEventHandler().handle(
+            new JobSetupCompletedEvent(event.getJobID()));
+      } catch (Exception e) {
+        LOG.warn("Job setup failed", e);
+        context.getEventHandler().handle(new JobSetupFailedEvent(
+            event.getJobID(), StringUtils.stringifyException(e)));
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void handleJobCommit(CommitterJobCommitEvent event) {
+      try {
+        jobCommitStarted();
+        committer.commitJob(event.getJobContext());
+        context.getEventHandler().handle(
+            new JobCommitCompletedEvent(event.getJobID()));
+      } catch (Exception e) {
+          LOG.error("Could not commit job", e);
+          context.getEventHandler().handle(
+              new JobCommitFailedEvent(event.getJobID(),
+                  StringUtils.stringifyException(e)));
+      } finally {
+        jobCommitEnded();
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void handleJobAbort(CommitterJobAbortEvent event) {
+      cancelJobCommit();
+
+      try {
+        committer.abortJob(event.getJobContext(), event.getFinalState());
+      } catch (Exception e) {
+        LOG.warn("Could not abort job", e);
+      }
+
+      context.getEventHandler().handle(new JobAbortCompletedEvent(
+          event.getJobID(), event.getFinalState()));
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void handleTaskAbort(CommitterTaskAbortEvent event) {
+      try {
+        committer.abortTask(event.getAttemptContext());
+      } catch (Exception e) {
+        LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
+      }
+      context.getEventHandler().handle(
+          new TaskAttemptEvent(event.getAttemptID(),
+              TaskAttemptEventType.TA_CLEANUP_DONE));
+    }
+  }
+}

+ 26 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventType.java

@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+public enum CommitterEventType {
+  JOB_SETUP,
+  JOB_COMMIT,
+  JOB_ABORT,
+  TASK_ABORT
+}

+ 50 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterJobAbortEvent.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class CommitterJobAbortEvent extends CommitterEvent {
+
+  private JobId jobID;
+  private JobContext jobContext;
+  private JobStatus.State finalState;
+
+  public CommitterJobAbortEvent(JobId jobID, JobContext jobContext,
+      JobStatus.State finalState) {
+    super(CommitterEventType.JOB_ABORT);
+    this.jobID = jobID;
+    this.jobContext = jobContext;
+    this.finalState = finalState;
+  }
+
+  public JobId getJobID() {
+    return jobID;
+  }
+
+  public JobContext getJobContext() {
+    return jobContext;
+  }
+
+  public JobStatus.State getFinalState() {
+    return finalState;
+  }
+}

+ 42 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterJobCommitEvent.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class CommitterJobCommitEvent extends CommitterEvent {
+
+  private JobId jobID;
+  private JobContext jobContext;
+
+  public CommitterJobCommitEvent(JobId jobID, JobContext jobContext) {
+    super(CommitterEventType.JOB_COMMIT);
+    this.jobID = jobID;
+    this.jobContext = jobContext;
+  }
+
+  public JobId getJobID() {
+    return jobID;
+  }
+
+  public JobContext getJobContext() {
+    return jobContext;
+  }
+}

+ 42 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterJobSetupEvent.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class CommitterJobSetupEvent extends CommitterEvent {
+
+  private JobId jobID;
+  private JobContext jobContext;
+
+  public CommitterJobSetupEvent(JobId jobID, JobContext jobContext) {
+    super(CommitterEventType.JOB_SETUP);
+    this.jobID = jobID;
+    this.jobContext = jobContext;
+  }
+
+  public JobId getJobID() {
+    return jobID;
+  }
+
+  public JobContext getJobContext() {
+    return jobContext;
+  }
+}

+ 43 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterTaskAbortEvent.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.commit;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class CommitterTaskAbortEvent extends CommitterEvent {
+
+  private final TaskAttemptId attemptID;
+  private final TaskAttemptContext attemptContext;
+
+  public CommitterTaskAbortEvent(TaskAttemptId attemptID,
+      TaskAttemptContext attemptContext) {
+    super(CommitterEventType.TASK_ABORT);
+    this.attemptID = attemptID;
+    this.attemptContext = attemptContext;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return attemptID;
+  }
+
+  public TaskAttemptContext getAttemptContext() {
+    return attemptContext;
+  }
+}

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/package-info.java → hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/package-info.java

@@ -16,5 +16,5 @@
  * limitations under the License.
  */
 @InterfaceAudience.Private
-package org.apache.hadoop.mapreduce.v2.app.taskclean;
+package org.apache.hadoop.mapreduce.v2.app.commit;
 import org.apache.hadoop.classification.InterfaceAudience;

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java

@@ -20,11 +20,15 @@ package org.apache.hadoop.mapreduce.v2.app.job;
 
 public enum JobStateInternal {
   NEW,
+  SETUP,
   INITED,
   RUNNING,
+  COMMITTING,
   SUCCEEDED,
+  FAIL_ABORT,
   FAILED,
   KILL_WAIT,
+  KILL_ABORT,
   KILLED,
   ERROR
 }

+ 36 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobAbortCompletedEvent.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobAbortCompletedEvent extends JobEvent {
+
+  private JobStatus.State finalState;
+
+  public JobAbortCompletedEvent(JobId jobID, JobStatus.State finalState) {
+    super(jobID, JobEventType.JOB_ABORT_COMPLETED);
+    this.finalState = finalState;
+  }
+
+  public JobStatus.State getFinalState() {
+    return finalState;
+  }
+}

+ 27 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCommitCompletedEvent.java

@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobCommitCompletedEvent extends JobEvent {
+
+  public JobCommitCompletedEvent(JobId jobID) {
+    super(jobID, JobEventType.JOB_COMMIT_COMPLETED);
+  }
+}

+ 34 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCommitFailedEvent.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobCommitFailedEvent extends JobEvent {
+  private String message;
+
+  public JobCommitFailedEvent(JobId jobID, String message) {
+    super(jobID, JobEventType.JOB_COMMIT_FAILED);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+}

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java

@@ -35,6 +35,13 @@ public enum JobEventType {
   JOB_MAP_TASK_RESCHEDULED,
   JOB_TASK_ATTEMPT_COMPLETED,
 
+  //Producer:CommitterEventHandler
+  JOB_SETUP_COMPLETED,
+  JOB_SETUP_FAILED,
+  JOB_COMMIT_COMPLETED,
+  JOB_COMMIT_FAILED,
+  JOB_ABORT_COMPLETED,
+
   //Producer:Job
   JOB_COMPLETED,
 

+ 28 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobSetupCompletedEvent.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobSetupCompletedEvent extends JobEvent {
+
+  public JobSetupCompletedEvent(JobId jobID) {
+    super(jobID, JobEventType.JOB_SETUP_COMPLETED);
+  }
+}

+ 35 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobSetupFailedEvent.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobSetupFailedEvent extends JobEvent {
+
+  private String message;
+
+  public JobSetupFailedEvent(JobId jobID, String message) {
+    super(jobID, JobEventType.JOB_SETUP_FAILED);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

+ 286 - 90
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -77,14 +76,20 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobAbortEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobCommitEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobSetupEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@@ -138,7 +143,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private final Clock clock;
   private final JobACLsManager aclsManager;
   private final String username;
-  private final OutputCommitter committer;
   private final Map<JobACL, AccessControlList> jobACLs;
   private float setupWeight = 0.05f;
   private float cleanupWeight = 0.05f;
@@ -176,6 +180,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private Counters fullCounters = null;
   private Counters finalMapCounters = null;
   private Counters finalReduceCounters = null;
+
     // FIXME:  
     //
     // Can then replace task-level uber counters (MR-2424) with job-level ones
@@ -245,7 +250,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
               JobEventType.JOB_START,
               new StartTransition())
           .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
@@ -257,19 +262,43 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           // Ignore-able events
           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_UPDATED_NODES)
-              
+
+          // Transitions from SETUP state
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
+              JobEventType.JOB_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
+              JobEventType.JOB_SETUP_COMPLETED,
+              new SetupCompletedTransition())
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.FAIL_ABORT,
+              JobEventType.JOB_SETUP_FAILED,
+              new SetupFailedTransition())
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.KILL_ABORT,
+              JobEventType.JOB_KILL,
+              new KilledDuringSetupTransition())
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
+              JobEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
+              JobEventType.JOB_UPDATED_NODES)
+
           // Transitions from RUNNING state
           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
               (JobStateInternal.RUNNING,
-              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
+              EnumSet.of(JobStateInternal.RUNNING,
+                  JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition
               (JobStateInternal.RUNNING,
-              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
+              EnumSet.of(JobStateInternal.RUNNING,
+                  JobStateInternal.COMMITTING),
               JobEventType.JOB_COMPLETED,
               new JobNoTasksCompletedTransition())
           .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
@@ -296,7 +325,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           // Transitions from KILL_WAIT state.
           .addTransition
               (JobStateInternal.KILL_WAIT,
-              EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
+              EnumSet.of(JobStateInternal.KILL_WAIT,
+                  JobStateInternal.KILL_ABORT),
               JobEventType.JOB_TASK_COMPLETED,
               new KillWaitTaskCompletedTransition())
           .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
@@ -318,6 +348,35 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
+          // Transitions from COMMITTING state
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.SUCCEEDED,
+              JobEventType.JOB_COMMIT_COMPLETED,
+              new CommitSucceededTransition())
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.FAIL_ABORT,
+              JobEventType.JOB_COMMIT_FAILED,
+              new CommitFailedTransition())
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.KILL_ABORT,
+              JobEventType.JOB_KILL,
+              new KilledDuringCommitTransition())
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.COMMITTING,
+              JobEventType.JOB_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.COMMITTING,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(JobStateInternal.COMMITTING,
+              JobStateInternal.COMMITTING,
+              EnumSet.of(JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+
           // Transitions from SUCCEEDED state
           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -334,6 +393,61 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
+          // Transitions from FAIL_ABORT state
+          .addTransition(JobStateInternal.FAIL_ABORT,
+              JobStateInternal.FAIL_ABORT,
+              JobEventType.JOB_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.FAIL_ABORT,
+              JobStateInternal.FAIL_ABORT,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
+              JobEventType.JOB_ABORT_COMPLETED,
+              new JobAbortCompletedTransition())
+          .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.KILLED,
+              JobEventType.JOB_KILL,
+              new KilledDuringAbortTransition())
+          .addTransition(JobStateInternal.FAIL_ABORT,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(JobStateInternal.FAIL_ABORT,
+              JobStateInternal.FAIL_ABORT,
+              EnumSet.of(JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_COMPLETED,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED))
+
+          // Transitions from KILL_ABORT state
+          .addTransition(JobStateInternal.KILL_ABORT,
+              JobStateInternal.KILL_ABORT,
+              JobEventType.JOB_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.KILL_ABORT,
+              JobStateInternal.KILL_ABORT,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+          .addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED,
+              JobEventType.JOB_ABORT_COMPLETED,
+              new JobAbortCompletedTransition())
+          .addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED,
+              JobEventType.JOB_KILL,
+              new KilledDuringAbortTransition())
+          .addTransition(JobStateInternal.KILL_ABORT,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(JobStateInternal.KILL_ABORT,
+              JobStateInternal.KILL_ABORT,
+              EnumSet.of(JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_SETUP_COMPLETED,
+                  JobEventType.JOB_SETUP_FAILED,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED))
+
           // Transitions from FAILED state
           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -351,7 +465,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                   JobEventType.JOB_TASK_COMPLETED,
                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_SETUP_COMPLETED,
+                  JobEventType.JOB_SETUP_FAILED,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_ABORT_COMPLETED))
 
           // Transitions from KILLED state
           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
@@ -366,8 +485,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           // Ignore-able events
           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_START,
                   JobEventType.JOB_UPDATED_NODES,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_SETUP_COMPLETED,
+                  JobEventType.JOB_SETUP_FAILED,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_ABORT_COMPLETED))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
@@ -381,6 +506,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_SETUP_COMPLETED,
+                  JobEventType.JOB_SETUP_FAILED,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_ABORT_COMPLETED,
                   JobEventType.INTERNAL_ERROR))
           .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
@@ -417,7 +547,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
-      OutputCommitter committer, boolean newApiCommitter, String userName,
+      boolean newApiCommitter, String userName,
       long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) {
     this.applicationAttemptId = applicationAttemptId;
     this.jobId = jobId;
@@ -442,7 +572,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
 
     this.fsTokens = fsTokenCredentials;
     this.jobTokenSecretManager = jobTokenSecretManager;
-    this.committer = committer;
 
     this.aclsManager = new JobACLsManager(conf);
     this.username = System.getProperty("user.name");
@@ -461,11 +590,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     return jobId;
   }
 
-  // Getter methods that make unit testing easier (package-scoped)
-  OutputCommitter getCommitter() {
-    return this.committer;
-  }
-
   EventHandler getEventHandler() {
     return this.eventHandler;
   }
@@ -751,9 +875,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   
   private static JobState getExternalState(JobStateInternal smState) {
-    if (smState == JobStateInternal.KILL_WAIT) {
+    switch (smState) {
+    case KILL_WAIT:
+    case KILL_ABORT:
       return JobState.KILLED;
-    } else {
+    case SETUP:
+    case COMMITTING:
+      return JobState.RUNNING;
+    case FAIL_ABORT:
+      return JobState.FAILED;
+    default:
       return JobState.valueOf(smState.name());
     }
   }
@@ -799,22 +930,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     return FileSystem.get(conf);
   }
   
-  static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
-    // check for Job success
-    if (job.completedTaskCount == job.tasks.size()) {
-      try {
-        // Commit job & do cleanup
-        job.getCommitter().commitJob(job.getJobContext());
-      } catch (IOException e) {
-        LOG.error("Could not do commit for Job", e);
-        job.addDiagnostic("Job commit failed: " + e.getMessage());
-        job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobStateInternal.FAILED);
-      }
-      job.logJobHistoryFinishedEvent();
-      return job.finished(JobStateInternal.SUCCEEDED);
+  protected JobStateInternal checkReadyForCommit() {
+    JobStateInternal currentState = getInternalState();
+    if (completedTaskCount == tasks.size()
+        && currentState == JobStateInternal.RUNNING) {
+      eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
+      return JobStateInternal.COMMITTING;
     }
-    return null;
+    // return the current state as job not ready to commit yet
+    return getInternalState();
   }
 
   JobStateInternal finished(JobStateInternal finalState) {
@@ -1104,25 +1228,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         job.allowedReduceFailuresPercent =
             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
 
-        // do the setup
-        job.committer.setupJob(job.jobContext);
-        job.setupProgress = 1.0f;
-
         // create the Tasks but don't start them yet
         createMapTasks(job, inputLength, taskSplitMetaInfo);
         createReduceTasks(job);
 
         job.metrics.endPreparingJob(job);
         return JobStateInternal.INITED;
-        //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
-
       } catch (IOException e) {
         LOG.warn("Job init failed", e);
+        job.metrics.endPreparingJob(job);
         job.addDiagnostic("Job init failed : "
             + StringUtils.stringifyException(e));
-        job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        job.metrics.endPreparingJob(job);
-        return job.finished(JobStateInternal.FAILED);
+        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+            job.jobContext,
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+        return JobStateInternal.FAILED;
       }
     }
 
@@ -1174,7 +1294,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                 job.remoteJobConfFile, 
                 job.conf, splits[i], 
                 job.taskAttemptListener, 
-                job.committer, job.jobToken, job.fsTokens,
+                job.jobToken, job.fsTokens,
                 job.clock, job.completedTasksFromPreviousRun, 
                 job.applicationAttemptId.getAttemptId(),
                 job.metrics, job.appContext);
@@ -1191,7 +1311,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                 job.eventHandler, 
                 job.remoteJobConfFile, 
                 job.conf, job.numMapTasks, 
-                job.taskAttemptListener, job.committer, job.jobToken,
+                job.taskAttemptListener, job.jobToken,
                 job.fsTokens, job.clock,
                 job.completedTasksFromPreviousRun, 
                 job.applicationAttemptId.getAttemptId(),
@@ -1224,6 +1344,35 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
   } // end of InitTransition
 
+  private static class SetupCompletedTransition
+      implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.setupProgress = 1.0f;
+      job.scheduleTasks(job.mapTasks);  // schedule (i.e., start) the maps
+      job.scheduleTasks(job.reduceTasks);
+
+      // If we have no tasks, just transition to job completed
+      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
+        job.eventHandler.handle(new JobEvent(job.jobId,
+            JobEventType.JOB_COMPLETED));
+      }
+    }
+  }
+
+  private static class SetupFailedTransition
+      implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.metrics.endRunningJob(job);
+      job.addDiagnostic("Job setup failed : "
+          + ((JobSetupFailedEvent) event).getMessage());
+      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+          job.jobContext,
+          org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+    }
+  }
+
   public static class StartTransition
   implements SingleArcTransition<JobImpl, JobEvent> {
     /**
@@ -1233,43 +1382,45 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     @Override
     public void transition(JobImpl job, JobEvent event) {
       job.startTime = job.clock.getTime();
-      job.scheduleTasks(job.mapTasks);  // schedule (i.e., start) the maps
-      job.scheduleTasks(job.reduceTasks);
       JobInitedEvent jie =
         new JobInitedEvent(job.oldJobId,
              job.startTime,
              job.numMapTasks, job.numReduceTasks,
              job.getState().toString(),
-             job.isUber()); //Will transition to state running. Currently in INITED
+             job.isUber());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
       JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
           job.appSubmitTime, job.startTime);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
       job.metrics.runningJob(job);
 
-			// If we have no tasks, just transition to job completed
-      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
-        job.eventHandler.handle(new JobEvent(job.jobId, JobEventType.JOB_COMPLETED));
-      }
+      job.eventHandler.handle(new CommitterJobSetupEvent(
+              job.jobId, job.jobContext));
     }
   }
 
-  protected void abortJob(
-      org.apache.hadoop.mapreduce.JobStatus.State finalState) {
-    try {
-      committer.abortJob(jobContext, finalState);
-    } catch (IOException e) {
-      LOG.warn("Could not abortJob", e);
+  private void unsuccessfulFinish(JobStateInternal finalState) {
+      if (finishTime == 0) setFinishTime();
+      cleanupProgress = 1.0f;
+      JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
+          new JobUnsuccessfulCompletionEvent(oldJobId,
+              finishTime,
+              succeededMapTaskCount,
+              succeededReduceTaskCount,
+              finalState.toString());
+      eventHandler.handle(new JobHistoryEvent(jobId,
+          unsuccessfulJobEvent));
+      finished(finalState);
+  }
+
+  private static class JobAbortCompletedTransition
+  implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobStateInternal finalState = JobStateInternal.valueOf(
+          ((JobAbortCompletedEvent) event).getFinalState().name());
+      job.unsuccessfulFinish(finalState);
     }
-    if (finishTime == 0) setFinishTime();
-    cleanupProgress = 1.0f;
-    JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
-      new JobUnsuccessfulCompletionEvent(oldJobId,
-          finishTime,
-          succeededMapTaskCount,
-          succeededReduceTaskCount,
-          finalState.toString());
-    eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
   }
     
   // JobFinishedEvent triggers the move of the history file out of the staging
@@ -1343,9 +1494,22 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   implements SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
-      job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
       job.addDiagnostic("Job received Kill in INITED state.");
-      job.finished(JobStateInternal.KILLED);
+      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+          job.jobContext,
+          org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
+    }
+  }
+
+  private static class KilledDuringSetupTransition
+  implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.metrics.endRunningJob(job);
+      job.addDiagnostic("Job received kill in SETUP state.");
+      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+          job.jobContext,
+          org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
     }
   }
 
@@ -1470,10 +1634,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         taskKilled(job, task);
       }
 
-      return checkJobForCompletion(job);
+      return checkJobAfterTaskCompletion(job);
     }
 
-    protected JobStateInternal checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
       //check for Job failure
       if (job.failedMapTaskCount*100 > 
         job.allowedMapFailuresPercent*job.numMapTasks ||
@@ -1486,17 +1650,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
             " failedReduces:" + job.failedReduceTaskCount;
         LOG.info(diagnosticMsg);
         job.addDiagnostic(diagnosticMsg);
-        job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobStateInternal.FAILED);
-      }
-      
-      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
-      if (jobCompleteSuccess != null) {
-        return jobCompleteSuccess;
+        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+            job.jobContext,
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+        return JobStateInternal.FAIL_ABORT;
       }
       
-      //return the current state, Job not finished yet
-      return job.getInternalState();
+      return job.checkReadyForCommit();
     }
 
     private void taskSucceeded(JobImpl job, Task task) {
@@ -1529,18 +1689,52 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
 
   // Transition class for handling jobs with no tasks
-  static class JobNoTasksCompletedTransition implements
+  private static class JobNoTasksCompletedTransition implements
   MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
     public JobStateInternal transition(JobImpl job, JobEvent event) {
-      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
-      if (jobCompleteSuccess != null) {
-        return jobCompleteSuccess;
-      }
-      
-      // Return the current state, Job not finished yet
-      return job.getInternalState();
+      return job.checkReadyForCommit();
+    }
+  }
+
+  private static class CommitSucceededTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.logJobHistoryFinishedEvent();
+      job.finished(JobStateInternal.SUCCEEDED);
+    }
+  }
+
+  private static class CommitFailedTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
+      job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
+      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+          job.jobContext,
+          org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+    }
+  }
+
+  private static class KilledDuringCommitTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.setFinishTime();
+      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+          job.jobContext,
+          org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
+    }
+  }
+
+  private static class KilledDuringAbortTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.unsuccessfulFinish(JobStateInternal.KILLED);
     }
   }
 
@@ -1557,11 +1751,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private static class KillWaitTaskCompletedTransition extends  
       TaskCompletedTransition {
     @Override
-    protected JobStateInternal checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
       if (job.completedTaskCount == job.tasks.size()) {
         job.setFinishTime();
-        job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-        return job.finished(JobStateInternal.KILLED);
+        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+            job.jobContext,
+            org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
+        return JobStateInternal.KILL_ABORT;
       }
       //return the current state, Job not finished yet
       return job.getInternalState();

+ 3 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -47,13 +46,13 @@ public class MapTaskImpl extends TaskImpl {
   public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
       Path remoteJobConfFile, JobConf conf,
       TaskSplitMetaInfo taskSplitMetaInfo,
-      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
       MRAppMetrics metrics, AppContext appContext) {
     super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
-        conf, taskAttemptListener, committer, jobToken, credentials, clock,
+        conf, taskAttemptListener, jobToken, credentials, clock,
         completedTasksFromPreviousRun, startCount, metrics, appContext);
     this.taskSplitMetaInfo = taskSplitMetaInfo;
   }
@@ -68,7 +67,7 @@ public class MapTaskImpl extends TaskImpl {
     return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
         eventHandler, jobFile,
         partition, taskSplitMetaInfo, conf, taskAttemptListener,
-        committer, jobToken, credentials, clock, appContext);
+        jobToken, credentials, clock, appContext);
   }
 
   @Override

+ 3 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -46,12 +45,12 @@ public class ReduceTaskImpl extends TaskImpl {
   public ReduceTaskImpl(JobId jobId, int partition,
       EventHandler eventHandler, Path jobFile, JobConf conf,
       int numMapTasks, TaskAttemptListener taskAttemptListener,
-      OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
+      Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
       MRAppMetrics metrics, AppContext appContext) {
     super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
-        taskAttemptListener, committer, jobToken, credentials, clock,
+        taskAttemptListener, jobToken, credentials, clock,
         completedTasksFromPreviousRun, startCount, metrics, appContext);
     this.numMapTasks = numMapTasks;
   }
@@ -66,7 +65,7 @@ public class ReduceTaskImpl extends TaskImpl {
     return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
         eventHandler, jobFile,
         partition, numMapTasks, conf, taskAttemptListener,
-        committer, jobToken, credentials, clock, appContext);
+        jobToken, credentials, clock, appContext);
   }
 
   @Override

+ 4 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -39,7 +39,6 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,7 +56,6 @@ import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -76,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -99,7 +98,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -157,7 +155,6 @@ public abstract class TaskAttemptImpl implements
   private final Clock clock;
   private final org.apache.hadoop.mapred.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
-  private final OutputCommitter committer;
   private final Resource resourceCapability;
   private final String[] dataLocalHosts;
   private final List<String> diagnostics = new ArrayList<String>();
@@ -501,7 +498,7 @@ public abstract class TaskAttemptImpl implements
   public TaskAttemptImpl(TaskId taskId, int i, 
       EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
-      JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
+      JobConf conf, String[] dataLocalHosts,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       AppContext appContext) {
@@ -525,7 +522,6 @@ public abstract class TaskAttemptImpl implements
     this.credentials = credentials;
     this.jobToken = jobToken;
     this.eventHandler = eventHandler;
-    this.committer = committer;
     this.jobFile = jobFile;
     this.partition = partition;
 
@@ -1436,10 +1432,8 @@ public abstract class TaskAttemptImpl implements
       TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(taskAttempt.conf,
             TypeConverter.fromYarn(taskAttempt.attemptId));
-      taskAttempt.eventHandler.handle(new TaskCleanupEvent(
-          taskAttempt.attemptId,
-          taskAttempt.committer,
-          taskContext));
+      taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent(
+          taskAttempt.attemptId, taskContext));
     }
   }
 

+ 1 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -100,7 +99,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   protected final JobConf conf;
   protected final Path jobFile;
-  protected final OutputCommitter committer;
   protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
@@ -278,7 +276,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   public TaskImpl(JobId jobId, TaskType taskType, int partition,
       EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
-      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
@@ -301,7 +299,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.partition = partition;
     this.taskAttemptListener = taskAttemptListener;
     this.eventHandler = eventHandler;
-    this.committer = committer;
     this.credentials = credentials;
     this.jobToken = jobToken;
     this.metrics = metrics;

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -48,6 +48,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -65,8 +67,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.Clock;
@@ -339,8 +339,8 @@ public class RecoveryService extends CompositeService implements Recovery {
         return;
       }
 
-      else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
-        TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+      else if (event.getType() == CommitterEventType.TASK_ABORT) {
+        TaskAttemptId aId = ((CommitterTaskAbortEvent) event).getAttemptID();
         LOG.debug("TASK_CLEAN");
         actualHandler.handle(new TaskAttemptEvent(aId,
             TaskAttemptEventType.TA_CLEANUP_DONE));

+ 0 - 28
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleaner.java

@@ -1,28 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapreduce.v2.app.taskclean;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-
-public interface TaskCleaner extends EventHandler<TaskCleanupEvent> {
-
-  enum EventType {
-    TASK_CLEAN
-  }
-}

+ 0 - 124
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java

@@ -1,124 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapreduce.v2.app.taskclean;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.service.AbstractService;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
-
-  private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
-
-  private final AppContext context;
-  private ThreadPoolExecutor launcherPool;
-  private Thread eventHandlingThread;
-  private BlockingQueue<TaskCleanupEvent> eventQueue =
-      new LinkedBlockingQueue<TaskCleanupEvent>();
-  private final AtomicBoolean stopped;
-
-  public TaskCleanerImpl(AppContext context) {
-    super("TaskCleaner");
-    this.context = context;
-    this.stopped = new AtomicBoolean(false);
-  }
-
-  public void start() {
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("TaskCleaner #%d")
-      .build();
-    launcherPool = new ThreadPoolExecutor(5, 5, 1, 
-        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
-    eventHandlingThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        TaskCleanupEvent event = null;
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.error("Returning, interrupted : " + e);
-            }
-            return;
-          }
-          // the events from the queue are handled in parallel
-          // using a thread pool
-          launcherPool.execute(new EventProcessor(event));        }
-      }
-    });
-    eventHandlingThread.setName("TaskCleaner Event Handler");
-    eventHandlingThread.start();
-    super.start();
-  }
-
-  public void stop() {
-    if (stopped.getAndSet(true)) {
-      // return if already stopped
-      return;
-    }
-    eventHandlingThread.interrupt();
-    launcherPool.shutdown();
-    super.stop();
-  }
-
-  private class EventProcessor implements Runnable {
-    private TaskCleanupEvent event;
-
-    EventProcessor(TaskCleanupEvent event) {
-      this.event = event;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Processing the event " + event.toString());
-      try {
-        event.getCommitter().abortTask(event.getAttemptContext());
-      } catch (Exception e) {
-        LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
-      }
-      context.getEventHandler().handle(
-          new TaskAttemptEvent(event.getAttemptID(), 
-              TaskAttemptEventType.TA_CLEANUP_DONE));
-    }
-  }
-
-  @Override
-  public void handle(TaskCleanupEvent event) {
-    try {
-      eventQueue.put(event);
-    } catch (InterruptedException e) {
-      throw new YarnException(e);
-    }
-  }
-
-}

+ 0 - 56
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanupEvent.java

@@ -1,56 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapreduce.v2.app.taskclean;
-
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-/**
- * This class encapsulates task cleanup event.
- *
- */
-public class TaskCleanupEvent extends AbstractEvent<TaskCleaner.EventType> {
-
-  private final TaskAttemptId attemptID;
-  private final OutputCommitter committer;
-  private final TaskAttemptContext attemptContext;
-
-  public TaskCleanupEvent(TaskAttemptId attemptID, OutputCommitter committer, 
-      TaskAttemptContext attemptContext) {
-    super(TaskCleaner.EventType.TASK_CLEAN);
-    this.attemptID = attemptID;
-    this.committer = committer;
-    this.attemptContext = attemptContext;
-  }
-
-  public TaskAttemptId getAttemptID() {
-    return attemptID;
-  }
-
-  public OutputCommitter getCommitter() {
-    return committer;
-  }
-
-  public TaskAttemptContext getAttemptContext() {
-    return attemptContext;
-  }
-
-}

+ 55 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -32,9 +32,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
@@ -49,6 +52,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -69,8 +74,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
-import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -394,8 +397,7 @@ public class MRApp extends MRAppMaster {
     Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
     		getDispatcher().getEventHandler(),
             getTaskAttemptListener(), getContext().getClock(),
-            getCommitter(), isNewApiCommitter(),
-            currentUser.getUserName(), getContext());
+            isNewApiCommitter(), currentUser.getUserName(), getContext());
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -515,16 +517,56 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected TaskCleaner createTaskCleaner(AppContext context) {
-    return new TaskCleaner() {
+  protected EventHandler<CommitterEvent> createCommitterEventHandler(
+      AppContext context, final OutputCommitter committer) {
+    // create an output committer with the task methods stubbed out
+    OutputCommitter stubbedCommitter = new OutputCommitter() {
       @Override
-      public void handle(TaskCleanupEvent event) {
-        //send the cleanup done event
-        getContext().getEventHandler().handle(
-            new TaskAttemptEvent(event.getAttemptID(),
-                TaskAttemptEventType.TA_CLEANUP_DONE));
+      public void setupJob(JobContext jobContext) throws IOException {
+        committer.setupJob(jobContext);
+      }
+      @SuppressWarnings("deprecation")
+      @Override
+      public void cleanupJob(JobContext jobContext) throws IOException {
+        committer.cleanupJob(jobContext);
+      }
+      @Override
+      public void commitJob(JobContext jobContext) throws IOException {
+        committer.commitJob(jobContext);
+      }
+      @Override
+      public void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        committer.abortJob(jobContext, state);
+      }
+      @Override
+      public boolean isRecoverySupported() {
+        return committer.isRecoverySupported();
+      }
+      @Override
+      public void setupTask(TaskAttemptContext taskContext)
+          throws IOException {
+      }
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext)
+          throws IOException {
+        return false;
+      }
+      @Override
+      public void commitTask(TaskAttemptContext taskContext)
+          throws IOException {
+      }
+      @Override
+      public void abortTask(TaskAttemptContext taskContext)
+          throws IOException {
+      }
+      @Override
+      public void recoverTask(TaskAttemptContext taskContext)
+          throws IOException {
       }
     };
+
+    return new CommitterEventHandler(context, stubbedCommitter);
   }
 
   @Override
@@ -576,12 +618,11 @@ public class MRApp extends MRAppMaster {
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,
-        OutputCommitter committer, boolean newApiCommitter, String user, 
-        AppContext appContext) {
+        boolean newApiCommitter, String user, AppContext appContext) {
       super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock,
-          getCompletedTaskFromPreviousRun(), metrics, committer,
+          getCompletedTaskFromPreviousRun(), metrics,
           newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
           appContext);
 

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -201,8 +201,7 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          getCommitter(), isNewApiCommitter(),
-          currentUser.getUserName(), getContext());
+          isNewApiCommitter(), currentUser.getUserName(), getContext());
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
       getDispatcher().register(JobFinishEvent.Type.class,

+ 408 - 139
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -19,46 +19,51 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Records;
@@ -69,121 +74,223 @@ import org.junit.Test;
 /**
  * Tests various functions of the JobImpl class
  */
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"rawtypes"})
 public class TestJobImpl {
   
   @Test
-  public void testJobNoTasksTransition() { 
-    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
-    JobImpl mockJob = mock(JobImpl.class);
+  public void testJobNoTasks() {
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = mock(OutputCommitter.class);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 0);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
-    // Force checkJobCompleteSuccess to return null
-    Task mockTask = mock(Task.class);
-    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-    tasks.put(mockTask.getID(), mockTask);
-    mockJob.tasks = tasks;
+  @Test(timeout=20000)
+  public void testCommitJobFailsJob() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer fail and verify the job fails
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.FAILED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
-    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
-    JobEvent mockJobEvent = mock(JobEvent.class);
-    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
-    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
-        JobStateInternal.ERROR, state);
+  @Test(timeout=20000)
+  public void testCheckJobCompleteSuccess() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer complete and verify the job succeeds
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCommitJobFailsJob() {
-
-    JobImpl mockJob = mock(JobImpl.class);
-    mockJob.tasks = new HashMap<TaskId, Task>();
-    OutputCommitter mockCommitter = mock(OutputCommitter.class);
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    JobContext mockJobContext = mock(JobContext.class);
-
-    when(mockJob.getCommitter()).thenReturn(mockCommitter);
-    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
-    when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
-        JobStateInternal.KILLED);
-    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
-        JobStateInternal.FAILED);
-    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
-        JobStateInternal.SUCCEEDED);
-
-    try {
-      doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
-    } catch (IOException e) {
-      // commitJob stubbed out, so this can't happen
-    }
-    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
-    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job", jobState);
-    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobStateInternal.FAILED, jobState);
-    verify(mockJob).abortJob(
-        eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+  @Test(timeout=20000)
+  public void testKilledDuringSetup() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void setupJob(JobContext jobContext)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCheckJobCompleteSuccess() {
-    
-    JobImpl mockJob = mock(JobImpl.class);
-    mockJob.tasks = new HashMap<TaskId, Task>();
-    OutputCommitter mockCommitter = mock(OutputCommitter.class);
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    JobContext mockJobContext = mock(JobContext.class);
-    
-    when(mockJob.getCommitter()).thenReturn(mockCommitter);
-    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
-    when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    doNothing().when(mockJob).setFinishTime();
-    doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
-        JobStateInternal.SUCCEEDED);
-
-    try {
-      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
-    } catch (IOException e) {
-      // commitJob stubbed out, so this can't happen
-    }
-    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
-    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+  @Test(timeout=20000)
+  public void testKilledDuringCommit() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    syncBarrier.await();
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCheckJobCompleteSuccessFailed() {
-    JobImpl mockJob = mock(JobImpl.class);
-
-    // Make the completedTasks not equal the getTasks()
-    Task mockTask = mock(Task.class);
-    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-    tasks.put(mockTask.getID(), mockTask);
-    mockJob.tasks = tasks;
-    
-    try {
-      // Just in case the code breaks and reaches these calls
-      OutputCommitter mockCommitter = mock(OutputCommitter.class);
-      EventHandler mockEventHandler = mock(EventHandler.class);
-      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
-      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    } catch (IOException e) {
-      e.printStackTrace();    
-    }
-    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
-      "for unsuccessful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
+  @Test(timeout=20000)
+  public void testKilledDuringFailAbort() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public void setupJob(JobContext jobContext) throws IOException {
+        throw new IOException("forced failure");
+      }
+
+      @Override
+      public synchronized void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.FAIL_ABORT);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
+  @Test(timeout=20000)
+  public void testKilledDuringKillAbort() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILL_ABORT);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
-    t.testJobNoTasksTransition();
+    t.testJobNoTasks();
     t.testCheckJobCompleteSuccess();
-    t.testCheckJobCompleteSuccessFailed();
     t.testCheckAccess();
     t.testReportDiagnostics();
     t.testUberDecision();
@@ -208,7 +315,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -219,7 +326,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -230,7 +337,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -241,7 +348,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -252,7 +359,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -270,8 +377,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, mock(OutputCommitter.class),
-        true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -282,8 +388,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, mock(OutputCommitter.class),
-        true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -338,20 +443,23 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null, null, null,
-        mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
-    InitTransition initTransition = getInitTransition();
+        mrAppMetrics, true, null, 0, null, null);
+    InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
     boolean isUber = job.isUber();
     return isUber;
   }
 
-  private static InitTransition getInitTransition() {
+  private static InitTransition getInitTransition(final int numSplits) {
     InitTransition initTransition = new InitTransition() {
       @Override
       protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
-        return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
-            new TaskSplitMetaInfo() };
+        TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits];
+        for (int i = 0; i < numSplits; ++i) {
+          splits[i] = new TaskSplitMetaInfo();
+        }
+        return splits;
       }
     };
     return initTransition;
@@ -360,19 +468,24 @@ public class TestJobImpl {
   @Test
   public void testTransitionsAtFailed() throws IOException {
     Configuration conf = new Configuration();
-    JobID jobID = JobID.forName("job_1234567890000_0001");
-    JobId jobId = TypeConverter.toYarn(jobID);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+
     OutputCommitter committer = mock(OutputCommitter.class);
     doThrow(new IOException("forcefail"))
       .when(committer).setupJob(any(JobContext.class));
-    InlineDispatcher dispatcher = new InlineDispatcher();
-    JobImpl job = new StubbedJob(jobId, Records
-        .newRecord(ApplicationAttemptId.class), conf,
-        dispatcher.getEventHandler(), committer, true, null);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
 
-    dispatcher.register(JobEventType.class, job);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
-    Assert.assertEquals(JobState.FAILED, job.getState());
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.FAILED);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
     Assert.assertEquals(JobState.FAILED, job.getState());
@@ -382,17 +495,86 @@ public class TestJobImpl {
     Assert.assertEquals(JobState.FAILED, job.getState());
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
     Assert.assertEquals(JobState.FAILED, job.getState());
+
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  private static CommitterEventHandler createCommitterEventHandler(
+      Dispatcher dispatcher, OutputCommitter committer) {
+    SystemClock clock = new SystemClock();
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getEventHandler()).thenReturn(
+        dispatcher.getEventHandler());
+    when(appContext.getClock()).thenReturn(clock);
+    CommitterEventHandler handler =
+        new CommitterEventHandler(appContext, committer);
+    dispatcher.register(CommitterEventType.class, handler);
+    return handler;
+  }
+
+  private static StubbedJob createStubbedJob(Configuration conf,
+      Dispatcher dispatcher, int numSplits) {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    StubbedJob job = new StubbedJob(jobId,
+        Records.newRecord(ApplicationAttemptId.class), conf,
+        dispatcher.getEventHandler(), true, "somebody", numSplits);
+    dispatcher.register(JobEventType.class, job);
+    EventHandler mockHandler = mock(EventHandler.class);
+    dispatcher.register(TaskEventType.class, mockHandler);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        mockHandler);
+    dispatcher.register(JobFinishEvent.Type.class, mockHandler);
+    return job;
+  }
+
+  private static StubbedJob createRunningStubbedJob(Configuration conf,
+      Dispatcher dispatcher, int numSplits) {
+    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.RUNNING);
+    return job;
+  }
+
+  private static void completeJobTasks(JobImpl job) {
+    // complete the map tasks and the reduce tasks so we start committing
+    int numMaps = job.getTotalMaps();
+    for (int i = 0; i < numMaps; ++i) {
+      job.handle(new JobTaskEvent(
+          MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+          TaskState.SUCCEEDED));
+      Assert.assertEquals(JobState.RUNNING, job.getState());
+    }
+    int numReduces = job.getTotalReduces();
+    for (int i = 0; i < numReduces; ++i) {
+      job.handle(new JobTaskEvent(
+          MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+          TaskState.SUCCEEDED));
+      Assert.assertEquals(JobState.RUNNING, job.getState());
+    }
+  }
+
+  private static void assertJobState(JobImpl job, JobStateInternal state) {
+    int timeToWaitMsec = 5 * 1000;
+    while (timeToWaitMsec > 0 && job.getInternalState() != state) {
+      try {
+        Thread.sleep(10);
+        timeToWaitMsec -= 10;
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    Assert.assertEquals(state, job.getInternalState());
   }
 
   private static class StubbedJob extends JobImpl {
     //override the init transition
-    private final InitTransition initTransition = getInitTransition();
-    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
-        = stateMachineFactory.addTransition(JobStateInternal.NEW,
-            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
-            JobEventType.JOB_INIT,
-            // This is abusive.
-            initTransition);
+    private final InitTransition initTransition;
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+        localFactory;
 
     private final StateMachine<JobStateInternal, JobEventType, JobEvent>
         localStateMachine;
@@ -404,15 +586,102 @@ public class TestJobImpl {
 
     public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
-        OutputCommitter committer, boolean newApiCommitter, String user) {
+        boolean newApiCommitter, String user, int numSplits) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
-          new SystemClock(), null, MRAppMetrics.create(), committer,
+          new SystemClock(), null, MRAppMetrics.create(),
           newApiCommitter, user, System.currentTimeMillis(), null, null);
 
+      initTransition = getInitTransition(numSplits);
+      localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
+
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.
       localStateMachine = localFactory.make(this);
     }
   }
+
+  private static class StubbedOutputCommitter extends OutputCommitter {
+
+    public StubbedOutputCommitter() {
+      super();
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    }
+  }
+
+  private static class TestingOutputCommitter extends StubbedOutputCommitter {
+    CyclicBarrier syncBarrier;
+    boolean shouldSucceed;
+
+    public TestingOutputCommitter(CyclicBarrier syncBarrier,
+        boolean shouldSucceed) {
+      super();
+      this.syncBarrier = syncBarrier;
+      this.shouldSucceed = shouldSucceed;
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      } catch (InterruptedException e) {
+      }
+
+      if (!shouldSucceed) {
+        throw new IOException("forced failure");
+      }
+    }
+  }
+
+  private static class WaitingOutputCommitter extends TestingOutputCommitter {
+    public WaitingOutputCommitter(CyclicBarrier syncBarrier,
+        boolean shouldSucceed) {
+      super(syncBarrier, shouldSucceed);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      } catch (InterruptedException e) {
+      }
+
+      while (!Thread.interrupted()) {
+        try {
+          synchronized (this) {
+            wait();
+          }
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+    }
+  }
 }

+ 7 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -253,10 +252,9 @@ public class TestTaskAttempt{
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     Path jobFile = mock(Path.class);
     JobConf jobConf = new JobConf();
-    OutputCommitter outputCommitter = mock(OutputCommitter.class);
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-            taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+            taskSplitMetaInfo, jobConf, taListener, null,
             null, clock, null);
     return taImpl;
   }
@@ -342,7 +340,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), null);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -397,7 +395,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -453,7 +451,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -512,7 +510,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -578,7 +576,7 @@ public class TestTaskAttempt{
     when(resource.getMemory()).thenReturn(1024);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
-        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        jobFile, 1, splits, jobConf, taListener,
         mock(Token.class), new Credentials(), new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -628,7 +626,7 @@ public class TestTaskAttempt{
     when(resource.getMemory()).thenReturn(1024);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
-        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        jobFile, 1, splits, jobConf, taListener,
         mock(Token.class), new Credentials(), new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java

@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -107,7 +106,7 @@ public class TestTaskAttemptContainerRequest {
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
             mock(TaskSplitMetaInfo.class), jobConf, taListener,
-            mock(OutputCommitter.class), jobToken, credentials,
+            jobToken, credentials,
             new SystemClock(), null);
 
     jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

+ 7 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -71,7 +70,6 @@ public class TestTaskImpl {
   
   private JobConf conf;
   private TaskAttemptListener taskAttemptListener;
-  private OutputCommitter committer;
   private Token<JobTokenIdentifier> jobToken;
   private JobId jobId;
   private Path remoteJobConfFile;
@@ -99,13 +97,13 @@ public class TestTaskImpl {
 
     public MockTaskImpl(JobId jobId, int partition,
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
-        TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+        TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
         MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
       super(jobId, taskType , partition, eventHandler,
-          remoteJobConfFile, conf, taskAttemptListener, committer, 
+          remoteJobConfFile, conf, taskAttemptListener,
           jobToken, credentials, clock,
           completedTasksFromPreviousRun, startCount, metrics, appContext);
       this.taskType = taskType;
@@ -120,7 +118,7 @@ public class TestTaskImpl {
     protected TaskAttemptImpl createAttempt() {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
           eventHandler, taskAttemptListener, remoteJobConfFile, partition,
-          conf, committer, jobToken, credentials, clock, appContext, taskType);
+          conf, jobToken, credentials, clock, appContext, taskType);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -145,12 +143,11 @@ public class TestTaskImpl {
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
-        JobConf conf, OutputCommitter committer,
-        Token<JobTokenIdentifier> jobToken,
+        JobConf conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         AppContext appContext, TaskType taskType) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
-          dataLocations, committer, jobToken, credentials, clock, appContext);
+          dataLocations, jobToken, credentials, clock, appContext);
       this.taskType = taskType;
     }
 
@@ -210,7 +207,6 @@ public class TestTaskImpl {
     
     conf = new JobConf();
     taskAttemptListener = mock(TaskAttemptListener.class);
-    committer = mock(OutputCommitter.class);
     jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
     remoteJobConfFile = mock(Path.class);
     credentials = null;
@@ -235,7 +231,7 @@ public class TestTaskImpl {
   
   private MockTaskImpl createMockTask(TaskType taskType) {
     return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
-        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
         completedTasksFromPreviousRun, startCount,
         metrics, appContext, taskType);
@@ -606,7 +602,7 @@ public class TestTaskImpl {
   @Test
   public void testFailedTransitions() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
-        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
         completedTasksFromPreviousRun, startCount,
         metrics, appContext, TaskType.MAP) {

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -464,6 +464,15 @@ public interface MRJobConfig {
     MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
   public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
 
+  /**
+   * How long to wait in milliseconds for the output committer to cancel
+   * an operation when the job is being killed
+   */
+  public static final String MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
+      MR_AM_PREFIX + "job.committer.cancel-timeout";
+  public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
+      60 * 1000;
+
   /**
    * Boolean. Create the base dirs in the JobHistoryEventHandler
    * Set to false for multi-user clusters.  This is an internal config that

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -873,6 +873,13 @@
     For example 50000-50050,50100-50200</description>
 </property>
 
+<property>
+  <name>yarn.app.mapreduce.am.job.committer.cancel-timeout</name>
+  <value>60000</value>
+  <description>The amount of time in milliseconds to wait for the output
+    committer to cancel an operation if the job is killed</description>
+</property>
+
 <property>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <value>1000</value>