浏览代码

MAPREDUCE-2479. Move distributed cache cleanup to a background task,
backporting MAPREDUCE-1568. Contributed by Robert Joseph Evans


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1103940 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 14 年之前
父节点
当前提交
bdc3142334

+ 3 - 0
CHANGES.txt

@@ -21,6 +21,9 @@ Release 0.20.205.0 - unreleased
 
     HADOOP-7274. Fix typos in IOUtils. (Jonathan Eagles via cdouglas)
 
+    MAPREDUCE-2479. Move distributed cache cleanup to a background task,
+    backporting MAPREDUCE-1568. (Robert Joseph Evans via cdouglas)
+
 Release 0.20.204.0 - unreleased
 
   BUG FIXES

+ 163 - 107
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -22,12 +22,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
@@ -76,14 +76,6 @@ public class TrackerDistributedCacheManager {
   private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
     FsPermission.createImmutable((short) 0755);
 
-  // For holding the properties of each cache directory
-  static class CacheDir {
-    long size;
-    long subdirs;
-  }
-  private TreeMap<Path, CacheDir> baseDirProperties =
-    new TreeMap<Path, CacheDir>();
-
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
   private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
@@ -100,6 +92,9 @@ public class TrackerDistributedCacheManager {
   private Configuration trackerConf;
   
   private static final Random random = new Random();
+  
+  BaseDirManager baseDirManager = new BaseDirManager();
+  CleanupThread cleanupThread;
 
   public TrackerDistributedCacheManager(Configuration conf,
                                         TaskController controller
@@ -116,6 +111,7 @@ public class TrackerDistributedCacheManager {
       ("mapreduce.tasktracker.local.cache.numberdirectories",
        DEFAULT_CACHE_SUBDIR_LIMIT);
     this.taskController = controller;
+    this.cleanupThread = new CleanupThread(conf);
   }
 
   /**
@@ -197,7 +193,7 @@ public class TrackerDistributedCacheManager {
 
               // Increase the size and sub directory count of the cache
               // from baseDirSize and baseDirNumberSubDir.
-              addCacheInfoUpdate(lcacheStatus);
+              baseDirManager.addCacheInfoUpdate(lcacheStatus);
             }
           }
           lcacheStatus.initComplete();
@@ -206,27 +202,6 @@ public class TrackerDistributedCacheManager {
                                                    lcacheStatus, fileStatus, isArchive);            
         }
       }
-
-      // try deleting stuff if you can
-      long size = 0;
-      long numberSubdirs = 0;
-      synchronized (lcacheStatus) {
-        synchronized (baseDirProperties) {
-          CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
-          if (cacheDir != null) {
-            size = cacheDir.size;
-            numberSubdirs = cacheDir.subdirs;
-          } else {
-            LOG.warn("Cannot find size and number of subdirectories of" +
-                     " baseDir: " + lcacheStatus.getBaseDir());
-          }
-        }
-      }
-
-      if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
-        // try some cache deletions
-        compactCache(conf);
-      }
     } catch (IOException ie) {
       synchronized (lcacheStatus) {
         // release this cache
@@ -257,7 +232,7 @@ public class TrackerDistributedCacheManager {
     if (size != 0) {
       synchronized (status) {
         status.size = size;
-        addCacheInfoUpdate(status);
+        baseDirManager.addCacheInfoUpdate(status);
       }
     }
   }
@@ -289,54 +264,6 @@ public class TrackerDistributedCacheManager {
     return user;
   }
 
-
-  // To delete the caches which have a refcount of zero
-
-  private void compactCache(Configuration conf) throws IOException {
-    List<CacheStatus> deleteList = new LinkedList<CacheStatus>();
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator<String> it = cachedArchives.keySet().iterator(); 
-          it.hasNext();) {
-        String cacheId = it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        // if reference count is zero 
-        // mark the cache for deletion
-        if (lcacheStatus.refcount == 0) {
-          // delete this cache entry from the global list 
-          // and mark the localized file for deletion
-          deleteList.add(lcacheStatus);
-          it.remove();
-        }
-      }
-    }
-    
-    // do the deletion, after releasing the global lock
-    for (CacheStatus lcacheStatus : deleteList) {
-      synchronized (lcacheStatus) {
-        Path potentialDeletee = lcacheStatus.localizedLoadPath;
-        Path localizedDir = lcacheStatus.getLocalizedUniqueDir();
-        if (lcacheStatus.user == null) {
-          LOG.info("Deleted path " + localizedDir);
-          try {
-            localFs.delete(localizedDir, true);
-          } catch (IOException e) {
-            LOG.warn("Could not delete distributed cache empty directory "
-                     + localizedDir, e);
-          }
-        } else {         
-          LOG.info("Deleted path " + localizedDir + " as " + lcacheStatus.user);
-          String base = lcacheStatus.getBaseDir().toString();
-          String userDir = TaskTracker.getUserDir(lcacheStatus.user);
-          int skip = base.length() + 1 + userDir.length() + 1;
-          String relative = localizedDir.toString().substring(skip);
-          taskController.deleteAsUser(lcacheStatus.user, relative);
-        }
-        deleteCacheInfoUpdate(lcacheStatus);
-      }
-    }
-  }
-
   /*
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
@@ -538,7 +465,7 @@ public class TrackerDistributedCacheManager {
     
     // Increase the size and sub directory count of the cache
     // from baseDirSize and baseDirNumberSubDir.
-    addCacheInfoUpdate(cacheStatus);
+    baseDirManager.addCacheInfoUpdate(cacheStatus);
 
     LOG.info(String.format("Cached %s as %s",
              cache.toString(), cacheStatus.localizedLoadPath));
@@ -614,28 +541,31 @@ public class TrackerDistributedCacheManager {
   }
 
   static class CacheStatus {
-    // the local load path of this cache
-    Path localizedLoadPath;
-
-    //the base dir where the cache lies
-    Path localizedBaseDir;
-
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // is it initialized ?
-    boolean inited = false;
+    //
+    // This field should be accessed under global cachedArchives lock.
+    //
+    int refcount;    // number of instances using this cache
 
+    //
+    // The following two fields should be accessed under
+    // individual cacheStatus lock.
+    //
+    long size;              //the size of this cache.
+    boolean inited = false; // is it initialized ?
+    
+    //
+    // The following five fields are Immutable.
+    //
+    
     // The sub directory (tasktracker/archive or tasktracker/user/archive),
     // under which the file will be localized
     Path subDir;
-    
     // unique string used in the construction of local load path
     String uniqueString;
-    
+    // the local load path of this cache
+    Path localizedLoadPath;
+    //the base dir where the cache lies
+    Path localizedBaseDir;
     // The user that owns the cache entry or null if it is public
     final String user;
 
@@ -940,49 +870,175 @@ public class TrackerDistributedCacheManager {
     return path;
   }
   
+  
+  /**
+   * A thread to check and cleanup the unused files periodically
+   */
+  private class CleanupThread extends Thread {
+    // How often do we check if we need to clean up cache files?
+    private long cleanUpCheckPeriod = 60000L; // 1 minute
+    public CleanupThread(Configuration conf) {
+      cleanUpCheckPeriod =
+        conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
+            cleanUpCheckPeriod);
+    }
+    private volatile boolean running = true;
+    
+    public void stopRunning() {
+      running = false;
+    }
+    
+    @Override
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(cleanUpCheckPeriod);
+          baseDirManager.checkAndCleanup();
+        } catch (Exception e) {
+          LOG.error("Exception in DistributedCache CleanupThread.", e);
+          // This thread should keep running and never crash.
+        }
+      }
+    }
+  }
+
+  /**
+   * This class holds properties of each base directories and is responsible
+   * for clean up unused cache files in base directories.
+   */
+  private class BaseDirManager {
+
+    // For holding the properties of each cache directory
+    private class CacheDir {
+      long size;
+      long subdirs;
+    }
+
+    private TreeMap<Path, BaseDirManager.CacheDir> properties =
+    new TreeMap<Path, BaseDirManager.CacheDir>();
+
+    private long getDirSize(Path p) {
+      return properties.get(p).size;
+    }
+    private long getDirSubdirs(Path p) {
+      return properties.get(p).subdirs;
+    }
+    
+    void checkAndCleanup() throws IOException {
+      Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
+      Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+      synchronized (properties) {
+        for (Path baseDir : properties.keySet()) {
+          if (allowedCacheSize < getDirSize(baseDir) ||
+              allowedCacheSubdirs < getDirSubdirs(baseDir)) {
+            toBeCleanedBaseDir.add(baseDir);
+          }
+        }
+      }
+      // try deleting cache Status with refcount of zero
+      synchronized (cachedArchives) {
+        for (Iterator<String> it = cachedArchives.keySet().iterator(); 
+            it.hasNext();) {
+          String cacheId = it.next();
+          CacheStatus cacheStatus = cachedArchives.get(cacheId);
+          if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+            synchronized (cacheStatus) {
+              // if reference count is zero mark the cache for deletion
+              if (cacheStatus.refcount == 0) {
+                // delete this cache entry from the global list 
+                // and mark the localized file for deletion
+                toBeDeletedCache.add(cacheStatus);
+                it.remove();
+              }
+            }
+          }
+        }
+      }
+      
+      // do the deletion, after releasing the global lock
+      for (CacheStatus cacheStatus : toBeDeletedCache) {
+        synchronized (cacheStatus) {
+          Path localizedDir = cacheStatus.getLocalizedUniqueDir();
+          if (cacheStatus.user == null) {
+            TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir);
+            try {
+              localFs.delete(localizedDir, true);
+            } catch (IOException e) {
+              TrackerDistributedCacheManager.LOG.warn("Could not delete distributed cache empty directory "
+                       + localizedDir, e);
+            }
+          } else {         
+            TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir + " as " + cacheStatus.user);
+            String base = cacheStatus.getBaseDir().toString();
+            String userDir = TaskTracker.getUserDir(cacheStatus.user);
+            int skip = base.length() + 1 + userDir.length() + 1;
+            String relative = localizedDir.toString().substring(skip);
+            taskController.deleteAsUser(cacheStatus.user, relative);
+          }
+          deleteCacheInfoUpdate(cacheStatus);
+        }
+      }
+    }
+
   /**
    * Decrement the size and sub directory count of the cache from baseDirSize
    * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
    * @param cacheStatus cache status of the cache is deleted
    */
