Browse Source

Merge r1581679 from branch-2. YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in ApplicationClientProtcol, ResourceManagerAdministrationProtocol and ResourceTrackerProtocol so that they work in HA scenario. Contributed by Xuan Gong

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1581682 13f79535-47bb-0310-9956-ffa450edef68
Jian He 11 years ago
parent
commit
c8b650ef57
12 changed files with 1162 additions and 21 deletions
  1. 5 0
      hadoop-yarn-project/CHANGES.txt
  2. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
  3. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
  4. 721 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
  5. 211 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
  6. 91 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
  7. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
  8. 4 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  10. 1 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
  11. 85 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
  12. 11 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

+ 5 - 0
hadoop-yarn-project/CHANGES.txt

@@ -526,6 +526,11 @@ Release 2.4.0 - UNRELEASED
     YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or
     fail-over. (Jian He via vinodkv)
 
+    YARN-1521. Mark Idempotent/AtMostOnce annotations to the APIs in
+    ApplicationClientProtcol, ResourceManagerAdministrationProtocol and
+    ResourceTrackerProtocol so that they work in HA scenario. (Xuan Gong
+    via jianhe)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java

@@ -104,6 +104,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public GetNewApplicationResponse getNewApplication(
       GetNewApplicationRequest request)
   throws YarnException, IOException;
