Explorar el Código

MAPREDUCE-4819. AM can rerun job after reporting final job status to the client (bobby and Bikas Saha via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1429114 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans hace 12 años
padre
commit
64e4fb983e
Se han modificado 24 ficheros con 1064 adiciones y 208 borrados
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 125 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
  3. 7 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
  4. 215 105
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  5. 40 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
  6. 12 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  7. 3 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  8. 9 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
  9. 7 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
  10. 18 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  11. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
  12. 219 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
  13. 18 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
  14. 175 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
  15. 43 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
  16. 5 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
  17. 20 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
  18. 42 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
  19. 25 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
  20. 27 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java
  21. 32 17
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
  22. 3 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
  23. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
  24. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServlet.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -664,6 +664,9 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth
     Seth via tgraves)
 
+    MAPREDUCE-4819. AM can rerun job after reporting final job status to the
+    client (bobby and Bikas Saha via bobby)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 125 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java

@@ -0,0 +1,125 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+
+/**
+ * Reads in history events from the JobHistoryFile and sends them out again
+ * to be recorded.
+ */
+public class JobHistoryCopyService extends CompositeService implements HistoryEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(JobHistoryCopyService.class);
+
+  private final ApplicationAttemptId applicationAttemptId;
+  private final EventHandler handler;
+  private final JobId jobId;
+
+
+  public JobHistoryCopyService(ApplicationAttemptId applicationAttemptId, 
+      EventHandler handler) {
+    super("JobHistoryCopyService");
+    this.applicationAttemptId = applicationAttemptId;
+    this.jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    this.handler = handler;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+  }
+  
+  @Override
+  public void handleEvent(HistoryEvent event) throws IOException {
+    //Skip over the AM Events this is handled elsewhere
+    if (!(event instanceof AMStartedEvent)) {
+      handler.handle(new JobHistoryEvent(jobId, event));
+    }
+  }
+  
+  @Override
+  public void start() {
+    try {
+      //TODO should we parse on a background thread???
+      parse();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    super.start();
+  }
+  
+  private void parse() throws IOException {
+    FSDataInputStream in = null;
+    try {
+      in =  getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
+    } catch (IOException e) {
+      LOG.warn("error trying to open previous history file. No history data " +
+      		"will be copied over.", e);
+      return;
+    }
+    JobHistoryParser parser = new JobHistoryParser(in);
+    parser.parse(this);
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file" + 
+          ", ignoring incomplete events.", parseException);
+    }
+  }
+
+  public static FSDataInputStream getPreviousJobHistoryFileStream(
+      Configuration conf, ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    FSDataInputStream in = null;
+    Path historyFile = null;
+    String jobId =
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+          .toString();
+    String jobhistoryDir =
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
+    Path histDirPath =
+        FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+    // read the previous history file
+    historyFile =
+        fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
+          jobId, (applicationAttemptId.getAttemptId() - 1)));
+    LOG.info("History file is at " + historyFile);
+    in = fc.open(historyFile);
+    return in;
+  }
+  
+  
+}

