|
@@ -68,8 +68,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
private static final String DB_NAME = "yarn-conf-store";
|
|
|
private static final String LOG_KEY = "log";
|
|
|
private static final String VERSION_KEY = "version";
|
|
|
+ private static final String CONF_VERSION_NAME = "conf-version-store";
|
|
|
+ private static final String CONF_VERSION_KEY = "conf-version";
|
|
|
|
|
|
private DB db;
|
|
|
+ private DB versiondb;
|
|
|
private long maxLogs;
|
|
|
private Configuration conf;
|
|
|
private LogMutation pendingMutation;
|
|
@@ -102,11 +105,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
public void format() throws Exception {
|
|
|
close();
|
|
|
FileSystem fs = FileSystem.getLocal(conf);
|
|
|
- fs.delete(getStorageDir(), true);
|
|
|
+ fs.delete(getStorageDir(DB_NAME), true);
|
|
|
}
|
|
|
|
|
|
private void initDatabase(Configuration config) throws Exception {
|
|
|
- Path storeRoot = createStorageDir();
|
|
|
+ Path storeRoot = createStorageDir(DB_NAME);
|
|
|
Options options = new Options();
|
|
|
options.createIfMissing(false);
|
|
|
options.comparator(new DBComparator() {
|
|
@@ -142,6 +145,29 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
+ Path confVersion = createStorageDir(CONF_VERSION_NAME);
|
|
|
+ Options confOptions = new Options();
|
|
|
+ confOptions.createIfMissing(false);
|
|
|
+ LOG.info("Using conf version at " + confVersion);
|
|
|
+ File confVersionFile = new File(confVersion.toString());
|
|
|
+ try {
|
|
|
+ versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
|
|
|
+ } catch (NativeDB.DBException e) {
|
|
|
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
|
|
+ LOG.info("Creating conf version at " + confVersionFile);
|
|
|
+ confOptions.createIfMissing(true);
|
|
|
+ try {
|
|
|
+ versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
|
|
|
+ versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
|
|
|
+ } catch (DBException dbErr) {
|
|
|
+ throw new IOException(dbErr.getMessage(), dbErr);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
LOG.info("Using conf database at " + storeRoot);
|
|
|
File dbfile = new File(storeRoot.toString());
|
|
|
try {
|
|
@@ -158,6 +184,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
|
|
}
|
|
|
db.write(initBatch);
|
|
|
+ long configVersion = getConfigVersion() + 1L;
|
|
|
+ versiondb.put(bytes(CONF_VERSION_KEY),
|
|
|
+ bytes(String.valueOf(configVersion)));
|
|
|
} catch (DBException dbErr) {
|
|
|
throw new IOException(dbErr.getMessage(), dbErr);
|
|
|
}
|
|
@@ -167,20 +196,20 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Path createStorageDir() throws IOException {
|
|
|
- Path root = getStorageDir();
|
|
|
+ private Path createStorageDir(String storageName) throws IOException {
|
|
|
+ Path root = getStorageDir(storageName);
|
|
|
FileSystem fs = FileSystem.getLocal(conf);
|
|
|
fs.mkdirs(root, new FsPermission((short) 0700));
|
|
|
return root;
|
|
|
}
|
|
|
|
|
|
- private Path getStorageDir() throws IOException {
|
|
|
+ private Path getStorageDir(String storageName) throws IOException {
|
|
|
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
|
|
if (storePath == null) {
|
|
|
throw new IOException("No store location directory configured in " +
|
|
|
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
|
|
}
|
|
|
- return new Path(storePath, DB_NAME);
|
|
|
+ return new Path(storePath, storageName);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -188,6 +217,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
if (db != null) {
|
|
|
db.close();
|
|
|
}
|
|
|
+ if (versiondb != null) {
|
|
|
+ versiondb.close();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -213,6 +245,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
|
|
|
}
|
|
|
}
|
|
|
+ long configVersion = getConfigVersion() + 1L;
|
|
|
+ versiondb.put(bytes(CONF_VERSION_KEY),
|
|
|
+ bytes(String.valueOf(configVersion)));
|
|
|
}
|
|
|
db.write(updateBatch);
|
|
|
pendingMutation = null;
|
|
@@ -258,6 +293,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
return config;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public long getConfigVersion() {
|
|
|
+ String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ return Long.parseLong(version);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
|
|
return null; // unimplemented
|