@@ -133,6 +134,10 @@ public interface ApplicationClientProtocol {
    * it encounters the {@link ApplicationNotFoundException} on the
    * {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
    * 
+   * <p>During the submission process, it checks whether the application
+   * already exists. If the application exists, it will simply return
+   * SubmitApplicationResponse</p>
+   *
    * <p> In secure mode,the <code>ResourceManager</code> verifies access to
    * queues etc. before accepting the application submission.</p>
    * 
@@ -147,6 +152,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) 
   throws YarnException, IOException;
@@ -173,6 +179,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public KillApplicationResponse forceKillApplication(
       KillApplicationRequest request) 
   throws YarnException, IOException;
@@ -231,6 +238,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public GetClusterMetricsResponse getClusterMetrics(
       GetClusterMetricsRequest request) 
   throws YarnException, IOException;
@@ -258,6 +266,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public GetApplicationsResponse getApplications(
       GetApplicationsRequest request)
   throws YarnException, IOException;
@@ -277,6 +286,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public GetClusterNodesResponse getClusterNodes(
       GetClusterNodesRequest request) 
   throws YarnException, IOException;
@@ -298,6 +308,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public GetQueueInfoResponse getQueueInfo(
       GetQueueInfoRequest request) 
   throws YarnException, IOException;
@@ -317,6 +328,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+ @Idempotent
   public GetQueueUserAclsInfoResponse getQueueUserAcls(
       GetQueueUserAclsInfoRequest request) 
   throws YarnException, IOException;
@@ -335,6 +347,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public GetDelegationTokenResponse getDelegationToken(
       GetDelegationTokenRequest request) 
   throws YarnException, IOException;
@@ -349,6 +362,7 @@ public interface ApplicationClientProtocol {
    */
   @Private
   @Unstable
+  @Idempotent
   public RenewDelegationTokenResponse renewDelegationToken(
       RenewDelegationTokenRequest request) throws YarnException,
       IOException;
@@ -363,6 +377,7 @@ public interface ApplicationClientProtocol {
    */
   @Private
   @Unstable
+  @Idempotent
   public CancelDelegationTokenResponse cancelDelegationToken(
       CancelDelegationTokenRequest request) throws YarnException,
       IOException;
@@ -377,6 +392,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Unstable
+  @Idempotent
   public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
       MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
 
@@ -422,6 +438,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Unstable
+  @Idempotent
   public GetApplicationAttemptReportResponse getApplicationAttemptReport(
       GetApplicationAttemptReportRequest request) throws YarnException,
       IOException;
@@ -453,6 +470,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Unstable
+  @Idempotent
   public GetApplicationAttemptsResponse getApplicationAttempts(
       GetApplicationAttemptsRequest request) throws YarnException, IOException;
 
@@ -486,6 +504,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Unstable
+  @Idempotent
   public GetContainerReportResponse getContainerReport(
       GetContainerReportRequest request) throws YarnException, IOException;
 
@@ -520,6 +539,7 @@ public interface ApplicationClientProtocol {
    */
   @Public
   @Unstable
+  @Idempotent
   public GetContainersResponse getContainers(GetContainersRequest request)
       throws YarnException, IOException;
 

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -50,16 +51,19 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
 
   @Public
   @Stable
+  @Idempotent
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) 
   throws StandbyException, YarnException, IOException;
 
   @Public
   @Stable
+  @Idempotent
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
   throws StandbyException, YarnException, IOException;
 
   @Public
   @Stable
+  @Idempotent
   public RefreshSuperUserGroupsConfigurationResponse 
   refreshSuperUserGroupsConfiguration(
       RefreshSuperUserGroupsConfigurationRequest request)
@@ -67,18 +71,21 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
 
   @Public
   @Stable
+  @Idempotent
   public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
       RefreshUserToGroupsMappingsRequest request)
   throws StandbyException, YarnException, IOException;
 
   @Public
   @Stable
+  @Idempotent
   public RefreshAdminAclsResponse refreshAdminAcls(
       RefreshAdminAclsRequest request)
   throws YarnException, IOException;
 
   @Public
   @Stable
+  @Idempotent
   public RefreshServiceAclsResponse refreshServiceAcls(
       RefreshServiceAclsRequest request)
   throws YarnException, IOException;
@@ -99,6 +106,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
    */
   @Public
   @Evolving
+  @Idempotent
   public UpdateNodeResourceResponse updateNodeResource(
       UpdateNodeResourceRequest request) 
   throws YarnException, IOException;

+ 721 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java

@@ -0,0 +1,721 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+
+
+public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
+  protected static final HAServiceProtocol.StateChangeRequestInfo req =
+      new HAServiceProtocol.StateChangeRequestInfo(
+          HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+  protected static final String RM1_NODE_ID = "rm1";
+  protected static final int RM1_PORT_BASE = 10000;
+  protected static final String RM2_NODE_ID = "rm2";
+  protected static final int RM2_PORT_BASE = 20000;
+
+  protected Configuration conf;
+  protected MiniYARNClusterForHATesting cluster;
+
+  protected Thread failoverThread = null;
+  private volatile boolean keepRunning;
+
+  private void setConfForRM(String rmId, String prefix, String value) {
+    conf.set(HAUtil.addSuffix(prefix, rmId), value);
+  }
+
+  private void setRpcAddressForRM(String rmId, int base) {
+    setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+        "0.0.0.0:" + (base + YarnConfiguration
+            .DEFAULT_RM_RESOURCE_TRACKER_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
+  }
+
+  @Before
+  public void setup() throws IOException {
+    failoverThread = null;
+    keepRunning = true;
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
+    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+    setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
+    setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
+
+    conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
+
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    keepRunning = false;
+    if (failoverThread != null) {
+      failoverThread.interrupt();
+      try {
+        failoverThread.join();
+      } catch (InterruptedException ex) {
+        LOG.error("Error joining with failover thread", ex);
+      }
+    }
+    cluster.stop();
+  }
+
+  protected AdminService getAdminService(int index) {
+    return cluster.getResourceManager(index).getRMContext()
+        .getRMAdminService();
+  }
+
+  protected void explicitFailover() throws IOException {
+    int activeRMIndex = cluster.getActiveRMIndex();
+    int newActiveRMIndex = (activeRMIndex + 1) % 2;
+    getAdminService(activeRMIndex).transitionToStandby(req);
+    getAdminService(newActiveRMIndex).transitionToActive(req);
+    assertEquals("Failover failed", newActiveRMIndex,
+        cluster.getActiveRMIndex());
+  }
+
+  protected YarnClient createAndStartYarnClient(Configuration conf) {
+    Configuration configuration = new YarnConfiguration(conf);
+    YarnClient client = YarnClient.createYarnClient();
+    client.init(configuration);
+    client.start();
+    return client;
+  }
+
+  protected void verifyConnections() throws InterruptedException,
+      YarnException {
+    assertTrue("NMs failed to connect to the RM",
+        cluster.waitForNodeManagersToConnect(20000));
+    verifyClientConnection();
+  }
+
+  protected void verifyClientConnection() {
+    int numRetries = 3;
+    while(numRetries-- > 0) {
+      Configuration conf = new YarnConfiguration(this.conf);
+      YarnClient client = createAndStartYarnClient(conf);
+      try {
+        Thread.sleep(100);
+        client.getApplications();
+        return;
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      } finally {
+        client.stop();
+      }
+    }
+    fail("Client couldn't connect to the Active RM");
+  }
+
+  protected Thread createAndStartFailoverThread() {
+    Thread failoverThread = new Thread() {
+      public void run() {
+        keepRunning = true;
+        while (keepRunning) {
+          if (cluster.getStartFailoverFlag()) {
+            try {
+              explicitFailover();
+              keepRunning = false;
+              cluster.resetFailoverTriggeredFlag(true);
+            } catch (Exception e) {
+              // Do Nothing
+            } finally {
+              keepRunning = false;
+            }
+          }
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            // DO NOTHING
+          }
+        }
+      }
+    };
+    failoverThread.start();
+    return failoverThread;
+  }
+
+  protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
+      boolean overrideRTS) throws Exception {
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    cluster =
+        new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
+            numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS);
+    cluster.resetStartFailoverFlag(false);
+    cluster.init(conf);
+    cluster.start();
+    getAdminService(0).transitionToActive(req);
+    assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+    verifyConnections();
+
+    // Do the failover
+    explicitFailover();
+    verifyConnections();
+
+    failoverThread = createAndStartFailoverThread();
+
+  }
+
+  protected ResourceManager getActiveRM() {
+    return cluster.getResourceManager(cluster.getActiveRMIndex());
+  }
+
+  public class MiniYARNClusterForHATesting extends MiniYARNCluster {
+
+    private boolean overrideClientRMService;
+    private boolean overrideRTS;
+    private final AtomicBoolean startFailover = new AtomicBoolean(false);
+    private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
+
+    public MiniYARNClusterForHATesting(String testName,
+        int numResourceManagers, int numNodeManagers, int numLocalDirs,
+        int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
+        boolean overrideRTS) {
+      super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
+          numLogDirs, enableAHS);
+      this.overrideClientRMService = overrideClientRMService;
+      this.overrideRTS = overrideRTS;
+    }
+
+    public boolean getStartFailoverFlag() {
+      return startFailover.get();
+    }
+
+    public void resetStartFailoverFlag(boolean flag) {
+      startFailover.set(flag);
+    }
+
+    public void resetFailoverTriggeredFlag(boolean flag) {
+      failoverTriggered.set(flag);
+    }
+
+    private boolean waittingForFailOver() {
+      int maximumWaittingTime = 50;
+      int count = 0;
+      while (!failoverTriggered.get() && count >= maximumWaittingTime) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          // DO NOTHING
+        }
+        count++;
+      }
+      if (count >= maximumWaittingTime) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    protected ResourceManager createResourceManager() {
+      return new ResourceManager() {
+        @Override
+        protected void doSecureLogin() throws IOException {
+          // Don't try to login using keytab in the testcases.
+        }
+        @Override
+        protected ClientRMService createClientRMService() {
+          if (overrideClientRMService) {
+            return new CustomedClientRMService(this.rmContext, this.scheduler,
+                this.rmAppManager, this.applicationACLsManager,
+                this.queueACLsManager,
+                this.rmContext.getRMDelegationTokenSecretManager());
+          }
+          return super.createClientRMService();
+        }
+        @Override
+        protected ResourceTrackerService createResourceTrackerService() {
+          if (overrideRTS) {
+            return new CustomedResourceTrackerService(this.rmContext,
+                this.nodesListManager, this.nmLivelinessMonitor,
+                this.rmContext.getContainerTokenSecretManager(),
+                this.rmContext.getNMTokenSecretManager());
+          }
+          return super.createResourceTrackerService();
+        }
+      };
+    }
+
+    private class CustomedClientRMService extends ClientRMService {
+      public CustomedClientRMService(RMContext rmContext,
+          YarnScheduler scheduler, RMAppManager rmAppManager,
+          ApplicationACLsManager applicationACLsManager,
+          QueueACLsManager queueACLsManager,
+          RMDelegationTokenSecretManager rmDTSecretManager) {
+        super(rmContext, scheduler, rmAppManager, applicationACLsManager,
+            queueACLsManager, rmDTSecretManager);
+      }
+
+      @Override
+      public GetNewApplicationResponse getNewApplication(
+          GetNewApplicationRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // create the GetNewApplicationResponse with fake applicationId
+        GetNewApplicationResponse response =
+            GetNewApplicationResponse.newInstance(
+                createFakeAppId(), null, null);
+        return response;
+      }
+
+      @Override
+      public GetApplicationReportResponse getApplicationReport(
+          GetApplicationReportRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // create a fake application report
+        ApplicationReport report = createFakeAppReport();
+        GetApplicationReportResponse response =
+            GetApplicationReportResponse.newInstance(report);
+        return response;
+      }
+
+      @Override
+      public GetClusterMetricsResponse getClusterMetrics(
+          GetClusterMetricsRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // create GetClusterMetricsResponse with fake YarnClusterMetrics
+        GetClusterMetricsResponse response =
+            GetClusterMetricsResponse.newInstance(
+                createFakeYarnClusterMetrics());
+        return response;
+      }
+
+      @Override
+      public GetApplicationsResponse getApplications(
+          GetApplicationsRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // create GetApplicationsResponse with fake applicationList
+        GetApplicationsResponse response =
+            GetApplicationsResponse.newInstance(createFakeAppReports());
+        return response;
+      }
+
+      @Override
+      public GetClusterNodesResponse getClusterNodes(
+          GetClusterNodesRequest request)
+          throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // create GetClusterNodesResponse with fake ClusterNodeLists
+        GetClusterNodesResponse response =
+            GetClusterNodesResponse.newInstance(createFakeNodeReports());
+        return response;
+      }
+
+      @Override
+      public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+          throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // return fake QueueInfo
+        return GetQueueInfoResponse.newInstance(createFakeQueueInfo());
+      }
+
+      @Override
+      public GetQueueUserAclsInfoResponse getQueueUserAcls(
+          GetQueueUserAclsInfoRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // return fake queueUserAcls
+        return GetQueueUserAclsInfoResponse
+            .newInstance(createFakeQueueUserACLInfoList());
+      }
+
+      @Override
+      public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+          GetApplicationAttemptReportRequest request) throws YarnException,
+          IOException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // return fake ApplicationAttemptReport
+        return GetApplicationAttemptReportResponse
+            .newInstance(createFakeApplicationAttemptReport());
+      }
+
+      @Override
+      public GetApplicationAttemptsResponse getApplicationAttempts(
+          GetApplicationAttemptsRequest request) throws YarnException,
+          IOException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // return fake ApplicationAttemptReports
+        return GetApplicationAttemptsResponse
+            .newInstance(createFakeApplicationAttemptReports());
+      }
+
+      @Override
+      public GetContainerReportResponse getContainerReport(
+          GetContainerReportRequest request) throws YarnException,
+              IOException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // return fake containerReport
+        return GetContainerReportResponse
+            .newInstance(createFakeContainerReport());
+      }
+
+      @Override
+      public GetContainersResponse getContainers(GetContainersRequest request)
+          throws YarnException, IOException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        // return fake ContainerReports
+        return GetContainersResponse.newInstance(createFakeContainerReports());
+      }
+
+      @Override
+      public SubmitApplicationResponse submitApplication(
+          SubmitApplicationRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        return super.submitApplication(request);
+      }
+
+      @Override
+      public KillApplicationResponse forceKillApplication(
+          KillApplicationRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        return KillApplicationResponse.newInstance(true);
+      }
+
+      @Override
+      public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+          MoveApplicationAcrossQueuesRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        return Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
+      }
+
+      @Override
+      public GetDelegationTokenResponse getDelegationToken(
+          GetDelegationTokenRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        return GetDelegationTokenResponse.newInstance(createFakeToken());
+      }
+
+      @Override
+      public RenewDelegationTokenResponse renewDelegationToken(
+          RenewDelegationTokenRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        return RenewDelegationTokenResponse
+            .newInstance(createNextExpirationTime());
+      }
+
+      @Override
+      public CancelDelegationTokenResponse cancelDelegationToken(
+          CancelDelegationTokenRequest request) throws YarnException {
+        resetStartFailoverFlag(true);
+
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+
+        return CancelDelegationTokenResponse.newInstance();
+      }
+    }
+
+    public ApplicationReport createFakeAppReport() {
+      ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+      ApplicationAttemptId attemptId =
+          ApplicationAttemptId.newInstance(appId, 1);
+      // create a fake application report
+      ApplicationReport report =
+          ApplicationReport.newInstance(appId, attemptId, "fakeUser",
+              "fakeQueue", "fakeApplicationName", "localhost", 0, null,
+              YarnApplicationState.FAILED, "fake an application report", "",
+              1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
+              "fakeApplicationType", null);
+      return report;
+    }
+
+    public List<ApplicationReport> createFakeAppReports() {
+      List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
+      reports.add(createFakeAppReport());
+      return reports;
+    }
+
+    public ApplicationId createFakeAppId() {
+      return ApplicationId.newInstance(1000l, 1);
+    }
+
+    public ApplicationAttemptId createFakeApplicationAttemptId() {
+      return ApplicationAttemptId.newInstance(createFakeAppId(), 0);
+    }
+
+    public ContainerId createFakeContainerId() {
+      return ContainerId.newInstance(createFakeApplicationAttemptId(), 0);
+    }
+
+    public YarnClusterMetrics createFakeYarnClusterMetrics() {
+      return YarnClusterMetrics.newInstance(1);
+    }
+
+    public List<NodeReport> createFakeNodeReports() {
+      NodeId nodeId = NodeId.newInstance("localhost", 0);
+      NodeReport report =
+          NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
+              "rack1", null, null, 4, null, 1000l);
+      List<NodeReport> reports = new ArrayList<NodeReport>();
+      reports.add(report);
+      return reports;
+    }
+
+    public QueueInfo createFakeQueueInfo() {
+      return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
+          createFakeAppReports(), QueueState.RUNNING);
+    }
+
+    public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
+      List<QueueACL> queueACL = new ArrayList<QueueACL>();
+      queueACL.add(QueueACL.SUBMIT_APPLICATIONS);
+      QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL);
+      List<QueueUserACLInfo> infos = new ArrayList<QueueUserACLInfo>();
+      infos.add(info);
+      return infos;
+    }
+
+    public ApplicationAttemptReport createFakeApplicationAttemptReport() {
+      return ApplicationAttemptReport.newInstance(
+          createFakeApplicationAttemptId(), "localhost", 0, "", "",
+          YarnApplicationAttemptState.RUNNING, createFakeContainerId());
+    }
+
+    public List<ApplicationAttemptReport>
+        createFakeApplicationAttemptReports() {
+      List<ApplicationAttemptReport> reports =
+          new ArrayList<ApplicationAttemptReport>();
+      reports.add(createFakeApplicationAttemptReport());
+      return reports;
+    }
+
+    public ContainerReport createFakeContainerReport() {
+      return ContainerReport.newInstance(createFakeContainerId(), null,
+          NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
+          ContainerState.COMPLETE);
+    }
+
+    public List<ContainerReport> createFakeContainerReports() {
+      List<ContainerReport> reports =
+          new ArrayList<ContainerReport>();
+      reports.add(createFakeContainerReport());
+      return reports;
+    }
+
+    public Token createFakeToken() {
+      String identifier = "fake Token";
+      String password = "fake token passwd";
+      Token token = Token.newInstance(
+          identifier.getBytes(), " ", password.getBytes(), " ");
+      return token;
+    }
+
+    public long createNextExpirationTime() {
+      return "fake Token".getBytes().length;
+    }
+
+    private class CustomedResourceTrackerService extends
+        ResourceTrackerService {
+      public CustomedResourceTrackerService(RMContext rmContext,
+          NodesListManager nodesListManager,
+          NMLivelinessMonitor nmLivelinessMonitor,
+          RMContainerTokenSecretManager containerTokenSecretManager,
+          NMTokenSecretManagerInRM nmTokenSecretManager) {
+        super(rmContext, nodesListManager, nmLivelinessMonitor,
+            containerTokenSecretManager, nmTokenSecretManager);
+      }
+
+      @Override
+      public RegisterNodeManagerResponse registerNodeManager(
+          RegisterNodeManagerRequest request) throws YarnException,
+          IOException {
+        resetStartFailoverFlag(true);
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+        return super.registerNodeManager(request);
+      }
+
+      @Override
+      public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+          throws YarnException, IOException {
+        resetStartFailoverFlag(true);
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+        return super.nodeHeartbeat(request);
+      }
+    }
+  }
+}

