|
@@ -25,6 +25,7 @@ import java.util.Calendar;
|
|
|
import java.util.List;
|
|
|
import java.util.TimeZone;
|
|
|
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.util.curator.ZKCuratorManager;
|
|
@@ -84,6 +85,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -124,6 +127,12 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
private String membershipZNode;
|
|
|
private String policiesZNode;
|
|
|
|
|
|
+ private volatile Clock clock = SystemClock.getInstance();
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ private ZKFederationStateStoreOpDurations opDurations =
|
|
|
+ ZKFederationStateStoreOpDurations.getInstance();
|
|
|
+
|
|
|
@Override
|
|
|
public void init(Configuration conf) throws YarnException {
|
|
|
LOG.info("Initializing ZooKeeper connection");
|
|
@@ -153,7 +162,6 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot create base directories: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -167,6 +175,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
|
|
|
AddApplicationHomeSubClusterRequest request) throws YarnException {
|
|
|
|
|
|
+ long start = clock.getTime();
|
|
|
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
|
|
|
ApplicationId appId = app.getApplicationId();
|
|
@@ -187,7 +196,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot check app home subcluster for " + appId;
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
-
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addAppHomeSubClusterDuration(start, end);
|
|
|
return AddApplicationHomeSubClusterResponse
|
|
|
.newInstance(homeSubCluster);
|
|
|
}
|
|
@@ -198,6 +208,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
UpdateApplicationHomeSubClusterRequest request)
|
|
|
throws YarnException {
|
|
|
|
|
|
+ long start = clock.getTime();
|
|
|
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
|
|
|
ApplicationId appId = app.getApplicationId();
|
|
@@ -209,6 +220,9 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
SubClusterId newSubClusterId =
|
|
|
request.getApplicationHomeSubCluster().getHomeSubCluster();
|
|
|
putApp(appId, newSubClusterId, true);
|
|
|
+
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addUpdateAppHomeSubClusterDuration(start, end);
|
|
|
return UpdateApplicationHomeSubClusterResponse.newInstance();
|
|
|
}
|
|
|
|
|
@@ -216,6 +230,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
|
|
|
GetApplicationHomeSubClusterRequest request) throws YarnException {
|
|
|
|
|
|
+ long start = clock.getTime();
|
|
|
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
ApplicationId appId = request.getApplicationId();
|
|
|
SubClusterId homeSubCluster = getApp(appId);
|
|
@@ -223,13 +238,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Application " + appId + " does not exist";
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
- return GetApplicationHomeSubClusterResponse.newInstance(
|
|
|
- ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetAppHomeSubClusterDuration(start, end);
|
|
|
+ return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
|
|
|
GetApplicationsHomeSubClusterRequest request) throws YarnException {
|
|
|
+ long start = clock.getTime();
|
|
|
List<ApplicationHomeSubCluster> result = new ArrayList<>();
|
|
|
|
|
|
try {
|
|
@@ -244,7 +261,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot get apps: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
-
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetAppsHomeSubClusterDuration(start, end);
|
|
|
return GetApplicationsHomeSubClusterResponse.newInstance(result);
|
|
|
}
|
|
|
|
|
@@ -253,7 +271,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
deleteApplicationHomeSubCluster(
|
|
|
DeleteApplicationHomeSubClusterRequest request)
|
|
|
throws YarnException {
|
|
|
-
|
|
|
+ long start = clock.getTime();
|
|
|
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
ApplicationId appId = request.getApplicationId();
|
|
|
String appZNode = getNodePath(appsZNode, appId.toString());
|
|
@@ -276,13 +294,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot delete app: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
-
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addDeleteAppHomeSubClusterDuration(start, end);
|
|
|
return DeleteApplicationHomeSubClusterResponse.newInstance();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public SubClusterRegisterResponse registerSubCluster(
|
|
|
SubClusterRegisterRequest request) throws YarnException {
|
|
|
+ long start = clock.getTime();
|
|
|
FederationMembershipStateStoreInputValidator.validate(request);
|
|
|
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
|
|
|
SubClusterId subclusterId = subClusterInfo.getSubClusterId();
|
|
@@ -297,12 +317,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot register subcluster: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addRegisterSubClusterDuration(start, end);
|
|
|
return SubClusterRegisterResponse.newInstance();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public SubClusterDeregisterResponse deregisterSubCluster(
|
|
|
SubClusterDeregisterRequest request) throws YarnException {
|
|
|
+ long start = clock.getTime();
|
|
|
FederationMembershipStateStoreInputValidator.validate(request);
|
|
|
SubClusterId subClusterId = request.getSubClusterId();
|
|
|
SubClusterState state = request.getState();
|
|
@@ -316,14 +339,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
subClusterInfo.setState(state);
|
|
|
putSubclusterInfo(subClusterId, subClusterInfo, true);
|
|
|
}
|
|
|
-
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addDeregisterSubClusterDuration(start, end);
|
|
|
return SubClusterDeregisterResponse.newInstance();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public SubClusterHeartbeatResponse subClusterHeartbeat(
|
|
|
SubClusterHeartbeatRequest request) throws YarnException {
|
|
|
-
|
|
|
+ long start = clock.getTime();
|
|
|
FederationMembershipStateStoreInputValidator.validate(request);
|
|
|
SubClusterId subClusterId = request.getSubClusterId();
|
|
|
|
|
@@ -340,14 +364,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
subClusterInfo.setCapability(request.getCapability());
|
|
|
|
|
|
putSubclusterInfo(subClusterId, subClusterInfo, true);
|
|
|
-
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addSubClusterHeartbeatDuration(start, end);
|
|
|
return SubClusterHeartbeatResponse.newInstance();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetSubClusterInfoResponse getSubCluster(
|
|
|
GetSubClusterInfoRequest request) throws YarnException {
|
|
|
-
|
|
|
+ long start = clock.getTime();
|
|
|
FederationMembershipStateStoreInputValidator.validate(request);
|
|
|
SubClusterId subClusterId = request.getSubClusterId();
|
|
|
SubClusterInfo subClusterInfo = null;
|
|
@@ -361,12 +386,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot get subcluster: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetSubClusterDuration(start, end);
|
|
|
return GetSubClusterInfoResponse.newInstance(subClusterInfo);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetSubClustersInfoResponse getSubClusters(
|
|
|
GetSubClustersInfoRequest request) throws YarnException {
|
|
|
+ long start = clock.getTime();
|
|
|
List<SubClusterInfo> result = new ArrayList<>();
|
|
|
|
|
|
try {
|
|
@@ -382,6 +410,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot get subclusters: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetSubClustersDuration(start, end);
|
|
|
return GetSubClustersInfoResponse.newInstance(result);
|
|
|
}
|
|
|
|
|
@@ -389,7 +419,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
@Override
|
|
|
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
|
|
|
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
|
|
-
|
|
|
+ long start = clock.getTime();
|
|
|
FederationPolicyStoreInputValidator.validate(request);
|
|
|
String queue = request.getQueue();
|
|
|
SubClusterPolicyConfiguration policy = null;
|
|
@@ -404,6 +434,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
LOG.warn("Policy for queue: {} does not exist.", queue);
|
|
|
return null;
|
|
|
}
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetPolicyConfigurationDuration(start, end);
|
|
|
return GetSubClusterPolicyConfigurationResponse
|
|
|
.newInstance(policy);
|
|
|
}
|
|
@@ -411,7 +443,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
@Override
|
|
|
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
|
|
|
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
|
|
|
-
|
|
|
+ long start = clock.getTime();
|
|
|
FederationPolicyStoreInputValidator.validate(request);
|
|
|
SubClusterPolicyConfiguration policy =
|
|
|
request.getPolicyConfiguration();
|
|
@@ -422,12 +454,15 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot set policy: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addSetPolicyConfigurationDuration(start, end);
|
|
|
return SetSubClusterPolicyConfigurationResponse.newInstance();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
|
|
|
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
|
|
|
+ long start = clock.getTime();
|
|
|
List<SubClusterPolicyConfiguration> result = new ArrayList<>();
|
|
|
|
|
|
try {
|
|
@@ -443,6 +478,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
String errMsg = "Cannot get policies: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
}
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetPoliciesConfigurationsDuration(start, end);
|
|
|
return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
|
|
|
}
|
|
|
|
|
@@ -649,6 +686,11 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
return cal.getTimeInMillis();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public ZKFederationStateStoreOpDurations getOpDurations() {
|
|
|
+ return opDurations;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
|
|
|
AddReservationHomeSubClusterRequest request) throws YarnException {
|
|
@@ -678,4 +720,4 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
UpdateReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
throw new NotImplementedException("Code is not implemented");
|
|
|
}
|
|
|
-}
|
|
|
+}
|