|
@@ -17,15 +17,20 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.federation.store.impl;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Calendar;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
+import java.util.Set;
|
|
|
import java.util.TimeZone;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
@@ -71,6 +76,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
|
@@ -90,6 +99,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
private Map<ApplicationId, SubClusterId> applications;
|
|
|
private Map<ReservationId, SubClusterId> reservations;
|
|
|
private Map<String, SubClusterPolicyConfiguration> policies;
|
|
|
+ private RouterRMDTSecretManagerState routerRMSecretManagerState;
|
|
|
|
|
|
private final MonotonicClock clock = new MonotonicClock();
|
|
|
|
|
@@ -102,6 +112,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
|
|
|
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
|
|
|
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
|
|
|
+ routerRMSecretManagerState = new RouterRMDTSecretManagerState();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -395,4 +406,74 @@ public class MemoryFederationStateStore implements FederationStateStore {
|
|
|
reservations.remove(reservationId);
|
|
|
return DeleteReservationHomeSubClusterResponse.newInstance();
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ // Restore the DelegationKey from the request
|
|
|
+ RouterMasterKey masterKey = request.getRouterMasterKey();
|
|
|
+ DelegationKey delegationKey = getDelegationKeyByMasterKey(masterKey);
|
|
|
+
|
|
|
+ Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
|
|
|
+ if (rmDTMasterKeyState.contains(delegationKey)) {
|
|
|
+ LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId());
|
|
|
+ throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() +
|
|
|
+ " is already stored");
|
|
|
+ }
|
|
|
+
|
|
|
+ routerRMSecretManagerState.getMasterKeyState().add(delegationKey);
|
|
|
+ LOG.info("Store Router-RMDT master key with key id: {}. Currently rmDTMasterKeyState size: {}",
|
|
|
+ delegationKey.getKeyId(), rmDTMasterKeyState.size());
|
|
|
+
|
|
|
+ return RouterMasterKeyResponse.newInstance(masterKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ // Restore the DelegationKey from the request
|
|
|
+ RouterMasterKey masterKey = request.getRouterMasterKey();
|
|
|
+ DelegationKey delegationKey = getDelegationKeyByMasterKey(masterKey);
|
|
|
+
|
|
|
+ LOG.info("Remove Router-RMDT master key with key id: {}.", delegationKey.getKeyId());
|
|
|
+ Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
|
|
|
+ rmDTMasterKeyState.remove(delegationKey);
|
|
|
+
|
|
|
+ return RouterMasterKeyResponse.newInstance(masterKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ // Restore the DelegationKey from the request
|
|
|
+ RouterMasterKey masterKey = request.getRouterMasterKey();
|
|
|
+ DelegationKey delegationKey = getDelegationKeyByMasterKey(masterKey);
|
|
|
+
|
|
|
+ Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
|
|
|
+ if (!rmDTMasterKeyState.contains(delegationKey)) {
|
|
|
+ throw new IOException("GetMasterKey with keyID: " + masterKey.getKeyId() +
|
|
|
+ " does not exist.");
|
|
|
+ }
|
|
|
+ RouterMasterKey resultRouterMasterKey = RouterMasterKey.newInstance(delegationKey.getKeyId(),
|
|
|
+ ByteBuffer.wrap(delegationKey.getEncodedKey()), delegationKey.getExpiryDate());
|
|
|
+ return RouterMasterKeyResponse.newInstance(resultRouterMasterKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get DelegationKey By based on MasterKey.
|
|
|
+ *
|
|
|
+ * @param masterKey masterKey
|
|
|
+ * @return DelegationKey
|
|
|
+ */
|
|
|
+ private static DelegationKey getDelegationKeyByMasterKey(RouterMasterKey masterKey) {
|
|
|
+ ByteBuffer keyByteBuf = masterKey.getKeyBytes();
|
|
|
+ byte[] keyBytes = new byte[keyByteBuf.remaining()];
|
|
|
+ keyByteBuf.get(keyBytes);
|
|
|
+ return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public RouterRMDTSecretManagerState getRouterRMSecretManagerState() {
|
|
|
+ return routerRMSecretManagerState;
|
|
|
+ }
|
|
|
}
|