+ 211 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java

@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
+  private YarnClient client = null;
+
+  @Before
+  public void initiate() throws Exception {
+    startHACluster(1, true, false);
+    Configuration conf = new YarnConfiguration(this.conf);
+    client = createAndStartYarnClient(conf);
+  }
+
+  @After
+  public void shutDown() {
+    if (client != null) {
+      client.stop();
+    }
+  }
+
+  @Test(timeout = 15000)
+  public void testGetApplicationReportOnHA() throws Exception {
+    ApplicationReport report =
+        client.getApplicationReport(cluster.createFakeAppId());
+    Assert.assertTrue(report != null);
+    Assert.assertEquals(cluster.createFakeAppReport(), report);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetNewApplicationOnHA() throws Exception {
+    ApplicationId appId =
+        client.createApplication().getApplicationSubmissionContext()
+            .getApplicationId();
+    Assert.assertTrue(appId != null);
+    Assert.assertEquals(cluster.createFakeAppId(), appId);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetClusterMetricsOnHA() throws Exception {
+    YarnClusterMetrics clusterMetrics =
+        client.getYarnClusterMetrics();
+    Assert.assertTrue(clusterMetrics != null);
+    Assert.assertEquals(cluster.createFakeYarnClusterMetrics(),
+        clusterMetrics);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetApplicationsOnHA() throws Exception {
+    List<ApplicationReport> reports =
+        client.getApplications();
+    Assert.assertTrue(reports != null && !reports.isEmpty());
+    Assert.assertEquals(cluster.createFakeAppReports(),
+        reports);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetClusterNodesOnHA() throws Exception {
+    List<NodeReport> reports = client.getNodeReports(NodeState.RUNNING);
+    Assert.assertTrue(reports != null && !reports.isEmpty());
+    Assert.assertEquals(cluster.createFakeNodeReports(),
+        reports);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetQueueInfoOnHA() throws Exception {
+    QueueInfo queueInfo = client.getQueueInfo("root");
+    Assert.assertTrue(queueInfo != null);
+    Assert.assertEquals(cluster.createFakeQueueInfo(),
+        queueInfo);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetQueueUserAclsOnHA() throws Exception {
+    List<QueueUserACLInfo> queueUserAclsList = client.getQueueAclsInfo();
+    Assert.assertTrue(queueUserAclsList != null
+        && !queueUserAclsList.isEmpty());
+    Assert.assertEquals(cluster.createFakeQueueUserACLInfoList(),
+        queueUserAclsList);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetApplicationAttemptReportOnHA() throws Exception {
+    ApplicationAttemptReport report =
+        client.getApplicationAttemptReport(cluster
+            .createFakeApplicationAttemptId());
+    Assert.assertTrue(report != null);
+    Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetApplicationAttemptsOnHA() throws Exception {
+    List<ApplicationAttemptReport> reports =
+        client.getApplicationAttempts(cluster.createFakeAppId());
+    Assert.assertTrue(reports != null && !reports.isEmpty());
+    Assert.assertEquals(cluster.createFakeApplicationAttemptReports(),
+        reports);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetContainerReportOnHA() throws Exception {
+    ContainerReport report =
+        client.getContainerReport(cluster.createFakeContainerId());
+    Assert.assertTrue(report != null);
+    Assert.assertEquals(cluster.createFakeContainerReport(), report);
+  }
+
+  @Test(timeout = 15000)
+  public void testGetContainersOnHA() throws Exception {
+    List<ContainerReport> reports =
+        client.getContainers(cluster.createFakeApplicationAttemptId());
+    Assert.assertTrue(reports != null && !reports.isEmpty());
+    Assert.assertEquals(cluster.createFakeContainerReports(),
+        reports);
+  }
+
+  @Test(timeout = 15000)
+  public void testSubmitApplicationOnHA() throws Exception {
+    ApplicationSubmissionContext appContext =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(cluster.createFakeAppId());
+    ContainerLaunchContext amContainer =
+        Records.newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(10);
+    capability.setVirtualCores(1);
+    appContext.setResource(capability);
+    ApplicationId appId = client.submitApplication(appContext);
+    Assert.assertTrue(getActiveRM().getRMContext().getRMApps()
+        .containsKey(appId));
+  }
+
+  @Test(timeout = 15000)
+  public void testMoveApplicationAcrossQueuesOnHA() throws Exception{
+    client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root");
+  }
+
+  @Test(timeout = 15000)
+  public void testForceKillApplicationOnHA() throws Exception {
+    client.killApplication(cluster.createFakeAppId());
+  }
+
+  @Test(timeout = 15000)
+  public void testGetDelegationTokenOnHA() throws Exception {
+    Token token = client.getRMDelegationToken(new Text(" "));
+    Assert.assertEquals(token, cluster.createFakeToken());
+  }
+
+  @Test(timeout = 15000)
+  public void testRenewDelegationTokenOnHA() throws Exception {
+    RenewDelegationTokenRequest request =
+        RenewDelegationTokenRequest.newInstance(cluster.createFakeToken());
+    long newExpirationTime =
+        ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
+            .renewDelegationToken(request).getNextExpirationTime();
+    Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime());
+  }
+
+  @Test(timeout = 15000)
+  public void testCancelDelegationTokenOnHA() throws Exception {
+    CancelDelegationTokenRequest request =
+        CancelDelegationTokenRequest.newInstance(cluster.createFakeToken());
+    ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
+        .cancelDelegationToken(request);
+  }
+}

+ 91 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java

@@ -0,0 +1,91 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestResourceTrackerOnHA extends ProtocolHATestBase{
+
+  private ResourceTracker resourceTracker = null;
+
+  @Before
+  public void initiate() throws Exception {
+    startHACluster(0, false, true);
+    this.resourceTracker = getRMClient();
+  }
+
+  @After
+  public void shutDown() {
+    if(this.resourceTracker != null) {
+      RPC.stopProxy(this.resourceTracker);
+    }
+  }
+
+  @Test(timeout = 15000)
+  public void testResourceTrackerOnHA() throws Exception {
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+    Resource resource = Resource.newInstance(2048, 4);
+
+    // make sure registerNodeManager works when failover happens
+    RegisterNodeManagerRequest request =
+        RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
+            YarnVersionInfo.getVersion(), null);
+    resourceTracker.registerNodeManager(request);
+    Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
+
+    // restart the failover thread, and make sure nodeHeartbeat works
+    failoverThread = createAndStartFailoverThread();
+    NodeStatus status =
+        NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
+            null, null);
+    NodeHeartbeatRequest request2 =
+        NodeHeartbeatRequest.newInstance(status, null, null);
+    resourceTracker.nodeHeartbeat(request2);
+  }
+
+  private ResourceTracker getRMClient() throws IOException {
+    return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class);
+  }
+
+  private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId)
+      throws Exception {
+    for (int i = 0; i < timeout / 100; i++) {
+      if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) {
+        return true;
+      }
+      Thread.sleep(100);
+    }
+    return false;
+  }
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.api;
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -27,10 +29,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
 
 public interface ResourceTracker {
   
+  @Idempotent
   public RegisterNodeManagerResponse registerNodeManager(
       RegisterNodeManagerRequest request) throws YarnException,
       IOException;
 
+  @AtMostOnce
   public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
       throws YarnException, IOException;
 

+ 4 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -504,16 +504,11 @@ public class ClientRMService extends AbstractService implements
       throw RPCUtil.getRemoteException(ie);
     }
 
-    // Though duplication will checked again when app is put into rmContext,
-    // but it is good to fail the invalid submission as early as possible.
+    // Check whether app has already been put into rmContext,
+    // If it is, simply return the response
     if (rmContext.getRMApps().get(applicationId) != null) {
-      String message = "Application with id " + applicationId +
-          " is already present! Cannot add a duplicate!";
-      LOG.warn(message);
-      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
-          message, "ClientRMService", "Exception in submitting application",
-          applicationId);
-      throw RPCUtil.getRemoteException(message);
+      LOG.info("This is an earlier submitted application: " + applicationId);
+      return SubmitApplicationResponse.newInstance();
     }
 
     if (submissionContext.getQueue() == null) {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -718,7 +718,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
       
       // TODO: Write out change to state store (YARN-1558)
-      
+      // Also take care of RM failover
       moveEvent.getResult().set(null);
     }
   }

+ 1 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -589,10 +589,8 @@ public class TestClientRMService {
     // duplicate appId
     try {
       rmService.submitApplication(submitRequest2);
-      Assert.fail("Exception is expected.");
     } catch (YarnException e) {
-      Assert.assertTrue("The thrown exception is not expected.",
-          e.getMessage().contains("Cannot add a duplicate!"));
+      Assert.fail("Exception is not expected.");
     }
 
     GetApplicationsRequest getAllAppsRequest =

+ 85 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java

@@ -219,4 +219,89 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{
     Assert.assertEquals(appReport3.getYarnApplicationState(),
         appReport4.getYarnApplicationState());
   }
+
+  // There are two scenarios when RM failover happens
+  // during SubmitApplication Call:
+  // 1) RMStateStore already saved the ApplicationState when failover happens
+  // 2) RMStateStore did not save the ApplicationState when failover happens
+  @Test (timeout = 5000)
+  public void
+      testHandleRMHADuringSubmitApplicationCallWithSavedApplicationState()
+          throws Exception {
+    // Test scenario 1 when RM failover happens
+    // druing SubmitApplication Call:
+    // RMStateStore already saved the ApplicationState when failover happens
+    startRMs();
+
+    // Submit Application
+    // After submission, the applicationState will be saved in RMStateStore.
+    RMApp app0 = rm1.submitApp(200);
+
+    // Do the failover
+    explicitFailover();
+
+    // Since the applicationState has already been saved in RMStateStore
+    // before failover happens, the current active rm can load the previous
+    // applicationState.
+    // This RMApp should exist in the RMContext of current active RM
+    Assert.assertTrue(rm2.getRMContext().getRMApps()
+        .containsKey(app0.getApplicationId()));
+
+    // When we re-submit the application with same applicationId, it will
+    // check whether this application has been exist. If yes, just simply
+    // return submitApplicationResponse.
+    RMApp app1 =
+        rm2.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false, true, app0.getApplicationId());
+
+    Assert.assertEquals(app1.getApplicationId(), app0.getApplicationId());
+  }
+
+  @Test (timeout = 5000)
+  public void
+      testHandleRMHADuringSubmitApplicationCallWithoutSavedApplicationState()
+          throws Exception {
+    // Test scenario 2 when RM failover happens
+    // during SubmitApplication Call:
+    // RMStateStore did not save the ApplicationState when failover happens.
+    // Using customized RMAppManager.
+    startRMsWithCustomizedRMAppManager();
+
+    // Submit Application
+    // After submission, the applicationState will
+    // not be saved in RMStateStore
+    RMApp app0 =
+        rm1.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false);
+
+    // Do the failover
+    explicitFailover();
+
+    // When failover happens, the RMStateStore has not saved applicationState.
+    // The applicationState of this RMApp is lost.
+    // We should not find the RMApp in the RMContext of current active rm.
+    Assert.assertFalse(rm2.getRMContext().getRMApps()
+        .containsKey(app0.getApplicationId()));
+
+    // Submit the application with previous ApplicationId to current active RM
+    // This will mimic the similar behavior of ApplicationClientProtocol#
+    // submitApplication() when failover happens during the submission process
+    // because the submitApplication api is marked as idempotent
+    RMApp app1 =
+        rm2.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false, true, app0.getApplicationId());
+
+    verifySubmitApp(rm2, app1, app0.getApplicationId());
+    Assert.assertTrue(rm2.getRMContext().getRMApps()
+        .containsKey(app0.getApplicationId()));
+  }
 }

