|
@@ -24,6 +24,9 @@ import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
|
|
|
import javax.crypto.SecretKey;
|
|
|
|
|
@@ -86,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
protected static final String VERSION_NODE = "RMVersionNode";
|
|
|
protected static final String EPOCH_NODE = "EpochNode";
|
|
|
private ResourceManager resourceManager;
|
|
|
+ private final ReadLock readLock;
|
|
|
+ private final WriteLock writeLock;
|
|
|
|
|
|
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
|
|
|
|
@@ -113,6 +118,24 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
|
|
|
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
+ RMStateStoreEventType.STORE_MASTERKEY,
|
|
|
+ new StoreRMDTMasterKeyTransition())
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
+ RMStateStoreEventType.REMOVE_MASTERKEY,
|
|
|
+ new RemoveRMDTMasterKeyTransition())
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
+ RMStateStoreEventType.STORE_DELEGATION_TOKEN,
|
|
|
+ new StoreRMDTTransition())
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
+ RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
|
|
|
+ new RemoveRMDTTransition())
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
+ RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
|
|
+ new UpdateRMDTTransition())
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
|
|
+ RMStateStoreEventType.UPDATE_AMRM_TOKEN,
|
|
|
+ new StoreOrUpdateAMRMTokenTransition())
|
|
|
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
|
|
|
RMStateStoreEventType.FENCED)
|
|
|
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
|
|
@@ -122,7 +145,13 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
RMStateStoreEventType.REMOVE_APP,
|
|
|
RMStateStoreEventType.STORE_APP_ATTEMPT,
|
|
|
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
|
|
|
- RMStateStoreEventType.FENCED));
|
|
|
+ RMStateStoreEventType.FENCED,
|
|
|
+ RMStateStoreEventType.STORE_MASTERKEY,
|
|
|
+ RMStateStoreEventType.REMOVE_MASTERKEY,
|
|
|
+ RMStateStoreEventType.STORE_DELEGATION_TOKEN,
|
|
|
+ RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
|
|
|
+ RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
|
|
+ RMStateStoreEventType.UPDATE_AMRM_TOKEN));
|
|
|
|
|
|
private final StateMachine<RMStateStoreState,
|
|
|
RMStateStoreEventType,
|
|
@@ -255,8 +284,143 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ private static class StoreRMDTTransition implements
|
|
|
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
|
|
+ @Override
|
|
|
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreRMDTEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
|
|
+ try {
|
|
|
+ LOG.info("Storing RMDelegationToken and SequenceNumber");
|
|
|
+ store.storeRMDelegationTokenAndSequenceNumberState(
|
|
|
+ dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
|
|
|
+ dtEvent.getLatestSequenceNumber());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
|
|
|
+ e);
|
|
|
+ store.notifyStoreOperationFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class RemoveRMDTTransition implements
|
|
|
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
|
|
+ @Override
|
|
|
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreRMDTEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
|
|
+ try {
|
|
|
+ LOG.info("Removing RMDelegationToken and SequenceNumber");
|
|
|
+ store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
|
|
|
+ e);
|
|
|
+ store.notifyStoreOperationFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class UpdateRMDTTransition implements
|
|
|
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
|
|
+ @Override
|
|
|
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreRMDTEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
|
|
+ try {
|
|
|
+ LOG.info("Updating RMDelegationToken and SequenceNumber");
|
|
|
+ store.updateRMDelegationTokenAndSequenceNumberInternal(
|
|
|
+ dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
|
|
|
+ dtEvent.getLatestSequenceNumber());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
|
|
|
+ e);
|
|
|
+ store.notifyStoreOperationFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class StoreRMDTMasterKeyTransition implements
|
|
|
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
|
|
+ @Override
|
|
|
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ RMStateStoreRMDTMasterKeyEvent dtEvent =
|
|
|
+ (RMStateStoreRMDTMasterKeyEvent) event;
|
|
|
+ try {
|
|
|
+ LOG.info("Storing RMDTMasterKey.");
|
|
|
+ store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error While Storing RMDTMasterKey.", e);
|
|
|
+ store.notifyStoreOperationFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class RemoveRMDTMasterKeyTransition implements
|
|
|
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
|
|
+ @Override
|
|
|
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ RMStateStoreRMDTMasterKeyEvent dtEvent =
|
|
|
+ (RMStateStoreRMDTMasterKeyEvent) event;
|
|
|
+ try {
|
|
|
+ LOG.info("Removing RMDTMasterKey.");
|
|
|
+ store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error While Removing RMDTMasterKey.", e);
|
|
|
+ store.notifyStoreOperationFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class StoreOrUpdateAMRMTokenTransition implements
|
|
|
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
|
|
+ @Override
|
|
|
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
|
|
|
+
|
|
|
+ try {
|
|
|
+ LOG.info("Updating AMRMToken");
|
|
|
+ store.storeOrUpdateAMRMTokenSecretManagerState(
|
|
|
+ amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error storing info for AMRMTokenSecretManager", e);
|
|
|
+ store.notifyStoreOperationFailed(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public RMStateStore() {
|
|
|
super(RMStateStore.class.getName());
|
|
|
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
+ this.readLock = lock.readLock();
|
|
|
+ this.writeLock = lock.writeLock();
|
|
|
stateMachine = stateMachineFactory.make(this);
|
|
|
}
|
|
|
|
|
@@ -445,9 +609,8 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
|
|
|
}
|
|
|
|
|
|
- public synchronized void updateFencedState() {
|
|
|
- this.stateMachine.doTransition(RMStateStoreEventType.FENCED,
|
|
|
- new RMStateStoreEvent(RMStateStoreEventType.FENCED));
|
|
|
+ public void updateFencedState() {
|
|
|
+ handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -509,19 +672,11 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
* RMDTSecretManager call this to store the state of a delegation token
|
|
|
* and sequence number
|
|
|
*/
|
|
|
- public synchronized void storeRMDelegationTokenAndSequenceNumber(
|
|
|
+ public void storeRMDelegationTokenAndSequenceNumber(
|
|
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
|
|
int latestSequenceNumber) {
|
|
|
- if(isFencedState()) {
|
|
|
- LOG.info("State store is in Fenced state. Can't store RM Delegation Token.");
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
- storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
|
|
|
- latestSequenceNumber);
|
|
|
- } catch (Exception e) {
|
|
|
- notifyStoreOperationFailed(e);
|
|
|
- }
|
|
|
+ handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
|
|
|
+ latestSequenceNumber, RMStateStoreEventType.STORE_DELEGATION_TOKEN));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -536,17 +691,10 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
/**
|
|
|
* RMDTSecretManager call this to remove the state of a delegation token
|
|
|
*/
|
|
|
- public synchronized void removeRMDelegationToken(
|
|
|
+ public void removeRMDelegationToken(
|
|
|
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
|
|
|
- if(isFencedState()) {
|
|
|
- LOG.info("State store is in Fenced state. Can't remove RM Delegation Token.");
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
- removeRMDelegationTokenState(rmDTIdentifier);
|
|
|
- } catch (Exception e) {
|
|
|
- notifyStoreOperationFailed(e);
|
|
|
- }
|
|
|
+ handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
|
|
|
+ sequenceNumber, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -560,19 +708,11 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
* RMDTSecretManager call this to update the state of a delegation token
|
|
|
* and sequence number
|
|
|
*/
|
|
|
- public synchronized void updateRMDelegationTokenAndSequenceNumber(
|
|
|
+ public void updateRMDelegationTokenAndSequenceNumber(
|
|
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
|
|
int latestSequenceNumber) {
|
|
|
- if(isFencedState()) {
|
|
|
- LOG.info("State store is in Fenced state. Can't update RM Delegation Token.");
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
- updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
|
|
|
- latestSequenceNumber);
|
|
|
- } catch (Exception e) {
|
|
|
- notifyStoreOperationFailed(e);
|
|
|
- }
|
|
|
+ handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
|
|
|
+ latestSequenceNumber, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -587,17 +727,9 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
/**
|
|
|
* RMDTSecretManager call this to store the state of a master key
|
|
|
*/
|
|
|
- public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
|
|
|
- if(isFencedState()) {
|
|
|
- LOG.info("State store is in Fenced state. Can't store RM Delegation " +
|
|
|
- "Token Master key.");
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
- storeRMDTMasterKeyState(delegationKey);
|
|
|
- } catch (Exception e) {
|
|
|
- notifyStoreOperationFailed(e);
|
|
|
- }
|
|
|
+ public void storeRMDTMasterKey(DelegationKey delegationKey) {
|
|
|
+ handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
|
|
|
+ RMStateStoreEventType.STORE_MASTERKEY));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -611,17 +743,9 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
/**
|
|
|
* RMDTSecretManager call this to remove the state of a master key
|
|
|
*/
|
|
|
- public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
|
|
|
- if(isFencedState()) {
|
|
|
- LOG.info("State store is in Fenced state. Can't remove RM Delegation " +
|
|
|
- "Token Master key.");
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
- removeRMDTMasterKeyState(delegationKey);
|
|
|
- } catch (Exception e) {
|
|
|
- notifyStoreOperationFailed(e);
|
|
|
- }
|
|
|
+ public void removeRMDTMasterKey(DelegationKey delegationKey) {
|
|
|
+ handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
|
|
|
+ RMStateStoreEventType.REMOVE_MASTERKEY));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -636,9 +760,19 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
* Blocking API Derived classes must implement this method to store or update
|
|
|
* the state of AMRMToken Master Key
|
|
|
*/
|
|
|
- public abstract void storeOrUpdateAMRMTokenSecretManagerState(
|
|
|
- AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
|
|
- boolean isUpdate);
|
|
|
+ protected abstract void storeOrUpdateAMRMTokenSecretManagerState(
|
|
|
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
|
|
|
+ throws Exception;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Store or Update state of AMRMToken Master Key
|
|
|
+ */
|
|
|
+ public void storeOrUpdateAMRMTokenSecretManager(
|
|
|
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) {
|
|
|
+ handleStoreEvent(new RMStateStoreAMRMTokenEvent(
|
|
|
+ amrmTokenSecretManagerState, isUpdate,
|
|
|
+ RMStateStoreEventType.UPDATE_AMRM_TOKEN));
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Non-blocking API
|
|
@@ -689,16 +823,32 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- synchronized boolean isFencedState() {
|
|
|
- return (RMStateStoreState.FENCED == this.stateMachine.getCurrentState());
|
|
|
+ protected boolean isFencedState() {
|
|
|
+ return (RMStateStoreState.FENCED == getRMStateStoreState());
|
|
|
}
|
|
|
|
|
|
// Dispatcher related code
|
|
|
protected void handleStoreEvent(RMStateStoreEvent event) {
|
|
|
+ this.writeLock.lock();
|
|
|
try {
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Processing event of type " + event.getType());
|
|
|
+ }
|
|
|
+
|
|
|
+ final RMStateStoreState oldState = getRMStateStoreState();
|
|
|
+
|
|
|
this.stateMachine.doTransition(event.getType(), event);
|
|
|
+
|
|
|
+ if (oldState != getRMStateStoreState()) {
|
|
|
+ LOG.info("RMStateStore state change from " + oldState + " to "
|
|
|
+ + getRMStateStoreState());
|
|
|
+ }
|
|
|
+
|
|
|
} catch (InvalidStateTransitonException e) {
|
|
|
LOG.error("Can't handle this event at current state", e);
|
|
|
+ } finally {
|
|
|
+ this.writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -772,4 +922,13 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
resourceManager.handleTransitionToStandBy();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public RMStateStoreState getRMStateStoreState() {
|
|
|
+ this.readLock.lock();
|
|
|
+ try {
|
|
|
+ return this.stateMachine.getCurrentState();
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|