-  private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+  public void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
     if (!cacheStatus.inited) {
       // if it is not created yet, do nothing.
       return;
     }
     // decrement the size of the cache from baseDirSize
-    synchronized (baseDirProperties) {
-      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+    synchronized (baseDirManager.properties) {
+      BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
       if (cacheDir != null) {
         cacheDir.size -= cacheStatus.size;
         cacheDir.subdirs--;
       } else {
         LOG.warn("Cannot find size and number of subdirectories of" +
-                 " baseDir: " + cacheStatus.getBaseDir());
+            " baseDir: " + cacheStatus.getBaseDir());
       }
     }
   }
-  
+
   /**
    * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
    * Increase the size and sub directory count of the cache from baseDirSize
    * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
    * @param cacheStatus cache status of the cache is added
    */
-  private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+  public void addCacheInfoUpdate(CacheStatus cacheStatus) {
     long cacheSize = cacheStatus.size;
     // decrement the size of the cache from baseDirSize
-    synchronized (baseDirProperties) {
-      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+    synchronized (baseDirManager.properties) {
+      BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
       if (cacheDir != null) {
         cacheDir.size += cacheSize;
         cacheDir.subdirs++;
       } else {
-        cacheDir = new CacheDir();
+        cacheDir = new BaseDirManager.CacheDir();
         cacheDir.size = cacheSize;
         cacheDir.subdirs = 1;
-        baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+        properties.put(cacheStatus.getBaseDir(), cacheDir);
       }
     }
   }
+  }
+  
+  /**
+   * Start the background thread
+   */
+  public void startCleanupThread() {
+    this.cleanupThread.start();
+  }
+
+  /**
+   * Stop the background thread
+   */
+  public void stopCleanupThread() {
+    cleanupThread.stopRunning();
+    cleanupThread.interrupt();
+  }
 }