+ 7 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -116,12 +116,15 @@ public class JobHistoryEventHandler extends AbstractService
    */
   @Override
   public void init(Configuration conf) {
-
+    String jobId =
+      TypeConverter.fromYarn(context.getApplicationID()).toString();
+    
     String stagingDirStr = null;
     String doneDirStr = null;
     String userDoneDirStr = null;
     try {
-      stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+      stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf,
+          jobId);
       doneDirStr =
           JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
       userDoneDirStr =
@@ -881,7 +884,7 @@ public class JobHistoryEventHandler extends AbstractService
   private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
     // check if path exists, in case of retries it may not exist
     if (stagingDirFS.exists(fromPath)) {
-      LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+      LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
       // TODO temporarily removing the existing dst
       if (doneDirFS.exists(toPath)) {
         doneDirFS.delete(toPath, true);
@@ -892,11 +895,9 @@ public class JobHistoryEventHandler extends AbstractService
       if (copied)
         LOG.info("Copied to done location: " + toPath);
       else 
-          LOG.info("copy failed");
+        LOG.info("copy failed");
       doneDirFS.setPermission(toPath, new FsPermission(
           JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
-      
-      stagingDirFS.delete(fromPath, false);
     }
   }
 

+ 215 - 105
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -66,6 +67,7 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -91,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
@@ -179,9 +183,13 @@ public class MRAppMaster extends CompositeService {
 
   private Job job;
   private Credentials fsTokens = new Credentials(); // Filled during init
-  private UserGroupInformation currentUser; // Will be setup during init
+  protected UserGroupInformation currentUser; // Will be setup during init
 
   private volatile boolean isLastAMRetry = false;
+  //Something happened and we should shut down right after we start up.
+  boolean errorHappenedShutDown = false;
+  private String shutDownMessage = null;
+  JobStateInternal forcedState = null;
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@@ -242,94 +250,175 @@ public class MRAppMaster extends CompositeService {
       newApiCommitter = true;
       LOG.info("Using mapred newApiCommitter.");
     }
-
-    committer = createOutputCommitter(conf);
-    boolean recoveryEnabled = conf.getBoolean(
-        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-    if (recoveryEnabled && recoverySupportedByCommitter
-        && appAttemptID.getAttemptId() > 1) {
-      LOG.info("Recovery is enabled. "
-          + "Will try to recover from previous life on best effort basis.");
-      recoveryServ = createRecoveryService(context);
-      addIfService(recoveryServ);
-      dispatcher = recoveryServ.getDispatcher();
-      clock = recoveryServ.getClock();
-      inRecovery = true;
-    } else {
-      LOG.info("Not starting RecoveryService: recoveryEnabled: "
-          + recoveryEnabled + " recoverySupportedByCommitter: "
-          + recoverySupportedByCommitter + " ApplicationAttemptID: "
-          + appAttemptID.getAttemptId());
+    
+    boolean copyHistory = false;
+    try {
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+      FileSystem fs = getFileSystem(conf);
+      boolean stagingExists = fs.exists(stagingDir);
+      Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      boolean commitStarted = fs.exists(startCommitFile);
+      Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
+      boolean commitSuccess = fs.exists(endCommitSuccessFile);
+      Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
+      boolean commitFailure = fs.exists(endCommitFailureFile);
+      if(!stagingExists) {
+        isLastAMRetry = true;
+        errorHappenedShutDown = true;
+        forcedState = JobStateInternal.ERROR;
+        shutDownMessage = "Staging dir does not exist " + stagingDir;
+        LOG.fatal(shutDownMessage);
+      } else if (commitStarted) {
+        //A commit was started so this is the last time, we just need to know
+        // what result we will use to notify, and how we will unregister
+        errorHappenedShutDown = true;
+        isLastAMRetry = true;
+        copyHistory = true;
+        if (commitSuccess) {
+          shutDownMessage = "We crashed after successfully committing. Recovering.";
+          forcedState = JobStateInternal.SUCCEEDED;
+        } else if (commitFailure) {
+          shutDownMessage = "We crashed after a commit failure.";
+          forcedState = JobStateInternal.FAILED;
+        } else {
+          //The commit is still pending, commit error
+          shutDownMessage = "We crashed durring a commit";
+          forcedState = JobStateInternal.ERROR;
+        }
+      }
+    } catch (IOException e) {
+      throw new YarnException("Error while initializing", e);
+    }
+    
+    if (errorHappenedShutDown) {
       dispatcher = createDispatcher();
       addIfService(dispatcher);
-    }
-
-    //service to handle requests from JobClient
-    clientService = createClientService(context);
-    addIfService(clientService);
+      
+      EventHandler<JobHistoryEvent> historyService = null;
+      if (copyHistory) {
+        historyService = 
+          createJobHistoryHandler(context);
+        dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+            historyService);
+      }
+      NoopEventHandler eater = new NoopEventHandler();
+      //We do not have a JobEventDispatcher in this path
+      dispatcher.register(JobEventType.class, eater);
+      
+      // service to allocate containers from RM (if non-uber) or to fake it (uber)
+      containerAllocator = createContainerAllocator(null, context);
+      addIfService(containerAllocator);
+      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+      if (copyHistory) {
+        // Add the staging directory cleaner before the history server but after
+        // the container allocator so the staging directory is cleaned after
+        // the history has been flushed but before unregistering with the RM.
+        addService(createStagingDirCleaningService());
+
+        // Add the JobHistoryEventHandler last so that it is properly stopped first.
+        // This will guarantee that all history-events are flushed before AM goes
+        // ahead with shutdown.
+        // Note: Even though JobHistoryEventHandler is started last, if any
+        // component creates a JobHistoryEvent in the meanwhile, it will be just be
+        // queued inside the JobHistoryEventHandler 
+        addIfService(historyService);
+        
 
-    containerAllocator = createContainerAllocator(clientService, context);
+        JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
+            dispatcher.getEventHandler());
+        addIfService(cpHist);
+      }
+    } else {
+      committer = createOutputCommitter(conf);
+      boolean recoveryEnabled = conf.getBoolean(
+          MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+      boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+      if (recoveryEnabled && recoverySupportedByCommitter
+          && appAttemptID.getAttemptId() > 1) {
+        LOG.info("Recovery is enabled. "
+            + "Will try to recover from previous life on best effort basis.");
+        recoveryServ = createRecoveryService(context);
+        addIfService(recoveryServ);
+        dispatcher = recoveryServ.getDispatcher();
+        clock = recoveryServ.getClock();
+        inRecovery = true;
+      } else {
+        LOG.info("Not starting RecoveryService: recoveryEnabled: "
+            + recoveryEnabled + " recoverySupportedByCommitter: "
+            + recoverySupportedByCommitter + " ApplicationAttemptID: "
+            + appAttemptID.getAttemptId());
+        dispatcher = createDispatcher();
+        addIfService(dispatcher);
+      }
 
-    //service to handle requests to TaskUmbilicalProtocol
-    taskAttemptListener = createTaskAttemptListener(context);
-    addIfService(taskAttemptListener);
+      //service to handle requests from JobClient
+      clientService = createClientService(context);
+      addIfService(clientService);
+      
+      containerAllocator = createContainerAllocator(clientService, context);
+      
+      //service to handle the output committer
+      committerEventHandler = createCommitterEventHandler(context, committer);
+      addIfService(committerEventHandler);
 
-    //service to handle the output committer
-    committerEventHandler = createCommitterEventHandler(context, committer);
-    addIfService(committerEventHandler);
+      //service to handle requests to TaskUmbilicalProtocol
+      taskAttemptListener = createTaskAttemptListener(context);
+      addIfService(taskAttemptListener);
 
-    //service to log job history events
-    EventHandler<JobHistoryEvent> historyService = 
+      //service to log job history events
+      EventHandler<JobHistoryEvent> historyService = 
         createJobHistoryHandler(context);
-    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
-        historyService);
-
-    this.jobEventDispatcher = new JobEventDispatcher();
-
-    //register the event dispatchers
-    dispatcher.register(JobEventType.class, jobEventDispatcher);
-    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-    dispatcher.register(TaskAttemptEventType.class, 
-        new TaskAttemptEventDispatcher());
-    dispatcher.register(CommitterEventType.class, committerEventHandler);
-   
-    if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
-        || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
-      //optional service to speculate on task attempts' progress
-      speculator = createSpeculator(conf, context);
-      addIfService(speculator);
-    }
-
-    speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
-    dispatcher.register(Speculator.EventType.class,
-        speculatorEventDispatcher);
-
-    // service to allocate containers from RM (if non-uber) or to fake it (uber)
-    addIfService(containerAllocator);
-    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-
-    // corresponding service to launch allocated containers via NodeManager
-    containerLauncher = createContainerLauncher(context);
-    addIfService(containerLauncher);
-    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
-
-    // Add the staging directory cleaner before the history server but after
-    // the container allocator so the staging directory is cleaned after
-    // the history has been flushed but before unregistering with the RM.
-    addService(createStagingDirCleaningService());
-
-    // Add the JobHistoryEventHandler last so that it is properly stopped first.
-    // This will guarantee that all history-events are flushed before AM goes
-    // ahead with shutdown.
-    // Note: Even though JobHistoryEventHandler is started last, if any
-    // component creates a JobHistoryEvent in the meanwhile, it will be just be
-    // queued inside the JobHistoryEventHandler 
-    addIfService(historyService);
+      dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+          historyService);
+
+      this.jobEventDispatcher = new JobEventDispatcher();
+
+      //register the event dispatchers
+      dispatcher.register(JobEventType.class, jobEventDispatcher);
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, 
+          new TaskAttemptEventDispatcher());
+      dispatcher.register(CommitterEventType.class, committerEventHandler);
+
+      if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+        //optional service to speculate on task attempts' progress
+        speculator = createSpeculator(conf, context);
+        addIfService(speculator);
+      }
 
