|
@@ -18,10 +18,13 @@
|
|
|
package org.apache.hadoop.hdfs.server.federation.resolver;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -30,9 +33,10 @@ import java.util.Set;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ConcurrentNavigableMap;
|
|
|
-import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -55,6 +59,8 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.cache.Cache;
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
|
|
|
/**
|
|
|
* Mount table to map between global paths and remote locations. This allows the
|
|
@@ -81,8 +87,7 @@ public class MountTableResolver
|
|
|
/** Path -> Remote HDFS location. */
|
|
|
private final TreeMap<String, MountTable> tree = new TreeMap<>();
|
|
|
/** Path -> Remote location. */
|
|
|
- private final ConcurrentNavigableMap<String, PathLocation> locationCache =
|
|
|
- new ConcurrentSkipListMap<>();
|
|
|
+ private final Cache<String, PathLocation> locationCache;
|
|
|
|
|
|
/** Default nameservice when no mount matches the math. */
|
|
|
private String defaultNameService = "";
|
|
@@ -99,20 +104,30 @@ public class MountTableResolver
|
|
|
}
|
|
|
|
|
|
public MountTableResolver(Configuration conf, Router routerService) {
|
|
|
+ this(conf, routerService, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public MountTableResolver(Configuration conf, StateStoreService store) {
|
|
|
+ this(conf, null, store);
|
|
|
+ }
|
|
|
+
|
|
|
+ public MountTableResolver(Configuration conf, Router routerService,
|
|
|
+ StateStoreService store) {
|
|
|
this.router = routerService;
|
|
|
- if (this.router != null) {
|
|
|
+ if (store != null) {
|
|
|
+ this.stateStore = store;
|
|
|
+ } else if (this.router != null) {
|
|
|
this.stateStore = this.router.getStateStore();
|
|
|
} else {
|
|
|
this.stateStore = null;
|
|
|
}
|
|
|
|
|
|
- registerCacheExternal();
|
|
|
- initDefaultNameService(conf);
|
|
|
- }
|
|
|
-
|
|
|
- public MountTableResolver(Configuration conf, StateStoreService store) {
|
|
|
- this.router = null;
|
|
|
- this.stateStore = store;
|
|
|
+ int maxCacheSize = conf.getInt(
|
|
|
+ FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE,
|
|
|
+ FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT);
|
|
|
+ this.locationCache = CacheBuilder.newBuilder()
|
|
|
+ .maximumSize(maxCacheSize)
|
|
|
+ .build();
|
|
|
|
|
|
registerCacheExternal();
|
|
|
initDefaultNameService(conf);
|
|
@@ -210,16 +225,26 @@ public class MountTableResolver
|
|
|
* @param path Source path.
|
|
|
*/
|
|
|
private void invalidateLocationCache(final String path) {
|
|
|
- if (locationCache.isEmpty()) {
|
|
|
+ LOG.debug("Invalidating {} from {}", path, locationCache);
|
|
|
+ if (locationCache.size() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
- // Determine next lexicographic entry after source path
|
|
|
- String nextSrc = path + Character.MAX_VALUE;
|
|
|
- ConcurrentNavigableMap<String, PathLocation> subMap =
|
|
|
- locationCache.subMap(path, nextSrc);
|
|
|
- for (final String key : subMap.keySet()) {
|
|
|
- locationCache.remove(key);
|
|
|
+
|
|
|
+ // Go through the entries and remove the ones from the path to invalidate
|
|
|
+ ConcurrentMap<String, PathLocation> map = locationCache.asMap();
|
|
|
+ Set<Entry<String, PathLocation>> entries = map.entrySet();
|
|
|
+ Iterator<Entry<String, PathLocation>> it = entries.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ Entry<String, PathLocation> entry = it.next();
|
|
|
+ PathLocation loc = entry.getValue();
|
|
|
+ String src = loc.getSourcePath();
|
|
|
+ if (src.startsWith(path)) {
|
|
|
+ LOG.debug("Removing {}", src);
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ LOG.debug("Location cache after invalidation: {}", locationCache);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -312,7 +337,7 @@ public class MountTableResolver
|
|
|
LOG.info("Clearing all mount location caches");
|
|
|
writeLock.lock();
|
|
|
try {
|
|
|
- this.locationCache.clear();
|
|
|
+ this.locationCache.invalidateAll();
|
|
|
this.tree.clear();
|
|
|
} finally {
|
|
|
writeLock.unlock();
|
|
@@ -325,8 +350,15 @@ public class MountTableResolver
|
|
|
verifyMountTable();
|
|
|
readLock.lock();
|
|
|
try {
|
|
|
- return this.locationCache.computeIfAbsent(
|
|
|
- path, this::lookupLocation);
|
|
|
+ Callable<? extends PathLocation> meh = new Callable<PathLocation>() {
|
|
|
+ @Override
|
|
|
+ public PathLocation call() throws Exception {
|
|
|
+ return lookupLocation(path);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ return this.locationCache.get(path, meh);
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ throw new IOException(e);
|
|
|
} finally {
|
|
|
readLock.unlock();
|
|
|
}
|
|
@@ -544,4 +576,12 @@ public class MountTableResolver
|
|
|
}
|
|
|
return ret;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the size of the cache.
|
|
|
+ * @return Size of the cache.
|
|
|
+ */
|
|
|
+ protected long getCacheSize() {
|
|
|
+ return this.locationCache.size();
|
|
|
+ }
|
|
|
}
|