|
@@ -59,6 +59,7 @@ public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
|
|
|
|
|
|
private final DB levelDb;
|
|
|
private Configuration conf;
|
|
|
+ private String blockPoolID;
|
|
|
|
|
|
@Override
|
|
|
public void setConf(Configuration conf) {
|
|
@@ -79,32 +80,38 @@ public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
|
|
|
.toString();
|
|
|
}
|
|
|
|
|
|
- public static @Nonnull InMemoryAliasMap init(Configuration conf)
|
|
|
- throws IOException {
|
|
|
+ public static @Nonnull InMemoryAliasMap init(Configuration conf,
|
|
|
+ String blockPoolID) throws IOException {
|
|
|
Options options = new Options();
|
|
|
options.createIfMissing(true);
|
|
|
String directory =
|
|
|
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
|
|
|
LOG.info("Attempting to load InMemoryAliasMap from \"{}\"", directory);
|
|
|
- File path = new File(directory);
|
|
|
- if (!path.exists()) {
|
|
|
+ File levelDBpath;
|
|
|
+ if (blockPoolID != null) {
|
|
|
+ levelDBpath = new File(directory, blockPoolID);
|
|
|
+ } else {
|
|
|
+ levelDBpath = new File(directory);
|
|
|
+ }
|
|
|
+ if (!levelDBpath.exists()) {
|
|
|
String error = createPathErrorMessage(directory);
|
|
|
throw new IOException(error);
|
|
|
}
|
|
|
- DB levelDb = JniDBFactory.factory.open(path, options);
|
|
|
- InMemoryAliasMap aliasMap = new InMemoryAliasMap(levelDb);
|
|
|
+ DB levelDb = JniDBFactory.factory.open(levelDBpath, options);
|
|
|
+ InMemoryAliasMap aliasMap = new InMemoryAliasMap(levelDb, blockPoolID);
|
|
|
aliasMap.setConf(conf);
|
|
|
return aliasMap;
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- InMemoryAliasMap(DB levelDb) {
|
|
|
+ InMemoryAliasMap(DB levelDb, String blockPoolID) {
|
|
|
this.levelDb = levelDb;
|
|
|
+ this.blockPoolID = blockPoolID;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public IterationResult list(Optional<Block> marker) throws IOException {
|
|
|
- return withIterator((DBIterator iterator) -> {
|
|
|
+ try (DBIterator iterator = levelDb.iterator()) {
|
|
|
Integer batchSize =
|
|
|
conf.getInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE,
|
|
|
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT);
|
|
@@ -130,8 +137,7 @@ public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
|
|
|
} else {
|
|
|
return new IterationResult(batch, Optional.empty());
|
|
|
}
|
|
|
-
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public @Nonnull Optional<ProvidedStorageLocation> read(@Nonnull Block block)
|
|
@@ -159,7 +165,7 @@ public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
|
|
|
|
|
|
@Override
|
|
|
public String getBlockPoolId() {
|
|
|
- return null;
|
|
|
+ return blockPoolID;
|
|
|
}
|
|
|
|
|
|
public void close() throws IOException {
|
|
@@ -202,21 +208,15 @@ public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
|
|
|
return blockOutputStream.toByteArray();
|
|
|
}
|
|
|
|
|
|
- private IterationResult withIterator(
|
|
|
- CheckedFunction<DBIterator, IterationResult> func) throws IOException {
|
|
|
- try (DBIterator iterator = levelDb.iterator()) {
|
|
|
- return func.apply(iterator);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* CheckedFunction is akin to {@link java.util.function.Function} but
|
|
|
* specifies an IOException.
|
|
|
- * @param <T> Argument type.
|
|
|
+ * @param <T1> First argument type.
|
|
|
+ * @param <T2> Second argument type.
|
|
|
* @param <R> Return type.
|
|
|
*/
|
|
|
@FunctionalInterface
|
|
|
- public interface CheckedFunction<T, R> {
|
|
|
- R apply(T t) throws IOException;
|
|
|
+ public interface CheckedFunction2<T1, T2, R> {
|
|
|
+ R apply(T1 t1, T2 t2) throws IOException;
|
|
|
}
|
|
|
}
|