Browse Source

HADOOP-4780. Cache the size of directories in DistributedCache, avoiding
long delays in recalculating it. Contributed by He Yongqiang


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@758898 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 16 years ago
parent
commit
ffe0abc12c

+ 3 - 0
CHANGES.txt

@@ -95,6 +95,9 @@ Release 0.19.2 - Unreleased
     HADOOP-5374. Fixes a NPE problem in getTasksToSave method.
     HADOOP-5374. Fixes a NPE problem in getTasksToSave method.
     (Amareshwari Sriramadasu via ddas)
     (Amareshwari Sriramadasu via ddas)
 
 
+    HADOOP-4780. Cache the size of directories in DistributedCache, avoiding
+    long delays in recalculating it. (He Yongqiang via cdouglas)
+
 Release 0.19.1 - 2009-02-23 
 Release 0.19.1 - 2009-02-23 
 
 
     HADOOP-5225. Workaround for tmp file handling in HDFS. sync() is 
     HADOOP-5225. Workaround for tmp file handling in HDFS. sync() is 

+ 45 - 3
src/core/org/apache/hadoop/filecache/DistributedCache.java

@@ -116,6 +116,8 @@ public class DistributedCache {
   // cacheID to cacheStatus mapping
   // cacheID to cacheStatus mapping
   private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
   private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
   
   
+  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+  
   // default total cache size
   // default total cache size
   private static final long DEFAULT_CACHE_SIZE = 1048576L;
   private static final long DEFAULT_CACHE_SIZE = 1048576L;
 
 
@@ -195,7 +197,7 @@ public class DistributedCache {
       lcacheStatus = cachedArchives.get(cacheId);
       lcacheStatus = cachedArchives.get(cacheId);
       if (lcacheStatus == null) {
       if (lcacheStatus == null) {
         // was never localized
         // was never localized
-        lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
+        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
         cachedArchives.put(cacheId, lcacheStatus);
         cachedArchives.put(cacheId, lcacheStatus);
       }
       }
 
 
@@ -207,7 +209,13 @@ public class DistributedCache {
     }
     }
 
 
     // try deleting stuff if you can
     // try deleting stuff if you can
-    long size = FileUtil.getDU(new File(baseDir.toString()));
+    long size = 0;
+    synchronized (baseDirSize) {
+      Long get = baseDirSize.get(baseDir);
+      if ( get != null ) {
+    	size = get.longValue();
+      }
+    }
     // setting the cache size to a default of 1MB
     // setting the cache size to a default of 1MB
     long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
     long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
     if (allowedSize < size) {
     if (allowedSize < size) {
@@ -285,6 +293,13 @@ public class DistributedCache {
           if (lcacheStatus.refcount == 0) {
           if (lcacheStatus.refcount == 0) {
             // delete this cache entry
             // delete this cache entry
             FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
             FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+            synchronized (baseDirSize) {
+              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+              if ( dirSize != null ) {
+            	dirSize -= lcacheStatus.size;
+            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
+              }
+            }
             it.remove();
             it.remove();
           }
           }
         }
         }
@@ -367,6 +382,13 @@ public class DistributedCache {
       
       
       FileSystem localFs = FileSystem.getLocal(conf);
       FileSystem localFs = FileSystem.getLocal(conf);
       localFs.delete(cacheStatus.localLoadPath, true);
       localFs.delete(cacheStatus.localLoadPath, true);
+      synchronized (baseDirSize) {
+    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+    	if ( dirSize != null ) {
+    	  dirSize -= cacheStatus.size;
+    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
+    	}
+      }
       Path parchive = new Path(cacheStatus.localLoadPath,
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));
                                new Path(cacheStatus.localLoadPath.getName()));
       
       
@@ -392,6 +414,18 @@ public class DistributedCache {
         // and copy the file into the dir as it is
         // and copy the file into the dir as it is
       }
       }
       
       
+      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
+      cacheStatus.size = cacheSize;
+      synchronized (baseDirSize) {
+      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      	if( dirSize == null ) {
+      	  dirSize = Long.valueOf(cacheSize);
+      	} else {
+      	  dirSize += cacheSize;
+      	}
+      	baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+      
       // do chmod here 
       // do chmod here 
       try {
       try {
     	FileUtil.chmod(parchive.toString(), "+x");
     	FileUtil.chmod(parchive.toString(), "+x");
@@ -811,6 +845,12 @@ public class DistributedCache {
 
 
     // the local load path of this cache
     // the local load path of this cache
     Path localLoadPath;
     Path localLoadPath;
+    
+    //the base dir where the cache lies
+    Path baseDir;
+    
+    //the size of this cache
+    long size;
 
 
     // number of instances using this cache
     // number of instances using this cache
     int refcount;
     int refcount;
@@ -818,12 +858,14 @@ public class DistributedCache {
     // the cache-file modification time
     // the cache-file modification time
     long mtime;
     long mtime;
 
 
-    public CacheStatus(Path localLoadPath) {
+    public CacheStatus(Path baseDir, Path localLoadPath) {
       super();
       super();
       this.currentStatus = false;
       this.currentStatus = false;
       this.localLoadPath = localLoadPath;
       this.localLoadPath = localLoadPath;
       this.refcount = 0;
       this.refcount = 0;
       this.mtime = -1;
       this.mtime = -1;
+      this.baseDir = baseDir;
+      this.size = 0;
     }
     }
   }
   }
 
 

+ 77 - 0
src/test/org/apache/hadoop/filecache/TestDistributedCache.java

@@ -0,0 +1,77 @@
+package org.apache.hadoop.filecache;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestDistributedCache extends TestCase {
+  
+  static final URI LOCAL_FS = URI.create("file:///");
+  private static String TEST_CACHE_BASE_DIR =
+    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
+    .toString().replace(' ', '+');
+  private static String TEST_ROOT_DIR =
+    System.getProperty("test.build.data", "/tmp/distributedcache");
+  private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
+  private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private Configuration conf;
+  private Path firstCacheFile;
+  private Path secondCacheFile;
+  private FileSystem localfs;
+  
+  /**
+   * @see TestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    localfs = FileSystem.get(LOCAL_FS, conf);
+    firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
+    secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
+    createTempFile(localfs, firstCacheFile);
+    createTempFile(localfs, secondCacheFile);
+  }
+  
+  /** test delete cache */
+  public void testDeleteCache() throws Exception {
+    DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
+    //in above code,localized a file of size 4K and then release the cache which will cause the cache 
+    //be deleted when the limit goes out. The below code localize another cache which's designed to 
+    //sweep away the first cache.
+    DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    FileStatus[] dirStatuses = localfs.listStatus(new Path(TEST_CACHE_BASE_DIR));
+    assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
+        dirStatuses.length > 1);
+  }
+
+  private void createTempFile(FileSystem fs, Path p) throws IOException {
+    FSDataOutputStream out = fs.create(p);
+    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    new Random().nextBytes(toWrite);
+    out.write(toWrite);
+    out.close();
+    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+  }
+  
+  /**
+   * @see TestCase#tearDown()
+   */
+  @Override
+  protected void tearDown() throws IOException {
+    localfs.delete(firstCacheFile, true);
+    localfs.delete(secondCacheFile, true);
+    localfs.close();
+  }
+}