Pārlūkot izejas kodu

HADOOP-5351. Fixed a memory leak in JobTracker due to stable FS objects in FSCache. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1497962 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 gadi atpakaļ
vecāks
revīzija
b81482626f

+ 3 - 0
CHANGES.txt

@@ -105,6 +105,9 @@ Release 1.2.1 - Unreleased
     HADOOP-9665. Fixed BlockDecompressorStream#decompress to return -1 rather
     than throw EOF at end of file. (Zhijie Shen via acmurthy)
 
+    HADOOP-5351. Fixed a memory leak in JobTracker due to stable FS objects in
+    FSCache. (Sandy Ryza via acmurthy)
+
 Release 1.2.0 - 2013.05.05
 
   INCOMPATIBLE CHANGES

+ 9 - 0
src/core/org/apache/hadoop/fs/FileSystem.java

@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -146,6 +147,14 @@ public abstract class FileSystem extends Configured implements Closeable {
   public static void setDefaultUri(Configuration conf, String uri) {
     setDefaultUri(conf, URI.create(fixName(uri)));
   }
+  
+  /** Get the number of entries in the filesystem cache
+   * @return the number of entries in the filesystem cache
+   */
+  @Private
+  public static int getCacheSize() {
+    return CACHE.map.size();
+  }
 
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.

+ 10 - 2
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -105,8 +106,15 @@ public class CleanupQueue {
       (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
           new PrivilegedExceptionAction<Object>() {
             public Object run() throws IOException {
-             p.getFileSystem(conf).delete(p, true);
-             return null;
+              FileSystem fs = p.getFileSystem(conf);
+              try {
+                fs.delete(p, true);
+                return null;
+              } finally {
+                // So that we don't leave an entry in the FileSystem cache for
+                // every job.
+                fs.close();
+              }
             }
           });
       

+ 11 - 6
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -3307,6 +3307,7 @@ public class JobInProgress {
    * removing all delegation token etc.
    */
   void cleanupJob() {
+    FileSystem tempDirFs = null;
     synchronized (this) {
       try {
         // Definitely remove the local-disk copy of the job file
@@ -3324,6 +3325,7 @@ public class JobInProgress {
         if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
             !conf.getKeepFailedTaskFiles()) {
           Path jobTempDirPath = new Path(jobTempDir);
+          tempDirFs = jobTempDirPath.getFileSystem(conf);
           CleanupQueue.getInstance().addToQueue(
               new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
         }
@@ -3341,12 +3343,15 @@ public class JobInProgress {
       this.runningReduces = null;
     }
     
-    //close the user's FS
-    try {
-      fs.close();
-    } catch (IOException ie) {
-      LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
-          " while closing FileSystem for " + userUGI);
+    // Close the user's FS.  Or don't, in the common case of FS being the same
+    // FS as the temp directory FS, as it will be closed by the CleanupQueue.
+    if (tempDirFs != fs) {
+      try {
+        fs.close();
+      } catch (IOException ie) {
+        LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
+            " while closing FileSystem for " + userUGI);
+      }
     }
   }