Browse Source

MAPREDUCE-3824. Distributed caches are not removed properly. Contributed by Thomas Graves.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1291093 13f79535-47bb-0310-9956-ffa450edef68
Matthew Foley 13 năm trước cách đây
mục cha
commit
04829a40e6

+ 2 - 0
CHANGES.txt

@@ -200,6 +200,8 @@ Release 1.0.1 - 2012.02.19
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
 
+    MAPREDUCE-3824. Distributed caches are not removed properly. (Thomas Graves
+    via mattf)
 
 Release 1.0.0 - 2011.12.15
 

+ 3 - 3
src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java

@@ -259,10 +259,10 @@ public class TaskDistributedCacheManager {
   public void setSizes(long[] sizes) throws IOException {
     int i = 0;
     for (CacheFile c: cacheFiles) {
-      if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE && 
-    	  c.status != null) {
-        distributedCacheManager.setSize(c.status, sizes[i++]);
+      if (!c.isPublic && c.status != null) {
+        distributedCacheManager.setSize(c.status, sizes[i]);
       }
+      i++;
     }
   }
 

+ 30 - 10
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -557,7 +558,7 @@ public class TrackerDistributedCacheManager {
     //
     // This field should be accessed under global cachedArchives lock.
     //
-    private int refcount;    // number of instances using this cache
+    private AtomicInteger refcount;    // number of instances using this cache
 
     //
     // The following two fields should be accessed under
@@ -588,7 +589,7 @@ public class TrackerDistributedCacheManager {
                        String uniqueString, String user, String key) {
       super();
       this.localizedLoadPath = localLoadPath;
-      this.refcount = 0;
+      this.refcount = new AtomicInteger();
       this.localizedBaseDir = baseDir;
       this.size = 0;
       this.subDir = subDir;
@@ -598,14 +599,16 @@ public class TrackerDistributedCacheManager {
     }
     
     public synchronized void incRefCount() {
-      refcount += 1;
+      refcount.incrementAndGet() ;
+      LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
     }
 
     public void decRefCount() {
       synchronized (cachedArchives) {
         synchronized (this) {
-          refcount -= 1;
-          if(refcount <= 0) {
+          refcount.decrementAndGet() ;
+          LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+          if(refcount.get() <= 0) {
             String key = this.key;
             cachedArchives.remove(key);
             cachedArchives.put(key, this);
@@ -615,11 +618,12 @@ public class TrackerDistributedCacheManager {
     }
 
     public int getRefCount() {
-      return refcount;
+      return refcount.get();
     }
 
     public synchronized boolean isUsed() {
-      return refcount > 0;
+      LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+      return refcount.get() > 0;
     }
 
     Path getBaseDir(){
@@ -652,7 +656,8 @@ public class TrackerDistributedCacheManager {
         try {
           localFs.delete(f.getValue().localizedLoadPath, true);
         } catch (IOException ie) {
-          LOG.debug("Error cleaning up cache", ie);
+          LOG.debug("Error cleaning up cache (" + 
+              f.getValue().localizedLoadPath + ")", ie);
         }
       }
       cachedArchives.clear();
@@ -668,6 +673,10 @@ public class TrackerDistributedCacheManager {
     return result;
   }
 
+  /**
+   * Set the sizes for any archives, files, or directories in the private
+   * distributed cache.
+   */
   public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
     TaskDistributedCacheManager mgr = jobArchives.get(jobId);
     if (mgr != null) {
@@ -989,8 +998,13 @@ public class TrackerDistributedCacheManager {
       HashMap<Path, CacheDir> toBeCleanedBaseDir = 
         new HashMap<Path, CacheDir>();
       synchronized (properties) {
+        LOG.debug("checkAndCleanup: Allowed Cache Size test");
         for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
           CacheDir baseDirCounts = baseDir.getValue();
+          LOG.debug(baseDir.getKey() + ": allowedCacheSize=" + allowedCacheSize +
+              ",baseDirCounts.size=" + baseDirCounts.size +
+              ",allowedCacheSubdirs=" + allowedCacheSubdirs + 
+              ",baseDirCounts.subdirs=" + baseDirCounts.subdirs);
           if (allowedCacheSize < baseDirCounts.size ||
               allowedCacheSubdirs < baseDirCounts.subdirs) {
             CacheDir tcc = new CacheDir();
@@ -1002,6 +1016,7 @@ public class TrackerDistributedCacheManager {
       }
       // try deleting cache Status with refcount of zero
       synchronized (cachedArchives) {
+        LOG.debug("checkAndCleanup: Global Cache Size Check");
         for(
             Iterator<Map.Entry<String, CacheStatus>> it 
             = cachedArchives.entrySet().iterator();
@@ -1010,11 +1025,16 @@ public class TrackerDistributedCacheManager {
           String cacheId = entry.getKey();
           CacheStatus cacheStatus = cachedArchives.get(cacheId);
           CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+
           if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
             synchronized (cacheStatus) {
               // if reference count is zero mark the cache for deletion
-              if (!cacheStatus.isUsed()) {
-                leftToClean.size -= cacheStatus.size;
+              boolean isUsed = cacheStatus.isUsed();
+              long cacheSize = cacheStatus.size; 
+              LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + isUsed + 
+                  " size=" + cacheSize + " leftToClean.size=" + leftToClean.size);
+              if (!isUsed) {
+                leftToClean.size -= cacheSize;
                 leftToClean.subdirs--;
                 // delete this cache entry from the global list 
                 // and mark the localized file for deletion

+ 8 - 3
src/mapred/org/apache/hadoop/mapred/JobLocalizer.java

@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -339,21 +340,25 @@ public class JobLocalizer {
    * @return the size of the archive objects
    */
   public static long[] downloadPrivateCache(Configuration conf) throws IOException {
-    downloadPrivateCacheObjects(conf,
+    long[] fileSizes = downloadPrivateCacheObjects(conf,
                                 DistributedCache.getCacheFiles(conf),
                                 DistributedCache.getLocalCacheFiles(conf),
                                 DistributedCache.getFileTimestamps(conf),
                                 TrackerDistributedCacheManager.
                                   getFileVisibilities(conf),
                                 false);
-    return 
-      downloadPrivateCacheObjects(conf,
+
+    long[] archiveSizes = downloadPrivateCacheObjects(conf,
                                   DistributedCache.getCacheArchives(conf),
                                   DistributedCache.getLocalCacheArchives(conf),
                                   DistributedCache.getArchiveTimestamps(conf),
                                   TrackerDistributedCacheManager.
                                     getArchiveVisibilities(conf),
                                   true);
+
+    // The order here matters - it has to match order of cache files
+    // in TaskDistributedCacheManager.
+    return ArrayUtils.addAll(fileSizes, archiveSizes);
   }
 
   public void localizeJobFiles(JobID jobid, JobConf jConf,

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -173,7 +173,7 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
 
   /**
    * The job initializer needs to report the sizes of the archive
-   * objects in the private distributed cache.
+   * objects and directories in the private distributed cache.
    * @param jobId the job to update
    * @param sizes the array of sizes that were computed
    * @throws IOException

+ 90 - 14
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -77,6 +77,10 @@ public class TestTrackerDistributedCacheManager extends TestCase {
   protected Path firstCacheFilePublic;
   protected Path secondCacheFile;
   protected Path secondCacheFilePublic;
+  protected Path firstCacheDirPublic;
+  protected Path firstCacheDirPrivate;
+  protected Path firstCacheFileInDirPublic;
+  protected Path firstCacheFileInDirPrivate;
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
@@ -136,6 +140,15 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     createPublicTempFile(secondCacheFilePublic);
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
+
+    firstCacheDirPublic = new Path(TEST_ROOT_DIR, "firstcachedirPublic");
+    firstCacheDirPrivate = new Path(TEST_ROOT_DIR, "firstcachedirPrivate");
+    firstCacheFileInDirPublic = new Path(firstCacheDirPublic, "firstcacheFileinDirPublic.txt");
+    firstCacheFileInDirPrivate = new Path(firstCacheDirPrivate, "firstcacheFileinDirPrivate.txt");
+    createPublicTempDir(firstCacheDirPublic);
+    createPrivateTempDir(firstCacheDirPrivate);
+    createPublicTempFile(firstCacheFileInDirPublic);
+    createPrivateTempFile(firstCacheFileInDirPrivate);
   }
   
   protected void refreshConf(Configuration conf) throws IOException {
@@ -263,41 +276,79 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
     // Task localizing for first job
+    JobID jobId = new JobID("jt", 1);
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+        .newTaskDistributedCacheManager(jobId, conf1);
     handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), 
         TaskTracker.getPrivateDistributedCacheDir(userName));
-    JobLocalizer.downloadPrivateCache(conf1);
+    long[] sizes = JobLocalizer.downloadPrivateCache(conf1);
+    if (sizes != null) {
+      manager.setArchiveSizes(jobId, sizes);
+    }
+    handle.release();
+    for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+      assertEquals(0, manager.getReferenceCount(c.getStatus()));
+      long filesize = FileUtil.getDU(new File(c.getStatus().localizedLoadPath.getParent().toString()));
+      assertTrue("filesize is not greater than 0", filesize > 0);
+      assertEquals(filesize, c.getStatus().size);
+    }
+
+    // Test specifying directories to go into distributed cache and make
+    // their sizes are calculated properly.
+    Job job2 = new Job(conf);
+    Configuration conf2 = job2.getConfiguration();
+    conf1.set("user.name", userName);
+    DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
+    DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);
+
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+
+    // Task localizing for second job
+    JobID job2Id = new JobID("jt", 2);
+    handle = manager.newTaskDistributedCacheManager(job2Id, conf2);
+    handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    long[] sizes2 = JobLocalizer.downloadPrivateCache(conf2);
+    for (int j=0; j > sizes2.length; j++) {
+      LOG.info("size is: " + sizes2[j]);
+    }
+    if (sizes2 != null) {
+      manager.setArchiveSizes(job2Id, sizes2);
+    }
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       assertEquals(0, manager.getReferenceCount(c.getStatus()));
+      long filesize = FileUtil.getDU(new File(c.getStatus().localizedLoadPath.getParent().toString()));
+      assertTrue("filesize is not greater than 0", filesize > 0);
+      assertEquals(filesize, c.getStatus().size);
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     createPrivateTempFile(thirdCacheFile);
     
     // Configures another job with three regular files.
-    Job job2 = new Job(conf);
-    Configuration conf2 = job2.getConfiguration();
-    conf2.set("user.name", userName);
+    Job job3 = new Job(conf);
+    Configuration conf3 = job3.getConfiguration();
+    conf3.set("user.name", userName);
     // add a file that would get failed to localize
-    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
+    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf3);
     // add a file that is already localized by different job
-    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf3);
     // add a file that is never localized
-    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);
     
-    TrackerDistributedCacheManager.determineTimestamps(conf2);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf3);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf3);
 
-    // Task localizing for second job
+    // Task localizing for third job
     // localization for the "firstCacheFile" will fail.
-    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
+    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 3), conf3);
     Throwable th = null;
     try {
-      handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+      handle.setupCache(conf3, TaskTracker.getPublicDistributedCacheDir(),
           TaskTracker.getPrivateDistributedCacheDir(userName));
-      JobLocalizer.downloadPrivateCache(conf2);
+      JobLocalizer.downloadPrivateCache(conf3);
     } catch (IOException e) {
       th = e;
       LOG.info("Exception during setup", e);
@@ -949,6 +1000,13 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     createTempFile(p, TEST_FILE_SIZE);
   }
   
+  static void createTempDir(Path p) throws IOException {
+    File dir = new File(p.toString());
+    dir.mkdirs();
+    FileSystem.LOG.info("created temp directory: " + p);
+
+  }
+
   static void createTempFile(Path p, int size) throws IOException {
     File f = new File(p.toString());
     FileOutputStream os = new FileOutputStream(f);
@@ -971,12 +1029,30 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     FileUtil.chmod(p.toString(), "0770",true);
   }
 
+  static void createPublicTempDir(Path p)
+  throws IOException, InterruptedException {
+    createTempDir(p);
+    FileUtil.chmod(p.toString(), "0777",true);
+  }
+
+  static void createPrivateTempDir(Path p)
+  throws IOException, InterruptedException {
+    createTempDir(p);
+    FileUtil.chmod(p.toString(), "0770",true);
+  }
+
   @Override
   protected void tearDown() throws IOException {
     new File(firstCacheFile.toString()).delete();
     new File(secondCacheFile.toString()).delete();
     new File(firstCacheFilePublic.toString()).delete();
     new File(secondCacheFilePublic.toString()).delete();
+
+    new File(firstCacheFileInDirPublic.toString()).delete();
+    new File(firstCacheFileInDirPrivate.toString()).delete();
+    new File(firstCacheDirPrivate.toString()).delete();
+    new File(firstCacheDirPublic.toString()).delete();
+
     FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }