浏览代码

Merge r1605263 from trunk. YARN-1365. Changed ApplicationMasterService to allow an app to re-register after RM restart. Contributed by Anubhav Dhoot

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605264 13f79535-47bb-0310-9956-ffa450edef68
Jian He 11 年之前
父节点
当前提交
63940d6e13
共有 16 个文件被更改,包括 191 次插入87 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 47 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java
  3. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java
  4. 12 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  5. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  6. 13 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  7. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
  8. 14 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  9. 13 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  10. 22 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
  11. 8 31
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
  13. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
  14. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
  15. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
  16. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -27,6 +27,9 @@ Release 2.5.0 - UNRELEASED
     YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
     YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
     via junping_du)
     via junping_du)
 
 
+    YARN-1365. Changed ApplicationMasterService to allow an app to re-register
+    after RM restart. (Anubhav Dhoot via jianhe)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+
+/**
+ * This exception is thrown when an Application Master tries to unregister by calling
+ * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
+ * API without first registering by calling
+ * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
+ * or after an RM restart. The ApplicationMaster is expected to call
+ * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
+ * and retry.
+ */
+
+public class ApplicationMasterNotRegisteredException extends YarnException {
+
+  private static final long serialVersionUID = 13498238L;
+
+  public ApplicationMasterNotRegisteredException(Throwable cause) { super(cause);}
+
+  public ApplicationMasterNotRegisteredException(String message) { super(message); }
+
+  public ApplicationMasterNotRegisteredException(String message, Throwable
+      cause) {
+    super(message, cause);
+  }
+}

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java

@@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 
 
 /**
 /**
  * This exception is thrown when an ApplicationMaster asks for resources by
  * This exception is thrown when an ApplicationMaster asks for resources by
- * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
- * to unregister by calling
- * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
- * API without first registering by calling
+ * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)}
+ * without first registering by calling
  * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
  * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
  * or if it tries to register more than once.
  * or if it tries to register more than once.
  */
  */

+ 12 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
@@ -107,12 +108,15 @@ public class ApplicationMasterService extends AbstractService implements
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   private final AllocateResponse resync =
   private final AllocateResponse resync =
       recordFactory.newRecordInstance(AllocateResponse.class);
       recordFactory.newRecordInstance(AllocateResponse.class);
+  private final AllocateResponse shutdown =
+      recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
   private final RMContext rmContext;
 
 
   public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
   public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
     super(ApplicationMasterService.class.getName());
     super(ApplicationMasterService.class.getName());
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
     this.rScheduler = scheduler;
+    this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
     this.resync.setAMCommand(AMCommand.AM_RESYNC);
     this.resync.setAMCommand(AMCommand.AM_RESYNC);
     this.rmContext = rmContext;
     this.rmContext = rmContext;
   }
   }
@@ -346,9 +350,9 @@ public class ApplicationMasterService extends AbstractService implements
             AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
             AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
             message, applicationAttemptId.getApplicationId(),
             message, applicationAttemptId.getApplicationId(),
             applicationAttemptId);
             applicationAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        throw new ApplicationMasterNotRegisteredException(message);
       }
       }
-      
+
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
 
       RMApp rmApp =
       RMApp rmApp =
