|
@@ -28,15 +28,15 @@ import org.apache.hadoop.dfs.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
-import org.apache.hadoop.fs.permission.AccessControlException;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.io.MultipleIOException;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
/****************************************************************
|
|
|
* An abstract base class for a fairly generic filesystem. It
|
|
|
* may be implemented as a distributed filesystem, or as a "local"
|
|
|
* one that reflects the locally-connected disk. The local version
|
|
|
- * exists for small Hadopp instances and for testing.
|
|
|
+ * exists for small Hadoop instances and for testing.
|
|
|
*
|
|
|
* <p>
|
|
|
*
|
|
@@ -50,12 +50,12 @@ import org.apache.hadoop.ipc.RemoteException;
|
|
|
* The local implementation is {@link LocalFileSystem} and distributed
|
|
|
* implementation is {@link DistributedFileSystem}.
|
|
|
*****************************************************************/
|
|
|
-public abstract class FileSystem extends Configured {
|
|
|
+public abstract class FileSystem extends Configured implements Closeable {
|
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FileSystem");
|
|
|
|
|
|
- // cache indexed by URI scheme and authority
|
|
|
- private static final Map<String,Map<String,FileSystem>> CACHE
|
|
|
- = new HashMap<String,Map<String,FileSystem>>();
|
|
|
+ /** FileSystem cache */
|
|
|
+ private static final Cache CACHE = new Cache();
|
|
|
+
|
|
|
/**
|
|
|
* Parse the cmd-line args, starting at i. Remove consumed args
|
|
|
* from array. We expect param in the form:
|
|
@@ -137,9 +137,7 @@ public abstract class FileSystem extends Configured {
|
|
|
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
|
|
|
* The entire URI is passed to the FileSystem instance's initialize method.
|
|
|
*/
|
|
|
- public static synchronized FileSystem get(URI uri, Configuration conf)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
+ public static FileSystem get(URI uri, Configuration conf) throws IOException {
|
|
|
String scheme = uri.getScheme();
|
|
|
String authority = uri.getAuthority();
|
|
|
|
|
@@ -147,27 +145,7 @@ public abstract class FileSystem extends Configured {
|
|
|
return get(conf);
|
|
|
}
|
|
|
|
|
|
- Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
|
|
|
- if (authorityToFs == null) {
|
|
|
- if (CACHE.isEmpty()) {
|
|
|
- Runtime.getRuntime().addShutdownHook(clientFinalizer);
|
|
|
- }
|
|
|
- authorityToFs = new HashMap<String,FileSystem>();
|
|
|
- CACHE.put(scheme, authorityToFs);
|
|
|
- }
|
|
|
-
|
|
|
- FileSystem fs = authorityToFs.get(authority);
|
|
|
- if (fs == null) {
|
|
|
- Class fsClass = conf.getClass("fs."+scheme+".impl", null);
|
|
|
- if (fsClass == null) {
|
|
|
- throw new IOException("No FileSystem for scheme: " + scheme);
|
|
|
- }
|
|
|
- fs = (FileSystem)ReflectionUtils.newInstance(fsClass, conf);
|
|
|
- fs.initialize(uri, conf);
|
|
|
- authorityToFs.put(authority, fs);
|
|
|
- }
|
|
|
-
|
|
|
- return fs;
|
|
|
+ return CACHE.get(uri, conf);
|
|
|
}
|
|
|
|
|
|
private static class ClientFinalizer extends Thread {
|
|
@@ -187,14 +165,8 @@ public abstract class FileSystem extends Configured {
|
|
|
*
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static synchronized void closeAll() throws IOException {
|
|
|
- Set<String> scheme = new HashSet<String>(CACHE.keySet());
|
|
|
- for (String s : scheme) {
|
|
|
- Set<String> authority = new HashSet<String>(CACHE.get(s).keySet());
|
|
|
- for (String a : authority) {
|
|
|
- CACHE.get(s).get(a).close();
|
|
|
- }
|
|
|
- }
|
|
|
+ public static void closeAll() throws IOException {
|
|
|
+ CACHE.closeAll();
|
|
|
}
|
|
|
|
|
|
/** Make sure that a path specifies a FileSystem. */
|
|
@@ -1110,22 +1082,7 @@ public abstract class FileSystem extends Configured {
|
|
|
* release any held locks.
|
|
|
*/
|
|
|
public void close() throws IOException {
|
|
|
- URI uri = getUri();
|
|
|
- synchronized (FileSystem.class) {
|
|
|
- Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
|
|
|
- if (authorityToFs != null) {
|
|
|
- authorityToFs.remove(uri.getAuthority());
|
|
|
- if (authorityToFs.isEmpty()) {
|
|
|
- CACHE.remove(uri.getScheme());
|
|
|
- if (CACHE.isEmpty() && !clientFinalizer.isAlive()) {
|
|
|
- if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
|
|
|
- LOG.info("Could not cancel cleanup thread, though no " +
|
|
|
- "FileSystems are open");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ CACHE.remove(new Cache.Key(getUri(), getConf()));
|
|
|
}
|
|
|
|
|
|
/** Return the total size of all files in the filesystem.*/
|
|
@@ -1212,4 +1169,118 @@ public abstract class FileSystem extends Configured {
|
|
|
public void setOwner(Path p, String username, String groupname
|
|
|
) throws IOException {
|
|
|
}
|
|
|
+
|
|
|
+ private static FileSystem createFileSystem(URI uri, Configuration conf
|
|
|
+ ) throws IOException {
|
|
|
+ Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
|
|
|
+ if (clazz == null) {
|
|
|
+ throw new IOException("No FileSystem for scheme: " + uri.getScheme());
|
|
|
+ }
|
|
|
+ FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
|
|
|
+ fs.initialize(uri, conf);
|
|
|
+ return fs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Caching FileSystem objects */
|
|
|
+ private static class Cache {
|
|
|
+ final Map<Key, FsRef> map = new HashMap<Key, FsRef>();
|
|
|
+
|
|
|
+ synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
|
|
|
+ Key key = new Key(uri, conf);
|
|
|
+ FsRef ref = map.get(key);
|
|
|
+ FileSystem fs = ref == null? null: ref.get();
|
|
|
+ if (fs == null) {
|
|
|
+ if (map.isEmpty() && !clientFinalizer.isAlive()) {
|
|
|
+ Runtime.getRuntime().addShutdownHook(clientFinalizer);
|
|
|
+ }
|
|
|
+
|
|
|
+ fs = createFileSystem(uri, conf);
|
|
|
+ map.put(key, new FsRef(fs, key));
|
|
|
+ }
|
|
|
+ return fs;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized FsRef remove(Key key) {
|
|
|
+ FsRef ref = map.remove(key);
|
|
|
+ if (map.isEmpty() && !clientFinalizer.isAlive()) {
|
|
|
+ if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
|
|
|
+ LOG.info("Could not cancel cleanup thread, though no " +
|
|
|
+ "FileSystems are open");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ref;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void closeAll() throws IOException {
|
|
|
+ List<IOException> exceptions = new ArrayList<IOException>();
|
|
|
+ for(FsRef ref : new ArrayList<FsRef>(map.values())) {
|
|
|
+ FileSystem fs = ref.get();
|
|
|
+ if (fs != null) {
|
|
|
+ try {
|
|
|
+ fs.close();
|
|
|
+ }
|
|
|
+ catch(IOException ioe) {
|
|
|
+ exceptions.add(ioe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ remove(ref.key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!exceptions.isEmpty()) {
|
|
|
+ throw MultipleIOException.createIOException(exceptions);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Reference of FileSystem which contains the cache key */
|
|
|
+ private static class FsRef {
|
|
|
+ final FileSystem fs;
|
|
|
+ final Key key;
|
|
|
+
|
|
|
+ FsRef(FileSystem fs, Key key) {
|
|
|
+ this.fs = fs;
|
|
|
+ this.key = key;
|
|
|
+ }
|
|
|
+
|
|
|
+ FileSystem get() {return fs;}
|
|
|
+ }
|
|
|
+
|
|
|
+ /** FileSystem.Cache.Key */
|
|
|
+ private static class Key {
|
|
|
+ final String scheme;
|
|
|
+ final String authority;
|
|
|
+ final String username;
|
|
|
+
|
|
|
+ Key(URI uri, Configuration conf) throws IOException {
|
|
|
+ scheme = uri.getScheme();
|
|
|
+ authority = uri.getAuthority();
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
|
|
|
+ username = ugi == null? null: ugi.getUserName();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public int hashCode() {
|
|
|
+ return (scheme + authority + username).hashCode();
|
|
|
+ }
|
|
|
+
|
|
|
+ static boolean isEqual(Object a, Object b) {
|
|
|
+ return a == b || (a != null && a.equals(b));
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public boolean equals(Object obj) {
|
|
|
+ if (obj == this) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if (obj != null && obj instanceof Key) {
|
|
|
+ Key that = (Key)obj;
|
|
|
+ return isEqual(this.scheme, that.scheme)
|
|
|
+ && isEqual(this.authority, that.authority)
|
|
|
+ && isEqual(this.username, that.username);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|