|
@@ -44,7 +44,6 @@ import javax.ws.rs.core.Response;
|
|
import javax.ws.rs.core.Response.Status;
|
|
import javax.ws.rs.core.Response.Status;
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
-import org.apache.commons.lang3.NotImplementedException;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.impl.prefetch.Validate;
|
|
import org.apache.hadoop.fs.impl.prefetch.Validate;
|
|
@@ -129,6 +128,7 @@ import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesI
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
|
|
|
|
+import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
|
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
@@ -136,6 +136,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
|
|
|
+import org.apache.hadoop.yarn.webapp.dao.ConfInfo;
|
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
|
@@ -848,6 +849,29 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the active subcluster in the federation.
|
|
|
|
+ *
|
|
|
|
+ * @param subClusterId subClusterId.
|
|
|
|
+ * @return subClusterInfo.
|
|
|
|
+ * @throws NotFoundException If the subclusters cannot be found.
|
|
|
|
+ */
|
|
|
|
+ private SubClusterInfo getActiveSubCluster(String subClusterId)
|
|
|
|
+ throws NotFoundException {
|
|
|
|
+ try {
|
|
|
|
+ SubClusterId pSubClusterId = SubClusterId.newInstance(subClusterId);
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClusterInfoMap =
|
|
|
|
+ federationFacade.getSubClusters(true);
|
|
|
|
+ SubClusterInfo subClusterInfo = subClusterInfoMap.get(pSubClusterId);
|
|
|
|
+ if (subClusterInfo == null) {
|
|
|
|
+ throw new NotFoundException(subClusterId + " not found.");
|
|
|
|
+ }
|
|
|
|
+ return subClusterInfo;
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ throw new NotFoundException(e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The YARN Router will forward to the request to all the SubClusters to find
|
|
* The YARN Router will forward to the request to all the SubClusters to find
|
|
* where the node is running.
|
|
* where the node is running.
|
|
@@ -2906,17 +2930,117 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
throw new RuntimeException("getContainer Failed.");
|
|
throw new RuntimeException("getContainer Failed.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * This method updates the Scheduler configuration, and it is reachable by
|
|
|
|
+ * using {@link RMWSConsts#SCHEDULER_CONF}.
|
|
|
|
+ *
|
|
|
|
+ * @param mutationInfo th information for making scheduler configuration
|
|
|
|
+ * changes (supports adding, removing, or updating a queue, as well
|
|
|
|
+ * as global scheduler conf changes)
|
|
|
|
+ * @param hsr the servlet request
|
|
|
|
+ * @return Response containing the status code
|
|
|
|
+ * @throws AuthorizationException if the user is not authorized to invoke this
|
|
|
|
+ * method
|
|
|
|
+ * @throws InterruptedException if interrupted
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
|
- HttpServletRequest hsr)
|
|
|
|
- throws AuthorizationException, InterruptedException {
|
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+ HttpServletRequest hsr) throws AuthorizationException, InterruptedException {
|
|
|
|
+
|
|
|
|
+ // Make Sure mutationInfo is not null.
|
|
|
|
+ if (mutationInfo == null) {
|
|
|
|
+ routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ throw new IllegalArgumentException(
|
|
|
|
+ "Parameter error, the schedConfUpdateInfo is empty or null.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // In federated mode, we may have a mix of multiple schedulers.
|
|
|
|
+ // In order to ensure accurate update scheduler configuration,
|
|
|
|
+ // we need users to explicitly set subClusterId.
|
|
|
|
+ String pSubClusterId = mutationInfo.getSubClusterId();
|
|
|
|
+ if (StringUtils.isBlank(pSubClusterId)) {
|
|
|
|
+ routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ throw new IllegalArgumentException("Parameter error, " +
|
|
|
|
+ "the subClusterId is empty or null.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Get the subClusterInfo , then update the scheduler configuration.
|
|
|
|
+ try {
|
|
|
|
+ long startTime = clock.getTime();
|
|
|
|
+ SubClusterInfo subClusterInfo = getActiveSubCluster(pSubClusterId);
|
|
|
|
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
|
|
|
|
+ subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
|
|
|
|
+ Response response = interceptor.updateSchedulerConfiguration(mutationInfo, hsr);
|
|
|
|
+ if (response != null) {
|
|
|
|
+ long endTime = clock.getTime();
|
|
|
|
+ routerMetrics.succeededUpdateSchedulerConfigurationRetrieved(endTime - startTime);
|
|
|
|
+ return Response.status(response.getStatus()).entity(response.getEntity()).build();
|
|
|
|
+ }
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowRunTimeException(e,
|
|
|
|
+ "Get subCluster error. subClusterId = %s", pSubClusterId);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowRunTimeException(e,
|
|
|
|
+ "UpdateSchedulerConfiguration error. subClusterId = %s", pSubClusterId);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ throw new RuntimeException("UpdateSchedulerConfiguration error. subClusterId = "
|
|
|
|
+ + pSubClusterId);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * This method retrieves all the Scheduler configuration, and it is reachable
|
|
|
|
+ * by using {@link RMWSConsts#SCHEDULER_CONF}.
|
|
|
|
+ *
|
|
|
|
+ * @param hsr the servlet request
|
|
|
|
+ * @return Response containing the status code
|
|
|
|
+ * @throws AuthorizationException if the user is not authorized to invoke this
|
|
|
|
+ * method.
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public Response getSchedulerConfiguration(HttpServletRequest hsr)
|
|
public Response getSchedulerConfiguration(HttpServletRequest hsr)
|
|
throws AuthorizationException {
|
|
throws AuthorizationException {
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
|
|
|
+ try {
|
|
|
|
+ long startTime = clock.getTime();
|
|
|
|
+ FederationConfInfo federationConfInfo = new FederationConfInfo();
|
|
|
|
+ Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
|
|
|
+ final HttpServletRequest hsrCopy = clone(hsr);
|
|
|
|
+ Class[] argsClasses = new Class[]{HttpServletRequest.class};
|
|
|
|
+ Object[] args = new Object[]{hsrCopy};
|
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getSchedulerConfiguration", argsClasses, args);
|
|
|
|
+ Map<SubClusterInfo, Response> responseMap =
|
|
|
|
+ invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
|
|
|
|
+ responseMap.forEach((subClusterInfo, response) -> {
|
|
|
|
+ SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
|
|
|
+ if (response == null) {
|
|
|
|
+ String errorMsg = subClusterId + " Can't getSchedulerConfiguration.";
|
|
|
|
+ federationConfInfo.getErrorMsgs().add(errorMsg);
|
|
|
|
+ } else if (response.getStatus() == Status.BAD_REQUEST.getStatusCode()) {
|
|
|
|
+ String errorMsg = String.valueOf(response.getEntity());
|
|
|
|
+ federationConfInfo.getErrorMsgs().add(errorMsg);
|
|
|
|
+ } else if (response.getStatus() == Status.OK.getStatusCode()) {
|
|
|
|
+ ConfInfo fedConfInfo = ConfInfo.class.cast(response.getEntity());
|
|
|
|
+ fedConfInfo.setSubClusterId(subClusterId.getId());
|
|
|
|
+ federationConfInfo.getList().add(fedConfInfo);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ long endTime = clock.getTime();
|
|
|
|
+ routerMetrics.succeededGetSchedulerConfigurationRetrieved(endTime - startTime);
|
|
|
|
+ return Response.status(Status.OK).entity(federationConfInfo).build();
|
|
|
|
+ } catch (NotFoundException e) {
|
|
|
|
+ RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
|
|
|
|
+ routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ RouterServerUtil.logAndThrowRunTimeException("getSchedulerConfiguration error.", e);
|
|
|
|
+ return Response.status(Status.BAD_REQUEST).entity("getSchedulerConfiguration error.").build();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
|
|
|
|
+ throw new RuntimeException("getSchedulerConfiguration error.");
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|