+ 3 - 1
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -812,7 +812,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     // Initialize DistributedCache
     this.distributedCacheManager = new TrackerDistributedCacheManager(
         this.fConf, taskController);
-
+    this.distributedCacheManager.startCleanupThread();
+    
     this.jobClient = (InterTrackerProtocol) 
     UserGroupInformation.getLoginUser().doAs(
         new PrivilegedExceptionAction<Object>() {
@@ -1365,6 +1366,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
 
+    this.distributedCacheManager.stopCleanupThread();
     jvmManager.stop();
     
     // shutdown RPC connections

+ 36 - 6
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
 
@@ -564,10 +565,13 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     Configuration conf2 = new Configuration(conf);
     conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 200); // 200 ms
     
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+    try {
     FileSystem localfs = FileSystem.getLocal(conf2);
     long now = System.currentTimeMillis();
     String userName = getJobOwnerName();
@@ -601,9 +605,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
         fs.getFileStatus(secondCacheFilePublic), false, 
         fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
         cfile2);
-    assertFalse("DistributedCache failed deleting old" + 
-        " cache when the cache store is full.",
-        localfs.exists(firstLocalCache));
+    checkCacheDeletion(localfs, firstLocalCache,
+        "DistributedCache failed deleting old" +
+        " cache when the cache store is full");
 
     // find the root directory of distributed caches
     Path firstCursor = firstLocalCache;
@@ -633,8 +637,12 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT * 10);
     conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
         LOCAL_CACHE_SUBDIR_LIMIT);
