Browse Source

HADOOP-1130. The FileSystem.closeAll() method closes all existing
DFSClients. (Chris Douglas via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@594452 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 years ago
parent
commit
e47bdc3b1a

+ 3 - 0
CHANGES.txt

@@ -94,6 +94,9 @@ Trunk (unreleased changes)
     HADOOP-2121. Cleanup DFSOutputStream when the stream encountered errors
     HADOOP-2121. Cleanup DFSOutputStream when the stream encountered errors
     when Datanodes became full.  (Raghu Angadi via dhruba)
     when Datanodes became full.  (Raghu Angadi via dhruba)
 
 
+    HADOOP-1130. The FileSystem.closeAll() method closes all existing
+    DFSClients.  (Chris Douglas via dhruba)
+
 Release 0.15.1 -
 Release 0.15.1 -
 
 
   BUG FIXES
   BUG FIXES

+ 0 - 32
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -72,38 +72,6 @@ class DFSClient implements FSConstants {
   private TreeMap<String, OutputStream> pendingCreates =
   private TreeMap<String, OutputStream> pendingCreates =
     new TreeMap<String, OutputStream>();
     new TreeMap<String, OutputStream>();
     
     
-  /**
-   * A class to track the list of DFS clients, so that they can be closed
-   * on exit.
-   */
-  private static class ClientFinalizer extends Thread {
-    private List<DFSClient> clients = new ArrayList<DFSClient>();
-
-    public synchronized void addClient(DFSClient client) {
-      clients.add(client);
-    }
-
-    @Override
-    public synchronized void run() {
-      for (DFSClient client : clients) {
-        if (client.running) {
-          try {
-            client.close();
-          } catch (IOException ie) {
-            System.err.println("Error closing client");
-            ie.printStackTrace();
-          }
-        }
-      }
-    }
-  }
-
-  // add a cleanup thread
-  private static ClientFinalizer clientFinalizer = new ClientFinalizer();
-  static {
-    Runtime.getRuntime().addShutdownHook(clientFinalizer);
-  }
-
   private static ClientProtocol createNamenode(
   private static ClientProtocol createNamenode(
       InetSocketAddress nameNodeAddr, Configuration conf)
       InetSocketAddress nameNodeAddr, Configuration conf)
     throws IOException {
     throws IOException {

+ 29 - 4
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -145,6 +145,9 @@ public abstract class FileSystem extends Configured {
 
 
     Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
     Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
     if (authorityToFs == null) {
     if (authorityToFs == null) {
+      if (CACHE.isEmpty()) {
+        Runtime.getRuntime().addShutdownHook(clientFinalizer);
+      }
       authorityToFs = new HashMap<String,FileSystem>();
       authorityToFs = new HashMap<String,FileSystem>();
       CACHE.put(scheme, authorityToFs);
       CACHE.put(scheme, authorityToFs);
     }
     }
@@ -163,16 +166,29 @@ public abstract class FileSystem extends Configured {
     return fs;
     return fs;
   }
   }
 
 
+  private static class ClientFinalizer extends Thread {
+    public synchronized void run() {
+      try {
+        FileSystem.closeAll();
+      } catch (IOException e) {
+        LOG.info("FileSystem.closeAll() threw an exception:\n" + e);
+      }
+    }
+  }
+  private static final ClientFinalizer clientFinalizer = new ClientFinalizer();
+
   /**
   /**
    * Close all cached filesystems. Be sure those filesystems are not
    * Close all cached filesystems. Be sure those filesystems are not
    * used anymore.
    * used anymore.
    * 
    * 
    * @throws IOException
    * @throws IOException
    */
    */
-  public static synchronized void closeAll() throws IOException{
-    for(Map<String, FileSystem>  fss : CACHE.values()){
-      for(FileSystem fs : fss.values()){
-        fs.close();
+  public static synchronized void closeAll() throws IOException {
+    Set<String> scheme = new HashSet<String>(CACHE.keySet());
+    for (String s : scheme) {
+      Set<String> authority = new HashSet<String>(CACHE.get(s).keySet());
+      for (String a : authority) {
+        CACHE.get(s).get(a).close();
       }
       }
     }
     }
   }
   }
@@ -895,6 +911,15 @@ public abstract class FileSystem extends Configured {
       Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
       Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
       if (authorityToFs != null) {
       if (authorityToFs != null) {
         authorityToFs.remove(uri.getAuthority());
         authorityToFs.remove(uri.getAuthority());
+        if (authorityToFs.isEmpty()) {
+          CACHE.remove(uri.getScheme());
+          if (CACHE.isEmpty() && !clientFinalizer.isAlive()) {
+            if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
+              LOG.info("Could not cancel cleanup thread, though no " +
+                       "FileSystems are open");
+            }
+          }
+        }
       }
       }
     }
     }
   }
   }