浏览代码

YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager after the submitApplication call goes through. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1576160 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1576163 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 年之前
父节点
当前提交
bc32cc6e8a

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -234,6 +234,9 @@ Release 2.4.0 - UNRELEASED
     after getting an application-ID but before submission and can still submit to
     after getting an application-ID but before submission and can still submit to
     the newly active RM with no issues. (Xuan Gong via vinodkv)
     the newly active RM with no issues. (Xuan Gong via vinodkv)
 
 
+    YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager
+    after the submitApplication call goes through. (Xuan Gong via vinodkv)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 14 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 
 
 /**
 /**
  * <p>The protocol between clients and the <code>ResourceManager</code>
  * <p>The protocol between clients and the <code>ResourceManager</code>
@@ -107,7 +110,16 @@ public interface ApplicationClientProtocol {
    * {@link SubmitApplicationResponse} on accepting the submission and throws 
    * {@link SubmitApplicationResponse} on accepting the submission and throws 
    * an exception if it rejects the submission. However, this call needs to be
    * an exception if it rejects the submission. However, this call needs to be
    * followed by {@link #getApplicationReport(GetApplicationReportRequest)}
    * followed by {@link #getApplicationReport(GetApplicationReportRequest)}
-   * to make sure that the application gets properly submitted.</p>
+   * to make sure that the application gets properly submitted - obtaining a
+   * {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee
+   * that RM 'remembers' this application beyond failover or restart. If RM
+   * failover or RM restart happens before ResourceManager saves the
+   * application's state successfully, the subsequent
+   * {@link #getApplicationReport(GetApplicationReportRequest)} will throw
+   * a {@link ApplicationNotFoundException}. The Clients need to re-submit
+   * the application with the same {@link ApplicationSubmissionContext} when
+   * it encounters the {@link ApplicationNotFoundException} on the
+   * {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
    * 
    * 
    * <p> In secure mode,the <code>ResourceManager</code> verifies access to
    * <p> In secure mode,the <code>ResourceManager</code> verifies access to
    * queues etc. before accepting the application submission.</p>
    * queues etc. before accepting the application submission.</p>
@@ -186,6 +198,7 @@ public interface ApplicationClientProtocol {
    */
    */
   @Public
   @Public
   @Stable
   @Stable
+  @Idempotent
   public GetApplicationReportResponse getApplicationReport(
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) 
       GetApplicationReportRequest request) 
   throws YarnException, IOException;
   throws YarnException, IOException;

+ 25 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

@@ -29,6 +29,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -45,7 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
 import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
-import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
 
