Преглед изворни кода

YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism. (#4904)

slfan1989 пре 2 година
родитељ
комит
22bd5e3b53

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

@@ -497,4 +497,14 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public RouterRMDTSecretManagerState getRouterRMSecretManagerState() {
     return routerRMSecretManagerState;
   }
+
+  @VisibleForTesting
+  public Map<SubClusterId, SubClusterInfo> getMembership() {
+    return membership;
+  }
+
+  @VisibleForTesting
+  public void setMembership(Map<SubClusterId, SubClusterInfo> membership) {
+    this.membership = membership;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -727,7 +727,7 @@ public final class FederationStateStoreFacade {
     return stateStore;
   }
 
-  /**
+  /*
    * The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
    *
    * @param newKey Key used for generating and verifying delegation tokens

+ 76 - 117
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.yarn.server.router.clientrm;
 
-import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +40,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -661,14 +661,11 @@ public class FederationClientInterceptor
       RouterServerUtil.logAndThrowException("Missing getApplications request.", null);
     }
     long startTime = clock.getTime();
-    Map<SubClusterId, SubClusterInfo> subclusters =
-        federationFacade.getSubClusters(true);
     ClientMethod remoteMethod = new ClientMethod("getApplications",
         new Class[] {GetApplicationsRequest.class}, new Object[] {request});
-    Map<SubClusterId, GetApplicationsResponse> applications = null;
+    Collection<GetApplicationsResponse> applications = null;
     try {
-      applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
-          GetApplicationsResponse.class);
+      applications = invokeConcurrent(remoteMethod, GetApplicationsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrMultipleAppsFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex);
@@ -676,7 +673,7 @@ public class FederationClientInterceptor
     long stopTime = clock.getTime();
     routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
     // Merge the Application Reports
-    return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport);
+    return RouterYarnClientUtils.mergeApplications(applications, returnPartialReport);
   }
 
   @Override
@@ -691,8 +688,7 @@ public class FederationClientInterceptor
         new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
     Collection<GetClusterMetricsResponse> clusterMetrics = null;
     try {
-      clusterMetrics = invokeAppClientProtocolMethod(
-          true, remoteMethod, GetClusterMetricsResponse.class);
+      clusterMetrics = invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetClusterMetricsFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex);
@@ -702,67 +698,62 @@ public class FederationClientInterceptor
     return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
-  <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
-      ClientMethod request, Class<R> clazz) throws YarnException, IOException {
-    List<Callable<Object>> callables = new ArrayList<>();
-    List<Future<Object>> futures = new ArrayList<>();
-    Map<SubClusterId, IOException> exceptions = new TreeMap<>();
-    for (SubClusterId subClusterId : clusterIds) {
-      callables.add(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          ApplicationClientProtocol protocol =
-              getClientRMProxyForSubCluster(subClusterId);
-          Method method = ApplicationClientProtocol.class
-              .getMethod(request.getMethodName(), request.getTypes());
-          return method.invoke(protocol, request.getParams());
-        }
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+      throws YarnException {
+
+    // Get Active SubClusters
+    Map<SubClusterId, SubClusterInfo> subClusterInfo = federationFacade.getSubClusters(true);
+    Collection<SubClusterId> subClusterIds = subClusterInfo.keySet();
+
+    List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+    List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+    Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+    // Generate parallel Callable tasks
+    for (SubClusterId subClusterId : subClusterIds) {
+      callables.add(() -> {
+        ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
+        String methodName = request.getMethodName();
+        Class<?>[] types = request.getTypes();
+        Object[] params = request.getParams();
+        Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
+        Object result = method.invoke(protocol, params);
+        return Pair.of(subClusterId, result);
       });
     }
+
+    // Get results from multiple threads
     Map<SubClusterId, R> results = new TreeMap<>();
     try {
       futures.addAll(executorService.invokeAll(callables));
-      for (int i = 0; i < futures.size(); i++) {
-        SubClusterId subClusterId = clusterIds.get(i);
+      futures.stream().forEach(future -> {
+        SubClusterId subClusterId = null;
         try {
-          Future<Object> future = futures.get(i);
-          Object result = future.get();
+          Pair<SubClusterId, Object> pair = future.get();
+          subClusterId = pair.getKey();
+          Object result = pair.getValue();
           results.put(subClusterId, clazz.cast(result));
-        } catch (ExecutionException ex) {
-          Throwable cause = ex.getCause();
-          LOG.debug("Cannot execute {} on {}: {}", request.getMethodName(),
+        } catch (InterruptedException | ExecutionException e) {
+          Throwable cause = e.getCause();
+          LOG.error("Cannot execute {} on {}: {}", request.getMethodName(),
               subClusterId.getId(), cause.getMessage());
-          IOException ioe;
-          if (cause instanceof IOException) {
-            ioe = (IOException) cause;
-          } else if (cause instanceof YarnException) {
-            throw (YarnException) cause;
-          } else {
-            ioe = new IOException(
-                "Unhandled exception while calling " + request.getMethodName()
-                    + ": " + cause.getMessage(), cause);
-          }
-          // Store the exceptions
-          exceptions.put(subClusterId, ioe);
-        }
-      }
-      if (results.isEmpty() && !clusterIds.isEmpty()) {
-        SubClusterId subClusterId = clusterIds.get(0);
-        IOException ioe = exceptions.get(subClusterId);
-        if (ioe != null) {
-          throw ioe;
+          exceptions.put(subClusterId, e);
         }
-      }
+      });
     } catch (InterruptedException e) {
-      throw new YarnException(e);
+      throw new YarnException("invokeConcurrent Failed.", e);
+    }
+
+    // All sub-clusters return results to be considered successful,
+    // otherwise an exception will be thrown.
+    if (exceptions != null && !exceptions.isEmpty()) {
+      Set<SubClusterId> subClusterIdSets = exceptions.keySet();
+      throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
+          StringUtils.join(subClusterIdSets, ","));
     }
-    return results;
-  }
 
-  <R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
-      ClientMethod request, Class<R> clazz) throws YarnException, IOException {
-    ArrayList<SubClusterId> clusterIdList = new ArrayList<>(clusterIds);
-    return invokeConcurrent(clusterIdList, request, clazz);
+    // return result
+    return results.values();
   }
 
   @Override
@@ -773,24 +764,19 @@ public class FederationClientInterceptor
       RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null);
     }
     long startTime = clock.getTime();
-    Map<SubClusterId, SubClusterInfo> subClusters =
-        federationFacade.getSubClusters(true);
-    Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap();
-    for (SubClusterId subClusterId : subClusters.keySet()) {
-      ApplicationClientProtocol client;
-      try {
-        client = getClientRMProxyForSubCluster(subClusterId);
-        GetClusterNodesResponse response = client.getClusterNodes(request);
-        clusterNodes.put(subClusterId, response);
-      } catch (Exception ex) {
-        routerMetrics.incrClusterNodesFailedRetrieved();
-        RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
-      }
+    ClientMethod remoteMethod = new ClientMethod("getClusterNodes",
+        new Class[]{GetClusterNodesRequest.class}, new Object[]{request});
+    try {
+      Collection<GetClusterNodesResponse> clusterNodes =
+          invokeConcurrent(remoteMethod, GetClusterNodesResponse.class);
+      long stopTime = clock.getTime();
+      routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
+      return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes);
+    } catch (Exception ex) {
+      routerMetrics.incrClusterNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
     }
-    long stopTime = clock.getTime();
-    routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
-    // Merge the NodesResponse
-    return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values());
+    throw new YarnException("Unable to get cluster nodes.");
   }
 
   @Override
@@ -806,8 +792,7 @@ public class FederationClientInterceptor
         new Class[]{GetQueueInfoRequest.class}, new Object[]{request});
     Collection<GetQueueInfoResponse> queues = null;
     try {
-      queues = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetQueueInfoResponse.class);
+      queues = invokeConcurrent(remoteMethod, GetQueueInfoResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetQueueInfoFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get queue [" +
@@ -831,8 +816,7 @@ public class FederationClientInterceptor
         new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request});
     Collection<GetQueueUserAclsInfoResponse> queueUserAcls = null;
     try {
-      queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetQueueUserAclsInfoResponse.class);
+      queueUserAcls = invokeConcurrent(remoteMethod, GetQueueUserAclsInfoResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrQueueUserAclsFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex);
@@ -992,8 +976,7 @@ public class FederationClientInterceptor
         new Class[] {ReservationListRequest.class}, new Object[] {request});
     Collection<ReservationListResponse> listResponses = null;
     try {
-      listResponses = invokeAppClientProtocolMethod(true, remoteMethod,
-          ReservationListResponse.class);
+      listResponses = invokeConcurrent(remoteMethod, ReservationListResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrListReservationsFailedRetrieved();
       RouterServerUtil.logAndThrowException(
@@ -1072,24 +1055,6 @@ public class FederationClientInterceptor
     throw new YarnException(msg);
   }
 
-  private <R> Collection<R> invokeAppClientProtocolMethod(
-      Boolean filterInactiveSubClusters, ClientMethod request, Class<R> clazz)
-          throws YarnException, RuntimeException {
-    Map<SubClusterId, SubClusterInfo> subClusters =
-        federationFacade.getSubClusters(filterInactiveSubClusters);
-    return subClusters.keySet().stream().map(subClusterId -> {
-      try {
-        ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
-        Method method = ApplicationClientProtocol.class.
-            getMethod(request.getMethodName(), request.getTypes());
-        return clazz.cast(method.invoke(protocol, request.getParams()));
-      } catch (YarnException | NoSuchMethodException |
-               IllegalAccessException | InvocationTargetException ex) {
-        throw new RuntimeException(ex);
-      }
-    }).collect(Collectors.toList());
-  }
-
   @Override
   public GetNodesToLabelsResponse getNodeToLabels(
       GetNodesToLabelsRequest request) throws YarnException, IOException {
@@ -1102,8 +1067,7 @@ public class FederationClientInterceptor
          new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request});
     Collection<GetNodesToLabelsResponse> clusterNodes = null;
     try {
-      clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetNodesToLabelsResponse.class);
+      clusterNodes = invokeConcurrent(remoteMethod, GetNodesToLabelsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrNodeToLabelsFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex);
@@ -1126,8 +1090,7 @@ public class FederationClientInterceptor
          new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
     Collection<GetLabelsToNodesResponse> labelNodes = null;
     try {
-      labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetLabelsToNodesResponse.class);
+      labelNodes = invokeConcurrent(remoteMethod, GetLabelsToNodesResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrLabelsToNodesFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex);
@@ -1150,8 +1113,7 @@ public class FederationClientInterceptor
          new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
     Collection<GetClusterNodeLabelsResponse> nodeLabels = null;
     try {
-      nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetClusterNodeLabelsResponse.class);
+      nodeLabels = invokeConcurrent(remoteMethod, GetClusterNodeLabelsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrClusterNodeLabelsFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.",
@@ -1563,8 +1525,7 @@ public class FederationClientInterceptor
         new Class[] {GetAllResourceProfilesRequest.class}, new Object[] {request});
     Collection<GetAllResourceProfilesResponse> resourceProfiles = null;
     try {
-      resourceProfiles = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetAllResourceProfilesResponse.class);
+      resourceProfiles = invokeConcurrent(remoteMethod, GetAllResourceProfilesResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetResourceProfilesFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get resource profiles due to exception.",
@@ -1588,8 +1549,7 @@ public class FederationClientInterceptor
         new Class[] {GetResourceProfileRequest.class}, new Object[] {request});
     Collection<GetResourceProfileResponse> resourceProfile = null;
     try {
-      resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetResourceProfileResponse.class);
+      resourceProfile = invokeConcurrent(remoteMethod, GetResourceProfileResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetResourceProfileFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get resource profile due to exception.",
@@ -1612,8 +1572,7 @@ public class FederationClientInterceptor
         new Class[] {GetAllResourceTypeInfoRequest.class}, new Object[] {request});
     Collection<GetAllResourceTypeInfoResponse> listResourceTypeInfo;
     try {
-      listResourceTypeInfo = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetAllResourceTypeInfoResponse.class);
+      listResourceTypeInfo = invokeConcurrent(remoteMethod, GetAllResourceTypeInfoResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrResourceTypeInfoFailedRetrieved();
       LOG.error("Unable to get all resource type info node due to exception.", ex);
@@ -1644,8 +1603,8 @@ public class FederationClientInterceptor
         new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
     Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
     try {
-      attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
-          GetAttributesToNodesResponse.class);
+      attributesToNodesResponses =
+          invokeConcurrent(remoteMethod, GetAttributesToNodesResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetAttributesToNodesFailedRetrieved();
       RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
@@ -1668,7 +1627,7 @@ public class FederationClientInterceptor
         new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
     Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
     try {
-      clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+      clusterNodeAttributesResponses = invokeConcurrent(remoteMethod,
           GetClusterNodeAttributesResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
@@ -1693,7 +1652,7 @@ public class FederationClientInterceptor
         new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
     Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
     try {
-      nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
+      nodesToAttributesResponses = invokeConcurrent(remoteMethod,
           GetNodesToAttributesResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetNodesToAttributesFailedRetrieved();

+ 12 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java

@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -127,6 +128,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -579,12 +581,20 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
     Assert.assertEquals(subClusters.size(),
         response.getClusterMetrics().getNumNodeManagers());
 
+    // Clear Membership
+    Map<SubClusterId, SubClusterInfo> membership = new HashMap<>();
+    membership.putAll(stateStore.getMembership());
+    stateStore.getMembership().clear();
+
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class},
         new Object[] {GetClusterMetricsRequest.newInstance()});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
-        invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
+    Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.invokeConcurrent(
+        remoteMethod, GetClusterMetricsResponse.class);
     Assert.assertTrue(clusterMetrics.isEmpty());
+
+    // Restore membership
+    stateStore.setMembership(membership);
   }
 
   /**