|
@@ -26,7 +26,6 @@ import java.util.List;
|
|
|
import java.util.TimeZone;
|
|
|
|
|
|
import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
-import org.apache.commons.lang3.NotImplementedException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.util.curator.ZKCuratorManager;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
@@ -65,6 +64,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
|
|
@@ -84,7 +84,9 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicatio
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
+import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
@@ -105,8 +107,11 @@ import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
|
|
|
* | |----- APP1
|
|
|
* | |----- APP2
|
|
|
* |--- POLICY
|
|
|
- * |----- QUEUE1
|
|
|
- * |----- QUEUE1
|
|
|
+ * | |----- QUEUE1
|
|
|
+ * | |----- QUEUE1
|
|
|
+ * |--- RESERVATION
|
|
|
+ * | |----- RESERVATION1
|
|
|
+ * | |----- RESERVATION2
|
|
|
*/
|
|
|
public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
|
|
@@ -116,6 +121,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
|
|
|
private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
|
|
|
private final static String ROOT_ZNODE_NAME_POLICY = "policies";
|
|
|
+ private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";
|
|
|
|
|
|
/** Interface to Zookeeper. */
|
|
|
private ZKCuratorManager zkManager;
|
|
@@ -126,6 +132,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
private String appsZNode;
|
|
|
private String membershipZNode;
|
|
|
private String policiesZNode;
|
|
|
+ private String reservationsZNode;
|
|
|
|
|
|
private volatile Clock clock = SystemClock.getInstance();
|
|
|
|
|
@@ -151,6 +158,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
|
|
|
appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
|
|
|
policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
|
|
|
+ reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
|
|
|
|
|
|
// Create base znode for each entity
|
|
|
try {
|
|
@@ -158,6 +166,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
|
|
|
zkManager.createRootDirRecursively(appsZNode, zkAcl);
|
|
|
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
|
|
|
+ zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
|
|
|
} catch (Exception e) {
|
|
|
String errMsg = "Cannot create base directories: " + e.getMessage();
|
|
|
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
@@ -686,6 +695,30 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
return cal.getTimeInMillis();
|
|
|
}
|
|
|
|
|
|
+ private void putReservation(final ReservationId reservationId,
|
|
|
+ final SubClusterId subClusterId, boolean update) throws YarnException {
|
|
|
+ String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
|
|
|
+ SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
|
|
|
+ byte[] data = proto.toByteArray();
|
|
|
+ put(reservationZNode, data, update);
|
|
|
+ }
|
|
|
+
|
|
|
+ private SubClusterId getReservation(final ReservationId reservationId)
|
|
|
+ throws YarnException {
|
|
|
+ String reservationIdZNode = getNodePath(reservationsZNode, reservationId.toString());
|
|
|
+ SubClusterId subClusterId = null;
|
|
|
+ byte[] data = get(reservationIdZNode);
|
|
|
+ if (data != null) {
|
|
|
+ try {
|
|
|
+ subClusterId = new SubClusterIdPBImpl(SubClusterIdProto.parseFrom(data));
|
|
|
+ } catch (InvalidProtocolBufferException e) {
|
|
|
+ String errMsg = "Cannot parse reservation at " + reservationId;
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return subClusterId;
|
|
|
+ }
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
public ZKFederationStateStoreOpDurations getOpDurations() {
|
|
|
return opDurations;
|
|
@@ -694,30 +727,128 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
|
|
|
@Override
|
|
|
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
|
|
|
AddReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ long start = clock.getTime();
|
|
|
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
+ ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
|
|
|
+ ReservationId reservationId = reservationHomeSubCluster.getReservationId();
|
|
|
+
|
|
|
+ // Try to write the subcluster
|
|
|
+ SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
|
|
|
+ try {
|
|
|
+ putReservation(reservationId, homeSubCluster, false);
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errMsg = "Cannot add reservation home subcluster for " + reservationId;
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check for the actual subcluster
|
|
|
+ try {
|
|
|
+ homeSubCluster = getReservation(reservationId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errMsg = "Cannot check app home subcluster for " + reservationId;
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addReservationHomeSubClusterDuration(start, end);
|
|
|
+ return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
|
|
|
GetReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ long start = clock.getTime();
|
|
|
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
+ ReservationId reservationId = request.getReservationId();
|
|
|
+ SubClusterId homeSubCluster = getReservation(reservationId);
|
|
|
+
|
|
|
+ if (homeSubCluster == null) {
|
|
|
+ String errMsg = "Reservation " + reservationId + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ ReservationHomeSubCluster reservationHomeSubCluster =
|
|
|
+ ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetReservationHomeSubClusterDuration(start, end);
|
|
|
+ return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
|
|
|
GetReservationsHomeSubClusterRequest request) throws YarnException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+ long start = clock.getTime();
|
|
|
+ List<ReservationHomeSubCluster> result = new ArrayList<>();
|
|
|
+
|
|
|
+ try {
|
|
|
+ for (String child : zkManager.getChildren(reservationsZNode)) {
|
|
|
+ ReservationId reservationId = ReservationId.parseReservationId(child);
|
|
|
+ SubClusterId homeSubCluster = getReservation(reservationId);
|
|
|
+ ReservationHomeSubCluster app =
|
|
|
+ ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
|
|
|
+ result.add(app);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errMsg = "Cannot get apps: " + e.getMessage();
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addGetReservationsHomeSubClusterDuration(start, end);
|
|
|
+ return GetReservationsHomeSubClusterResponse.newInstance(result);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
|
|
|
DeleteReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+ long start = clock.getTime();
|
|
|
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
+ ReservationId reservationId = request.getReservationId();
|
|
|
+ String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
|
|
|
+
|
|
|
+ boolean exists = false;
|
|
|
+ try {
|
|
|
+ exists = zkManager.exists(reservationZNode);
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errMsg = "Cannot check reservation: " + e.getMessage();
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!exists) {
|
|
|
+ String errMsg = "Reservation " + reservationId + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ zkManager.delete(reservationZNode);
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errMsg = "Cannot delete reservation: " + e.getMessage();
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
|
|
|
+ return DeleteReservationHomeSubClusterResponse.newInstance();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
|
|
|
UpdateReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ long start = clock.getTime();
|
|
|
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
+ ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
|
|
|
+ ReservationId reservationId = reservationHomeSubCluster.getReservationId();
|
|
|
+ SubClusterId homeSubCluster = getReservation(reservationId);
|
|
|
+
|
|
|
+ if (homeSubCluster == null) {
|
|
|
+ String errMsg = "Reservation " + reservationId + " does not exist";
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
|
|
|
+ putReservation(reservationId, newSubClusterId, true);
|
|
|
+ long end = clock.getTime();
|
|
|
+ opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
|
|
|
+ return UpdateReservationHomeSubClusterResponse.newInstance();
|
|
|
}
|
|
|
}
|