Forráskód Böngészése

HADOOP-6640. FileSystem.get() does RPC retries within a static synchronized block. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@930096 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 15 éve
szülő
commit
7b1ac5ac13

+ 3 - 0
CHANGES.txt

@@ -306,6 +306,9 @@ Trunk (unreleased changes)
     HADOOP-6654. Fix code example in WritableComparable javadoc.  (Tom White
     via szetszwo)
 
+    HADOOP-6640. FileSystem.get() does RPC retries within a static
+    synchronized block. (hairong)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 20 - 7
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -1758,32 +1758,45 @@ public abstract class FileSystem extends Configured implements Closeable {
     /** A variable that makes all objects in the cache unique */
     private static AtomicLong unique = new AtomicLong(1);
 
-    synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
+    FileSystem get(URI uri, Configuration conf) throws IOException{
       Key key = new Key(uri, conf);
       return getInternal(uri, conf, key);
     }
 
     /** The objects inserted into the cache using this method are all unique */
-    synchronized FileSystem getUnique(URI uri, Configuration conf) throws IOException{
+    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
       Key key = new Key(uri, conf, unique.getAndIncrement());
       return getInternal(uri, conf, key);
     }
 
     private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
-      FileSystem fs = map.get(key);
-      if (fs == null) {
-        fs = createFileSystem(uri, conf);
+      FileSystem fs;
+      synchronized (this) {
+        fs = map.get(key);
+      }
+      if (fs != null) {
+        return fs;
+      }
+
+      fs = createFileSystem(uri, conf);
+      synchronized (this) { // refetch the lock again
+        FileSystem oldfs = map.get(key);
+        if (oldfs != null) { // a file system is created while lock is releasing
+          fs.close(); // close the new file system
+          return oldfs;  // return the old file system
+        }
+        
+        // now insert the new file system into the map
         if (map.isEmpty() && !clientFinalizer.isAlive()) {
           Runtime.getRuntime().addShutdownHook(clientFinalizer);
         }
         fs.key = key;
         map.put(key, fs);
-
         if (conf.getBoolean("fs.automatic.close", true)) {
           toAutoClose.add(key);
         }
+        return fs;
       }
-      return fs;
     }
 
     synchronized void remove(Key key, FileSystem fs) {

+ 45 - 0
src/test/core/org/apache/hadoop/fs/TestFileSystemCaching.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.fs;
 import static junit.framework.Assert.assertSame;
 import static junit.framework.Assert.assertNotSame;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -46,6 +48,49 @@ public class TestFileSystemCaching {
     assertSame(fs1, fs2);
   }
 
+  public static class InitializeForeverFileSystem extends LocalFileSystem {
+    public void initialize(URI uri, Configuration conf) throws IOException {
+      // notify that InitializeForeverFileSystem started initialization
+      synchronized (conf) {
+        conf.notify();
+      }
+      try {
+        while (true) {
+          Thread.sleep(1000);
+        }
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+  }
+  
+  @Test
+  public void testCacheEnabledWithInitializeForeverFS() throws Exception {
+    final Configuration conf = new Configuration();
+    Thread t = new Thread() {
+      public void run() {
+        conf.set("fs.localfs1.impl", "org.apache.hadoop.fs." +
+         "TestFileSystemCaching$InitializeForeverFileSystem");
+        try {
+          FileSystem.get(new URI("localfs1://a"), conf);
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (URISyntaxException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    t.start();
+    // wait for InitializeForeverFileSystem to start initialization
+    synchronized (conf) {
+      conf.wait();
+    }
+    conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
+    FileSystem.get(new URI("cachedfile://a"), conf);
+    t.interrupt();
+    t.join();
+  }
+
   @Test
   public void testCacheDisabled() throws Exception {
     Configuration conf = new Configuration();