Ver Fonte

Merge -r 1497961:1497963 from branch-1 to branch-1.2 to fix MAPREDUCE-5351. Fixed a memory leak in JobTracker due to stable FS objects in FSCache. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2@1497964 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy há 12 anos atrás
pai
commit
4218d36db9

+ 3 - 0
CHANGES.txt

@@ -40,6 +40,9 @@ Release 1.2.1 - Unreleased
     HADOOP-9665. Fixed BlockDecompressorStream#decompress to return -1 rather
     than throw EOF at end of file. (Zhijie Shen via acmurthy)
 
+    MAPREDUCE-5351. Fixed a memory leak in JobTracker due to stable FS objects in
+    FSCache. (Sandy Ryza via acmurthy)
+
 Release 1.2.0 - 2013.05.05
 
   INCOMPATIBLE CHANGES

+ 9 - 0
src/core/org/apache/hadoop/fs/FileSystem.java

@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -146,6 +147,14 @@ public abstract class FileSystem extends Configured implements Closeable {
   public static void setDefaultUri(Configuration conf, String uri) {
     setDefaultUri(conf, URI.create(fixName(uri)));
   }
+  
+  /** Get the number of entries in the filesystem cache
+   * @return the number of entries in the filesystem cache
+   */
+  @Private
+  public static int getCacheSize() {
+    return CACHE.map.size();
+  }
 
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.

+ 10 - 2
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -105,8 +106,15 @@ public class CleanupQueue {
       (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
           new PrivilegedExceptionAction<Object>() {
             public Object run() throws IOException {
-             p.getFileSystem(conf).delete(p, true);
-             return null;
+              FileSystem fs = p.getFileSystem(conf);
+              try {
+                fs.delete(p, true);
+                return null;
+              } finally {
+                // So that we don't leave an entry in the FileSystem cache for
+                // every job.
+                fs.close();
+              }
             }
           });
       

+ 11 - 6
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -3307,6 +3307,7 @@ public class JobInProgress {
    * removing all delegation token etc.
    */
   void cleanupJob() {
+    FileSystem tempDirFs = null;
     synchronized (this) {
       try {
         // Definitely remove the local-disk copy of the job file
@@ -3324,6 +3325,7 @@ public class JobInProgress {
         if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
             !conf.getKeepFailedTaskFiles()) {
           Path jobTempDirPath = new Path(jobTempDir);
+          tempDirFs = jobTempDirPath.getFileSystem(conf);
           CleanupQueue.getInstance().addToQueue(
               new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
         }
@@ -3341,12 +3343,15 @@ public class JobInProgress {
       this.runningReduces = null;
     }
     
-    //close the user's FS
-    try {
-      fs.close();
-    } catch (IOException ie) {
-      LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
-          " while closing FileSystem for " + userUGI);
+    // Close the user's FS.  Or don't, in the common case of FS being the same
+    // FS as the temp directory FS, as it will be closed by the CleanupQueue.
+    if (tempDirFs != fs) {
+      try {
+        fs.close();
+      } catch (IOException ie) {
+        LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
+            " while closing FileSystem for " + userUGI);
+      }
     }
   }
 

+ 52 - 0
src/test/org/apache/hadoop/mapred/TestCleanupQueue.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+public class TestCleanupQueue {
+  @Test (timeout = 2000)
+  public void testCleanupQueueClosesFilesystem() throws IOException,
+      InterruptedException {
+    File file = new File("afile.txt");
+    file.createNewFile();
+    Path path = new Path(file.getAbsoluteFile().toURI());
+    
+    FileSystem.get(new Configuration());
+    Assert.assertEquals(1, FileSystem.getCacheSize());
+    
+    CleanupQueue cleanupQueue = new CleanupQueue();
+    PathDeletionContext context = new PathDeletionContext(path,
+        new Configuration(), UserGroupInformation.getLoginUser());
+    cleanupQueue.addToQueue(context);
+    
+    while (FileSystem.getCacheSize() > 0) {
+      Thread.sleep(100);
+    }
+  }
+}