|
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
|
@@ -60,8 +61,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationH
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
* In-memory implementation of {@link FederationStateStore}.
|
|
@@ -74,6 +78,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
private final MonotonicClock clock = new MonotonicClock();
|
|
|
|
|
|
+ public static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(MemoryFederationStateStore.class);
|
|
|
+
|
|
|
@Override
|
|
|
public void init(Configuration conf) {
|
|
|
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
|
@@ -94,7 +101,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
FederationMembershipStateStoreInputValidator
|
|
|
.validateSubClusterRegisterRequest(request);
|
|
|
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
|
|
|
- membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
|
|
|
+
|
|
|
+ SubClusterInfo subClusterInfoToSave =
|
|
|
+ SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
|
|
|
+ subClusterInfo.getAMRMServiceAddress(),
|
|
|
+ subClusterInfo.getClientRMServiceAddress(),
|
|
|
+ subClusterInfo.getRMAdminServiceAddress(),
|
|
|
+ subClusterInfo.getRMWebServiceAddress(), clock.getTime(),
|
|
|
+ subClusterInfo.getState(), subClusterInfo.getLastStartTime(),
|
|
|
+ subClusterInfo.getCapability());
|
|
|
+
|
|
|
+ membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
|
|
|
return SubClusterRegisterResponse.newInstance();
|
|
|
}
|
|
|
|
|
@@ -105,8 +122,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
.validateSubClusterDeregisterRequest(request);
|
|
|
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
|
|
|
if (subClusterInfo == null) {
|
|
|
- throw new YarnException(
|
|
|
- "SubCluster " + request.getSubClusterId().toString() + " not found");
|
|
|
+ String errMsg =
|
|
|
+ "SubCluster " + request.getSubClusterId().toString() + " not found";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
|
|
|
+ errMsg);
|
|
|
} else {
|
|
|
subClusterInfo.setState(request.getState());
|
|
|
}
|
|
@@ -124,8 +144,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
SubClusterInfo subClusterInfo = membership.get(subClusterId);
|
|
|
|
|
|
if (subClusterInfo == null) {
|
|
|
- throw new YarnException("Subcluster " + subClusterId.toString()
|
|
|
- + " does not exist; cannot heartbeat");
|
|
|
+ String errMsg = "Subcluster " + subClusterId.toString()
|
|
|
+ + " does not exist; cannot heartbeat";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
|
|
|
+ errMsg);
|
|
|
}
|
|
|
|
|
|
subClusterInfo.setLastHeartBeat(clock.getTime());
|
|
@@ -143,8 +166,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
.validateGetSubClusterInfoRequest(request);
|
|
|
SubClusterId subClusterId = request.getSubClusterId();
|
|
|
if (!membership.containsKey(subClusterId)) {
|
|
|
- throw new YarnException(
|
|
|
- "Subcluster " + subClusterId.toString() + " does not exist");
|
|
|
+ String errMsg =
|
|
|
+ "Subcluster " + subClusterId.toString() + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg);
|
|
|
}
|
|
|
|
|
|
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
|
|
@@ -193,7 +218,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
ApplicationId appId =
|
|
|
request.getApplicationHomeSubCluster().getApplicationId();
|
|
|
if (!applications.containsKey(appId)) {
|
|
|
- throw new YarnException("Application " + appId + " does not exist");
|
|
|
+ String errMsg = "Application " + appId + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg);
|
|
|
}
|
|
|
|
|
|
applications.put(appId,
|
|
@@ -209,7 +236,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
.validateGetApplicationHomeSubClusterRequest(request);
|
|
|
ApplicationId appId = request.getApplicationId();
|
|
|
if (!applications.containsKey(appId)) {
|
|
|
- throw new YarnException("Application " + appId + " does not exist");
|
|
|
+ String errMsg = "Application " + appId + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
|
|
|
+ errMsg);
|
|
|
}
|
|
|
|
|
|
return GetApplicationHomeSubClusterResponse.newInstance(
|
|
@@ -238,7 +268,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
.validateDeleteApplicationHomeSubClusterRequest(request);
|
|
|
ApplicationId appId = request.getApplicationId();
|
|
|
if (!applications.containsKey(appId)) {
|
|
|
- throw new YarnException("Application " + appId + " does not exist");
|
|
|
+ String errMsg = "Application " + appId + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg);
|
|
|
}
|
|
|
|
|
|
applications.remove(appId);
|
|
@@ -253,7 +285,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
.validateGetSubClusterPolicyConfigurationRequest(request);
|
|
|
String queue = request.getQueue();
|
|
|
if (!policies.containsKey(queue)) {
|
|
|
- throw new YarnException("Policy for queue " + queue + " does not exist");
|
|
|
+ String errMsg = "Policy for queue " + queue + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg);
|
|
|
}
|
|
|
|
|
|
return GetSubClusterPolicyConfigurationResponse
|