|
@@ -44,16 +44,21 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
@@ -81,7 +86,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
|
|
|
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
|
|
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
|
|
- .newInstance(1, 0);
|
|
|
+ .newInstance(1, 1);
|
|
|
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
|
|
|
"RMDelegationTokensRoot";
|
|
|
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
|
|
@@ -102,6 +107,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
*
|
|
|
* ROOT_DIR_PATH
|
|
|
* |--- VERSION_INFO
|
|
|
+ * |--- EPOCH_NODE
|
|
|
* |--- RM_ZK_FENCING_LOCK
|
|
|
* |--- RM_APP_ROOT
|
|
|
* | |----- (#ApplicationId1)
|
|
@@ -391,6 +397,28 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public synchronized int getAndIncrementEpoch() throws Exception {
|
|
|
+ String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
|
|
+ int currentEpoch = 0;
|
|
|
+ if (existsWithRetries(epochNodePath, true) != null) {
|
|
|
+ // load current epoch
|
|
|
+ byte[] data = getDataWithRetries(epochNodePath, true);
|
|
|
+ Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
|
|
+ currentEpoch = epoch.getEpoch();
|
|
|
+ // increment epoch and store it
|
|
|
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
|
|
+ .toByteArray();
|
|
|
+ setDataWithRetries(epochNodePath, storeData, -1);
|
|
|
+ } else {
|
|
|
+ // initialize epoch node with 1 for the next time.
|
|
|
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
|
|
+ .toByteArray();
|
|
|
+ createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ }
|
|
|
+ return currentEpoch;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public synchronized RMState loadState() throws Exception {
|
|
|
RMState rmState = new RMState();
|