|
@@ -54,6 +54,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Timer;
|
|
|
import java.util.TimerTask;
|
|
|
+import java.util.function.Consumer;
|
|
|
|
|
|
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
|
|
|
|
@@ -72,22 +73,23 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
private static final String CONF_VERSION_KEY = "conf-version";
|
|
|
|
|
|
private DB db;
|
|
|
- private DB versiondb;
|
|
|
+ private DB versionDb;
|
|
|
private long maxLogs;
|
|
|
private Configuration conf;
|
|
|
private LogMutation pendingMutation;
|
|
|
+ private Configuration initSchedConf;
|
|
|
@VisibleForTesting
|
|
|
protected static final Version CURRENT_VERSION_INFO = Version
|
|
|
.newInstance(0, 1);
|
|
|
- private Timer compactionTimer;
|
|
|
private long compactionIntervalMsec;
|
|
|
|
|
|
@Override
|
|
|
public void initialize(Configuration config, Configuration schedConf,
|
|
|
RMContext rmContext) throws IOException {
|
|
|
this.conf = config;
|
|
|
+ this.initSchedConf = schedConf;
|
|
|
try {
|
|
|
- initDatabase(schedConf);
|
|
|
+ initDatabase();
|
|
|
this.maxLogs = config.getLong(
|
|
|
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
|
|
|
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
|
|
@@ -108,7 +110,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
fs.delete(getStorageDir(DB_NAME), true);
|
|
|
}
|
|
|
|
|
|
- private void initDatabase(Configuration config) throws Exception {
|
|
|
+ private void initDatabase() throws Exception {
|
|
|
+ Path confVersion = createStorageDir(CONF_VERSION_NAME);
|
|
|
+ Options confOptions = new Options();
|
|
|
+ confOptions.createIfMissing(false);
|
|
|
+ File confVersionFile = new File(confVersion.toString());
|
|
|
+
|
|
|
+ versionDb = initDatabaseHelper(confVersionFile, confOptions,
|
|
|
+ this::initVersionDb);
|
|
|
+
|
|
|
Path storeRoot = createStorageDir(DB_NAME);
|
|
|
Options options = new Options();
|
|
|
options.createIfMissing(false);
|
|
@@ -144,49 +154,37 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
return key;
|
|
|
}
|
|
|
});
|
|
|
+ LOG.info("Using conf database at {}", storeRoot);
|
|
|
+ File dbFile = new File(storeRoot.toString());
|
|
|
+ db = initDatabaseHelper(dbFile, options, this::initDb);
|
|
|
+ }
|
|
|
|
|
|
- Path confVersion = createStorageDir(CONF_VERSION_NAME);
|
|
|
- Options confOptions = new Options();
|
|
|
- confOptions.createIfMissing(false);
|
|
|
- LOG.info("Using conf version at " + confVersion);
|
|
|
- File confVersionFile = new File(confVersion.toString());
|
|
|
- try {
|
|
|
- versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
|
|
|
- } catch (NativeDB.DBException e) {
|
|
|
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
|
|
- LOG.info("Creating conf version at " + confVersionFile);
|
|
|
- confOptions.createIfMissing(true);
|
|
|
- try {
|
|
|
- versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
|
|
|
- versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
|
|
|
- } catch (DBException dbErr) {
|
|
|
- throw new IOException(dbErr.getMessage(), dbErr);
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
+ private void initVersionDb(DB database) {
|
|
|
+ database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
|
|
|
+ }
|
|
|
|
|
|
+ private void initDb(DB database) {
|
|
|
+ WriteBatch initBatch = database.createWriteBatch();
|
|
|
+ for (Map.Entry<String, String> kv : initSchedConf) {
|
|
|
+ initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
|
|
+ }
|
|
|
+ database.write(initBatch);
|
|
|
+ increaseConfigVersion();
|
|
|
+ }
|
|
|
|
|
|
- LOG.info("Using conf database at " + storeRoot);
|
|
|
- File dbfile = new File(storeRoot.toString());
|
|
|
+ private DB initDatabaseHelper(File configurationFile, Options options,
|
|
|
+ Consumer<DB> initMethod) throws Exception {
|
|
|
+ DB database;
|
|
|
try {
|
|
|
- db = JniDBFactory.factory.open(dbfile, options);
|
|
|
+ database = JniDBFactory.factory.open(configurationFile, options);
|
|
|
} catch (NativeDB.DBException e) {
|
|
|
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
|
|
- LOG.info("Creating conf database at " + dbfile);
|
|
|
+ LOG.info("Creating configuration version/database at {}",
|
|
|
+ configurationFile);
|
|
|
options.createIfMissing(true);
|
|
|
try {
|
|
|
- db = JniDBFactory.factory.open(dbfile, options);
|
|
|
- // Write the initial scheduler configuration
|
|
|
- WriteBatch initBatch = db.createWriteBatch();
|
|
|
- for (Map.Entry<String, String> kv : config) {
|
|
|
- initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
|
|
- }
|
|
|
- db.write(initBatch);
|
|
|
- long configVersion = getConfigVersion() + 1L;
|
|
|
- versiondb.put(bytes(CONF_VERSION_KEY),
|
|
|
- bytes(String.valueOf(configVersion)));
|
|
|
+ database = JniDBFactory.factory.open(configurationFile, options);
|
|
|
+ initMethod.accept(database);
|
|
|
} catch (DBException dbErr) {
|
|
|
throw new IOException(dbErr.getMessage(), dbErr);
|
|
|
}
|
|
@@ -194,6 +192,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return database;
|
|
|
}
|
|
|
|
|
|
private Path createStorageDir(String storageName) throws IOException {
|
|
@@ -217,8 +217,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
if (db != null) {
|
|
|
db.close();
|
|
|
}
|
|
|
- if (versiondb != null) {
|
|
|
- versiondb.close();
|
|
|
+ if (versionDb != null) {
|
|
|
+ versionDb.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -245,9 +245,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
|
|
|
}
|
|
|
}
|
|
|
- long configVersion = getConfigVersion() + 1L;
|
|
|
- versiondb.put(bytes(CONF_VERSION_KEY),
|
|
|
- bytes(String.valueOf(configVersion)));
|
|
|
+ increaseConfigVersion();
|
|
|
}
|
|
|
db.write(updateBatch);
|
|
|
pendingMutation = null;
|
|
@@ -263,6 +261,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Because of type erasure casting to LinkedList<LogMutation> will be
|
|
|
+ // unchecked. A way around that would be to iterate over the logMutations
|
|
|
+ // which is overkill in this case.
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
|
|
|
IOException {
|
|
|
if (mutations == null) {
|
|
@@ -293,9 +295,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
return config;
|
|
|
}
|
|
|
|
|
|
+ private void increaseConfigVersion() {
|
|
|
+ long configVersion = getConfigVersion() + 1L;
|
|
|
+ versionDb.put(bytes(CONF_VERSION_KEY),
|
|
|
+ bytes(String.valueOf(configVersion)));
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public long getConfigVersion() {
|
|
|
- String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)),
|
|
|
+ String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)),
|
|
|
StandardCharsets.UTF_8);
|
|
|
return Long.parseLong(version);
|
|
|
}
|
|
@@ -305,18 +313,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
return null; // unimplemented
|
|
|
}
|
|
|
|
|
|
- // TODO below was taken from LeveldbRMStateStore, it can probably be
|
|
|
- // refactored
|
|
|
private void startCompactionTimer() {
|
|
|
if (compactionIntervalMsec > 0) {
|
|
|
- compactionTimer = new Timer(
|
|
|
+ Timer compactionTimer = new Timer(
|
|
|
this.getClass().getSimpleName() + " compaction timer", true);
|
|
|
compactionTimer.schedule(new CompactionTimerTask(),
|
|
|
compactionIntervalMsec, compactionIntervalMsec);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // TODO: following is taken from LeveldbRMStateStore
|
|
|
@Override
|
|
|
public Version getConfStoreVersion() throws Exception {
|
|
|
Version version = null;
|
|
@@ -370,7 +375,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|
|
LOG.error("Error compacting database", e);
|
|
|
}
|
|
|
long duration = Time.monotonicNow() - start;
|
|
|
- LOG.info("Full compaction cycle completed in " + duration + " msec");
|
|
|
+ LOG.info("Full compaction cycle completed in {} msec", duration);
|
|
|
}
|
|
|
}
|
|
|
}
|