+ 11 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -25,7 +25,6 @@ import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -243,12 +242,7 @@ public class MiniYARNCluster extends CompositeService {
     }
 
     for (int i = 0; i < resourceManagers.length; i++) {
-      resourceManagers[i] = new ResourceManager() {
-        @Override
-        protected void doSecureLogin() throws IOException {
-          // Don't try to login using keytab in the testcases.
-        }
-      };
+      resourceManagers[i] = createResourceManager();
       if (!useFixedPorts) {
         if (HAUtil.isHAEnabled(conf)) {
           setHARMConfiguration(i, conf);
@@ -676,7 +670,7 @@ public class MiniYARNCluster extends CompositeService {
     }
     return false;
   }
-  
+
   private class ApplicationHistoryServerWrapper extends AbstractService {
     public ApplicationHistoryServerWrapper() {
       super(ApplicationHistoryServerWrapper.class.getName());
@@ -736,4 +730,13 @@ public class MiniYARNCluster extends CompositeService {
   public ApplicationHistoryServer getApplicationHistoryServer() {
     return this.appHistoryServer;
   }
+
+  protected ResourceManager createResourceManager() {
+    return new ResourceManager(){
+      @Override
+      protected void doSecureLogin() throws IOException {
+        // Don't try to login using keytab in the testcases.
+      }
+    };
+  }
 }