浏览代码

commit 01d660ded0a45eb2fdc9f5a574e090eb7dcf54e2
Author: Devaraj Das <ddas@yahoo-inc.com>
Date: Thu Sep 30 23:14:43 2010 -0700

Improved handling of cache-file sizes in the DistributedCache

+++ b/YAHOO-CHANGES.txt
+Release 0.20.201.1 - unreleased
+
+ Improved handling of cache-file size updates in
+ DistributedCache; adds a testcase. (ddas)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077728 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年之前
父节点
当前提交
869b8c4c2a

+ 10 - 0
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -187,6 +187,16 @@ public class TrackerDistributedCacheManager {
                                                       isArchive);
           } else {
             localizedPath = localPath;
+            if (!isArchive) {
+              //for private archives, the lengths come over RPC from the 
+              //JobLocalizer since the JobLocalizer is the one who expands
+              //archives and gets the total length
+              lcacheStatus.size = fileStatus.getLen();
+
+              // Increase the size and sub directory count of the cache
+              // from baseDirSize and baseDirNumberSubDir.
+              addCacheInfoUpdate(lcacheStatus);
+            }
           }
           lcacheStatus.initComplete();
         } else {

+ 25 - 8
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -68,7 +68,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
-  private static final int LOCAL_CACHE_SUBDIR_LIMIT = 2;
+  private static final int LOCAL_CACHE_SUBDIR_LIMIT = 1;
   private static final int LOCAL_CACHE_SUBDIR = 2;
   protected Configuration conf;
   protected Path firstCacheFile;
@@ -517,8 +517,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     Configuration conf2 = new Configuration(conf);
     conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
-    conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
-                   LOCAL_CACHE_SUBDIR_LIMIT);
+    
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
@@ -548,6 +547,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     		                         CacheFile.FileType.REGULAR, true, 
     		                         stat.getModificationTime(),
     		                         true); 
+    assertTrue("DistributedCache currently doesn't have cached file",
+        localfs.exists(firstLocalCache));
     Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(secondCacheFilePublic), false, 
@@ -582,15 +583,22 @@ public class TestTrackerDistributedCacheManager extends TestCase {
            + "directory names when it deleted the files they contained "
            + "because they collectively exceeded the size limit.",
        localfs.listStatus(cachesBase).length > 1);
-    
+    conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT * 10);
+    conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
+        LOCAL_CACHE_SUBDIR_LIMIT);
+    manager = 
+      new TrackerDistributedCacheManager(conf2, taskController);
     // Now we test the number of sub directories limit
     // Create the temporary cache files to be used in the tests.
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
     // Adding two more small files, so it triggers the number of sub directory
     // limit but does not trigger the file size limit.
-    createPublicTempFile(thirdCacheFile);
-    createPublicTempFile(fourthCacheFile);
+    createPrivateTempFile(thirdCacheFile);
+    createPrivateTempFile(fourthCacheFile);
+    DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
     stat = fs.getFileStatus(thirdCacheFile);
     CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), 
             CacheFile.FileType.REGULAR, false, 
@@ -600,7 +608,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(thirdCacheFile), false,
         fs.getFileStatus(thirdCacheFile).getModificationTime(), 
-        true, cfile3);
+        false, cfile3);
+    DistributedCache.setLocalFiles(conf2, thirdLocalCache.toString());
+    JobLocalizer.downloadPrivateCache(conf2);
     // Release the third cache so that it can be deleted while sweeping
     manager.releaseCache(cfile3.getStatus());
     // Getting the fourth cache will make the number of sub directories becomes
@@ -610,10 +620,17 @@ public class TestTrackerDistributedCacheManager extends TestCase {
             CacheFile.FileType.REGULAR, false, 
             stat.getModificationTime(),
             true); 
+    assertTrue("DistributedCache currently doesn't have cached file",
+        localfs.exists(thirdLocalCache));
+    
+    DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
+    DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
     Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
-        fs.getFileStatus(fourthCacheFile).getModificationTime(), true, cfile4);
+        fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache exceeds the number of sub directories limit.",
         localfs.exists(thirdLocalCache));