Kaynağa Gözat

YARN-8900. [Follow Up] Fix FederationInterceptorREST#invokeConcurrent Inaccurate Order of Subclusters. (#5260)

slfan1989 2 yıl önce
ebeveyn
işleme
3f767a61b1

+ 31 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -45,7 +45,6 @@ import javax.ws.rs.core.Response.Status;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -123,6 +122,7 @@ import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
 import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
+import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
@@ -2532,7 +2532,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     // If there is a sub-cluster access error,
     // we should choose whether to throw exception information according to user configuration.
     // Send the requests in parallel.
-    CompletionService<Pair<R, Exception>> compSvc = new ExecutorCompletionService<>(threadpool);
+    CompletionService<SubClusterResult<R>> compSvc = new ExecutorCompletionService<>(threadpool);
 
     // This part of the code should be able to expose the accessed Exception information.
     // We use Pair to store related information. The left value of the Pair is the response,
@@ -2548,36 +2548,41 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
               getMethod(request.getMethodName(), request.getTypes());
           Object retObj = method.invoke(interceptor, request.getParams());
           R ret = clazz.cast(retObj);
-          return Pair.of(ret, null);
+          return new SubClusterResult<>(info, ret, null);
         } catch (Exception e) {
           LOG.error("SubCluster {} failed to call {} method.",
               info.getSubClusterId(), request.getMethodName(), e);
-          return Pair.of(null, e);
+          return new SubClusterResult<>(info, null, e);
         }
       });
     }
 
-    clusterIds.stream().forEach(clusterId -> {
+    for (int i = 0; i < clusterIds.size(); i++) {
+      SubClusterInfo subClusterInfo = null;
       try {
-        Future<Pair<R, Exception>> future = compSvc.take();
-        Pair<R, Exception> pair = future.get();
-        R response = pair.getKey();
+        Future<SubClusterResult<R>> future = compSvc.take();
+        SubClusterResult<R> result = future.get();
+        subClusterInfo = result.getSubClusterInfo();
+
+        R response = result.getResponse();
         if (response != null) {
-          results.put(clusterId, response);
+          results.put(subClusterInfo, response);
         }
-        Exception exception = pair.getValue();
+
+        Exception exception = result.getException();
+
         // If allowPartialResult=false, it means that if an exception occurs in a subCluster,
         // an exception will be thrown directly.
         if (!allowPartialResult && exception != null) {
           throw exception;
         }
       } catch (Throwable e) {
-        String msg = String.format("SubCluster %s failed to %s report.",
-            clusterId.getSubClusterId(), request.getMethodName());
-        LOG.error(msg, e);
+        String subClusterId = subClusterInfo != null ?
+            subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
+        LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
         throw new YarnRuntimeException(e.getCause().getMessage(), e);
       }
-    });
+    }
 
     return results;
   }
@@ -2648,4 +2653,16 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   public void setAllowPartialResult(boolean allowPartialResult) {
     this.allowPartialResult = allowPartialResult;
   }
+
+  @VisibleForTesting
+  public Map<SubClusterInfo, NodesInfo> invokeConcurrentGetNodeLabel()
+      throws IOException, YarnException {
+    Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
+    Class[] argsClasses = new Class[]{String.class};
+    Object[] args = new Object[]{null};
+    ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args);
+    Map<SubClusterInfo, NodesInfo> nodesMap =
+        invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class);
+    return nodesMap;
+  }
 }

+ 59 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/SubClusterResult.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.webapp.dao;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+public class SubClusterResult<R> {
+  private SubClusterInfo subClusterInfo;
+  private R response;
+  private Exception exception;
+
+  public SubClusterResult() {
+  }
+
+  public SubClusterResult(SubClusterInfo subCluster, R res, Exception ex) {
+    this.subClusterInfo = subCluster;
+    this.response = res;
+    this.exception = ex;
+  }
+
+  public SubClusterInfo getSubClusterInfo() {
+    return subClusterInfo;
+  }
+
+  public void setSubClusterInfo(SubClusterInfo subClusterInfo) {
+    this.subClusterInfo = subClusterInfo;
+  }
+
+  public Exception getException() {
+    return exception;
+  }
+
+  public void setException(Exception exception) {
+    this.exception = exception;
+  }
+
+  public R getResponse() {
+    return response;
+  }
+
+  public void setResponse(R response) {
+    this.response = response;
+  }
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

@@ -1534,6 +1534,34 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
   }
 
+  @Test
+  public void testInvokeConcurrent() throws IOException, YarnException {
+
+    // We design such a test case, we call the interceptor's getNodes interface,
+    // this interface will generate the following test data
+    // subCluster0 Node 0
+    // subCluster1 Node 1
+    // subCluster2 Node 2
+    // subCluster3 Node 3
+    // We use the returned data to verify whether the subClusterId
+    // of the multi-thread call can match the node data
+    Map<SubClusterInfo, NodesInfo> subClusterInfoNodesInfoMap =
+        interceptor.invokeConcurrentGetNodeLabel();
+    Assert.assertNotNull(subClusterInfoNodesInfoMap);
+    Assert.assertEquals(4, subClusterInfoNodesInfoMap.size());
+
+    subClusterInfoNodesInfoMap.forEach((subClusterInfo, nodesInfo) -> {
+      String subClusterId = subClusterInfo.getSubClusterId().getId();
+      List<NodeInfo> nodeInfos = nodesInfo.getNodes();
+      Assert.assertNotNull(nodeInfos);
+      Assert.assertEquals(1, nodeInfos.size());
+
+      String expectNodeId = "Node " + subClusterId;
+      String nodeId = nodeInfos.get(0).getNodeId();
+      Assert.assertEquals(expectNodeId, nodeId);
+    });
+  }
+
   @Test
   public void testGetSchedulerInfo() {
     // In this test case, we will get the return results of 4 sub-clusters.