|
@@ -282,7 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
.entity(e.getLocalizedMessage()).build();
|
|
|
}
|
|
|
|
|
|
- List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
|
|
|
+ List<SubClusterId> blacklist = new ArrayList<>();
|
|
|
|
|
|
for (int i = 0; i < numSubmitRetries; ++i) {
|
|
|
|
|
@@ -295,7 +295,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
.entity(e.getLocalizedMessage()).build();
|
|
|
}
|
|
|
|
|
|
- LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
|
|
|
+ LOG.debug("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
|
|
|
|
|
|
DefaultRequestInterceptorREST interceptor =
|
|
|
getOrCreateInterceptorForSubCluster(subClusterId,
|
|
@@ -304,7 +304,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
try {
|
|
|
response = interceptor.createNewApplication(hsr);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Unable to create a new ApplicationId in SubCluster {}",
|
|
|
+ LOG.warn("Unable to create a new ApplicationId in SubCluster {}.",
|
|
|
subClusterId.getId(), e);
|
|
|
}
|
|
|
|
|
@@ -424,7 +424,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
- List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
|
|
|
+ List<SubClusterId> blacklist = new ArrayList<>();
|
|
|
|
|
|
for (int i = 0; i < numSubmitRetries; ++i) {
|
|
|
|
|
@@ -441,7 +441,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
.entity(e.getLocalizedMessage())
|
|
|
.build();
|
|
|
}
|
|
|
- LOG.info("submitApplication appId {} try #{} on SubCluster {}",
|
|
|
+ LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
|
|
|
applicationId, i, subClusterId);
|
|
|
|
|
|
ApplicationHomeSubCluster appHomeSubCluster =
|
|
@@ -482,7 +482,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
.build();
|
|
|
}
|
|
|
if (subClusterId == subClusterIdInStateStore) {
|
|
|
- LOG.info("Application {} already submitted on SubCluster {}",
|
|
|
+ LOG.info("Application {} already submitted on SubCluster {}.",
|
|
|
applicationId, subClusterId);
|
|
|
} else {
|
|
|
routerMetrics.incrAppsFailedSubmitted();
|
|
@@ -712,8 +712,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
|
|
|
if (rmApps == null) {
|
|
|
routerMetrics.incrMultipleAppsFailedRetrieved();
|
|
|
- LOG.error("Subcluster {} failed to return appReport.",
|
|
|
- info.getSubClusterId());
|
|
|
+ LOG.error("Subcluster {} failed to return appReport.", info.getSubClusterId());
|
|
|
return null;
|
|
|
}
|
|
|
return rmApps;
|
|
@@ -873,8 +872,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
subclusterId, subcluster.getRMWebServiceAddress());
|
|
|
return interceptor.getNode(nodeId);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Subcluster {} failed to return nodeInfo.",
|
|
|
- subclusterId);
|
|
|
+ LOG.error("Subcluster {} failed to return nodeInfo.", subclusterId, e);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -953,58 +951,28 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
public NodesInfo getNodes(String states) {
|
|
|
|
|
|
NodesInfo nodes = new NodesInfo();
|
|
|
-
|
|
|
- final Map<SubClusterId, SubClusterInfo> subClustersActive;
|
|
|
try {
|
|
|
- subClustersActive = getActiveSubclusters();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Cannot get nodes: {}", e.getMessage());
|
|
|
- return new NodesInfo();
|
|
|
- }
|
|
|
-
|
|
|
- // Send the requests in parallel
|
|
|
- CompletionService<NodesInfo> compSvc =
|
|
|
- new ExecutorCompletionService<NodesInfo>(this.threadpool);
|
|
|
-
|
|
|
- for (final SubClusterInfo info : subClustersActive.values()) {
|
|
|
- compSvc.submit(new Callable<NodesInfo>() {
|
|
|
- @Override
|
|
|
- public NodesInfo call() {
|
|
|
- DefaultRequestInterceptorREST interceptor =
|
|
|
- getOrCreateInterceptorForSubCluster(
|
|
|
- info.getSubClusterId(), info.getRMWebServiceAddress());
|
|
|
- try {
|
|
|
- NodesInfo nodesInfo = interceptor.getNodes(states);
|
|
|
- return nodesInfo;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Subcluster {} failed to return nodesInfo.",
|
|
|
- info.getSubClusterId());
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
+ Class[] argsClasses = new Class[]{String.class};
|
|
|
+ Object[] args = new Object[]{states};
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args);
|
|
|
+ Map<SubClusterInfo, NodesInfo> nodesMap =
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class);
|
|
|
+ nodesMap.values().stream().forEach(nodesInfo -> {
|
|
|
+ nodes.addAll(nodesInfo.getNodes());
|
|
|
});
|
|
|
- }
|
|
|
-
|
|
|
- // Collect all the responses in parallel
|
|
|
-
|
|
|
- for (int i = 0; i < subClustersActive.size(); i++) {
|
|
|
- try {
|
|
|
- Future<NodesInfo> future = compSvc.take();
|
|
|
- NodesInfo nodesResponse = future.get();
|
|
|
-
|
|
|
- if (nodesResponse != null) {
|
|
|
- nodes.addAll(nodesResponse.getNodes());
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- LOG.warn("Failed to get nodes report ", e);
|
|
|
- }
|
|
|
+ } catch (NotFoundException e) {
|
|
|
+ LOG.error("Get all active sub cluster(s) error.", e);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ LOG.error("getNodes error.", e);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("getNodes error with io error.", e);
|
|
|
}
|
|
|
|
|
|
// Delete duplicate from all the node reports got from all the available
|
|
|
// YARN RMs. Nodes can be moved from one subclusters to another. In this
|
|
|
// operation they result LOST/RUNNING in the previous SubCluster and
|
|
|
// NEW/RUNNING in the new one.
|
|
|
-
|
|
|
return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
|
|
|
}
|
|
|
|
|
@@ -1172,7 +1140,22 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
@Override
|
|
|
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
|
|
|
throws IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+ try {
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
+ Class[] argsClasses = new Class[]{HttpServletRequest.class};
|
|
|
+ Object[] args = new Object[]{hsrCopy};
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", argsClasses, args);
|
|
|
+ Map<SubClusterInfo, NodeToLabelsInfo> nodeToLabelsInfoMap =
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, NodeToLabelsInfo.class);
|
|
|
+ return RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfoMap);
|
|
|
+ } catch (NotFoundException e) {
|
|
|
+ LOG.error("Get all active sub cluster(s) error.", e);
|
|
|
+ throw new IOException("Get all active sub cluster(s) error.", e);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ LOG.error("getNodeToLabels error.", e);
|
|
|
+ throw new IOException("getNodeToLabels error.", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1395,7 +1378,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
}
|
|
|
|
|
|
private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
|
|
|
- ClientMethod request, Class<R> clazz) {
|
|
|
+ ClientMethod request, Class<R> clazz) throws YarnException {
|
|
|
|
|
|
Map<SubClusterInfo, R> results = new HashMap<>();
|
|
|
|
|
@@ -1413,8 +1396,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
R ret = clazz.cast(retObj);
|
|
|
return ret;
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(),
|
|
|
- request.getMethodName(), e);
|
|
|
+ LOG.error("SubCluster %s failed to call %s method.",
|
|
|
+ info.getSubClusterId(), request.getMethodName(), e);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -1428,7 +1411,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
results.put(clusterId, response);
|
|
|
}
|
|
|
} catch (Throwable e) {
|
|
|
- LOG.warn("SubCluster {} failed to {} report.", clusterId, request.getMethodName(), e);
|
|
|
+ String msg = String.format("SubCluster %s failed to %s report.",
|
|
|
+ clusterId, request.getMethodName());
|
|
|
+ LOG.warn(msg, e);
|
|
|
+ throw new YarnRuntimeException(msg, e);
|
|
|
}
|
|
|
});
|
|
|
return results;
|