Переглянути джерело

YARN-8142. Improve SIGTERM handling for YARN Service Application Master.
Contributed by Billie Rinaldi

(cherry picked from commit 9031a76d447f0c5eaa392144fd17c5b9812e1b20)

Eric Yang 7 роки тому
батько
коміт
148322ca72

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java

@@ -125,6 +125,7 @@ public class ClientAMService extends AbstractService
     LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
     context.scheduler.getDiagnostics()
         .append("Stopped by user " + UserGroupInformation.getCurrentUser());
+    context.scheduler.setGracefulStop();
 
     // Stop the service in 2 seconds delay to make sure this rpc call is completed.
     // shutdown hook will be executed which will stop AM gracefully.

+ 27 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java

@@ -156,6 +156,8 @@ public class ServiceScheduler extends CompositeService {
   // requests for a single service is not recommended.
   private boolean hasAtLeastOnePlacementConstraint;
 
+  private boolean gracefulStop = false;
+
   public ServiceScheduler(ServiceContext context) {
     super(context.service.getName());
     this.context = context;
@@ -199,6 +201,7 @@ public class ServiceScheduler extends CompositeService {
     addIfService(amRMClient);
 
     nmClient = createNMClient();
+    nmClient.getClient().cleanupRunningContainersOnStop(false);
     addIfService(nmClient);
 
     dispatcher = new AsyncDispatcher("Component  dispatcher");
@@ -252,6 +255,11 @@ public class ServiceScheduler extends CompositeService {
         .createAMRMClientAsync(1000, new AMRMClientCallback());
   }
 
+  protected void setGracefulStop() {
+    this.gracefulStop = true;
+    nmClient.getClient().cleanupRunningContainersOnStop(true);
+  }
+
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     try {
@@ -266,26 +274,31 @@ public class ServiceScheduler extends CompositeService {
   public void serviceStop() throws Exception {
     LOG.info("Stopping service scheduler");
 
-    // Mark component-instances/containers as STOPPED
-    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-      for (ContainerId containerId : getLiveInstances().keySet()) {
-        serviceTimelinePublisher.componentInstanceFinished(containerId,
-            KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
-      }
-    }
     if (executorService != null) {
       executorService.shutdownNow();
     }
 
     DefaultMetricsSystem.shutdown();
-    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-      serviceTimelinePublisher
-          .serviceAttemptUnregistered(context, diagnostics.toString());
+
+    // only stop the entire service when a graceful stop has been initiated
+    // (e.g. via client RPC, not through the AM receiving a SIGTERM)
+    if (gracefulStop) {
+      if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+        // mark component-instances/containers as STOPPED
+        for (ContainerId containerId : getLiveInstances().keySet()) {
+          serviceTimelinePublisher.componentInstanceFinished(containerId,
+              KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
+        }
+        // mark attempt as unregistered
+        serviceTimelinePublisher
+            .serviceAttemptUnregistered(context, diagnostics.toString());
+      }
+      // unregister AM
+      amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
+          diagnostics.toString(), "");
+      LOG.info("Service {} unregistered with RM, with attemptId = {} " +
+          ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
     }
-    amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
-        diagnostics.toString(), "");
-    LOG.info("Service {} unregistered with RM, with attemptId = {} " +
-        ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
     super.serviceStop();
   }
 

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.registry.client.impl.zk.CuratorService;
 import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.service.api.records.Component;
@@ -305,6 +306,16 @@ public class ServiceTestUtils {
     return client;
   }
 
+  /**
+   * Creates a YarnClient for test purposes.
+   */
+  public static YarnClient createYarnClient(Configuration conf) {
+    YarnClient client = YarnClient.createYarnClient();
+    client.init(conf);
+    client.start();
+    return client;
+  }
+
   protected CuratorService getCuratorService() throws IOException {
     return curatorService;
   }

+ 71 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
 import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -489,6 +490,76 @@ public class TestYarnNativeServices extends ServiceTestUtils {
     client.actionDestroy(exampleApp.getName());
   }
 
+  @Test(timeout = 200000)
+  public void testAMSigtermDoesNotKillApplication() throws Exception {
+    runAMSignalTest(SignalContainerCommand.GRACEFUL_SHUTDOWN);
+  }
+
+  @Test(timeout = 200000)
+  public void testAMSigkillDoesNotKillApplication() throws Exception {
+    runAMSignalTest(SignalContainerCommand.FORCEFUL_SHUTDOWN);
+  }
+
+  public void runAMSignalTest(SignalContainerCommand signal) throws Exception {
+    setupInternal(NUM_NMS);
+    ServiceClient client = createClient(getConf());
+    Service exampleApp = createExampleApplication();
+    client.actionCreate(exampleApp);
+    waitForServiceToBeStable(client, exampleApp);
+    Service appStatus1 = client.getStatus(exampleApp.getName());
+    ApplicationId exampleAppId = ApplicationId.fromString(appStatus1.getId());
+
+    YarnClient yarnClient = createYarnClient(getConf());
+    ApplicationReport applicationReport = yarnClient.getApplicationReport(
+        exampleAppId);
+
+    ApplicationAttemptId firstAttemptId = applicationReport
+        .getCurrentApplicationAttemptId();
+    ApplicationAttemptReport attemptReport = yarnClient
+        .getApplicationAttemptReport(firstAttemptId);
+
+    // the AM should not perform a graceful shutdown since the operation was not
+    // initiated through the service client
+    yarnClient.signalToContainer(attemptReport.getAMContainerId(), signal);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        ApplicationReport ar = client.getYarnClient()
+            .getApplicationReport(exampleAppId);
+        YarnApplicationState state = ar.getYarnApplicationState();
+        Assert.assertTrue(state == YarnApplicationState.RUNNING ||
+            state == YarnApplicationState.ACCEPTED);
+        if (state != YarnApplicationState.RUNNING) {
+          return false;
+        }
+        if (ar.getCurrentApplicationAttemptId() == null ||
+            ar.getCurrentApplicationAttemptId().equals(firstAttemptId)) {
+          return false;
+        }
+        Service appStatus2 = client.getStatus(exampleApp.getName());
+        if (appStatus2.getState() != ServiceState.STABLE) {
+          return false;
+        }
+        Assert.assertEquals(getSortedContainerIds(appStatus1).toString(),
+            getSortedContainerIds(appStatus2).toString());
+        return true;
+      } catch (YarnException | IOException e) {
+        throw new RuntimeException("while waiting", e);
+      }
+    }, 2000, 200000);
+  }
+
+  private static List<String> getSortedContainerIds(Service s) {
+    List<String> containerIds = new ArrayList<>();
+    for (Component component : s.getComponents()) {
+      for (Container container : component.getContainers()) {
+        containerIds.add(container.getId());
+      }
+    }
+    Collections.sort(containerIds);
+    return containerIds;
+  }
+
   // Check containers launched are in dependency order
   // Get all containers into a list and sort based on container launch time e.g.
   // compa-c1, compa-c2, compb-c1, compb-c2;