Browse Source

merged MAPREDUCE-3343 to branch-1.0.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1243233 13f79535-47bb-0310-9956-ffa450edef68
Matthew Foley 13 years ago
parent
commit
20165758da

+ 3 - 0
CHANGES.txt

@@ -43,6 +43,9 @@ Release 1.0.1 - 2012.02.12
     Double.MAX_VALUE) to avoid making Ganglia's gmetad core. (Varun Kapoor
     via mattf)
 
+    MAPREDUCE-3343. TaskTracker Out of Memory because of distributed cache.
+    (Zhao Yunjiong).
+
 Release 1.0.0 - 2011.12.15
 
   NEW FEATURES

+ 12 - 0
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -664,6 +664,18 @@ public class TrackerDistributedCacheManager {
     }
   }
 
+  public void removeTaskDistributedCacheManager(JobID jobId) {
+    jobArchives.remove(jobId);
+  }
+
+  /*
+   * This method is called from unit tests.
+   */
+  protected TaskDistributedCacheManager getTaskDistributedCacheManager(
+      JobID jobId) {
+    return jobArchives.get(jobId);
+  }
+
   /**
    * Determines timestamps of files to be cached, and stores those
    * in the configuration.  This is intended to be used internally by JobClient

+ 3 - 2
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -2074,8 +2074,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       runningJobs.remove(jobId);
     }
     getJobTokenSecretManager().removeTokenForJob(jobId.toString());  
-  }      
-    
+    distributedCacheManager.removeTaskDistributedCacheManager(jobId);
+  }
+
   /**
    * This job's files are no longer needed on this TT, remove them.
    *

+ 19 - 0
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -1137,4 +1137,23 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     }
   }
 
+  public void testRemoveTaskDistributedCacheManager() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(
+        conf, taskController);
+    JobID jobId = new JobID("jobtracker", 1);
+    manager.newTaskDistributedCacheManager(jobId, conf);
+
+    TaskDistributedCacheManager taskDistributedCacheManager = manager
+        .getTaskDistributedCacheManager(jobId);
+    assertNotNull(taskDistributedCacheManager);
+
+    manager.removeTaskDistributedCacheManager(jobId);
+
+    taskDistributedCacheManager = manager.getTaskDistributedCacheManager(jobId);
+    assertNull(taskDistributedCacheManager);
+  }
+
 }