|
@@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders;
|
|
import javax.ws.rs.core.Response;
|
|
import javax.ws.rs.core.Response;
|
|
import javax.ws.rs.core.Response.Status;
|
|
import javax.ws.rs.core.Response.Status;
|
|
|
|
|
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
|
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
|
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
|
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
|
|
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
|
@@ -1580,16 +1582,126 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
throw new RuntimeException("getClusterNodeLabels Failed.");
|
|
throw new RuntimeException("getClusterNodeLabels Failed.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * This method adds specific node labels for specific nodes, and it is
|
|
|
|
+ * reachable by using {@link RMWSConsts#ADD_NODE_LABELS}.
|
|
|
|
+ *
|
|
|
|
+ * @see ResourceManagerAdministrationProtocol#addToClusterNodeLabels
|
|
|
|
+ * @param newNodeLabels the node labels to add. It is a content param.
|
|
|
|
+ * @param hsr the servlet request
|
|
|
|
+ * @return Response containing the status code
|
|
|
|
+ * @throws Exception in case of bad request
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
|
|
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
|
|
HttpServletRequest hsr) throws Exception {
|
|
HttpServletRequest hsr) throws Exception {
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+
|
|
|
|
+ if (newNodeLabels == null) {
|
|
|
|
+ routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ throw new IllegalArgumentException("Parameter error, the newNodeLabels is null.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<NodeLabelInfo> nodeLabelInfos = newNodeLabels.getNodeLabelsInfo();
|
|
|
|
+ if (CollectionUtils.isEmpty(nodeLabelInfos)) {
|
|
|
|
+ routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ throw new IllegalArgumentException("Parameter error, the nodeLabelsInfo is null or empty.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ long startTime = clock.getTime();
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
|
+ Class[] argsClasses = new Class[]{NodeLabelsInfo.class, HttpServletRequest.class};
|
|
|
|
+ Object[] args = new Object[]{newNodeLabels, hsrCopy};
|
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("addToClusterNodeLabels", argsClasses, args);
|
|
|
|
+ Map<SubClusterInfo, Response> responseInfoMap =
|
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
|
|
|
|
+ StringBuffer buffer = new StringBuffer();
|
|
|
|
+ // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS
|
|
|
|
+ responseInfoMap.forEach((subClusterInfo, response) -> {
|
|
|
|
+ buildAppendMsg(subClusterInfo, buffer, response);
|
|
|
|
+ });
|
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
+ routerMetrics.succeededAddToClusterNodeLabelsRetrieved((stopTime - startTime));
|
|
|
|
+ return Response.status(Status.OK).entity(buffer.toString()).build();
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("addToClusterNodeLabels with yarn error.", e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ throw new RuntimeException("addToClusterNodeLabels Failed.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * This method removes all the node labels for specific nodes, and it is
|
|
|
|
+ * reachable by using {@link RMWSConsts#REMOVE_NODE_LABELS}.
|
|
|
|
+ *
|
|
|
|
+ * @see ResourceManagerAdministrationProtocol#removeFromClusterNodeLabels
|
|
|
|
+ * @param oldNodeLabels the node labels to remove. It is a QueryParam.
|
|
|
|
+ * @param hsr the servlet request
|
|
|
|
+ * @return Response containing the status code
|
|
|
|
+ * @throws Exception in case of bad request
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public Response removeFromClusterNodeLabels(Set<String> oldNodeLabels,
|
|
public Response removeFromClusterNodeLabels(Set<String> oldNodeLabels,
|
|
HttpServletRequest hsr) throws Exception {
|
|
HttpServletRequest hsr) throws Exception {
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+
|
|
|
|
+ if (CollectionUtils.isEmpty(oldNodeLabels)) {
|
|
|
|
+ routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ throw new IllegalArgumentException("Parameter error, the oldNodeLabels is null or empty.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ long startTime = clock.getTime();
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
|
+ Class[] argsClasses = new Class[]{Set.class, HttpServletRequest.class};
|
|
|
|
+ Object[] args = new Object[]{oldNodeLabels, hsrCopy};
|
|
|
|
+ ClientMethod remoteMethod =
|
|
|
|
+ new ClientMethod("removeFromClusterNodeLabels", argsClasses, args);
|
|
|
|
+ Map<SubClusterInfo, Response> responseInfoMap =
|
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
|
|
|
|
+ StringBuffer buffer = new StringBuffer();
|
|
|
|
+ // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS
|
|
|
|
+ responseInfoMap.forEach((subClusterInfo, response) -> {
|
|
|
|
+ buildAppendMsg(subClusterInfo, buffer, response);
|
|
|
|
+ });
|
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
+ routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
|
|
|
|
+ return Response.status(Status.OK).entity(buffer.toString()).build();
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowIOException("removeFromClusterNodeLabels with yarn error.", e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
|
|
|
|
+ throw new RuntimeException("removeFromClusterNodeLabels Failed.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Bbulid Append information.
|
|
|
|
+ *
|
|
|
|
+ * @param subClusterInfo subCluster information.
|
|
|
|
+ * @param buffer StringBuffer.
|
|
|
|
+ * @param response response message.
|
|
|
|
+ */
|
|
|
|
+ private void buildAppendMsg(SubClusterInfo subClusterInfo, StringBuffer buffer,
|
|
|
|
+ Response response) {
|
|
|
|
+ SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
|
|
|
+ String state = response != null &&
|
|
|
|
+ (response.getStatus() == Status.OK.getStatusCode()) ? "SUCCESS" : "FAILED";
|
|
|
|
+ buffer.append("SubCluster-")
|
|
|
|
+ .append(subClusterId.getId())
|
|
|
|
+ .append(":")
|
|
|
|
+ .append(state)
|
|
|
|
+ .append(",");
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|