|
@@ -25,6 +25,7 @@ import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.HashSet;
|
|
import java.util.IdentityHashMap;
|
|
import java.util.IdentityHashMap;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -70,7 +71,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
public static final Log LOG = LogFactory.getLog(FileSystem.class);
|
|
public static final Log LOG = LogFactory.getLog(FileSystem.class);
|
|
|
|
|
|
/** FileSystem cache */
|
|
/** FileSystem cache */
|
|
- private static final Cache CACHE = new Cache();
|
|
|
|
|
|
+ static final Cache CACHE = new Cache();
|
|
|
|
|
|
/** The key this instance is stored under in the cache. */
|
|
/** The key this instance is stored under in the cache. */
|
|
private Cache.Key key;
|
|
private Cache.Key key;
|
|
@@ -224,17 +225,6 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
|
|
return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
|
|
}
|
|
}
|
|
|
|
|
|
- private static class ClientFinalizer extends Thread {
|
|
|
|
- public synchronized void run() {
|
|
|
|
- try {
|
|
|
|
- FileSystem.closeAll();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.info("FileSystem.closeAll() threw an exception:\n" + e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- private static final ClientFinalizer clientFinalizer = new ClientFinalizer();
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Close all cached filesystems. Be sure those filesystems are not
|
|
* Close all cached filesystems. Be sure those filesystems are not
|
|
* used anymore.
|
|
* used anymore.
|
|
@@ -1409,7 +1399,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
|
|
|
/** Caching FileSystem objects */
|
|
/** Caching FileSystem objects */
|
|
static class Cache {
|
|
static class Cache {
|
|
|
|
+ private final ClientFinalizer clientFinalizer = new ClientFinalizer();
|
|
|
|
+
|
|
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
|
|
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
|
|
|
|
+ private final Set<Key> toAutoClose = new HashSet<Key>();
|
|
|
|
|
|
/** A variable that makes all objects in the cache unique */
|
|
/** A variable that makes all objects in the cache unique */
|
|
private static AtomicLong unique = new AtomicLong(1);
|
|
private static AtomicLong unique = new AtomicLong(1);
|
|
@@ -1434,6 +1427,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
}
|
|
}
|
|
fs.key = key;
|
|
fs.key = key;
|
|
map.put(key, fs);
|
|
map.put(key, fs);
|
|
|
|
+
|
|
|
|
+ if (conf.getBoolean("fs.automatic.close", true)) {
|
|
|
|
+ toAutoClose.add(key);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return fs;
|
|
return fs;
|
|
}
|
|
}
|
|
@@ -1441,6 +1438,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
synchronized void remove(Key key, FileSystem fs) {
|
|
synchronized void remove(Key key, FileSystem fs) {
|
|
if (map.containsKey(key) && fs == map.get(key)) {
|
|
if (map.containsKey(key) && fs == map.get(key)) {
|
|
map.remove(key);
|
|
map.remove(key);
|
|
|
|
+ toAutoClose.remove(key);
|
|
if (map.isEmpty() && !clientFinalizer.isAlive()) {
|
|
if (map.isEmpty() && !clientFinalizer.isAlive()) {
|
|
if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
|
|
if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
|
|
LOG.info("Could not cancel cleanup thread, though no " +
|
|
LOG.info("Could not cancel cleanup thread, though no " +
|
|
@@ -1451,11 +1449,27 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
synchronized void closeAll() throws IOException {
|
|
synchronized void closeAll() throws IOException {
|
|
|
|
+ closeAll(false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Close all FileSystem instances in the Cache.
|
|
|
|
+ * @param onlyAutomatic only close those that are marked for automatic closing
|
|
|
|
+ */
|
|
|
|
+ synchronized void closeAll(boolean onlyAutomatic) throws IOException {
|
|
List<IOException> exceptions = new ArrayList<IOException>();
|
|
List<IOException> exceptions = new ArrayList<IOException>();
|
|
- for(; !map.isEmpty(); ) {
|
|
|
|
- Map.Entry<Key, FileSystem> e = map.entrySet().iterator().next();
|
|
|
|
- final Key key = e.getKey();
|
|
|
|
- final FileSystem fs = e.getValue();
|
|
|
|
|
|
+
|
|
|
|
+ // Make a copy of the keys in the map since we'll be modifying
|
|
|
|
+ // the map while iterating over it, which isn't safe.
|
|
|
|
+ List<Key> keys = new ArrayList<Key>();
|
|
|
|
+ keys.addAll(map.keySet());
|
|
|
|
+
|
|
|
|
+ for (Key key : keys) {
|
|
|
|
+ final FileSystem fs = map.get(key);
|
|
|
|
+
|
|
|
|
+ if (onlyAutomatic && !toAutoClose.contains(key)) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
|
|
//remove from cache
|
|
//remove from cache
|
|
remove(key, fs);
|
|
remove(key, fs);
|
|
@@ -1475,6 +1489,16 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private class ClientFinalizer extends Thread {
|
|
|
|
+ public synchronized void run() {
|
|
|
|
+ try {
|
|
|
|
+ closeAll(true);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/** FileSystem.Cache.Key */
|
|
/** FileSystem.Cache.Key */
|
|
static class Key {
|
|
static class Key {
|
|
final String scheme;
|
|
final String scheme;
|