|
@@ -46,9 +46,11 @@ import org.apache.commons.lang3.NotImplementedException;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
+import org.apache.hadoop.util.Sets;
|
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeLabel;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
@@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSub
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
|
@@ -91,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
|
|
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
|
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
|
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
|
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
|
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
|
|
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
|
|
@@ -1161,7 +1165,32 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
@Override
|
|
@Override
|
|
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
|
|
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
|
+ Class[] argsClasses = new Class[]{Set.class};
|
|
|
|
+ Object[] args = new Object[]{labels};
|
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", argsClasses, args);
|
|
|
|
+ Map<SubClusterInfo, LabelsToNodesInfo> labelsToNodesInfoMap =
|
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, LabelsToNodesInfo.class);
|
|
|
|
+
|
|
|
|
+ Map<NodeLabelInfo, NodeIDsInfo> labelToNodesMap = new HashMap<>();
|
|
|
|
+ labelsToNodesInfoMap.values().forEach(labelsToNode -> {
|
|
|
|
+ Map<NodeLabelInfo, NodeIDsInfo> values = labelsToNode.getLabelsToNodes();
|
|
|
|
+ for (Map.Entry<NodeLabelInfo, NodeIDsInfo> item : values.entrySet()) {
|
|
|
|
+ NodeLabelInfo key = item.getKey();
|
|
|
|
+ NodeIDsInfo leftValue = item.getValue();
|
|
|
|
+ NodeIDsInfo rightValue = labelToNodesMap.getOrDefault(key, null);
|
|
|
|
+ NodeIDsInfo newValue = NodeIDsInfo.add(leftValue, rightValue);
|
|
|
|
+ labelToNodesMap.put(key, newValue);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ return new LabelsToNodesInfo(labelToNodesMap);
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("Get all active sub cluster(s) error.", e);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("getLabelsToNodes error.", e);
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -1179,7 +1208,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
@Override
|
|
@Override
|
|
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
|
|
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
|
+ Class[] argsClasses = new Class[]{HttpServletRequest.class};
|
|
|
|
+ Object[] args = new Object[]{hsrCopy};
|
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", argsClasses, args);
|
|
|
|
+ Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap =
|
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class);
|
|
|
|
+ Set<NodeLabel> hashSets = Sets.newHashSet();
|
|
|
|
+ nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels()));
|
|
|
|
+ return new NodeLabelsInfo(hashSets);
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("Get all active sub cluster(s) error.", e);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("getClusterNodeLabels error.", e);
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -1197,7 +1242,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
@Override
|
|
@Override
|
|
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
|
|
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
|
+ Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class};
|
|
|
|
+ Object[] args = new Object[]{hsrCopy, nodeId};
|
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getLabelsOnNode", argsClasses, args);
|
|
|
|
+ Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap =
|
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class);
|
|
|
|
+ Set<NodeLabel> hashSets = Sets.newHashSet();
|
|
|
|
+ nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels()));
|
|
|
|
+ return new NodeLabelsInfo(hashSets);
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("Get all active sub cluster(s) error.", e);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("getClusterNodeLabels error.", e);
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -1396,7 +1457,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
R ret = clazz.cast(retObj);
|
|
R ret = clazz.cast(retObj);
|
|
return ret;
|
|
return ret;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.error("SubCluster %s failed to call %s method.",
|
|
|
|
|
|
+ LOG.error("SubCluster {} failed to call {} method.",
|
|
info.getSubClusterId(), request.getMethodName(), e);
|
|
info.getSubClusterId(), request.getMethodName(), e);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|