|
@@ -18,7 +18,19 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.router.webapp;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+import javax.servlet.http.HttpServletRequest;
|
|
|
+import javax.servlet.http.HttpServletResponse;
|
|
|
+import javax.ws.rs.core.Response;
|
|
|
+import javax.ws.rs.core.Response.Status;
|
|
|
+
|
|
|
import org.apache.commons.lang.NotImplementedException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
@@ -36,20 +48,42 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
|
|
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
|
|
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
|
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
|
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
|
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.MonotonicClock;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import javax.servlet.http.HttpServletRequest;
|
|
|
-import javax.servlet.http.HttpServletResponse;
|
|
|
-import javax.ws.rs.core.Response;
|
|
|
-import javax.ws.rs.core.Response.Status;
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.*;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
|
|
@@ -66,6 +100,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
private FederationStateStoreFacade federationFacade;
|
|
|
private Random rand;
|
|
|
private RouterPolicyFacade policyFacade;
|
|
|
+ private RouterMetrics routerMetrics;
|
|
|
+ private final Clock clock = new MonotonicClock();
|
|
|
|
|
|
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
|
|
|
|
|
@@ -88,6 +124,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
|
|
|
|
|
|
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
|
|
|
+ routerMetrics = RouterMetrics.getMetrics();
|
|
|
}
|
|
|
|
|
|
private SubClusterId getRandomActiveSubCluster(
|
|
@@ -191,10 +228,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
@Override
|
|
|
public Response createNewApplication(HttpServletRequest hsr)
|
|
|
throws AuthorizationException, IOException, InterruptedException {
|
|
|
+
|
|
|
+ long startTime = clock.getTime();
|
|
|
+
|
|
|
Map<SubClusterId, SubClusterInfo> subClustersActive;
|
|
|
try {
|
|
|
subClustersActive = federationFacade.getSubClusters(true);
|
|
|
} catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedCreated();
|
|
|
return Response.status(Status.INTERNAL_SERVER_ERROR)
|
|
|
.entity(e.getLocalizedMessage()).build();
|
|
|
}
|
|
@@ -207,6 +248,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
try {
|
|
|
subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
|
|
|
} catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedCreated();
|
|
|
return Response.status(Status.SERVICE_UNAVAILABLE)
|
|
|
.entity(e.getLocalizedMessage()).build();
|
|
|
}
|
|
@@ -226,6 +268,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
}
|
|
|
|
|
|
if (response != null && response.getStatus() == 200) {
|
|
|
+
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededAppsCreated(stopTime - startTime);
|
|
|
+
|
|
|
return response;
|
|
|
} else {
|
|
|
// Empty response from the ResourceManager.
|
|
@@ -236,6 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
|
|
|
String errMsg = "Fail to create a new application.";
|
|
|
LOG.error(errMsg);
|
|
|
+ routerMetrics.incrAppsFailedCreated();
|
|
|
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
}
|
|
|
|
|
@@ -308,7 +355,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
|
|
|
HttpServletRequest hsr)
|
|
|
throws AuthorizationException, IOException, InterruptedException {
|
|
|
+
|
|
|
+ long startTime = clock.getTime();
|
|
|
+
|
|
|
if (newApp == null || newApp.getApplicationId() == null) {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
String errMsg = "Missing ApplicationSubmissionContextInfo or "
|
|
|
+ "applicationSubmissionContex information.";
|
|
|
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
|
|
@@ -318,6 +369,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
try {
|
|
|
applicationId = ApplicationId.fromString(newApp.getApplicationId());
|
|
|
} catch (IllegalArgumentException e) {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
|
|
|
.build();
|
|
|
}
|
|
@@ -333,6 +385,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
try {
|
|
|
subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
|
|
|
} catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
return Response.status(Status.SERVICE_UNAVAILABLE)
|
|
|
.entity(e.getLocalizedMessage()).build();
|
|
|
}
|
|
@@ -349,6 +402,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
subClusterId =
|
|
|
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
|
|
|
} catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
String errMsg = "Unable to insert the ApplicationId " + applicationId
|
|
|
+ " into the FederationStateStore";
|
|
|
return Response.status(Status.SERVICE_UNAVAILABLE)
|
|
@@ -367,6 +421,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
subClusterIdInStateStore =
|
|
|
federationFacade.getApplicationHomeSubCluster(applicationId);
|
|
|
} catch (YarnException e1) {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
return Response.status(Status.SERVICE_UNAVAILABLE)
|
|
|
.entity(e1.getLocalizedMessage()).build();
|
|
|
}
|
|
@@ -374,6 +429,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
LOG.info("Application " + applicationId
|
|
|
+ " already submitted on SubCluster " + subClusterId);
|
|
|
} else {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
|
|
|
.build();
|
|
|
}
|
|
@@ -384,6 +440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
try {
|
|
|
subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
|
|
} catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
return Response.status(Status.SERVICE_UNAVAILABLE)
|
|
|
.entity(e.getLocalizedMessage()).build();
|
|
|
}
|
|
@@ -401,6 +458,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
if (response != null && response.getStatus() == 202) {
|
|
|
LOG.info("Application " + context.getApplicationName() + " with appId "
|
|
|
+ applicationId + " submitted on " + subClusterId);
|
|
|
+
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededAppsSubmitted(stopTime - startTime);
|
|
|
+
|
|
|
return response;
|
|
|
} else {
|
|
|
// Empty response from the ResourceManager.
|
|
@@ -409,6 +470,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ routerMetrics.incrAppsFailedSubmitted();
|
|
|
String errMsg = "Application " + newApp.getApplicationName()
|
|
|
+ " with appId " + applicationId + " failed to be submitted.";
|
|
|
LOG.error(errMsg);
|
|
@@ -435,10 +497,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
public AppInfo getApp(HttpServletRequest hsr, String appId,
|
|
|
Set<String> unselectedFields) {
|
|
|
|
|
|
+ long startTime = clock.getTime();
|
|
|
+
|
|
|
ApplicationId applicationId = null;
|
|
|
try {
|
|
|
applicationId = ApplicationId.fromString(appId);
|
|
|
} catch (IllegalArgumentException e) {
|
|
|
+ routerMetrics.incrAppsFailedRetrieved();
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -448,16 +513,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
subClusterId =
|
|
|
federationFacade.getApplicationHomeSubCluster(applicationId);
|
|
|
if (subClusterId == null) {
|
|
|
+ routerMetrics.incrAppsFailedRetrieved();
|
|
|
return null;
|
|
|
}
|
|
|
subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
|
|
} catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedRetrieved();
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- return getOrCreateInterceptorForSubCluster(subClusterId,
|
|
|
+ AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId,
|
|
|
subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
|
|
|
unselectedFields);
|
|
|
+
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededAppsRetrieved(stopTime - startTime);
|
|
|
+
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -481,23 +553,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|
|
String appId) throws AuthorizationException, YarnException,
|
|
|
InterruptedException, IOException {
|
|
|
|
|
|
+ long startTime = clock.getTime();
|
|
|
+
|
|
|
ApplicationId applicationId = null;
|
|
|
try {
|
|
|
applicationId = ApplicationId.fromString(appId);
|
|
|
} catch (IllegalArgumentException e) {
|
|
|
+ routerMetrics.incrAppsFailedKilled();
|
|
|
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
- SubClusterId subClusterId =
|
|
|
- federationFacade.getApplicationHomeSubCluster(applicationId);
|
|
|
-
|
|
|
- SubClusterInfo subClusterInfo =
|
|
|
- federationFacade.getSubCluster(subClusterId);
|
|
|
+ SubClusterInfo subClusterInfo = null;
|
|
|
+ SubClusterId subClusterId = null;
|
|
|
+ try {
|
|
|
+ subClusterId =
|
|
|
+ federationFacade.getApplicationHomeSubCluster(applicationId);
|
|
|
+ subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ routerMetrics.incrAppsFailedKilled();
|
|
|
+ return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
|
|
|
+ .build();
|
|
|
+ }
|
|
|
|
|
|
- return getOrCreateInterceptorForSubCluster(subClusterId,
|
|
|
+ Response response = getOrCreateInterceptorForSubCluster(subClusterId,
|
|
|
subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
|
|
|
hsr, appId);
|
|
|
+
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededAppsRetrieved(stopTime - startTime);
|
|
|
+
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
@Override
|