+    manager.stopCleanupThread();
+    
     manager = 
       new TrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+    
     // Now we test the number of sub directories limit
     // Create the temporary cache files to be used in the tests.
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -678,9 +686,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
         fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
-    assertFalse("DistributedCache failed deleting old" + 
-        " cache when the cache exceeds the number of sub directories limit.",
-        localfs.exists(thirdLocalCache));
+    checkCacheDeletion(localfs, thirdLocalCache,
+        "DistributedCache failed deleting old" +
+        " cache when the cache exceeds the number of sub directories limit.");
 
     assertFalse
       ("DistributedCache did not delete the gensym'ed distcache "
@@ -691,8 +699,30 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     // Clean up the files created in this test
     new File(thirdCacheFile.toString()).delete();
     new File(fourthCacheFile.toString()).delete();
+    } finally {
+      manager.stopCleanupThread();
+    }
   }
   
+  /**
+   * Periodically checks if a file is there, return if the file is no longer
+   * there. Fails the test if a files is there for 30 seconds.
+   */
+  private void checkCacheDeletion(FileSystem fs, Path cache, String msg)
+  throws Exception {
+    // Check every 100ms to see if the cache is deleted
+    boolean cacheExists = true;
+    for (int i = 0; i < 300; i++) {
+      if (!fs.exists(cache)) {
+        cacheExists = false;
+        break;
+      }
+      TimeUnit.MILLISECONDS.sleep(100L);
+    }
+    // If the cache is still there after 5 minutes, test fails.
+    assertFalse(msg, cacheExists);
+  }
+
   public void testFileSystemOtherThanDefault() throws Exception {
     if (!canRun()) {
       return;