@@ -409,22 +413,23 @@ public class ApplicationMasterService extends AbstractService implements
     AllocateResponseLock lock = responseMap.get(appAttemptId);
     AllocateResponseLock lock = responseMap.get(appAttemptId);
     if (lock == null) {
     if (lock == null) {
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
-      return resync;
+      return shutdown;
     }
     }
     synchronized (lock) {
     synchronized (lock) {
       AllocateResponse lastResponse = lock.getAllocateResponse();
       AllocateResponse lastResponse = lock.getAllocateResponse();
       if (!hasApplicationMasterRegistered(appAttemptId)) {
       if (!hasApplicationMasterRegistered(appAttemptId)) {
         String message =
         String message =
-            "Application Master is trying to allocate before registering for: "
-                + appAttemptId.getApplicationId();
-        LOG.error(message);
+            "Application Master is not registered for known application: "
+                + appAttemptId.getApplicationId()
+                + ". Let AM resync.";
+        LOG.info(message);
         RMAuditLogger.logFailure(
         RMAuditLogger.logFailure(
             this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
             this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
                 .getUser(), AuditConstants.REGISTER_AM, "",
                 .getUser(), AuditConstants.REGISTER_AM, "",
             "ApplicationMasterService", message,
             "ApplicationMasterService", message,
             appAttemptId.getApplicationId(),
             appAttemptId.getApplicationId(),
             appAttemptId);
             appAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        return resync;
       }
       }
 
 
       if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
       if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -899,8 +899,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       } else {
       } else {
         // Add the current attempt to the scheduler.
         // Add the current attempt to the scheduler.
         if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
         if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+          // Need to register an app attempt before AM can register
+          appAttempt.masterService
+              .registerAppAttempt(appAttempt.applicationAttemptId);
+
           appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
           appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false));
+            appAttempt.getAppAttemptId(), false, false));
         }
         }
 
 
         /*
         /*

+ 13 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -557,7 +557,8 @@ public class CapacityScheduler extends
 
 
   private synchronized void addApplicationAttempt(
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FiCaSchedulerApp> application =
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
     CSQueue queue = (CSQueue) application.getQueue();
@@ -575,9 +576,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
         + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler() .handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-          RMAppAttemptEventType.ATTEMPT_ADDED));
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(applicationAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
   }
 
 
   private synchronized void doneApplication(ApplicationId applicationId,
   private synchronized void doneApplication(ApplicationId applicationId,
@@ -911,7 +918,8 @@ public class CapacityScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
     }
     }
     break;
     break;
     case APP_ATTEMPT_REMOVED:
     case APP_ATTEMPT_REMOVED:

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java

@@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
 
 
   private final ApplicationAttemptId applicationAttemptId;
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
   private final boolean transferStateFromPreviousAttempt;
+  private final boolean shouldNotifyAttemptAdded;
 
 
   public AppAttemptAddedSchedulerEvent(
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
       boolean transferStateFromPreviousAttempt) {
+    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+  }
+
+  public AppAttemptAddedSchedulerEvent(
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
   }
   }
 
 
   public ApplicationAttemptId getApplicationAttemptId() {
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
   public boolean getTransferStateFromPreviousAttempt() {
   public boolean getTransferStateFromPreviousAttempt() {
     return transferStateFromPreviousAttempt;
     return transferStateFromPreviousAttempt;
   }
   }
+
+  public boolean getShouldNotifyAttemptAdded() {
+    return shouldNotifyAttemptAdded;
+  }
 }
 }

+ 14 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -597,7 +597,8 @@ public class FairScheduler extends
    */
    */
   protected synchronized void addApplicationAttempt(
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FSSchedulerApp> application =
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
     String user = application.getUser();
@@ -625,9 +626,16 @@ public class FairScheduler extends
 
 
     LOG.info("Added Application Attempt " + applicationAttemptId
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
         + " to scheduler from user: " + user);
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(applicationAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
   }
 
 
   /**
   /**
@@ -1130,7 +1138,8 @@ public class FairScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
       break;
       break;
     case APP_ATTEMPT_REMOVED:
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

+ 13 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -370,7 +370,8 @@ public class FifoScheduler extends
   @VisibleForTesting
   @VisibleForTesting
   public synchronized void
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
-          boolean transferStateFromPreviousAttempt) {
+          boolean transferStateFromPreviousAttempt,
+          boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FiCaSchedulerApp> application =
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
     String user = application.getUser();
@@ -388,9 +389,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
         + " to scheduler from user " + application.getUser());
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(appAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(appAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
   }
 
 
   private synchronized void doneApplication(ApplicationId applicationId,
   private synchronized void doneApplication(ApplicationId applicationId,
@@ -780,7 +787,8 @@ public class FifoScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
     }
     }
     break;
     break;
     case APP_ATTEMPT_REMOVED:
     case APP_ATTEMPT_REMOVED:

+ 22 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -194,28 +195,17 @@ public class TestApplicationMasterLauncher {
 
 
     // request for containers
     // request for containers
     int request = 2;
     int request = 2;
-    try {
-      AllocateResponse ar =
-          am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
-    } catch (Exception e) {
-      Assert.assertEquals("Application Master is trying to allocate before "
-          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
-      thrown = true;
-    }
+    AllocateResponse ar =
+        am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+    Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
     // kick the scheduler
     // kick the scheduler
     nm1.nodeHeartbeat(true);
     nm1.nodeHeartbeat(true);
-    try {
-      AllocateResponse amrs =
-          am.allocate(new ArrayList<ResourceRequest>(),
-            new ArrayList<ContainerId>());
-    } catch (Exception e) {
-      Assert.assertEquals("Application Master is trying to allocate before "
-          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
-      thrown = true;
-    }
-    Assert.assertTrue(thrown);
+    AllocateResponse amrs =
+        am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>());
+    Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
     am.registerAppAttempt();
     am.registerAppAttempt();
     thrown = false;
     thrown = false;
     try {
     try {
@@ -228,5 +218,17 @@ public class TestApplicationMasterLauncher {
       thrown = true;
       thrown = true;
     }
     }
     Assert.assertTrue(thrown);
     Assert.assertTrue(thrown);
+
+    // Simulate an AM that was disconnected and app attempt was removed
+    // (responseMap does not contain attemptid)
+    am.unregisterAppAttempt();
+    nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+        ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+
+    AllocateResponse amrs2 =
+        am.allocate(new ArrayList<ResourceRequest>(),
+            new ArrayList<ContainerId>());
+    Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
   }
   }
 }
 }

+ 8 - 31
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java

@@ -18,60 +18,33 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
-import com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Assert;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
-
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 
 
 import static java.lang.Thread.sleep;
 import static java.lang.Thread.sleep;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.*;
 
 
 public class TestApplicationMasterService {
 public class TestApplicationMasterService {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -270,13 +243,17 @@ public class TestApplicationMasterService {
       }
       }
       Assert.assertNotNull(cause);
       Assert.assertNotNull(cause);
       Assert
       Assert
-          .assertTrue(cause instanceof InvalidApplicationMasterRequestException);
+          .assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
       Assert.assertNotNull(cause.getMessage());
       Assert.assertNotNull(cause.getMessage());
       Assert
       Assert
           .assertTrue(cause
           .assertTrue(cause
               .getMessage()
               .getMessage()
               .contains(
               .contains(
                   "Application Master is trying to unregister before registering for:"));
                   "Application Master is trying to unregister before registering for:"));
+
+      am1.registerAppAttempt();
+
+      am1.unregisterAppAttempt(req, false);
     } finally {
     } finally {
       if (rm != null) {
       if (rm != null) {
         rm.stop();
         rm.stop();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

@@ -238,7 +238,7 @@ public class TestFifoScheduler {
     }
     }
 
 
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
-    scheduler.addApplicationAttempt(attId, false);
+    scheduler.addApplicationAttempt(attId, false, true);
 
 
     rm.stop();
     rm.stop();
   }
   }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -293,7 +293,7 @@ public class TestRMRestart {
     AllocateResponse allocResponse = am1.allocate(
     AllocateResponse allocResponse = am1.allocate(
         new ArrayList<ResourceRequest>(),
         new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
         new ArrayList<ContainerId>());
-    Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
+    Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
     
     
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -1648,7 +1648,7 @@ public class TestRMRestart {
     rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
     rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
     MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
     MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
     am1.registerAppAttempt();
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); 
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     nm1.nodeHeartbeat(true);
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
         new ArrayList<ContainerId>()).getAllocatedContainers();

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -535,6 +535,36 @@ public class TestWorkPreservingRMRestart {
     assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
     assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
   }
   }
 
 
+  @Test (timeout = 600000)
+  public void testAppReregisterOnRMWorkPreservingRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+
+    // start new RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
+
+    am0.setAMRMProtocol(rm2.getApplicationMasterService());
+    am0.registerAppAttempt(false);
+
+    rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+  }
+
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,
       int appsPending, int appsRunning, int appsCompleted,
       int allocatedContainers, int availableMB, int availableVirtualCores,
       int allocatedContainers, int availableMB, int availableVirtualCores,

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java

@@ -146,7 +146,7 @@ public class FairSchedulerTestBase {
     // This conditional is for testAclSubmitApplication where app is rejected
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
     // and no app is added.
     if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
     if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
-      scheduler.addApplicationAttempt(id, false);
+      scheduler.addApplicationAttempt(id, false, true);
     }
     }
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
     ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -787,13 +787,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
     scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
-    scheduler.addApplicationAttempt(id11, false);
+    scheduler.addApplicationAttempt(id11, false, true);
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
     scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
     scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id21, false);
+    scheduler.addApplicationAttempt(id21, false, true);
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
     scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
     scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id22, false);
+    scheduler.addApplicationAttempt(id22, false, true);
 
 
     int minReqSize = 
     int minReqSize = 
         FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
         FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@@ -1555,7 +1555,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     
     
     ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
     scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
-    scheduler.addApplicationAttempt(appId, false);
+    scheduler.addApplicationAttempt(appId, false, true);
     
     
     // 1 request with 2 nodes on the same rack. another request with 1 node on
     // 1 request with 2 nodes on the same rack. another request with 1 node on
     // a different rack
     // a different rack
@@ -2714,7 +2714,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ApplicationAttemptId appAttemptId =
     ApplicationAttemptId appAttemptId =
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
     fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
-    fs.addApplicationAttempt(appAttemptId, false);
+    fs.addApplicationAttempt(appAttemptId, false, true);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request =
     ResourceRequest request =
             createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
             createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);