|
@@ -18,6 +18,9 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
|
|
|
+
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
@@ -36,13 +39,16 @@ import java.util.TreeMap;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.collections.map.LRUMap;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.WritableComparator;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
@@ -50,8 +56,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
|
|
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
|
|
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.fusesource.leveldbjni.JniDBFactory;
|
|
@@ -62,8 +68,7 @@ import org.iq80.leveldb.ReadOptions;
|
|
|
import org.iq80.leveldb.WriteBatch;
|
|
|
import org.iq80.leveldb.WriteOptions;
|
|
|
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
* <p>An implementation of an application timeline store backed by leveldb.</p>
|
|
@@ -120,7 +125,9 @@ public class LeveldbTimelineStore extends AbstractService
|
|
|
private static final Log LOG = LogFactory
|
|
|
.getLog(LeveldbTimelineStore.class);
|
|
|
|
|
|
- private static final String FILENAME = "leveldb-timeline-store.ldb";
|
|
|
+ @Private
|
|
|
+ @VisibleForTesting
|
|
|
+ static final String FILENAME = "leveldb-timeline-store.ldb";
|
|
|
|
|
|
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
|
|
|
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
|
|
@@ -135,6 +142,11 @@ public class LeveldbTimelineStore extends AbstractService
|
|
|
|
|
|
private static final byte[] EMPTY_BYTES = new byte[0];
|
|
|
|
|
|
+ @Private
|
|
|
+ @VisibleForTesting
|
|
|
+ static final FsPermission LEVELDB_DIR_UMASK = FsPermission
|
|
|
+ .createImmutable((short) 0700);
|
|
|
+
|
|
|
private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
|
|
|
private Map<EntityIdentifier, Long> startTimeReadCache;
|
|
|
|
|
@@ -164,16 +176,23 @@ public class LeveldbTimelineStore extends AbstractService
|
|
|
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
|
|
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
|
|
|
JniDBFactory factory = new JniDBFactory();
|
|
|
- String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
|
|
|
- File p = new File(path);
|
|
|
- if (!p.exists()) {
|
|
|
- if (!p.mkdirs()) {
|
|
|
- throw new IOException("Couldn't create directory for leveldb " +
|
|
|
- "timeline store " + path);
|
|
|
+ Path dbPath = new Path(
|
|
|
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
|
|
|
+ FileSystem localFS = null;
|
|
|
+ try {
|
|
|
+ localFS = FileSystem.getLocal(conf);
|
|
|
+ if (!localFS.exists(dbPath)) {
|
|
|
+ if (!localFS.mkdirs(dbPath)) {
|
|
|
+ throw new IOException("Couldn't create directory for leveldb " +
|
|
|
+ "timeline store " + dbPath);
|
|
|
+ }
|
|
|
+ localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, localFS);
|
|
|
}
|
|
|
- LOG.info("Using leveldb path " + path);
|
|
|
- db = factory.open(new File(path, FILENAME), options);
|
|
|
+ LOG.info("Using leveldb path " + dbPath);
|
|
|
+ db = factory.open(new File(dbPath.toString()), options);
|
|
|
startTimeWriteCache =
|
|
|
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
|
|
|
conf)));
|