|
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.timeline;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
|
|
|
-import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
@@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.server.records.Version;
|
|
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
|
|
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
|
|
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
|
|
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
|
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
|
|
|
|
@@ -199,6 +199,11 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
static final String STARTTIME = "starttime-ldb";
|
|
|
static final String OWNER = "owner-ldb";
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ //Extension to FILENAME where backup will be stored in case we need to
|
|
|
+ //call LevelDb recovery
|
|
|
+ static final String BACKUP_EXT = ".backup-";
|
|
|
+
|
|
|
private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
|
|
|
private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
|
|
|
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
|
|
@@ -240,6 +245,12 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
super(RollingLevelDBTimelineStore.class.getName());
|
|
|
}
|
|
|
|
|
|
+ private JniDBFactory factory;
|
|
|
+ @VisibleForTesting
|
|
|
+ void setFactory(JniDBFactory fact) {
|
|
|
+ this.factory = fact;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
@SuppressWarnings("unchecked")
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
@@ -284,7 +295,9 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
options.cacheSize(conf.getLong(
|
|
|
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
|
|
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
|
|
|
- JniDBFactory factory = new JniDBFactory();
|
|
|
+ if(factory == null) {
|
|
|
+ factory = new JniDBFactory();
|
|
|
+ }
|
|
|
Path dbPath = new Path(
|
|
|
conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
|
|
|
Path domainDBPath = new Path(dbPath, DOMAIN);
|
|
@@ -327,13 +340,13 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
|
|
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
|
|
|
LOG.info("Using leveldb path " + dbPath);
|
|
|
- domaindb = factory.open(new File(domainDBPath.toString()), options);
|
|
|
+ domaindb = LeveldbUtils.loadOrRepairLevelDb(factory, domainDBPath, options);
|
|
|
entitydb = new RollingLevelDB(ENTITY);
|
|
|
entitydb.init(conf);
|
|
|
indexdb = new RollingLevelDB(INDEX);
|
|
|
indexdb.init(conf);
|
|
|
- starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
|
|
|
- ownerdb = factory.open(new File(ownerDBPath.toString()), options);
|
|
|
+ starttimedb = LeveldbUtils.loadOrRepairLevelDb(factory, starttimeDBPath, options);
|
|
|
+ ownerdb = LeveldbUtils.loadOrRepairLevelDb(factory, ownerDBPath, options);
|
|
|
checkVersion();
|
|
|
startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
|
|
|
getStartTimeWriteCacheSize(conf)));
|
|
@@ -346,7 +359,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
|
|
@@ -1834,4 +1847,4 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
return domain;
|
|
|
}
|
|
|
}
|
|
|
-}
|
|
|
+}
|