浏览代码

YARN-2037. Add work preserving restart support for Unmanaged AMs. (Botong Huang via Subru)

Subru Krishnan 7 年之前
父节点
当前提交
7836a6b59a

+ 13 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java

@@ -55,27 +55,32 @@ public interface ApplicationMasterProtocol {
    * The interface used by a new <code>ApplicationMaster</code> to register with
    * the <code>ResourceManager</code>.
    * </p>
-   * 
+   *
    * <p>
    * The <code>ApplicationMaster</code> needs to provide details such as RPC
    * Port, HTTP tracking url etc. as specified in
    * {@link RegisterApplicationMasterRequest}.
    * </p>
-   * 
+   *
    * <p>
    * The <code>ResourceManager</code> responds with critical details such as
    * maximum resource capabilities in the cluster as specified in
    * {@link RegisterApplicationMasterResponse}.
    * </p>
-   * 
-   * @param request
-   *          registration request
+   *
+   * <p>
+   * Re-register is only allowed for <code>Unmanaged Application Master</code>
+   * (UAM) HA, with
+   * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#getKeepContainersAcrossApplicationAttempts()}
+   * set to true.
+   * </p>
+   *
+   * @param request registration request
    * @return registration respose
    * @throws YarnException
    * @throws IOException
-   * @throws InvalidApplicationMasterRequestException
-   *           The exception is thrown when an ApplicationMaster tries to
-   *           register more then once.
+   * @throws InvalidApplicationMasterRequestException The exception is thrown
+   *           when an ApplicationMaster tries to register more then once.
    * @see RegisterApplicationMasterRequest
    * @see RegisterApplicationMasterResponse
    */

+ 10 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java

@@ -395,15 +395,18 @@ public abstract class ApplicationSubmissionContext {
    * Set the flag which indicates whether to keep containers across application
    * attempts.
    * <p>
-   * If the flag is true, running containers will not be killed when application
-   * attempt fails and these containers will be retrieved by the new application
-   * attempt on registration via
+   * For managed AM, if the flag is true, running containers will not be killed
+   * when application attempt fails and these containers will be retrieved by
+   * the new application attempt on registration via
    * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
    * </p>
-   * 
-   * @param keepContainers
-   *          the flag which indicates whether to keep containers across
-   *          application attempts.
+   * <p>
+   * For unmanaged AM, if the flag is true, RM allows re-register and returns
+   * the running containers in the same attempt back to the UAM for HA.
+   * </p>
+   *
+   * @param keepContainers the flag which indicates whether to keep containers
+   *          across application attempts.
    */
   @Public
   @Stable

+ 16 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -66,8 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
-    .AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
@@ -214,14 +214,20 @@ public class ApplicationMasterService extends AbstractService implements
     synchronized (lock) {
       AllocateResponse lastResponse = lock.getAllocateResponse();
       if (hasApplicationMasterRegistered(applicationAttemptId)) {
-        String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
-        LOG.warn(message);
-        RMAuditLogger.logFailure(
-          this.rmContext.getRMApps()
-            .get(appID).getUser(),
-          AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
-          appID, applicationAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        // allow UAM re-register if work preservation is enabled
+        ApplicationSubmissionContext appContext =
+            rmContext.getRMApps().get(appID).getApplicationSubmissionContext();
+        if (!(appContext.getUnmanagedAM()
+            && appContext.getKeepContainersAcrossApplicationAttempts())) {
+          String message =
+              AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
+          LOG.warn(message);
+          RMAuditLogger.logFailure(
+              this.rmContext.getRMApps().get(appID).getUser(),
+              AuditConstants.REGISTER_AM, "", "ApplicationMasterService",
+              message, appID, applicationAttemptId);
+          throw new InvalidApplicationMasterRequestException(message);
+        }
       }
 
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java

@@ -143,6 +143,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
           .getTransferredContainers(applicationAttemptId);
       if (!transferredContainers.isEmpty()) {
         response.setContainersFromPreviousAttempts(transferredContainers);
+        // Clear the node set remembered by the secret manager. Necessary
+        // for UAM restart because we use the same attemptId.
+        rmContext.getNMTokenSecretManager()
+            .clearNodeSetForAttempt(applicationAttemptId);
+
         List<NMToken> nmTokens = new ArrayList<NMToken>();
         for (Container container : transferredContainers) {
           try {

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -363,7 +363,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
        // Transitions from RUNNING State
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
-          RMAppAttemptEventType.LAUNCHED)
+          EnumSet.of(
+              RMAppAttemptEventType.LAUNCHED,
+              // Valid only for UAM restart
+              RMAppAttemptEventType.REGISTERED))
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
@@ -1242,7 +1245,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
         if (appAttempt.submissionContext
             .getKeepContainersAcrossApplicationAttempts()
-            && !appAttempt.submissionContext.getUnmanagedAM()
             && rmApp.getCurrentAppAttempt() != appAttempt) {
           appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
         }

+ 7 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -202,17 +202,18 @@ public abstract class AbstractYarnScheduler
     ApplicationId appId = currentAttempt.getApplicationId();
     SchedulerApplication<T> app = applications.get(appId);
     List<Container> containerList = new ArrayList<Container>();
-    RMApp appImpl = this.rmContext.getRMApps().get(appId);
-    if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
-      return containerList;
-    }
     if (app == null) {
       return containerList;
     }
     Collection<RMContainer> liveContainers =
         app.getCurrentAppAttempt().getLiveContainers();
-    ContainerId amContainerId = rmContext.getRMApps().get(appId)
-        .getCurrentAppAttempt().getMasterContainer().getId();
+    ContainerId amContainerId = null;
+    // For UAM, amContainer would be null
+    if (rmContext.getRMApps().get(appId).getCurrentAppAttempt()
+        .getMasterContainer() != null) {
+      amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt()
+          .getMasterContainer().getId();
+    }
     for (RMContainer rmContainer : liveContainers) {
       if (!rmContainer.getContainerId().equals(amContainerId)) {
         containerList.add(rmContainer.getContainer());

+ 159 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test UAM handling in RM.
+ */
+public class TestWorkPreservingUnmanagedAM
+    extends ParameterizedSchedulerTestBase {
+
+  private YarnConfiguration conf;
+
+  public TestWorkPreservingUnmanagedAM() throws IOException {
+    super();
+  }
+
+  @Before
+  public void setup() {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    conf = getConf();
+    UserGroupInformation.setConfiguration(conf);
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  /**
+   * Test UAM work preserving restart. When the keepContainersAcrossAttempt flag
+   * is on, we allow UAM to directly register again and move on without getting
+   * the applicationAlreadyRegistered exception.
+   */
+  protected void testUAMRestart(boolean keepContainers) throws Exception {
+    // start RM
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNM nm =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm.registerNode();
+
+    // create app and launch the UAM
+    boolean unamanged = true;
+    int maxAttempts = 1;
+    boolean waitForAccepted = true;
+    RMApp app = rm.submitApp(200, "",
+        UserGroupInformation.getCurrentUser().getShortUserName(), null,
+        unamanged, null, maxAttempts, null, null, waitForAccepted,
+        keepContainers);
+
+    MockAM am = MockRM.launchUAM(app, rm, nm);
+
+    // Register for the first time
+    am.registerAppAttempt();
+
+    // Allocate two containers to UAM
+    int numContainers = 3;
+    List<Container> conts = am.allocate("127.0.0.1", 1000, numContainers,
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() < numContainers) {
+      nm.nodeHeartbeat(true);
+      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(100);
+    }
+
+    // Release one container
+    List<ContainerId> releaseList =
+        Collections.singletonList(conts.get(0).getId());
+    List<ContainerStatus> finishedConts =
+        am.allocate(new ArrayList<ResourceRequest>(), releaseList)
+            .getCompletedContainersStatuses();
+    while (finishedConts.size() < releaseList.size()) {
+      nm.nodeHeartbeat(true);
+      finishedConts
+          .addAll(am
+              .allocate(new ArrayList<ResourceRequest>(),
+                  new ArrayList<ContainerId>())
+              .getCompletedContainersStatuses());
+      Thread.sleep(100);
+    }
+
+    // Register for the second time
+    RegisterApplicationMasterResponse response = null;
+    try {
+      response = am.registerAppAttempt(false);
+    } catch (InvalidApplicationMasterRequestException e) {
+      Assert.assertEquals(false, keepContainers);
+      return;
+    }
+    Assert.assertEquals("RM should not allow second register"
+        + " for UAM without keep container flag ", true, keepContainers);
+
+    // Expecting the two running containers previously
+    Assert.assertEquals(2, response.getContainersFromPreviousAttempts().size());
+    Assert.assertEquals(1, response.getNMTokensFromPreviousAttempts().size());
+
+    // Allocate one more containers to UAM, just to be safe
+    numContainers = 1;
+    am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
+    nm.nodeHeartbeat(true);
+    conts = am.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() < numContainers) {
+      nm.nodeHeartbeat(true);
+      conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(100);
+    }
+
+    rm.stop();
+  }
+
+  @Test(timeout = 600000)
+  public void testUAMRestartKeepContainers() throws Exception {
+    testUAMRestart(true);
+  }
+
+  @Test(timeout = 600000)
+  public void testUAMRestartNoKeepContainers() throws Exception {
+    testUAMRestart(false);
+  }
+
+}