|
@@ -28,8 +28,6 @@ import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
-import java.util.Timer;
|
|
|
-import java.util.TimerTask;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -37,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
@@ -126,7 +123,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
|
|
|
private DB db;
|
|
|
private boolean isNewlyCreated;
|
|
|
- private Timer compactionTimer;
|
|
|
|
|
|
public NMLeveldbStateStoreService() {
|
|
|
super(NMLeveldbStateStoreService.class.getName());
|
|
@@ -138,10 +134,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
|
|
|
@Override
|
|
|
protected void closeStorage() throws IOException {
|
|
|
- if (compactionTimer != null) {
|
|
|
- compactionTimer.cancel();
|
|
|
- compactionTimer = null;
|
|
|
- }
|
|
|
if (db != null) {
|
|
|
db.close();
|
|
|
}
|
|
@@ -950,12 +942,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
@Override
|
|
|
protected void initStorage(Configuration conf)
|
|
|
throws IOException {
|
|
|
- db = openDatabase(conf);
|
|
|
- checkVersion();
|
|
|
- startCompactionTimer(conf);
|
|
|
- }
|
|
|
-
|
|
|
- protected DB openDatabase(Configuration conf) throws IOException {
|
|
|
Path storeRoot = createStorageDir(conf);
|
|
|
Options options = new Options();
|
|
|
options.createIfMissing(false);
|
|
@@ -980,7 +966,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
- return db;
|
|
|
+ checkVersion();
|
|
|
}
|
|
|
|
|
|
private Path createStorageDir(Configuration conf) throws IOException {
|
|
@@ -996,33 +982,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
return root;
|
|
|
}
|
|
|
|
|
|
- private void startCompactionTimer(Configuration conf) {
|
|
|
- long intervalMsec = conf.getLong(
|
|
|
- YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS,
|
|
|
- YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000;
|
|
|
- if (intervalMsec > 0) {
|
|
|
- compactionTimer = new Timer(
|
|
|
- this.getClass().getSimpleName() + " compaction timer", true);
|
|
|
- compactionTimer.schedule(new CompactionTimerTask(),
|
|
|
- intervalMsec, intervalMsec);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private class CompactionTimerTask extends TimerTask {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- long start = Time.monotonicNow();
|
|
|
- LOG.info("Starting full compaction cycle");
|
|
|
- try {
|
|
|
- db.compactRange(null, null);
|
|
|
- } catch (DBException e) {
|
|
|
- LOG.error("Error compacting database", e);
|
|
|
- }
|
|
|
- long duration = Time.monotonicNow() - start;
|
|
|
- LOG.info("Full compaction cycle completed in " + duration + " msec");
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
private static class LeveldbLogger implements Logger {
|
|
|
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
|
|
@@ -1080,7 +1039,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
* throw exception and indicate user to use a separate upgrade tool to
|
|
|
* upgrade NM state or remove incompatible old state.
|
|
|
*/
|
|
|
- protected void checkVersion() throws IOException {
|
|
|
+ private void checkVersion() throws IOException {
|
|
|
Version loadedVersion = loadVersion();
|
|
|
LOG.info("Loaded NM state version info " + loadedVersion);
|
|
|
if (loadedVersion.equals(getCurrentVersion())) {
|