|
@@ -18,6 +18,15 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
|
|
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.security.KeyFactory;
|
|
|
+import java.security.NoSuchAlgorithmException;
|
|
|
+import java.security.PrivateKey;
|
|
|
+import java.security.cert.CertificateException;
|
|
|
+import java.security.cert.CertificateFactory;
|
|
|
+import java.security.cert.X509Certificate;
|
|
|
+import java.security.spec.InvalidKeySpecException;
|
|
|
+import java.security.spec.PKCS8EncodedKeySpec;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -101,6 +110,9 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
"AMRMTokenSecretManagerRoot";
|
|
|
protected static final String RESERVATION_SYSTEM_ROOT =
|
|
|
"ReservationSystemRoot";
|
|
|
+ protected static final String PROXY_CA_ROOT = "ProxyCARoot";
|
|
|
+ protected static final String PROXY_CA_CERT_NODE = "caCert";
|
|
|
+ protected static final String PROXY_CA_PRIVATE_KEY_NODE = "caPrivateKey";
|
|
|
protected static final String VERSION_NODE = "RMVersionNode";
|
|
|
protected static final String EPOCH_NODE = "EpochNode";
|
|
|
protected long baseEpoch;
|
|
@@ -183,6 +195,10 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
new RemoveReservationAllocationTransition())
|
|
|
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
|
|
|
RMStateStoreEventType.FENCED)
|
|
|
+ .addTransition(RMStateStoreState.ACTIVE,
|
|
|
+ EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
|
|
+ RMStateStoreEventType.STORE_PROXY_CA_CERT,
|
|
|
+ new StoreProxyCACertTransition())
|
|
|
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
|
|
|
EnumSet.of(
|
|
|
RMStateStoreEventType.STORE_APP,
|
|
@@ -198,7 +214,8 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
|
|
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
|
|
|
RMStateStoreEventType.STORE_RESERVATION,
|
|
|
- RMStateStoreEventType.REMOVE_RESERVATION));
|
|
|
+ RMStateStoreEventType.REMOVE_RESERVATION,
|
|
|
+ RMStateStoreEventType.STORE_PROXY_CA_CERT));
|
|
|
|
|
|
private final StateMachine<RMStateStoreState,
|
|
|
RMStateStoreEventType,
|
|
@@ -615,6 +632,31 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class StoreProxyCACertTransition implements
|
|
|
+ MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
|
|
+ RMStateStoreState> {
|
|
|
+ @Override
|
|
|
+ public RMStateStoreState transition(RMStateStore store,
|
|
|
+ RMStateStoreEvent event) {
|
|
|
+ if (!(event instanceof RMStateStoreProxyCAEvent)) {
|
|
|
+ // should never happen
|
|
|
+ LOG.error("Illegal event type: " + event.getClass());
|
|
|
+ return RMStateStoreState.ACTIVE;
|
|
|
+ }
|
|
|
+ boolean isFenced = false;
|
|
|
+ RMStateStoreProxyCAEvent caEvent = (RMStateStoreProxyCAEvent) event;
|
|
|
+ try {
|
|
|
+ LOG.info("Storing CA Certificate and Private Key");
|
|
|
+ store.storeProxyCACertState(
|
|
|
+ caEvent.getCaCert(), caEvent.getCaPrivateKey());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error While Storing CA Certificate and Private Key", e);
|
|
|
+ isFenced = store.notifyStoreOperationFailedInternal(e);
|
|
|
+ }
|
|
|
+ return finalState(isFenced);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static RMStateStoreState finalState(boolean isFenced) {
|
|
|
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
|
|
|
}
|
|
@@ -676,6 +718,39 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public static class ProxyCAState {
|
|
|
+ private X509Certificate caCert;
|
|
|
+ private PrivateKey caPrivateKey;
|
|
|
+
|
|
|
+ public X509Certificate getCaCert() {
|
|
|
+ return caCert;
|
|
|
+ }
|
|
|
+
|
|
|
+ public PrivateKey getCaPrivateKey() {
|
|
|
+ return caPrivateKey;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setCaCert(X509Certificate caCert) {
|
|
|
+ this.caCert = caCert;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setCaPrivateKey(PrivateKey caPrivateKey) {
|
|
|
+ this.caPrivateKey = caPrivateKey;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setCaCert(byte[] caCertData) throws CertificateException {
|
|
|
+ ByteArrayInputStream bais = new ByteArrayInputStream(caCertData);
|
|
|
+ caCert = (X509Certificate)
|
|
|
+ CertificateFactory.getInstance("X.509").generateCertificate(bais);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setCaPrivateKey(byte[] caPrivateKeyData)
|
|
|
+ throws NoSuchAlgorithmException, InvalidKeySpecException {
|
|
|
+ caPrivateKey = KeyFactory.getInstance("RSA").generatePrivate(
|
|
|
+ new PKCS8EncodedKeySpec(caPrivateKeyData));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* State of the ResourceManager
|
|
|
*/
|
|
@@ -690,6 +765,8 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
private Map<String, Map<ReservationId, ReservationAllocationStateProto>>
|
|
|
reservationState = new TreeMap<>();
|
|
|
|
|
|
+ ProxyCAState proxyCAState = new ProxyCAState();
|
|
|
+
|
|
|
public Map<ApplicationId, ApplicationStateData> getApplicationState() {
|
|
|
return appState;
|
|
|
}
|
|
@@ -706,6 +783,10 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
getReservationState() {
|
|
|
return reservationState;
|
|
|
}
|
|
|
+
|
|
|
+ public ProxyCAState getProxyCAState() {
|
|
|
+ return proxyCAState;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private Dispatcher rmDispatcher;
|
|
@@ -1273,4 +1354,21 @@ public abstract class RMStateStore extends AbstractService {
|
|
|
protected EventHandler getRMStateStoreEventHandler() {
|
|
|
return dispatcher.getEventHandler();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * ProxyCAManager calls this to store the CA Certificate and Private Key.
|
|
|
+ */
|
|
|
+ public void storeProxyCACert(X509Certificate caCert,
|
|
|
+ PrivateKey caPrivateKey) {
|
|
|
+ handleStoreEvent(new RMStateStoreProxyCAEvent(caCert, caPrivateKey,
|
|
|
+ RMStateStoreEventType.STORE_PROXY_CA_CERT));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Blocking API
|
|
|
+ * Derived classes must implement this method to store the CA Certificate
|
|
|
+ * and Private Key
|
|
|
+ */
|
|
|
+ protected abstract void storeProxyCACertState(
|
|
|
+ X509Certificate caCert, PrivateKey caPrivateKey) throws Exception;
|
|
|
}
|