+      speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
+      dispatcher.register(Speculator.EventType.class,
+          speculatorEventDispatcher);
+
+      // service to allocate containers from RM (if non-uber) or to fake it (uber)
+      addIfService(containerAllocator);
+      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+      // corresponding service to launch allocated containers via NodeManager
+      containerLauncher = createContainerLauncher(context);
+      addIfService(containerLauncher);
+      dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+
+      // Add the staging directory cleaner before the history server but after
+      // the container allocator so the staging directory is cleaned after
+      // the history has been flushed but before unregistering with the RM.
+      addService(createStagingDirCleaningService());
+
+      // Add the JobHistoryEventHandler last so that it is properly stopped first.
+      // This will guarantee that all history-events are flushed before AM goes
+      // ahead with shutdown.
+      // Note: Even though JobHistoryEventHandler is started last, if any
+      // component creates a JobHistoryEvent in the meanwhile, it will be just be
+      // queued inside the JobHistoryEventHandler 
+      addIfService(historyService);
+    }
+    
     super.init(conf);
   } // end of init()
-
+  
   protected Dispatcher createDispatcher() {
     return new AsyncDispatcher();
   }
@@ -489,15 +578,20 @@ public class MRAppMaster extends CompositeService {
         appContext.getClock(), getCommitter());
   }
 
