|
@@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesIn
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntry;
|
|
|
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
|
|
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
|
|
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
|
|
@@ -1539,16 +1540,130 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
"getLabelsToNodes by labels = %s Failed.", StringUtils.join(labels, ","));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This method replaces all the node labels for specific nodes, and it is
|
|
|
+ * reachable by using {@link RMWSConsts#REPLACE_NODE_TO_LABELS}.
|
|
|
+ *
|
|
|
+ * @see ResourceManagerAdministrationProtocol#replaceLabelsOnNode
|
|
|
+ * @param newNodeToLabels the list of new labels. It is a content param.
|
|
|
+ * @param hsr the servlet request
|
|
|
+ * @return Response containing the status code
|
|
|
+ * @throws IOException if an exception happened
|
|
|
+ */
|
|
|
@Override
|
|
|
public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
|
|
|
HttpServletRequest hsr) throws IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1. Check the parameters to ensure that the parameters are not empty.
|
|
|
+ if (newNodeToLabels == null) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
|
|
|
+ throw new IllegalArgumentException("Parameter error, newNodeToLabels must not be empty.");
|
|
|
+ }
|
|
|
+ List<NodeToLabelsEntry> nodeToLabelsEntries = newNodeToLabels.getNodeToLabels();
|
|
|
+ if (CollectionUtils.isEmpty(nodeToLabelsEntries)) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
|
|
|
+ throw new IllegalArgumentException("Parameter error, " +
|
|
|
+ "nodeToLabelsEntries must not be empty.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ // Step2. We map the NodeId and NodeToLabelsEntry in the request.
|
|
|
+ Map<String, NodeToLabelsEntry> nodeIdToLabels = new HashMap<>();
|
|
|
+ newNodeToLabels.getNodeToLabels().stream().forEach(nodeIdToLabel -> {
|
|
|
+ String nodeId = nodeIdToLabel.getNodeId();
|
|
|
+ nodeIdToLabels.put(nodeId, nodeIdToLabel);
|
|
|
+ });
|
|
|
+
|
|
|
+ // Step3. We map SubCluster with NodeToLabelsEntryList
|
|
|
+ Map<SubClusterInfo, NodeToLabelsEntryList> subClusterToNodeToLabelsEntryList =
|
|
|
+ new HashMap<>();
|
|
|
+ nodeIdToLabels.forEach((nodeId, nodeToLabelsEntry) -> {
|
|
|
+ SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
|
|
|
+ NodeToLabelsEntryList nodeToLabelsEntryList = subClusterToNodeToLabelsEntryList.
|
|
|
+ getOrDefault(subClusterInfo, new NodeToLabelsEntryList());
|
|
|
+ nodeToLabelsEntryList.getNodeToLabels().add(nodeToLabelsEntry);
|
|
|
+ subClusterToNodeToLabelsEntryList.put(subClusterInfo, nodeToLabelsEntryList);
|
|
|
+ });
|
|
|
+
|
|
|
+ // Step4. Traverse the subCluster and call the replaceLabelsOnNodes interface.
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ subClusterToNodeToLabelsEntryList.forEach((subCluster, nodeToLabelsEntryList) -> {
|
|
|
+ SubClusterId subClusterId = subCluster.getSubClusterId();
|
|
|
+ try {
|
|
|
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
|
|
|
+ subCluster.getSubClusterId(), subCluster.getRMWebServiceAddress());
|
|
|
+ interceptor.replaceLabelsOnNodes(nodeToLabelsEntryList, hsrCopy);
|
|
|
+ builder.append("subCluster-").append(subClusterId.getId()).append(":Success,");
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("replaceLabelsOnNodes Failed. subClusterId = {}.", subClusterId, e);
|
|
|
+ builder.append("subCluster-").append(subClusterId.getId()).append(":Failed,");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededReplaceLabelsOnNodesRetrieved(stopTime - startTime);
|
|
|
+
|
|
|
+ // Step5. return call result.
|
|
|
+ return Response.status(Status.OK).entity(builder.toString()).build();
|
|
|
+ } catch (NotFoundException e) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
|
|
|
+ throw e;
|
|
|
+ } catch (Exception e) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodesFailedRetrieved();
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This method replaces all the node labels for specific node, and it is
|
|
|
+ * reachable by using {@link RMWSConsts#NODES_NODEID_REPLACE_LABELS}.
|
|
|
+ *
|
|
|
+ * @see ResourceManagerAdministrationProtocol#replaceLabelsOnNode
|
|
|
+ * @param newNodeLabelsName the list of new labels. It is a QueryParam.
|
|
|
+ * @param hsr the servlet request
|
|
|
+ * @param nodeId the node we want to replace the node labels. It is a
|
|
|
+ * PathParam.
|
|
|
+ * @return Response containing the status code
|
|
|
+ * @throws Exception if an exception happened
|
|
|
+ */
|
|
|
@Override
|
|
|
public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
|
|
|
HttpServletRequest hsr, String nodeId) throws Exception {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1. Check the parameters to ensure that the parameters are not empty.
|
|
|
+ if (StringUtils.isBlank(nodeId)) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
|
|
|
+ throw new IllegalArgumentException("Parameter error, nodeId must not be null or empty.");
|
|
|
+ }
|
|
|
+ if (CollectionUtils.isEmpty(newNodeLabelsName)) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
|
|
|
+ throw new IllegalArgumentException("Parameter error, newNodeLabelsName must not be empty.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // Step2. We find the subCluster according to the nodeId,
|
|
|
+ // and then call the replaceLabelsOnNode of the subCluster.
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
|
|
|
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
|
|
|
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
+ interceptor.replaceLabelsOnNode(newNodeLabelsName, hsrCopy, nodeId);
|
|
|
+
|
|
|
+ // Step3. Return the response result.
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededReplaceLabelsOnNodeRetrieved(stopTime - startTime);
|
|
|
+ String msg = "subCluster#" + subClusterInfo.getSubClusterId().getId() + ":Success;";
|
|
|
+ return Response.status(Status.OK).entity(msg).build();
|
|
|
+ } catch (NotFoundException e) {
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
|
|
|
+ throw e;
|
|
|
+ } catch (Exception e){
|
|
|
+ routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|