@@ -84,16 +87,29 @@ public abstract class YarnClient extends AbstractService {
 
 
   /**
   /**
    * <p>
    * <p>
-   * Submit a new application to <code>YARN.</code> It is a blocking call, such
-   * that it will not return {@link ApplicationId} until the submitted
-   * application has been submitted and accepted by the ResourceManager.
+   * Submit a new application to <code>YARN.</code> It is a blocking call - it
+   * will not return {@link ApplicationId} until the submitted application is
+   * submitted successfully and accepted by the ResourceManager.
    * </p>
    * </p>
    * 
    * 
    * <p>
    * <p>
-   * Should provide an {@link ApplicationId} when submits a new application,
-   * otherwise, it will throw the {@link ApplicationIdNotProvidedException}
+   * Users should provide an {@link ApplicationId} as part of the parameter
+   * {@link ApplicationSubmissionContext} when submitting a new application,
+   * otherwise it will throw the {@link ApplicationIdNotProvidedException}.
    * </p>
    * </p>
    *
    *
+   * <p>This internally calls {@link ApplicationClientProtocol#submitApplication
+   * (SubmitApplicationRequest)}, and after that, it internally invokes
+   * {@link ApplicationClientProtocol#getApplicationReport
+   * (GetApplicationReportRequest)} and waits till it can make sure that the
+   * application gets properly submitted. If RM fails over or RM restart
+   * happens before ResourceManager saves the application's state,
+   * {@link ApplicationClientProtocol
+   * #getApplicationReport(GetApplicationReportRequest)} will throw
+   * the {@link ApplicationNotFoundException}. This API automatically resubmits
+   * the application with the same {@link ApplicationSubmissionContext} when it
+   * catches the {@link ApplicationNotFoundException}</p>
+   *
    * @param appContext
    * @param appContext
    *          {@link ApplicationSubmissionContext} containing all the details
    *          {@link ApplicationSubmissionContext} containing all the details
    *          needed to submit a new application
    *          needed to submit a new application
@@ -102,8 +118,9 @@ public abstract class YarnClient extends AbstractService {
    * @throws IOException
    * @throws IOException
    * @see #createApplication()
    * @see #createApplication()
    */
    */
-  public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext)
-      throws YarnException, IOException;
+  public abstract ApplicationId submitApplication(
+      ApplicationSubmissionContext appContext) throws YarnException,
+      IOException;
 
 
   /**
   /**
    * <p>
    * <p>

+ 34 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -187,35 +187,43 @@ public class YarnClientImpl extends YarnClient {
     int pollCount = 0;
     int pollCount = 0;
     long startTime = System.currentTimeMillis();
     long startTime = System.currentTimeMillis();
 
 
-    //TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
     while (true) {
     while (true) {
-      YarnApplicationState state =
-          getApplicationReport(applicationId).getYarnApplicationState();
-      if (!state.equals(YarnApplicationState.NEW) &&
-          !state.equals(YarnApplicationState.NEW_SAVING)) {
-        LOG.info("Submitted application " + applicationId);
-        break;
-      }
+      try {
+        YarnApplicationState state =
+            getApplicationReport(applicationId).getYarnApplicationState();
+        if (!state.equals(YarnApplicationState.NEW) &&
+            !state.equals(YarnApplicationState.NEW_SAVING)) {
+          LOG.info("Submitted application " + applicationId);
+          break;
+        }
 
 
-      long elapsedMillis = System.currentTimeMillis() - startTime;
-      if (enforceAsyncAPITimeout() &&
-          elapsedMillis >= asyncApiPollTimeoutMillis) {
-        throw new YarnException("Timed out while waiting for application " +
-          applicationId + " to be submitted successfully");
-      }
+        long elapsedMillis = System.currentTimeMillis() - startTime;
+        if (enforceAsyncAPITimeout() &&
+            elapsedMillis >= asyncApiPollTimeoutMillis) {
+          throw new YarnException("Timed out while waiting for application " +
+              applicationId + " to be submitted successfully");
+        }
 
 
-      // Notify the client through the log every 10 poll, in case the client
-      // is blocked here too long.
-      if (++pollCount % 10 == 0) {
-        LOG.info("Application submission is not finished, " +
-            "submitted application " + applicationId +
-            " is still in " + state);
-      }
-      try {
-        Thread.sleep(submitPollIntervalMillis);
-      } catch (InterruptedException ie) {
-        LOG.error("Interrupted while waiting for application " + applicationId
-            + " to be successfully submitted.");
+        // Notify the client through the log every 10 poll, in case the client
+        // is blocked here too long.
+        if (++pollCount % 10 == 0) {
+          LOG.info("Application submission is not finished, " +
+              "submitted application " + applicationId +
+              " is still in " + state);
+        }
+        try {
+          Thread.sleep(submitPollIntervalMillis);
+        } catch (InterruptedException ie) {
+          LOG.error("Interrupted while waiting for application "
+              + applicationId
+              + " to be successfully submitted.");
+        }
+      } catch (ApplicationNotFoundException ex) {
+        // FailOver or RM restart happens before RMStateStore saves
+        // ApplicationState
+        LOG.info("Re-submit application " + applicationId + "with the " +
+            "same ApplicationSubmissionContext");
+        rmClient.submitApplication(request);
       }
       }
     }
     }
 
 

+ 137 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java

@@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -80,10 +82,141 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{
       count++;
       count++;
     }
     }
     // Verify submittion is successful
     // Verify submittion is successful
-    Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
-        .getYarnApplicationState() == YarnApplicationState.NEW);
-    Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
-        .getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
+    YarnApplicationState state =
+        rm.getApplicationReport(app.getApplicationId())
+            .getYarnApplicationState();
+    Assert.assertTrue(state == YarnApplicationState.ACCEPTED
+        || state == YarnApplicationState.SUBMITTED);
     Assert.assertEquals(expectedAppId, app.getApplicationId());
     Assert.assertEquals(expectedAppId, app.getApplicationId());
   }
   }
+
+  // There are two scenarios when RM failover happens
+  // after SubmitApplication Call:
+  // 1) RMStateStore already saved the ApplicationState when failover happens
+  // 2) RMStateStore did not save the ApplicationState when failover happens
+
+  @Test
+  public void
+      testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState()
+          throws Exception {
+    // Test scenario 1 when RM failover happens
+    // after SubmitApplication Call:
+    // RMStateStore already saved the ApplicationState when failover happens
+    startRMs();
+
+    // Submit Application
+    // After submission, the applicationState will be saved in RMStateStore.
+    RMApp app0 = rm1.submitApp(200);
+
+    // Do the failover
+    explicitFailover();
+
+    // Since the applicationState has already been saved in RMStateStore
+    // before failover happens, the current active rm can load the previous
+    // applicationState.
+    ApplicationReport appReport =
+        rm2.getApplicationReport(app0.getApplicationId());
+
+    // verify previous submission is successful.
+    Assert.assertTrue(appReport.getYarnApplicationState()
+        == YarnApplicationState.ACCEPTED ||
+        appReport.getYarnApplicationState()
+        == YarnApplicationState.SUBMITTED);
+  }
+
+  @Test
+  public void
+      testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState()
+          throws Exception {
+    // Test scenario 2 when RM failover happens
+    // after SubmitApplication Call:
+    // RMStateStore did not save the ApplicationState when failover happens.
+    // Using customized RMAppManager.
+    startRMsWithCustomizedRMAppManager();
+
+    // Submit Application
+    // After submission, the applicationState will
+    // not be saved in RMStateStore
+    RMApp app0 =
+        rm1.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false);
+
+    // Do the failover
+    explicitFailover();
+
+    // Since the applicationState is not saved in RMStateStore
+    // when failover happens. The current active RM can not load
+    // previous applicationState.
+    // Expect ApplicationNotFoundException by calling getApplicationReport().
+    try {
+      rm2.getApplicationReport(app0.getApplicationId());
+      Assert.fail("Should get ApplicationNotFoundException here");
+    } catch (ApplicationNotFoundException ex) {
+      // expected ApplicationNotFoundException
+    }
+
+    // Submit the application with previous ApplicationId to current active RM
+    // This will mimic the similar behavior of YarnClient which will re-submit
+    // Application with previous applicationId
+    // when catches the ApplicationNotFoundException
+    RMApp app1 =
+        rm2.submitApp(200, "", UserGroupInformation
+            .getCurrentUser().getShortUserName(), null, false, null,
+            configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+            false, false, true, app0.getApplicationId());
+
+    verifySubmitApp(rm2, app1, app0.getApplicationId());
+  }
+
+  /**
+   * Test multiple calls of getApplicationReport, to make sure
+   * it is idempotent
+   */
+  @Test
+  public void testGetApplicationReportIdempotent() throws Exception{
+    // start two RMs, and transit rm1 to active, rm2 to standby
+    startRMs();
+
+    // Submit Application
+    // After submission, the applicationState will be saved in RMStateStore.
+    RMApp app = rm1.submitApp(200);
+
+    ApplicationReport appReport1 =
+        rm1.getApplicationReport(app.getApplicationId());
+    Assert.assertTrue(appReport1.getYarnApplicationState() ==
+        YarnApplicationState.ACCEPTED ||
+        appReport1.getYarnApplicationState() ==
+        YarnApplicationState.SUBMITTED);
+
+    // call getApplicationReport again
+    ApplicationReport appReport2 =
+        rm1.getApplicationReport(app.getApplicationId());
+    Assert.assertEquals(appReport1.getApplicationId(),
+        appReport2.getApplicationId());
+    Assert.assertEquals(appReport1.getYarnApplicationState(),
+        appReport2.getYarnApplicationState());
+
+    // Do the failover
+    explicitFailover();
+
+    // call getApplicationReport
+    ApplicationReport appReport3 =
+        rm2.getApplicationReport(app.getApplicationId());
+    Assert.assertEquals(appReport1.getApplicationId(),
+        appReport3.getApplicationId());
+    Assert.assertEquals(appReport1.getYarnApplicationState(),
+        appReport3.getYarnApplicationState());
+
+    // call getApplicationReport again
+    ApplicationReport appReport4 =
+        rm2.getApplicationReport(app.getApplicationId());
+    Assert.assertEquals(appReport3.getApplicationId(),
+        appReport4.getApplicationId());
+    Assert.assertEquals(appReport3.getYarnApplicationState(),
+        appReport4.getYarnApplicationState());
+  }
 }
 }