|
@@ -62,11 +62,13 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
|
|
private static final String LOGS_PATH = "LOGS";
|
|
|
private static final String CONF_STORE_PATH = "CONF_STORE";
|
|
|
private static final String FENCING_PATH = "FENCING";
|
|
|
+ private static final String CONF_VERSION_PATH = "CONF_VERSION";
|
|
|
|
|
|
private String zkVersionPath;
|
|
|
private String logsPath;
|
|
|
private String confStorePath;
|
|
|
private String fencingNodePath;
|
|
|
+ private String confVersionPath;
|
|
|
|
|
|
@VisibleForTesting
|
|
|
protected ZKCuratorManager zkManager;
|
|
@@ -89,6 +91,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
|
|
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
|
|
|
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
|
|
|
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
|
|
|
+ this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
|
|
|
|
|
|
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
|
|
|
zkManager.delete(fencingNodePath);
|
|
@@ -99,6 +102,12 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
|
|
serializeObject(new LinkedList<LogMutation>()), -1);
|
|
|
}
|
|
|
|
|
|
+ if (!zkManager.exists(confVersionPath)) {
|
|
|
+ zkManager.create(confVersionPath);
|
|
|
+ zkManager.setData(confVersionPath,
|
|
|
+ String.valueOf(System.currentTimeMillis()), -1);
|
|
|
+ }
|
|
|
+
|
|
|
if (!zkManager.exists(confStorePath)) {
|
|
|
zkManager.create(confStorePath);
|
|
|
HashMap<String, String> mapSchedConf = new HashMap<>();
|
|
@@ -106,6 +115,8 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
|
|
mapSchedConf.put(entry.getKey(), entry.getValue());
|
|
|
}
|
|
|
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
|
|
|
+ zkManager.setData(confVersionPath,
|
|
|
+ String.valueOf(System.currentTimeMillis()), -1);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -185,6 +196,9 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
|
|
}
|
|
|
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
|
|
|
zkAcl, fencingNodePath);
|
|
|
+ zkManager.setData(confVersionPath,
|
|
|
+ String.valueOf(System.currentTimeMillis()), -1);
|
|
|
+
|
|
|
}
|
|
|
pendingMutation = null;
|
|
|
}
|
|
@@ -213,6 +227,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public long getConfigVersion() throws Exception {
|
|
|
+ return Long.parseLong(zkManager.getStringData(confVersionPath));
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
|
|
return null; // unimplemented
|