|
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
|
@@ -59,7 +60,15 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegister
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
@@ -75,6 +84,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
private Map<SubClusterId, SubClusterInfo> membership;
|
|
private Map<SubClusterId, SubClusterInfo> membership;
|
|
private Map<ApplicationId, SubClusterId> applications;
|
|
private Map<ApplicationId, SubClusterId> applications;
|
|
|
|
+ private Map<ReservationId, SubClusterId> reservations;
|
|
private Map<String, SubClusterPolicyConfiguration> policies;
|
|
private Map<String, SubClusterPolicyConfiguration> policies;
|
|
|
|
|
|
private final MonotonicClock clock = new MonotonicClock();
|
|
private final MonotonicClock clock = new MonotonicClock();
|
|
@@ -86,6 +96,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
public void init(Configuration conf) {
|
|
public void init(Configuration conf) {
|
|
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
|
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
|
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
|
|
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
|
|
|
|
+ reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
|
|
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
|
|
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -93,6 +104,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
public void close() {
|
|
public void close() {
|
|
membership = null;
|
|
membership = null;
|
|
applications = null;
|
|
applications = null;
|
|
|
|
+ reservations = null;
|
|
policies = null;
|
|
policies = null;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -312,4 +324,45 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
|
|
|
|
+ AddReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
|
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
|
+ ReservationHomeSubCluster homeSubCluster = request.getReservationHomeSubCluster();
|
|
|
|
+ ReservationId reservationId = homeSubCluster.getReservationId();
|
|
|
|
+ if (!reservations.containsKey(reservationId)) {
|
|
|
|
+ reservations.put(reservationId, homeSubCluster.getHomeSubCluster());
|
|
|
|
+ }
|
|
|
|
+ return AddReservationHomeSubClusterResponse.newInstance(reservations.get(reservationId));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
|
|
|
|
+ GetReservationHomeSubClusterRequest request) throws YarnException {
|
|
|
|
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
|
|
|
|
+ ReservationId reservationId = request.getReservationId();
|
|
|
|
+ if (!reservations.containsKey(reservationId)) {
|
|
|
|
+ throw new YarnException("Reservation " + reservationId + " does not exist");
|
|
|
|
+ }
|
|
|
|
+ SubClusterId subClusterId = reservations.get(reservationId);
|
|
|
|
+ ReservationHomeSubCluster homeSubCluster =
|
|
|
|
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
|
|
|
|
+ return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
|
|
|
|
+ GetReservationsHomeSubClusterRequest request) throws YarnException {
|
|
|
|
+ List<ReservationHomeSubCluster> result = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ for (Entry<ReservationId, SubClusterId> entry : reservations.entrySet()) {
|
|
|
|
+ ReservationId reservationId = entry.getKey();
|
|
|
|
+ SubClusterId subClusterId = entry.getValue();
|
|
|
|
+ ReservationHomeSubCluster homeSubCluster =
|
|
|
|
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
|
|
|
|
+ result.add(homeSubCluster);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return GetReservationsHomeSubClusterResponse.newInstance(result);
|
|
|
|
+ }
|
|
}
|
|
}
|