-  /** Create and initialize (but don't start) a single job. */
-  protected Job createJob(Configuration conf) {
+  /** Create and initialize (but don't start) a single job. 
+   * @param forcedState a state to force the job into or null for normal operation. 
+   * @param diagnostic a diagnostic message to include with the job.
+   */
+  protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+      String diagnostic) {
 
     // create single job
     Job newJob =
         new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
             completedTasksFromPreviousRun, metrics, newApiCommitter,
-            currentUser.getUserName(), appSubmitTime, amInfos, context);
+            currentUser.getUserName(), appSubmitTime, amInfos, context, 
+            forcedState, diagnostic);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
@@ -874,7 +968,7 @@ public class MRAppMaster extends CompositeService {
     amInfos.add(amInfo);
 
     // /////////////////// Create the job itself.
-    job = createJob(getConfig());
+    job = createJob(getConfig(), forcedState, shutDownMessage);
 
     // End of creating the job.
 
@@ -891,31 +985,33 @@ public class MRAppMaster extends CompositeService {
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
 
-    // create a job event for job intialization
-    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
-    // Send init to the job (this does NOT trigger job execution)
-    // This is a synchronous call, not an event through dispatcher. We want
-    // job-init to be done completely here.
-    jobEventDispatcher.handle(initJobEvent);
+    if (!errorHappenedShutDown) {
+      // create a job event for job intialization
+      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+      // Send init to the job (this does NOT trigger job execution)
+      // This is a synchronous call, not an event through dispatcher. We want
+      // job-init to be done completely here.
+      jobEventDispatcher.handle(initJobEvent);
 
 
-    // JobImpl's InitTransition is done (call above is synchronous), so the
-    // "uber-decision" (MR-1220) has been made.  Query job and switch to
-    // ubermode if appropriate (by registering different container-allocator
-    // and container-launcher services/event-handlers).
+      // JobImpl's InitTransition is done (call above is synchronous), so the
+      // "uber-decision" (MR-1220) has been made.  Query job and switch to
+      // ubermode if appropriate (by registering different container-allocator
+      // and container-launcher services/event-handlers).
 
-    if (job.isUber()) {
-      speculatorEventDispatcher.disableSpeculation();
-      LOG.info("MRAppMaster uberizing job " + job.getID()
-               + " in local container (\"uber-AM\") on node "
-               + nmHost + ":" + nmPort + ".");
-    } else {
-      // send init to speculator only for non-uber jobs. 
-      // This won't yet start as dispatcher isn't started yet.
-      dispatcher.getEventHandler().handle(
-          new SpeculatorEvent(job.getID(), clock.getTime()));
-      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
-               + "job " + job.getID() + ".");
+      if (job.isUber()) {
+        speculatorEventDispatcher.disableSpeculation();
+        LOG.info("MRAppMaster uberizing job " + job.getID()
+            + " in local container (\"uber-AM\") on node "
+            + nmHost + ":" + nmPort + ".");
+      } else {
+        // send init to speculator only for non-uber jobs. 
+        // This won't yet start as dispatcher isn't started yet.
+        dispatcher.getEventHandler().handle(
+            new SpeculatorEvent(job.getID(), clock.getTime()));
+        LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+            + "job " + job.getID() + ".");
+      }
     }
 
     //start all the components
@@ -1062,6 +1158,17 @@ public class MRAppMaster extends CompositeService {
 
   }
 
+  /**
+   * Eats events that are not needed in some error cases.
+   */
+  private static class NoopEventHandler implements EventHandler<Event> {
+
+    @Override
+    public void handle(Event event) {
+      //Empty
+    }
+  }
+  
   private static void validateInputParam(String value, String param)
       throws IOException {
     if (value == null) {
@@ -1158,6 +1265,9 @@ public class MRAppMaster extends CompositeService {
       public Object run() throws Exception {
         appMaster.init(conf);
         appMaster.start();
+        if(appMaster.errorHappenedShutDown) {
+          throw new IOException("Was asked to shut down.");
+        }
         return null;
       }
     });

+ 40 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java

@@ -29,8 +29,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
@@ -40,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -64,6 +71,11 @@ public class CommitterEventHandler extends AbstractService
   private Thread jobCommitThread = null;
   private int commitThreadCancelTimeoutMs;
   private long commitWindowMs;
+  private FileSystem fs;
+  private Path startCommitFile;
+  private Path endCommitSuccessFile;
+  private Path endCommitFailureFile;
+  
 
   public CommitterEventHandler(AppContext context, OutputCommitter committer,
       RMHeartbeatHandler rmHeartbeatHandler) {
@@ -82,10 +94,21 @@ public class CommitterEventHandler extends AbstractService
         MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
     commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
         MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
+    try {
+      fs = FileSystem.get(conf);
+      JobID id = TypeConverter.fromYarn(context.getApplicationID());
+      JobId jobId = TypeConverter.toYarn(id);
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
+      endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
   }
 
   @Override
-  public void start() {
+  public void start() {    
     ThreadFactory tf = new ThreadFactoryBuilder()
       .setNameFormat("CommitterEvent Processor #%d")
       .build();
@@ -199,7 +222,7 @@ public class CommitterEventHandler extends AbstractService
             + event.toString());
       }
     }
-
+    
     @SuppressWarnings("unchecked")
     protected void handleJobSetup(CommitterJobSetupEvent event) {
       try {
@@ -213,19 +236,30 @@ public class CommitterEventHandler extends AbstractService
       }
     }
 
+    private void touchz(Path p) throws IOException {
+      fs.create(p, false).close();
+    }
+    
     @SuppressWarnings("unchecked")
     protected void handleJobCommit(CommitterJobCommitEvent event) {
       try {
+        touchz(startCommitFile);
         jobCommitStarted();
         waitForValidCommitWindow();
         committer.commitJob(event.getJobContext());
+        touchz(endCommitSuccessFile);
         context.getEventHandler().handle(
             new JobCommitCompletedEvent(event.getJobID()));
       } catch (Exception e) {
-          LOG.error("Could not commit job", e);
-          context.getEventHandler().handle(
-              new JobCommitFailedEvent(event.getJobID(),
-                  StringUtils.stringifyException(e)));
+        try {
+          touchz(endCommitFailureFile);
+        } catch (Exception e2) {
+          LOG.error("could not create failure file.", e2);
+        }
+        LOG.error("Could not commit job", e);
+        context.getEventHandler().handle(
+            new JobCommitFailedEvent(event.getJobID(),
+                StringUtils.stringifyException(e)));
       } finally {
         jobCommitEnded();
       }

+ 12 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -540,6 +540,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private Credentials fsTokens;
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
+  
+  private JobStateInternal forcedState = null;
 
   public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
       Configuration conf, EventHandler eventHandler,
@@ -548,7 +550,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       Credentials fsTokenCredentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
       boolean newApiCommitter, String userName,
-      long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) {
+      long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
+      JobStateInternal forcedState, String forcedDiagnostic) {
     this.applicationAttemptId = applicationAttemptId;
     this.jobId = jobId;
     this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@@ -579,6 +582,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
+    this.forcedState  = forcedState;
+    if(forcedDiagnostic != null) {
+      this.diagnostics.add(forcedDiagnostic);
+    }
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -818,7 +825,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   public JobState getState() {
     readLock.lock();
     try {
-      return getExternalState(getStateMachine().getCurrentState());
+      return getExternalState(getInternalState());
     } finally {
       readLock.unlock();
     }
@@ -868,6 +875,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   public JobStateInternal getInternalState() {
     readLock.lock();
     try {
+      if(forcedState != null) {
+        return forcedState;
+      }
      return getStateMachine().getCurrentState();
     } finally {
       readLock.unlock();

+ 3 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -205,18 +205,18 @@ public class RecoveryService extends CompositeService implements Recovery {
       throws IOException {
     FSDataInputStream in = null;
     Path historyFile = null;
-    String jobName =
+    String jobId =
         TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
           .toString();
     String jobhistoryDir =
-        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
     Path histDirPath =
         FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
     // read the previous history file
     historyFile =
         fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
-          jobName, (applicationAttemptId.getAttemptId() - 1)));
+          jobId, (applicationAttemptId.getAttemptId() - 1)));
     LOG.info("History file is at " + historyFile);
     in = fc.open(historyFile);
     return in;

+ 9 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -141,14 +141,19 @@ public abstract class RMCommunicator extends AbstractService
 
   protected void register() {
     //Register
-    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    InetSocketAddress serviceAddr = null;
+    if (clientService != null ) {
+      serviceAddr = clientService.getBindAddress();
+    }
     try {
       RegisterApplicationMasterRequest request =
         recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
       request.setApplicationAttemptId(applicationAttemptId);
-      request.setHost(serviceAddr.getHostName());
-      request.setRpcPort(serviceAddr.getPort());
-      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      if (serviceAddr != null) {
+        request.setHost(serviceAddr.getHostName());
+        request.setRpcPort(serviceAddr.getPort());
+        request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      }
       RegisterApplicationMasterResponse response =
         scheduler.registerApplicationMaster(request);
       minContainerCapability = response.getMinimumResourceCapability();

+ 7 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

@@ -18,14 +18,9 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 
 public class TestJobHistoryEventHandler {
 
@@ -260,13 +254,15 @@ public class TestJobHistoryEventHandler {
     }
   }
 
-  private AppContext mockAppContext(JobId jobId) {
+  private AppContext mockAppContext(ApplicationId appId) {
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
     AppContext mockContext = mock(AppContext.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getTotalMaps()).thenReturn(10);
     when(mockJob.getTotalReduces()).thenReturn(10);
     when(mockJob.getName()).thenReturn("mockjob");
     when(mockContext.getJob(jobId)).thenReturn(mockJob);
+    when(mockContext.getApplicationID()).thenReturn(appId);
     return mockContext;
   }
   
@@ -279,7 +275,7 @@ public class TestJobHistoryEventHandler {
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-    AppContext mockAppContext = mockAppContext(jobId);
+    AppContext mockAppContext = mockAppContext(appId);
   }
 
   private JobHistoryEvent getEventToEnqueue(JobId jobId) {

+ 18 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -208,6 +209,16 @@ public class MRApp extends MRAppMaster {
 
   @Override
   public void init(Configuration conf) {
+    try {
+      //Create the staging directory if it does not exist
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+      FileSystem fs = getFileSystem(conf);
+      fs.mkdirs(stagingDir);
+    } catch (Exception e) {
+      throw new YarnException("Error creating staging dir", e);
+    }
+    
     super.init(conf);
     if (this.clusterInfo != null) {
       getContext().getClusterInfo().setMinContainerCapability(
@@ -388,7 +399,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected Job createJob(Configuration conf) {
+  protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+      String diagnostic) {
     UserGroupInformation currentUser = null;
     try {
       currentUser = UserGroupInformation.getCurrentUser();
@@ -398,7 +410,8 @@ public class MRApp extends MRAppMaster {
     Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
     		getDispatcher().getEventHandler(),
             getTaskAttemptListener(), getContext().getClock(),
-            isNewApiCommitter(), currentUser.getUserName(), getContext());
+            isNewApiCommitter(), currentUser.getUserName(), getContext(),
+            forcedState, diagnostic);
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -631,13 +644,14 @@ public class MRApp extends MRAppMaster {
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,
-        boolean newApiCommitter, String user, AppContext appContext) {
+        boolean newApiCommitter, String user, AppContext appContext, 
+        JobStateInternal forcedState, String diagnostic) {
       super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock,
           getCompletedTaskFromPreviousRun(), metrics,
           newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
-          appContext);
+          appContext, forcedState, diagnostic);
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
@@ -370,8 +371,9 @@ public class TestMRApp {
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
-      spiedJob = spy((JobImpl) super.createJob(conf));
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
+      spiedJob = spy((JobImpl) super.createJob(conf, forcedState, diagnostic));
       ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
       return spiedJob;
     }

+ 219 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java

@@ -17,28 +17,63 @@
  */
 package org.apache.hadoop.mapreduce.v2.app;
 
-import java.io.IOException;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
-import junit.framework.Assert;
+import java.io.File;
+import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestMRAppMaster {
+  private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
+  static String stagingDir = "staging/";
+  
+  @BeforeClass
+  public static void setup() {
+    //Do not error out if metrics are inited multiple times
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+  
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testMRAppMasterForDifferentUser() throws IOException,
       InterruptedException {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
     String containerIdStr = "container_1317529182569_0004_000001_1";
-    String stagingDir = "/tmp/staging";
+    
     String userName = "TestAppMasterUser";
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -49,34 +84,208 @@ public class TestMRAppMaster {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
-    Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+    assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
         + ".staging", appMaster.stagingDirPath.toString());
   }
+  
+  @Test
+  public void testMRAppMasterMidLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    //Create the file, but no end file so we should unregister with an error.
+    fs.create(start).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterSuccessLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterFailLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterMissingStaging() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+
+    //Delete the staging directory
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    //Copying the history file is disabled, but it is not really visible from 
+    //here
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
 }
 
 class MRAppMasterTest extends MRAppMaster {
 
   Path stagingDirPath;
   private Configuration conf;
+  private boolean overrideInitAndStart;
+  ContainerAllocator mockContainerAllocator;
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       long submitTime) {
+    this(applicationAttemptId, containerId, host, port, httpPort, submitTime, 
+        true);
+  }
+  public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String host, int port, int httpPort,
+      long submitTime, boolean overrideInitAndStart) {
     super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+    this.overrideInitAndStart = overrideInitAndStart;
+    mockContainerAllocator = mock(ContainerAllocator.class);
   }
 
   @Override
   public void init(Configuration conf) {
-    this.conf = conf;
+    if (overrideInitAndStart) {
+      this.conf = conf; 
+    } else {
+      super.init(conf);
+    }
+  }
+  
+  @Override 
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+    try {
+      this.currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+  
+  @Override
+  protected ContainerAllocator createContainerAllocator(
+      final ClientService clientService, final AppContext context) {
+    return mockContainerAllocator;
   }
 
   @Override
   public void start() {
-    try {
-      String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      stagingDirPath = MRApps.getStagingAreaDir(conf, user);
-    } catch (Exception e) {
-      Assert.fail(e.getMessage());
+    if (overrideInitAndStart) {
+      try {
+        String user = UserGroupInformation.getCurrentUser().getShortUserName();
+        stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    } else {
+      super.start();
     }
   }
 

+ 18 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -38,11 +38,13 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -72,6 +74,10 @@ import org.junit.Test;
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -93,6 +99,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -118,6 +128,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(1);
@@ -198,7 +212,8 @@ import org.junit.Test;
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
       UserGroupInformation currentUser = null;
       try {
         currentUser = UserGroupInformation.getCurrentUser();
@@ -208,7 +223,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          isNewApiCommitter(), currentUser.getUserName(), getContext());
+          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
       getDispatcher().register(JobFinishEvent.Type.class,

+ 175 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java

@@ -36,16 +36,85 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCommitterEventHandler {
+  public static class WaitForItHandler implements EventHandler {
+
+    private Event event = null;
+    
+    @Override
+    public synchronized void handle(Event event) {
+      this.event = event;
+      notifyAll();
+    }
+    
+    public synchronized Event getAndClearEvent() throws InterruptedException {
+      if (event == null) {
+        //Wait for at most 10 ms
+        wait(100);
+      }
+      Event e = event;
+      event = null;
+      return e;
+    }
+    
+  }
+  
+  static String stagingDir = "target/test-staging/";
+
+  @BeforeClass
+  public static void setup() {    
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
 
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testCommitWindow() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -55,6 +124,10 @@ public class TestCommitterEventHandler {
 
     SystemClock clock = new SystemClock();
     AppContext appContext = mock(AppContext.class);
+    ApplicationAttemptId attemptid = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    when(appContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+    when(appContext.getApplicationAttemptId()).thenReturn(attemptid);
     when(appContext.getEventHandler()).thenReturn(
         dispatcher.getEventHandler());
     when(appContext.getClock()).thenReturn(clock);
@@ -91,6 +164,9 @@ public class TestCommitterEventHandler {
         1, jeh.numCommitCompletedEvents);
     verify(committer, times(1)).commitJob(any(JobContext.class));
 
+    //Clean up so we can try to commit again (Don't do this at home)
+    cleanup();
+    
     // try to commit again and verify it goes through since the heartbeat
     // is still fresh
     ceh.handle(new CommitterJobCommitEvent(null, null));
@@ -147,4 +223,103 @@ public class TestCommitterEventHandler {
       }
     }
   }
+
+  @Test
+  public void testBasic() throws Exception {
+    AppContext mockContext = mock(AppContext.class);
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    Clock mockClock = mock(Clock.class);
+    
+    CommitterEventHandler handler = new CommitterEventHandler(mockContext, 
+        mockCommitter, new TestingRMHeartbeatHandler());
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    JobContext mockJobContext = mock(JobContext.class);
+    ApplicationAttemptId attemptid = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(attemptid.getApplicationId()));
+    
+    WaitForItHandler waitForItHandler = new WaitForItHandler();
+    
+    when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+    when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
+    when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
+    when(mockContext.getClock()).thenReturn(mockClock);
+    
+    handler.init(conf);
+    handler.start();
+    try {
+      handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, 
+          jobId);
+      Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, 
+          jobId);
+
+      Event e = waitForItHandler.getAndClearEvent();
+      assertNotNull(e);
+      assertTrue(e instanceof JobCommitCompletedEvent);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(startCommitFile.toString(), fs.exists(startCommitFile));
+      assertTrue(endCommitSuccessFile.toString(), fs.exists(endCommitSuccessFile));
+      assertFalse(endCommitFailureFile.toString(), fs.exists(endCommitFailureFile));
+      verify(mockCommitter).commitJob(any(JobContext.class));
+    } finally {
+      handler.stop();
+    }
+  }
+
+  @Test
+  public void testFailure() throws Exception {
+    AppContext mockContext = mock(AppContext.class);
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    Clock mockClock = mock(Clock.class);
+    
+    CommitterEventHandler handler = new CommitterEventHandler(mockContext, 
+        mockCommitter, new TestingRMHeartbeatHandler());
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    JobContext mockJobContext = mock(JobContext.class);
+    ApplicationAttemptId attemptid = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(attemptid.getApplicationId()));
+    
+    WaitForItHandler waitForItHandler = new WaitForItHandler();
+    
+    when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+    when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
+    when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
+    when(mockContext.getClock()).thenReturn(mockClock);
+    
+    doThrow(new YarnException("Intentional Failure")).when(mockCommitter)
+      .commitJob(any(JobContext.class));
+    
+    handler.init(conf);
+    handler.start();
+    try {
+      handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, 
+          jobId);
+      Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, 
+          jobId);
+
+      Event e = waitForItHandler.getAndClearEvent();
+      assertNotNull(e);
+      assertTrue(e instanceof JobCommitFailedEvent);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs.exists(startCommitFile));
+      assertFalse(fs.exists(endCommitSuccessFile));
+      assertTrue(fs.exists(endCommitFailureFile));
+      verify(mockCommitter).commitJob(any(JobContext.class));
+    } finally {
+      handler.stop();
+    }
+  }
 }

+ 43 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -23,11 +23,13 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -67,8 +69,11 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 
@@ -78,10 +83,28 @@ import org.junit.Test;
 @SuppressWarnings({"rawtypes"})
 public class TestJobImpl {
   
+  static String stagingDir = "target/test-staging/";
+
+  @BeforeClass
+  public static void setup() {    
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testJobNoTasks() {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -103,6 +126,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testCommitJobFailsJob() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -127,6 +151,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testCheckJobCompleteSuccess() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -151,6 +176,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringSetup() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -187,6 +213,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringCommit() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -211,6 +238,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringFailAbort() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -252,6 +280,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringKillAbort() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -316,7 +345,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -327,7 +356,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -338,7 +367,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -349,7 +378,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -360,7 +389,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -378,7 +407,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -389,7 +418,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -444,7 +473,7 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null, null, null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
@@ -518,6 +547,10 @@ public class TestJobImpl {
         callback.run();
       }
     };
+    ApplicationAttemptId id = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
+    when(appContext.getApplicationAttemptId()).thenReturn(id);
     CommitterEventHandler handler =
         new CommitterEventHandler(appContext, committer, heartbeatHandler);
     dispatcher.register(CommitterEventType.class, handler);
@@ -601,7 +634,8 @@ public class TestJobImpl {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
           new SystemClock(), null, MRAppMetrics.create(),
-          newApiCommitter, user, System.currentTimeMillis(), null, null);
+          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
+          null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

+ 5 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java

@@ -182,14 +182,16 @@ public class JobHistoryUtils {
 
   /**
    * Gets the configured directory prefix for In Progress history files.
-   * @param conf
+   * @param conf the configuration for hte job
+   * @param jobId the id of the job the history file is for.
    * @return A string representation of the prefix.
    */
   public static String
-      getConfiguredHistoryStagingDirPrefix(Configuration conf)
+      getConfiguredHistoryStagingDirPrefix(Configuration conf, String jobId)
           throws IOException {
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    Path path = MRApps.getStagingAreaDir(conf, user);
+    Path stagingPath = MRApps.getStagingAreaDir(conf, user);
+    Path path = new Path(stagingPath, jobId);
     String logDir = path.toString();
     return logDir;
   }

+ 20 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -253,7 +253,26 @@ public class MRApps extends Apps {
     return jobFile.toString();
   }
   
-
+  public static Path getEndJobCommitSuccessFile(Configuration conf, String user,
+      JobId jobId) {
+    Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+        jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS");
+    return endCommitFile;
+  }
+  
+  public static Path getEndJobCommitFailureFile(Configuration conf, String user,
+      JobId jobId) {
+    Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+        jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL");
+    return endCommitFile;
+  }
+  
+  public static Path getStartJobCommitFile(Configuration conf, String user,
+      JobId jobId) {
+    Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+        jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED");
+    return startCommitFile;
+  }
 
   private static long[] parseTimeStamps(String[] strs) {
     if (null == strs) {

+ 42 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java

@@ -52,6 +52,14 @@ import org.apache.hadoop.classification.InterfaceStability;
  *   Discard the task commit.
  *   </li>
  * </ol>
+ * The methods in this class can be called from several different processes and
+ * from several different contexts.  It is important to know which process and
+ * which context each is called from.  Each method should be marked accordingly
+ * in its documentation.  It is also important to note that not all methods are
+ * guaranteed to be called once and only once.  If a method is not guaranteed to
+ * have this property the output committer needs to handle this appropriately. 
+ * Also note it will only be in rare situations where they may be called 
+ * multiple times for the same task.
  * 
  * @see FileOutputCommitter 
  * @see JobContext
@@ -62,7 +70,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 public abstract class OutputCommitter 
                 extends org.apache.hadoop.mapreduce.OutputCommitter {
   /**
-   * For the framework to setup the job output during initialization
+   * For the framework to setup the job output during initialization.  This is
+   * called from the application master process for the entire job. This will be
+   * called multiple times, once per job attempt.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException if temporary output could not be created
@@ -70,7 +80,9 @@ public abstract class OutputCommitter
   public abstract void setupJob(JobContext jobContext) throws IOException;
 
   /**
-   * For cleaning up the job's output after job completion
+   * For cleaning up the job's output after job completion.  This is called
+   * from the application master process for the entire job. This may be called
+   * multiple times.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
@@ -82,7 +94,10 @@ public abstract class OutputCommitter
 
   /**
    * For committing job's output after successful job completion. Note that this
-   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
+   * from the application master process for the entire job. This is guaranteed
+   * to only be called once.  If it throws an exception the entire job will
+   * fail.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException 
@@ -94,7 +109,8 @@ public abstract class OutputCommitter
   /**
    * For aborting an unsuccessful job's output. Note that this is invoked for 
    * jobs with final runstate as {@link JobStatus#FAILED} or 
-   * {@link JobStatus#KILLED}
+   * {@link JobStatus#KILLED}. This is called from the application
+   * master process for the entire job. This may be called multiple times.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @param status final runstate of the job
@@ -106,7 +122,10 @@ public abstract class OutputCommitter
   }
   
   /**
-   * Sets up output for the task.
+   * Sets up output for the task. This is called from each individual task's
+   * process that will output to HDFS, and it is called just for that task. This
+   * may be called multiple times for the same task, but for different task
+   * attempts.
    * 
    * @param taskContext Context of the task whose output is being written.
    * @throws IOException
@@ -115,7 +134,9 @@ public abstract class OutputCommitter
   throws IOException;
   
   /**
-   * Check whether task needs a commit
+   * Check whether task needs a commit.  This is called from each individual
+   * task's process that will output to HDFS, and it is called just for that
+   * task.
    * 
    * @param taskContext
    * @return true/false
@@ -125,9 +146,16 @@ public abstract class OutputCommitter
   throws IOException;
 
   /**
-   * To promote the task's temporary output to final output location
-   * 
-   * The task's output is moved to the job's output directory.
+   * To promote the task's temporary output to final output location.
+   * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
+   * task is the task that the AM determines finished first, this method
+   * is called to commit an individual task's output.  This is to mark
+   * that tasks output as complete, as {@link #commitJob(JobContext)} will 
+   * also be called later on if the entire job finished successfully. This
+   * is called from a task's process. This may be called multiple times for the
+   * same task, but different task attempts.  It should be very rare for this to
+   * be called multiple times and requires odd networking failures to make this
+   * happen. In the future the Hadoop framework may eliminate this race.
    * 
    * @param taskContext Context of the task whose output is being written.
    * @throws IOException if commit is not 
@@ -136,7 +164,9 @@ public abstract class OutputCommitter
   throws IOException;
   
   /**
-   * Discard the task output
+   * Discard the task output. This is called from a task's process to clean 
+   * up a single task's output that can not yet been committed. This may be
+   * called multiple times for the same task, but for different task attempts.
    * 
    * @param taskContext
    * @throws IOException
@@ -160,7 +190,8 @@ public abstract class OutputCommitter
    * The retry-count for the job will be passed via the 
    * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
    * {@link TaskAttemptContext#getConfiguration()} for the 
-   * <code>OutputCommitter</code>.
+   * <code>OutputCommitter</code>. This is called from the application master
+   * process, but it is called individually for each task.
    * 
    * If an exception is thrown the task will be attempted again. 
    * 

+ 25 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java

@@ -54,7 +54,11 @@ import org.apache.hadoop.classification.InterfaceStability;
  * The methods in this class can be called from several different processes and
  * from several different contexts.  It is important to know which process and
  * which context each is called from.  Each method should be marked accordingly
- * in its documentation.
+ * in its documentation.  It is also important to note that not all methods are
+ * guaranteed to be called once and only once.  If a method is not guaranteed to
+ * have this property the output committer needs to handle this appropriately. 
+ * Also note it will only be in rare situations where they may be called 
+ * multiple times for the same task.
  * 
  * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
  * @see JobContext
@@ -65,7 +69,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 public abstract class OutputCommitter {
   /**
    * For the framework to setup the job output during initialization.  This is
-   * called from the application master process for the entire job.
+   * called from the application master process for the entire job. This will be
+   * called multiple times, once per job attempt.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException if temporary output could not be created
@@ -74,7 +79,8 @@ public abstract class OutputCommitter {
 
   /**
    * For cleaning up the job's output after job completion.  This is called
-   * from the application master process for the entire job.
+   * from the application master process for the entire job. This may be called
+   * multiple times.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
@@ -87,7 +93,9 @@ public abstract class OutputCommitter {
   /**
    * For committing job's output after successful job completion. Note that this
    * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
-   * from the application master process for the entire job.	
+   * from the application master process for the entire job. This is guaranteed
+   * to only be called once.  If it throws an exception the entire job will
+   * fail.	
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
@@ -101,7 +109,7 @@ public abstract class OutputCommitter {
    * For aborting an unsuccessful job's output. Note that this is invoked for 
    * jobs with final runstate as {@link JobStatus.State#FAILED} or 
    * {@link JobStatus.State#KILLED}.  This is called from the application
-   * master process for the entire job.
+   * master process for the entire job. This may be called multiple times.
    *
    * @param jobContext Context of the job whose output is being written.
    * @param state final runstate of the job
@@ -114,7 +122,9 @@ public abstract class OutputCommitter {
   
   /**
    * Sets up output for the task.  This is called from each individual task's
-   * process that will output to HDFS, and it is called just for that task.
+   * process that will output to HDFS, and it is called just for that task. This
+   * may be called multiple times for the same task, but for different task
+   * attempts.
    * 
    * @param taskContext Context of the task whose output is being written.
    * @throws IOException
@@ -141,7 +151,10 @@ public abstract class OutputCommitter {
    * is called to commit an individual task's output.  This is to mark
    * that tasks output as complete, as {@link #commitJob(JobContext)} will 
    * also be called later on if the entire job finished successfully. This
-   * is called from a task's process.
+   * is called from a task's process. This may be called multiple times for the
+   * same task, but different task attempts.  It should be very rare for this to
+   * be called multiple times and requires odd networking failures to make this
+   * happen. In the future the Hadoop framework may eliminate this race.
    * 
    * @param taskContext Context of the task whose output is being written.
    * @throws IOException if commit is not successful. 
@@ -151,7 +164,8 @@ public abstract class OutputCommitter {
   
   /**
    * Discard the task output. This is called from a task's process to clean 
-   * up a single task's output that can not yet been committed.
+   * up a single task's output that can not yet been committed. This may be
+   * called multiple times for the same task, but for different task attempts.
    * 
    * @param taskContext
    * @throws IOException
@@ -184,6 +198,9 @@ public abstract class OutputCommitter {
    * 
    * If an exception is thrown the task will be attempted again. 
    * 
+   * This may be called multiple times for the same task.  But from different
+   * application attempts.
+   * 
    * @param taskContext Context of the task whose output is being recovered
    * @throws IOException
    */

+ 27 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java

@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+
+public interface HistoryEventHandler {
+
+  void handleEvent(HistoryEvent event) throws IOException;
+
+}

+ 32 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java

@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class JobHistoryParser {
+public class JobHistoryParser implements HistoryEventHandler {
 
   private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
   
@@ -94,6 +94,34 @@ public class JobHistoryParser {
     this.in = in;
   }
   
+  public synchronized void parse(HistoryEventHandler handler) 
+    throws IOException {
+    parse(new EventReader(in), handler);
+  }
+  
+  /**
+   * Only used for unit tests.
+   */
+  @Private
+  public synchronized void parse(EventReader reader, HistoryEventHandler handler)
+    throws IOException {
+    int eventCtr = 0;
+    HistoryEvent event;
+    try {
+      while ((event = reader.getNextEvent()) != null) {
+        handler.handleEvent(event);
+        ++eventCtr;
+      } 
+    } catch (IOException ioe) {
+      LOG.info("Caught exception parsing history file after " + eventCtr + 
+          " events", ioe);
+      parseException = ioe;
+    } finally {
+      in.close();
+    }
+  }
+  
+  
   /**
    * Parse the entire history file and populate the JobInfo object
    * The first invocation will populate the object, subsequent calls
@@ -122,21 +150,7 @@ public class JobHistoryParser {
     }
 
     info = new JobInfo();
-
-    int eventCtr = 0;
-    HistoryEvent event;
-    try {
-      while ((event = reader.getNextEvent()) != null) {
-        handleEvent(event);
-        ++eventCtr;
-      } 
-    } catch (IOException ioe) {
-      LOG.info("Caught exception parsing history file after " + eventCtr + 
-          " events", ioe);
-      parseException = ioe;
-    } finally {
-      in.close();
-    }
+    parse(reader, this);
     return info;
   }
   
@@ -150,7 +164,8 @@ public class JobHistoryParser {
     return parseException;
   }
   
-  private void handleEvent(HistoryEvent event)  { 
+  @Override
+  public void handleEvent(HistoryEvent event)  { 
     EventType type = event.getEventType();
 
     switch (type) {

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java

@@ -667,6 +667,9 @@ public class HistoryFileManager extends AbstractService {
             }
           });
         }
+      } else if (old != null && !old.isMovePending()) {
+        //This is a duplicate so just delete it
+        fileInfo.delete();
       }
     }
   }

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

@@ -125,7 +125,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     Class<? extends Enum> type = event.getType().getDeclaringClass();
 
     try{
-      eventDispatchers.get(type).handle(event);
+      EventHandler handler = eventDispatchers.get(type);
+      if(handler != null) {
+        handler.handle(event);
+      } else {
+        throw new Exception("No handler for registered for " + type);
+      }
     }
     catch (Throwable t) {
       //TODO Maybe log the state of the queue

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServlet.java

@@ -286,9 +286,12 @@ public class WebAppProxyServlet extends HttpServlet {
         		"please try the history server");
         return;
       }
-      URI trackingUri = ProxyUriUtils.getUriFromAMUrl(
-          applicationReport.getOriginalTrackingUrl());
-      if(applicationReport.getOriginalTrackingUrl().equals("N/A")) {
+      String original = applicationReport.getOriginalTrackingUrl();
+      URI trackingUri = null;
+      if (original != null) {
+        trackingUri = ProxyUriUtils.getUriFromAMUrl(original);
+      }
+      if(original == null || original.equals("N/A")) {
         String message;
         switch(applicationReport.getFinalApplicationStatus()) {
           case FAILED: