|
@@ -21,6 +21,7 @@ import javax.annotation.Nonnull;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InterruptedIOException;
|
|
|
import java.lang.ref.WeakReference;
|
|
|
import java.lang.ref.ReferenceQueue;
|
|
|
import java.net.URI;
|
|
@@ -44,6 +45,7 @@ import java.util.Set;
|
|
|
import java.util.Stack;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -75,6 +77,7 @@ import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.DelegationTokenIssuer;
|
|
|
import org.apache.hadoop.util.ClassUtil;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
+import org.apache.hadoop.util.DurationInfo;
|
|
|
import org.apache.hadoop.util.LambdaUtils;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
@@ -200,7 +203,7 @@ public abstract class FileSystem extends Configured
|
|
|
public static final String USER_HOME_PREFIX = "/user";
|
|
|
|
|
|
/** FileSystem cache. */
|
|
|
- static final Cache CACHE = new Cache();
|
|
|
+ static final Cache CACHE = new Cache(new Configuration());
|
|
|
|
|
|
/** The key this instance is stored under in the cache. */
|
|
|
private Cache.Key key;
|
|
@@ -2590,8 +2593,11 @@ public abstract class FileSystem extends Configured
|
|
|
+ "; Object Identity Hash: "
|
|
|
+ Integer.toHexString(System.identityHashCode(this)));
|
|
|
// delete all files that were marked as delete-on-exit.
|
|
|
- processDeleteOnExit();
|
|
|
- CACHE.remove(this.key, this);
|
|
|
+ try {
|
|
|
+ processDeleteOnExit();
|
|
|
+ } finally {
|
|
|
+ CACHE.remove(this.key, this);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3452,7 +3458,9 @@ public abstract class FileSystem extends Configured
|
|
|
private static FileSystem createFileSystem(URI uri, Configuration conf)
|
|
|
throws IOException {
|
|
|
Tracer tracer = FsTracer.get(conf);
|
|
|
- try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
|
|
|
+ try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
|
|
|
+ DurationInfo ignored =
|
|
|
+ new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
|
|
|
scope.addKVAnnotation("scheme", uri.getScheme());
|
|
|
Class<? extends FileSystem> clazz =
|
|
|
getFileSystemClass(uri.getScheme(), conf);
|
|
@@ -3475,15 +3483,39 @@ public abstract class FileSystem extends Configured
|
|
|
}
|
|
|
|
|
|
/** Caching FileSystem objects. */
|
|
|
- static class Cache {
|
|
|
+ static final class Cache {
|
|
|
private final ClientFinalizer clientFinalizer = new ClientFinalizer();
|
|
|
|
|
|
private final Map<Key, FileSystem> map = new HashMap<>();
|
|
|
private final Set<Key> toAutoClose = new HashSet<>();
|
|
|
|
|
|
+ /** Semaphore used to serialize creation of new FS instances. */
|
|
|
+ private final Semaphore creatorPermits;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Counter of the number of discarded filesystem instances
|
|
|
+ * in this cache. Primarily for testing, but it could possibly
|
|
|
+ * be made visible as some kind of metric.
|
|
|
+ */
|
|
|
+ private final AtomicLong discardedInstances = new AtomicLong(0);
|
|
|
+
|
|
|
/** A variable that makes all objects in the cache unique. */
|
|
|
private static AtomicLong unique = new AtomicLong(1);
|
|
|
|
|
|
+ /**
|
|
|
+ * Instantiate. The configuration is used to read the
|
|
|
+ * count of permits issued for concurrent creation
|
|
|
+ * of filesystem instances.
|
|
|
+ * @param conf configuration
|
|
|
+ */
|
|
|
+ Cache(final Configuration conf) {
|
|
|
+ int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
|
|
|
+ FS_CREATION_PARALLEL_COUNT_DEFAULT);
|
|
|
+ checkArgument(permits > 0, "Invalid value of %s: %s",
|
|
|
+ FS_CREATION_PARALLEL_COUNT, permits);
|
|
|
+ creatorPermits = new Semaphore(permits);
|
|
|
+ }
|
|
|
+
|
|
|
FileSystem get(URI uri, Configuration conf) throws IOException{
|
|
|
Key key = new Key(uri, conf);
|
|
|
return getInternal(uri, conf, key);
|
|
@@ -3517,33 +3549,86 @@ public abstract class FileSystem extends Configured
|
|
|
if (fs != null) {
|
|
|
return fs;
|
|
|
}
|
|
|
-
|
|
|
- fs = createFileSystem(uri, conf);
|
|
|
- final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
|
|
|
- SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
|
|
|
- ShutdownHookManager.TIME_UNIT_DEFAULT);
|
|
|
- synchronized (this) { // refetch the lock again
|
|
|
- FileSystem oldfs = map.get(key);
|
|
|
- if (oldfs != null) { // a file system is created while lock is releasing
|
|
|
- fs.close(); // close the new file system
|
|
|
- return oldfs; // return the old file system
|
|
|
- }
|
|
|
-
|
|
|
- // now insert the new file system into the map
|
|
|
- if (map.isEmpty()
|
|
|
- && !ShutdownHookManager.get().isShutdownInProgress()) {
|
|
|
- ShutdownHookManager.get().addShutdownHook(clientFinalizer,
|
|
|
- SHUTDOWN_HOOK_PRIORITY, timeout,
|
|
|
- ShutdownHookManager.TIME_UNIT_DEFAULT);
|
|
|
+ // fs not yet created, acquire lock
|
|
|
+ // to construct an instance.
|
|
|
+ try (DurationInfo d = new DurationInfo(LOGGER, false,
|
|
|
+ "Acquiring creator semaphore for %s", uri)) {
|
|
|
+ creatorPermits.acquire();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // acquisition was interrupted; convert to an IOE.
|
|
|
+ throw (IOException)new InterruptedIOException(e.toString())
|
|
|
+ .initCause(e);
|
|
|
+ }
|
|
|
+ FileSystem fsToClose = null;
|
|
|
+ try {
|
|
|
+ // See if FS was instantiated by another thread while waiting
|
|
|
+ // for the permit.
|
|
|
+ synchronized (this) {
|
|
|
+ fs = map.get(key);
|
|
|
}
|
|
|
- fs.key = key;
|
|
|
- map.put(key, fs);
|
|
|
- if (conf.getBoolean(
|
|
|
- FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
|
|
|
- toAutoClose.add(key);
|
|
|
+ if (fs != null) {
|
|
|
+ LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
|
|
|
+ return fs;
|
|
|
}
|
|
|
- return fs;
|
|
|
+ // create the filesystem
|
|
|
+ fs = createFileSystem(uri, conf);
|
|
|
+ final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
|
|
|
+ SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
|
|
|
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
|
|
|
+ // any FS to close outside of the synchronized section
|
|
|
+ synchronized (this) { // lock on the Cache object
|
|
|
+
|
|
|
+ // see if there is now an entry for the FS, which happens
|
|
|
+ // if another thread's creation overlapped with this one.
|
|
|
+ FileSystem oldfs = map.get(key);
|
|
|
+ if (oldfs != null) {
|
|
|
+ // a file system was created in a separate thread.
|
|
|
+ // save the FS reference to close outside all locks,
|
|
|
+ // and switch to returning the oldFS
|
|
|
+ fsToClose = fs;
|
|
|
+ fs = oldfs;
|
|
|
+ } else {
|
|
|
+ // register the clientFinalizer if needed and shutdown isn't
|
|
|
+ // already active
|
|
|
+ if (map.isEmpty()
|
|
|
+ && !ShutdownHookManager.get().isShutdownInProgress()) {
|
|
|
+ ShutdownHookManager.get().addShutdownHook(clientFinalizer,
|
|
|
+ SHUTDOWN_HOOK_PRIORITY, timeout,
|
|
|
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
|
|
|
+ }
|
|
|
+ // insert the new file system into the map
|
|
|
+ fs.key = key;
|
|
|
+ map.put(key, fs);
|
|
|
+ if (conf.getBoolean(
|
|
|
+ FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
|
|
|
+ toAutoClose.add(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } // end of synchronized block
|
|
|
+ } finally {
|
|
|
+ // release the creator permit.
|
|
|
+ creatorPermits.release();
|
|
|
}
|
|
|
+ if (fsToClose != null) {
|
|
|
+ LOGGER.debug("Duplicate FS created for {}; discarding {}",
|
|
|
+ uri, fs);
|
|
|
+ discardedInstances.incrementAndGet();
|
|
|
+ // close the new file system
|
|
|
+ // note this will briefly remove and reinstate "fsToClose" from
|
|
|
+ // the map. It is done in a synchronized block so will not be
|
|
|
+ // visible to others.
|
|
|
+ IOUtils.cleanupWithLogger(LOGGER, fsToClose);
|
|
|
+ }
|
|
|
+ return fs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the count of discarded instances.
|
|
|
+ * @return the new instance.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ long getDiscardedInstances() {
|
|
|
+ return discardedInstances.get();
|
|
|
}
|
|
|
|
|
|
synchronized void remove(Key key, FileSystem fs) {
|