Browse Source

MAPREDUCE-5351. Addedndum patch to ensure we don't incorrectly close wrong filesystems. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1499904 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 years ago
parent
commit
bd4697fa86

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -112,8 +112,10 @@ public class CleanupQueue {
                 return null;
               } finally {
                 // So that we don't leave an entry in the FileSystem cache for
-                // every job.
-                fs.close();
+                // every UGI that a job is submitted with.
+                if (ugi != null) {
+                  fs.close();
+                }
               }
             }
           });

+ 18 - 3
src/test/org/apache/hadoop/mapred/TestCleanupQueue.java

@@ -33,20 +33,35 @@ public class TestCleanupQueue {
   @Test (timeout = 2000)
   public void testCleanupQueueClosesFilesystem() throws IOException,
       InterruptedException {
+    Configuration conf = new Configuration();
     File file = new File("afile.txt");
     file.createNewFile();
     Path path = new Path(file.getAbsoluteFile().toURI());
     
-    FileSystem.get(new Configuration());
+    FileSystem.get(conf);
     Assert.assertEquals(1, FileSystem.getCacheSize());
     
+    // With UGI, should close FileSystem
     CleanupQueue cleanupQueue = new CleanupQueue();
-    PathDeletionContext context = new PathDeletionContext(path,
-        new Configuration(), UserGroupInformation.getLoginUser());
+    PathDeletionContext context = new PathDeletionContext(path, conf,
+        UserGroupInformation.getLoginUser());
     cleanupQueue.addToQueue(context);
     
     while (FileSystem.getCacheSize() > 0) {
       Thread.sleep(100);
     }
+    
+    file.createNewFile();
+    FileSystem.get(conf);
+    Assert.assertEquals(1, FileSystem.getCacheSize());
+    
+    // Without UGI, should not close FileSystem
+    context = new PathDeletionContext(path, conf);
+    cleanupQueue.addToQueue(context);
+    
+    while (file.exists()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, FileSystem.getCacheSize());
   }
 }