Prechádzať zdrojové kódy

YARN-1366. Changed AMRMClient to re-register with RM and send outstanding requests back to RM on work-preserving RM restart. Contributed by Rohith

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1609254 13f79535-47bb-0310-9956-ffa450edef68
Jian He 11 rokov pred
rodič
commit
d751a61e5a

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -75,6 +75,9 @@ Release 2.5.0 - UNRELEASED
     YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving
     restart is enabled. (Anubhav Dhoot via jianhe)
 
+    YARN-1366. Changed AMRMClient to re-register with RM and send outstanding requests
+    back to RM on work-preserving RM restart. (Rohith via jianhe)
+
   IMPROVEMENTS
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml

@@ -105,6 +105,12 @@
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-yarn-common</artifactId>
   	</dependency>
+    <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-common</artifactId>
+        <type>test-jar</type>
+        <scope>test</scope>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>

+ 15 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -207,14 +207,21 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
   
   /**
    * Request additional containers and receive new container allocations.
-   * Requests made via <code>addContainerRequest</code> are sent to the 
-   * <code>ResourceManager</code>. New containers assigned to the master are 
-   * retrieved. Status of completed containers and node health updates are 
-   * also retrieved.
-   * This also doubles up as a heartbeat to the ResourceManager and must be 
-   * made periodically.
-   * The call may not always return any new allocations of containers.
-   * App should not make concurrent allocate requests. May cause request loss.
+   * Requests made via <code>addContainerRequest</code> are sent to the
+   * <code>ResourceManager</code>. New containers assigned to the master are
+   * retrieved. Status of completed containers and node health updates are also
+   * retrieved. This also doubles up as a heartbeat to the ResourceManager and
+   * must be made periodically. The call may not always return any new
+   * allocations of containers. App should not make concurrent allocate
+   * requests. May cause request loss.
+   * 
+   * <p>
+   * Note : If the user has not removed container requests that have already
+   * been satisfied, then the re-register may end up sending the entire
+   * container requests to the RM (including matched requests). Which would mean
+   * the RM could end up giving it a lot of new allocated containers.
+   * </p>
+   * 
    * @param progressIndicator Indicates progress made by the master
    * @return the response of the allocate request
    * @throws YarnException

+ 1 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -234,8 +234,7 @@ extends AMRMClientAsync<T> {
           while (true) {
             try {
               responseQueue.put(response);
-              if (response.getAMCommand() == AMCommand.AM_RESYNC
-                  || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+              if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
                 return;
               }
               break;
@@ -280,7 +279,6 @@ extends AMRMClientAsync<T> {
 
           if (response.getAMCommand() != null) {
             switch(response.getAMCommand()) {
-            case AM_RESYNC:
             case AM_SHUTDOWN:
               handler.onShutdownRequest();
               LOG.info("Shutdown requested. Stopping callback.");

+ 73 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -77,10 +80,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   
   private int lastResponseId = 0;
 
+  protected String appHostName;
+  protected int appHostPort;
+  protected String appTrackingUrl;
+
   protected ApplicationMasterProtocol rmClient;
   protected Resource clusterAvailableResources;
   protected int clusterNodeCount;
   
+  // blacklistedNodes is required for keeping history of blacklisted nodes that
+  // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get
+  // current blacklisted nodes and send back to RM.
+  protected final Set<String> blacklistedNodes = new HashSet<String>();
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
   
@@ -150,6 +161,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
   protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+  // pendingRelease holds history or release requests.request is removed only if
+  // RM sends completedContainer.
+  // How it different from release? --> release is for per allocate() request.
+  protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   
   public AMRMClientImpl() {
     super(AMRMClientImpl.class.getName());
@@ -185,19 +200,27 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
       throws YarnException, IOException {
+    this.appHostName = appHostName;
+    this.appHostPort = appHostPort;
+    this.appTrackingUrl = appTrackingUrl;
     Preconditions.checkArgument(appHostName != null,
         "The host name should not be null");
     Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
         + " should be any integers larger than or equal to -1");
-    // do this only once ???
+
+    return registerApplicationMaster();
+  }
+
+  private RegisterApplicationMasterResponse registerApplicationMaster()
+      throws YarnException, IOException {
     RegisterApplicationMasterRequest request =
-        RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort,
-          appTrackingUrl);
+        RegisterApplicationMasterRequest.newInstance(this.appHostName,
+            this.appHostPort, this.appTrackingUrl);
     RegisterApplicationMasterResponse response =
         rmClient.registerApplicationMaster(request);
-
     synchronized (this) {
-      if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+      lastResponseId = 0;
+      if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
         populateNMTokens(response.getNMTokensFromPreviousAttempts());
       }
     }
@@ -249,6 +272,25 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       }
 
       allocateResponse = rmClient.allocate(allocateRequest);
+      if (isResyncCommand(allocateResponse)) {
+        LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");
+        synchronized (this) {
+          release.addAll(this.pendingRelease);
+          blacklistAdditions.addAll(this.blacklistedNodes);
+          for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
+              .values()) {
+            for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
+              for (ResourceRequestInfo request : capabalities.values()) {
+                addResourceRequestToAsk(request.remoteRequest);
+              }
+            }
+          }
+        }
+        // re register with RM
+        registerApplicationMaster();
+        return allocate(progressIndicator);
+      }
 
       synchronized (this) {
         // update these on successful RPC
@@ -258,6 +300,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         if (!allocateResponse.getNMTokens().isEmpty()) {
           populateNMTokens(allocateResponse.getNMTokens());
         }
+        if (!pendingRelease.isEmpty()
+            && !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
+          removePendingReleaseRequests(allocateResponse
+              .getCompletedContainersStatuses());
+        }
       }
     } finally {
       // TODO how to differentiate remote yarn exception vs error in rpc
@@ -288,6 +335,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return allocateResponse;
   }
 
+  protected void removePendingReleaseRequests(
+      List<ContainerStatus> completedContainersStatuses) {
+    for (ContainerStatus containerStatus : completedContainersStatuses) {
+      pendingRelease.remove(containerStatus.getContainerId());
+    }
+  }
+
+  private boolean isResyncCommand(AllocateResponse allocateResponse) {
+    return allocateResponse.getAMCommand() != null
+        && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+  }
+
   @Private
   @VisibleForTesting
   protected void populateNMTokens(List<NMToken> nmTokens) {
@@ -324,6 +383,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     } catch (InterruptedException e) {
       LOG.info("Interrupted while waiting for application"
           + " to be removed from RMStateStore");
+    } catch (ApplicationMasterNotRegisteredException e) {
+      LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+          + " hence resyncing.");
+      // re register with RM
+      registerApplicationMaster();
+      unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
     }
   }
   
@@ -414,6 +479,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized void releaseAssignedContainer(ContainerId containerId) {
     Preconditions.checkArgument(containerId != null,
         "ContainerId can not be null.");
+    pendingRelease.add(containerId);
     release.add(containerId);
   }
   
@@ -655,6 +721,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     
     if (blacklistAdditions != null) {
       this.blacklistAdditions.addAll(blacklistAdditions);
+      this.blacklistedNodes.addAll(blacklistAdditions);
       // if some resources are also in blacklistRemovals updated before, we 
       // should remove them here.
       this.blacklistRemovals.removeAll(blacklistAdditions);
@@ -662,6 +729,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     
     if (blacklistRemovals != null) {
       this.blacklistRemovals.addAll(blacklistRemovals);
+      this.blacklistedNodes.removeAll(blacklistRemovals);
       // if some resources are in blacklistAdditions before, we should remove
       // them here.
       this.blacklistAdditions.removeAll(blacklistRemovals);

+ 0 - 33
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

@@ -203,39 +203,6 @@ public class TestAMRMClientAsync {
     Assert.assertTrue(callbackHandler.callbackCount == 0);
   }
 
-  @Test//(timeout=10000)
-  public void testAMRMClientAsyncReboot() throws Exception {
-    Configuration conf = new Configuration();
-    TestCallbackHandler callbackHandler = new TestCallbackHandler();
-    @SuppressWarnings("unchecked")
-    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
-    
-    final AllocateResponse rebootResponse = createAllocateResponse(
-        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
-    rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
-    when(client.allocate(anyFloat())).thenReturn(rebootResponse);
-    
-    AMRMClientAsync<ContainerRequest> asyncClient = 
-        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
-    asyncClient.init(conf);
-    asyncClient.start();
-    
-    synchronized (callbackHandler.notifier) {
-      asyncClient.registerApplicationMaster("localhost", 1234, null);
-      while(callbackHandler.reboot == false) {
-        try {
-          callbackHandler.notifier.wait();
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-    
-    asyncClient.stop();
-    // stopping should have joined all threads and completed all callbacks
-    Assert.assertTrue(callbackHandler.callbackCount == 0);
-  }
-  
   @Test (timeout = 10000)
   public void testAMRMClientAsyncShutDown() throws Exception {
     Configuration conf = new Configuration();

+ 499 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java

@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAMRMClientOnRMRestart {
+  static Configuration conf = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new Configuration();
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+  }
+
+  // Test does major 6 steps verification.
+  // Step-1 : AMRMClient send allocate request for 2 container requests
+  // Step-2 : 2 containers are allocated by RM.
+  // Step-3 : AM Send 1 containerRequest(cRequest3) and 1 releaseRequests to
+  // RM
+  // Step-4 : On RM restart, AM(does not know RM is restarted) sends additional
+  // containerRequest(cRequest4) and blacklisted nodes.
+  // Intern RM send resync command
+  // Step-5 : Allocater after resync command & new containerRequest(cRequest5)
+  // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
+  @Test(timeout = 60000)
+  public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
+
+    UserGroupInformation.setLoginUser(null);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // Phase-1 Start 1st RM
+    MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+        rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
+            .getRMAppAttempt(appAttemptId).getAMRMToken();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+
+    // Step-1 : AMRMClient send allocate request for 2 ContainerRequest
+    // cRequest1 = h1 and cRequest2 = h1,h2
+    // blacklisted nodes = h2
+    AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
+    amClient.init(conf);
+    amClient.start();
+
+    amClient.registerApplicationMaster("Host", 10000, "");
+
+    ContainerRequest cRequest1 = createReq(1, 1024, new String[] { "h1" });
+    amClient.addContainerRequest(cRequest1);
+
+    ContainerRequest cRequest2 =
+        createReq(1, 1024, new String[] { "h1", "h2" });
+    amClient.addContainerRequest(cRequest2);
+
+    List<String> blacklistAdditions = new ArrayList<String>();
+    List<String> blacklistRemoval = new ArrayList<String>();
+    blacklistAdditions.add("h2");
+    blacklistRemoval.add("h10");
+    amClient.updateBlacklist(blacklistAdditions, blacklistRemoval);
+    blacklistAdditions.remove("h2");// remove from local list
+
+    AllocateResponse allocateResponse = amClient.allocate(0.1f);
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+        .getAllocatedContainers().size());
+
+    // Why 4 ask, why not 3 ask even h2 is blacklisted?
+    // On blacklisting host,applicationmaster has to remove ask request from
+    // remoterequest table.Here,test does not remove explicitely
+    assertAsksAndReleases(4, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(1, 1, rm1);
+
+    // Step-2 : NM heart beat is sent.
+    // On 2nd AM allocate request, RM allocates 2 containers to AM
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    allocateResponse = amClient.allocate(0.2f);
+    dispatcher.await();
+    // 2 containers are allocated i.e for cRequest1 and cRequest2.
+    Assert.assertEquals("No of assignments must be 0", 2, allocateResponse
+        .getAllocatedContainers().size());
+    assertAsksAndReleases(0, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    List<Container> allocatedContainers =
+        allocateResponse.getAllocatedContainers();
+    // removed allocated container requests
+    amClient.removeContainerRequest(cRequest1);
+    amClient.removeContainerRequest(cRequest2);
+
+    allocateResponse = amClient.allocate(0.2f);
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+        .getAllocatedContainers().size());
+    assertAsksAndReleases(4, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Step-3 : Send 1 containerRequest and 1 releaseRequests to RM
+    ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" });
+    amClient.addContainerRequest(cRequest3);
+
+    int pendingRelease = 0;
+    Iterator<Container> it = allocatedContainers.iterator();
+    while (it.hasNext()) {
+      amClient.releaseAssignedContainer(it.next().getId());
+      pendingRelease++;
+      it.remove();
+      break;// remove one container
+    }
+
+    allocateResponse = amClient.allocate(0.3f);
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+        .getAllocatedContainers().size());
+    assertAsksAndReleases(3, pendingRelease, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+    int completedContainer =
+        allocateResponse.getCompletedContainersStatuses().size();
+    pendingRelease -= completedContainer;
+
+    // Phase-2 start 2nd RM is up
+    MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
+    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+    // NM should be rebooted on heartbeat, even first heartbeat for nm2
+    NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+    // new NM to represent NM re-register
+    nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    blacklistAdditions.add("h3");
+    amClient.updateBlacklist(blacklistAdditions, null);
+    blacklistAdditions.remove("h3");
+
+    it = allocatedContainers.iterator();
+    while (it.hasNext()) {
+      amClient.releaseAssignedContainer(it.next().getId());
+      pendingRelease++;
+      it.remove();
+    }
+
+    ContainerRequest cRequest4 =
+        createReq(1, 1024, new String[] { "h1", "h2" });
+    amClient.addContainerRequest(cRequest4);
+
+    // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+    // additional
+    // containerRequest and blacklisted nodes.
+    // Intern RM send resync command,AMRMClient resend allocate request
+    allocateResponse = amClient.allocate(0.3f);
+    dispatcher.await();
+
+    completedContainer =
+        allocateResponse.getCompletedContainersStatuses().size();
+    pendingRelease -= completedContainer;
+
+    assertAsksAndReleases(4, pendingRelease, rm2);
+    assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+    ContainerRequest cRequest5 =
+        createReq(1, 1024, new String[] { "h1", "h2", "h3" });
+    amClient.addContainerRequest(cRequest5);
+
+    // Step-5 : Allocater after resync command
+    allocateResponse = amClient.allocate(0.5f);
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+        .getAllocatedContainers().size());
+
+    assertAsksAndReleases(5, 0, rm2);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm2);
+
+    int noAssignedContainer = 0;
+    int count = 5;
+    while (count-- > 0) {
+      nm1.nodeHeartbeat(true);
+      dispatcher.await();
+
+      allocateResponse = amClient.allocate(0.5f);
+      dispatcher.await();
+      noAssignedContainer += allocateResponse.getAllocatedContainers().size();
+      if (noAssignedContainer == 3) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
+    Assert.assertEquals("Number of container should be 3", 3,
+        noAssignedContainer);
+
+    amClient.stop();
+    rm1.stop();
+    rm2.stop();
+  }
+
+  // Test verify for
+  // 1. AM try to unregister without registering
+  // 2. AM register to RM, and try to unregister immediately after RM restart
+  @Test(timeout = 60000)
+  public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception {
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // Phase-1 Start 1st RM
+    MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+        rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
+            .getRMAppAttempt(appAttemptId).getAMRMToken();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+
+    AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
+    amClient.init(conf);
+    amClient.start();
+
+    amClient.registerApplicationMaster("h1", 10000, "");
+    amClient.allocate(0.1f);
+
+    // Phase-2 start 2nd RM is up
+    MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
+    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+    // NM should be rebooted on heartbeat, even first heartbeat for nm2
+    NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+    // new NM to represent NM re-register
+    nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, ContainerState.RUNNING,
+            Resource.newInstance(1024, 1), "recover container", 0,
+            Priority.newInstance(0), 0);
+    nm1.registerNode(Arrays.asList(containerReport), null);
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+        null, null);
+    rm2.waitForState(appAttemptId, RMAppAttemptState.FINISHING);
+    nm1.nodeHeartbeat(appAttemptId, 1, ContainerState.COMPLETE);
+    rm2.waitForState(appAttemptId, RMAppAttemptState.FINISHED);
+    rm2.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+
+    amClient.stop();
+    rm1.stop();
+    rm2.stop();
+
+  }
+
+  private static class MyFifoScheduler extends FifoScheduler {
+
+    public MyFifoScheduler(RMContext rmContext) {
+      super();
+      try {
+        Configuration conf = new Configuration();
+        reinitialize(conf, rmContext);
+      } catch (IOException ie) {
+        assert (false);
+      }
+    }
+
+    List<ResourceRequest> lastAsk = null;
+    List<ContainerId> lastRelease = null;
+    List<String> lastBlacklistAdditions;
+    List<String> lastBlacklistRemovals;
+
+    // override this to copy the objects otherwise FifoScheduler updates the
+    // numContainers in same objects as kept by RMContainerAllocator
+    @Override
+    public synchronized Allocation allocate(
+        ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+        List<ContainerId> release, List<String> blacklistAdditions,
+        List<String> blacklistRemovals) {
+      List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+      for (ResourceRequest req : ask) {
+        ResourceRequest reqCopy =
+            ResourceRequest.newInstance(req.getPriority(),
+                req.getResourceName(), req.getCapability(),
+                req.getNumContainers(), req.getRelaxLocality());
+        askCopy.add(reqCopy);
+      }
+      lastAsk = ask;
+      lastRelease = release;
+      lastBlacklistAdditions = blacklistAdditions;
+      lastBlacklistRemovals = blacklistRemovals;
+      return super.allocate(applicationAttemptId, askCopy, release,
+          blacklistAdditions, blacklistRemovals);
+    }
+  }
+
+  private static class MyResourceManager extends MockRM {
+
+    private static long fakeClusterTimeStamp = System.currentTimeMillis();
+
+    public MyResourceManager(Configuration conf, RMStateStore store) {
+      super(conf, store);
+    }
+
+    @Override
+    public void serviceStart() throws Exception {
+      super.serviceStart();
+      // Ensure that the application attempt IDs for all the tests are the same
+      // The application attempt IDs will be used as the login user names
+      MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp);
+    }
+
+    @Override
+    protected Dispatcher createDispatcher() {
+      return new DrainDispatcher();
+    }
+
+    @Override
+    protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+      // Dispatch inline for test sanity
+      return new EventHandler<SchedulerEvent>() {
+        @Override
+        public void handle(SchedulerEvent event) {
+          scheduler.handle(event);
+        }
+      };
+    }
+
+    @Override
+    protected ResourceScheduler createScheduler() {
+      return new MyFifoScheduler(this.getRMContext());
+    }
+
+    MyFifoScheduler getMyFifoScheduler() {
+      return (MyFifoScheduler) scheduler;
+    }
+  }
+
+  private static class MyAMRMClientImpl extends
+      AMRMClientImpl<ContainerRequest> {
+    private MyResourceManager rm;
+
+    public MyAMRMClientImpl(MyResourceManager rm) {
+      this.rm = rm;
+    }
+
+    @Override
+    protected void serviceInit(Configuration conf) throws Exception {
+      super.serviceInit(conf);
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      this.rmClient = this.rm.getApplicationMasterService();
+    }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      rmClient = null;
+      super.serviceStop();
+    }
+
+    public void updateRMProxy(MyResourceManager rm) {
+      rmClient = rm.getApplicationMasterService();
+    }
+  }
+
+  private static void assertBlacklistAdditionsAndRemovals(
+      int expectedAdditions, int expectedRemovals, MyResourceManager rm) {
+    Assert.assertEquals(expectedAdditions,
+        rm.getMyFifoScheduler().lastBlacklistAdditions.size());
+    Assert.assertEquals(expectedRemovals,
+        rm.getMyFifoScheduler().lastBlacklistRemovals.size());
+  }
+
+  private static void assertAsksAndReleases(int expectedAsk,
+      int expectedRelease, MyResourceManager rm) {
+    Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+    Assert.assertEquals(expectedRelease,
+        rm.getMyFifoScheduler().lastRelease.size());
+  }
+
+  private ContainerRequest createReq(int priority, int memory, String[] hosts) {
+    Resource capability = Resource.newInstance(memory, 1);
+    Priority priorityOfContainer = Priority.newInstance(priority);
+    return new ContainerRequest(capability, hosts,
+        new String[] { NetworkTopology.DEFAULT_RACK }, priorityOfContainer);
+  }
+
+}

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml

@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+  <property>
+    <name>hadoop.security.token.service.use_ip</name>
+    <value>false</value>
+  </property>
+
+</configuration>