瀏覽代碼

HADOOP-6888. Add a new FileSystem API closeAllForUGI(..) for closing all file systems associated with a particular UGI. Contributed by Kan Zhang

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@980523 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 15 年之前
父節點
當前提交
ff8b7bc253

+ 3 - 0
CHANGES.txt

@@ -26,6 +26,9 @@ Trunk (unreleased changes)
     input path if recursive is true. Block locations are returned together
     input path if recursive is true. Block locations are returned together
     with each file's status. (hairong)
     with each file's status. (hairong)
 
 
+    HADOOP-6888. Add a new FileSystem API closeAllForUGI(..) for closing all
+    file systems associated with a particular UGI.  (Kan Zhang via szetszwo)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name 
     HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name 

+ 37 - 0
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -311,6 +311,17 @@ public abstract class FileSystem extends Configured implements Closeable {
     CACHE.closeAll();
     CACHE.closeAll();
   }
   }
 
 
+  /**
+   * Close all cached filesystems for a given UGI. Be sure those filesystems 
+   * are not used anymore.
+   * @param ugi
+   * @throws IOException
+   */
+  public static void closeAllForUGI(UserGroupInformation ugi) 
+  throws IOException {
+    CACHE.closeAll(ugi);
+  }
+
   /** Make sure that a path specifies a FileSystem. */
   /** Make sure that a path specifies a FileSystem. */
   public Path makeQualified(Path path) {
   public Path makeQualified(Path path) {
     checkPath(path);
     checkPath(path);
@@ -1816,6 +1827,32 @@ public abstract class FileSystem extends Configured implements Closeable {
       }
       }
     }
     }
 
 
+    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
+      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
+      //Make a pass over the list and collect the filesystems to close
+      //we cannot close inline since close() removes the entry from the Map
+      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
+        final Key key = entry.getKey();
+        final FileSystem fs = entry.getValue();
+        if (ugi.equals(key.ugi) && fs != null) {
+          targetFSList.add(fs);   
+        }
+      }
+      List<IOException> exceptions = new ArrayList<IOException>();
+      //now make a pass over the target list and close each
+      for (FileSystem fs : targetFSList) {
+        try {
+          fs.close();
+        }
+        catch(IOException ioe) {
+          exceptions.add(ioe);
+        }
+      }
+      if (!exceptions.isEmpty()) {
+        throw MultipleIOException.createIOException(exceptions);
+      }
+    }
+
     /** FileSystem.Cache.Key */
     /** FileSystem.Cache.Key */
     static class Key {
     static class Key {
       final String scheme;
       final String scheme;

+ 29 - 0
src/test/core/org/apache/hadoop/fs/TestFileSystemCaching.java

@@ -179,4 +179,33 @@ public class TestFileSystemCaching {
     fs1.close();
     fs1.close();
     fs2.close();
     fs2.close();
   }
   }
+  
+  @Test
+  public void testCloseAllForUGI() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
+    UserGroupInformation ugiA = UserGroupInformation.createRemoteUser("foo");
+    FileSystem fsA = ugiA.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws Exception {
+        return FileSystem.get(new URI("cachedfile://a"), conf);
+      }
+    });
+    //Now we should get the cached filesystem
+    FileSystem fsA1 = ugiA.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws Exception {
+        return FileSystem.get(new URI("cachedfile://a"), conf);
+      }
+    });
+    assertSame(fsA, fsA1);
+    
+    FileSystem.closeAllForUGI(ugiA);
+    
+    //Now we should get a different (newly created) filesystem
+    fsA1 = ugiA.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws Exception {
+        return FileSystem.get(new URI("cachedfile://a"), conf);
+      }
+    });
+    assertNotSame(fsA, fsA1);
+  }
 }
 }