Преглед на файлове

YARN-10125. [Federation] Kill application from client does not kill Unmanaged AM's and containers launched by Unmanaged AM. (#6363) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
slfan1989 преди 1 година
родител
ревизия
bc159b5a87

+ 21 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

@@ -664,6 +664,13 @@ public class FederationClientInterceptor
     try {
       LOG.info("forceKillApplication {} on SubCluster {}.", applicationId, subClusterId);
       response = clientRMProxy.forceKillApplication(request);
+      // If kill home sub-cluster application is successful,
+      // we will try to kill the same application in other sub-clusters.
+      if (response != null) {
+        ClientMethod remoteMethod = new ClientMethod("forceKillApplication",
+            new Class[]{KillApplicationRequest.class}, new Object[]{request});
+        invokeConcurrent(remoteMethod, KillApplicationResponse.class, subClusterId);
+      }
     } catch (Exception e) {
       routerMetrics.incrAppsFailedKilled();
       String msg = "Unable to kill the application report.";
@@ -834,12 +841,24 @@ public class FederationClientInterceptor
     return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
-  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
-      throws YarnException {
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz) throws YarnException {
+    // Get Active SubClusters
+    Map<SubClusterId, SubClusterInfo> subClusterInfo = federationFacade.getSubClusters(true);
+    Collection<SubClusterId> subClusterIds = subClusterInfo.keySet();
+    return invokeConcurrent(request, clazz, subClusterIds);
+  }
 
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz,
+      SubClusterId homeSubclusterId) throws YarnException {
     // Get Active SubClusters
     Map<SubClusterId, SubClusterInfo> subClusterInfo = federationFacade.getSubClusters(true);
     Collection<SubClusterId> subClusterIds = subClusterInfo.keySet();
+    subClusterIds.remove(homeSubclusterId);
+    return invokeConcurrent(request, clazz, subClusterIds);
+  }
+
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz,
+      Collection<SubClusterId> subClusterIds) throws YarnException {
 
     List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
     List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java

@@ -29,15 +29,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.Arrays;
 import java.util.Collection;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -132,6 +136,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -387,6 +392,66 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
     Assert.assertNotNull(responseKill);
   }
 
+  @Test
+  public void testForceKillApplicationAllSubClusters()
+      throws IOException, YarnException, InterruptedException, TimeoutException {
+
+    // We will design a unit test. In this unit test,
+    // we will submit the same application to all sub-clusters.
+    // Then we use interceptor kill application,
+    // the application should be cleared from all sub-clusters.
+
+    Set<SubClusterId> subClusterSet = new HashSet<>();
+    for (SubClusterId subCluster : subClusters) {
+      subClusterSet.add(subCluster);
+    }
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 2);
+    SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+    // Submit the application we are going to kill later
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    SubClusterId subClusterId = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    subClusterSet.remove(subClusterId);
+
+    for (SubClusterId subCluster : subClusterSet) {
+      LOG.info("SubCluster : {}.", subCluster);
+      ApplicationClientProtocol clientRMProxyForSubCluster =
+          interceptor.getClientRMProxyForSubCluster(subCluster);
+      clientRMProxyForSubCluster.submitApplication(request);
+    }
+
+    KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId);
+    GenericTestUtils.waitFor(() -> {
+      KillApplicationResponse responseKill;
+      try {
+        responseKill = interceptor.forceKillApplication(requestKill);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return (responseKill.getIsKillCompleted());
+    }, 100, 2000);
+
+    for (SubClusterId subCluster : subClusters) {
+      ApplicationClientProtocol clientRMProxyForSubCluster =
+          interceptor.getClientRMProxyForSubCluster(subCluster);
+      GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId);
+      GetApplicationReportResponse responseGet =
+          clientRMProxyForSubCluster.getApplicationReport(requestGet);
+      Assert.assertNotNull(responseGet);
+      ApplicationReport applicationReport = responseGet.getApplicationReport();
+      Assert.assertNotNull(applicationReport);
+      YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
+      Assert.assertNotNull(yarnApplicationState);
+      Assert.assertEquals(YarnApplicationState.KILLED, yarnApplicationState);
+    }
+  }
+
   /**
    * This test validates the correctness of ForceKillApplication in case of
    * application does not exist in StateStore.