浏览代码

commit eec554140a1d7fcd0efe58007e8e228e70eef645
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date: Sun Oct 25 16:31:08 2009 +0530

MAPREDUCE:1098 from http://issues.apache.org/jira/secure/attachment/12423137/patch-1098-ydist.txt

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while
+ holding a global lock. (Amareshwari Sriramadasu via acmurthy)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077036 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年之前
父节点
当前提交
74e34b8853

+ 187 - 162
src/core/org/apache/hadoop/filecache/DistributedCache.java

@@ -124,6 +124,8 @@ public class DistributedCache {
   private static final Log LOG =
   private static final Log LOG =
     LogFactory.getLog(DistributedCache.class);
     LogFactory.getLog(DistributedCache.class);
   
   
+  private static Random random = new Random();
+  
   /**
   /**
    * Get the locally cached file or archive; it could either be 
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -155,7 +157,8 @@ public class DistributedCache {
                                    Path currentWorkDir) 
                                    Path currentWorkDir) 
   throws IOException {
   throws IOException {
     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
-        confFileStamp, currentWorkDir, true);
+        confFileStamp, currentWorkDir, true,
+        new LocalDirAllocator("mapred.local.dir"));
   }
   }
   /**
   /**
    * Get the locally cached file or archive; it could either be 
    * Get the locally cached file or archive; it could either be 
@@ -166,7 +169,7 @@ public class DistributedCache {
    * or hostname:port is provided the file is assumed to be in the filesystem
    * or hostname:port is provided the file is assumed to be in the filesystem
    * being used in the Configuration
    * being used in the Configuration
    * @param conf The Confguration file which contains the filesystem
    * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param subDir The sub cache Dir where you want to localize the files/archives
    * @param fileStatus The file status on the dfs.
    * @param fileStatus The file status on the dfs.
    * @param isArchive if the cache is an archive or a file. In case it is an
    * @param isArchive if the cache is an archive or a file. In case it is an
    *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
    *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
@@ -181,39 +184,59 @@ public class DistributedCache {
    * @param honorSymLinkConf if this is false, then the symlinks are not
    * @param honorSymLinkConf if this is false, then the symlinks are not
    * created even if conf says so (this is required for an optimization in task
    * created even if conf says so (this is required for an optimization in task
    * launches
    * launches
+   * @param lDirAllocator LocalDirAllocator of the tracker
    * @return the path to directory where the archives are unjarred in case of archives,
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * the path to the file where the file is copied locally 
    * @throws IOException
    * @throws IOException
    */
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
   public static Path getLocalCache(URI cache, Configuration conf, 
-      Path baseDir, FileStatus fileStatus,
+      Path subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) 
+      Path currentWorkDir, boolean honorSymLinkConf,
+      LocalDirAllocator lDirAllocator) 
   throws IOException {
   throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
     CacheStatus lcacheStatus;
     Path localizedPath;
     Path localizedPath;
     synchronized (cachedArchives) {
     synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
+      lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
       if (lcacheStatus == null) {
         // was never localized
         // was never localized
-        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
+        String cachePath = new Path (subDir, 
+          new Path(String.valueOf(random.nextLong()),
+            makeRelative(cache, conf))).toString();
+        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+          fileStatus.getLen(), conf);
+        lcacheStatus = new CacheStatus(
+          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        cachedArchives.put(key, lcacheStatus);
       }
       }
-
-      synchronized (lcacheStatus) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
+      lcacheStatus.refcount++;
+    }
+    
+    synchronized (lcacheStatus) {
+      if (!lcacheStatus.isInited()) {
+        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
+          fileStatus, isArchive);
+        lcacheStatus.initComplete();
+      } else {
+        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+          lcacheStatus, fileStatus, isArchive);
       }
       }
+      createSymlink(conf, cache, lcacheStatus, isArchive,
+        currentWorkDir, honorSymLinkConf);
     }
     }
-
+ 
     // try deleting stuff if you can
     // try deleting stuff if you can
     long size = 0;
     long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-    	size = get.longValue();
+    synchronized (lcacheStatus) {
+      synchronized (baseDirSize) {
+        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+        if ( get != null ) {
+    	    size = get.longValue();
+        } else {
+          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+        }
       }
       }
     }
     }
     // setting the cache size to a default of 10GB
     // setting the cache size to a default of 10GB
@@ -268,39 +291,52 @@ public class DistributedCache {
    * is contained in.
    * is contained in.
    * @throws IOException
    * @throws IOException
    */
    */
-  public static void releaseCache(URI cache, Configuration conf)
+  public static void releaseCache(URI cache, Configuration conf, long timeStamp)
     throws IOException {
     throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String cacheId = getKey(cache, conf, timeStamp);
     synchronized (cachedArchives) {
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(cacheId);
       CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
+      if (lcacheStatus == null) {
+        LOG.warn("Cannot find localized cache: " + cache + 
+                 " (key: " + cacheId + ") in releaseCache!");
         return;
         return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
       }
       }
+      lcacheStatus.refcount--;
     }
     }
   }
   }
   
   
   // To delete the caches which have a refcount of zero
   // To delete the caches which have a refcount of zero
   
   
   private static void deleteCache(Configuration conf) throws IOException {
   private static void deleteCache(Configuration conf) throws IOException {
+    Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
     // try deleting cache Status with refcount of zero
     // try deleting cache Status with refcount of zero
     synchronized (cachedArchives) {
     synchronized (cachedArchives) {
       for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
       for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
         String cacheId = (String) it.next();
         String cacheId = (String) it.next();
         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-            	dirSize -= lcacheStatus.size;
-            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
+        if (lcacheStatus.refcount == 0) {
+          // delete this cache entry from the global list 
+          // and mark the localized file for deletion
+          deleteSet.add(lcacheStatus);
+          it.remove();
+        }
+      }
+    }
+    
+    // do the deletion, after releasing the global lock
+    for (CacheStatus lcacheStatus : deleteSet) {
+      synchronized (lcacheStatus) {
+        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        // decrement the size of the cache from baseDirSize
+        synchronized (baseDirSize) {
+          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          if ( dirSize != null ) {
+            dirSize -= lcacheStatus.size;
+            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+          } else {
+            LOG.warn("Cannot find record of the baseDir: " + 
+                     lcacheStatus.baseDir + " during delete!");
           }
           }
         }
         }
       }
       }
@@ -333,128 +369,108 @@ public class DistributedCache {
     return path;
     return path;
   }
   }
 
 
-  private static Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
+  static String getKey(URI cache, Configuration conf, long timeStamp) 
+      throws IOException {
+    return makeRelative(cache, conf) + String.valueOf(timeStamp);
   }
   }
 
 
+  private static Path checkCacheStatusValidity(Configuration conf,
+      URI cache, long confFileStamp,
+      CacheStatus cacheStatus,
+      FileStatus fileStatus,
+      boolean isArchive
+      ) throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    // Has to be 
+    if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                          cacheStatus, fileStatus)) {
+      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + 
+                            " for cache-file: " + cache);
+    }
+    LOG.info(String.format("Using existing cache of %s->%s",
+        cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
+  }
+
+  private static void createSymlink(Configuration conf, URI cache,
+      CacheStatus cacheStatus, boolean isArchive,
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
+    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
+    if(cache.getFragment() == null) {
+      doSymlink = false;
+    }
+    String link = 
+      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+    File flink = new File(link);
+    if (doSymlink){
+      if (!flink.exists()) {
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+      }
+    }
+  }
+  
   // the method which actually copies the caches locally and unjars/unzips them
   // the method which actually copies the caches locally and unjars/unzips them
   // and does chmod for the files
   // and does chmod for the files
   private static Path localizeCache(Configuration conf, 
   private static Path localizeCache(Configuration conf, 
                                     URI cache, long confFileStamp,
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
                                     CacheStatus cacheStatus,
                                     FileStatus fileStatus,
                                     FileStatus fileStatus,
-                                    boolean isArchive, 
-                                    Path currentWorkDir,boolean honorSymLinkConf) 
+                                    boolean isArchive) 
   throws IOException {
   throws IOException {
-    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
-    if(cache.getFragment() == null) {
-    	doSymlink = false;
-    }
     FileSystem fs = getFileSystem(cache, conf);
     FileSystem fs = getFileSystem(cache, conf);
-    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                             link);
-        }
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                             link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
-      }
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path parchive = null; 
+    if (isArchive) {
+      parchive = new Path(cacheStatus.localLoadPath,
+        new Path(cacheStatus.localLoadPath.getName()));
     } else {
     } else {
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-      
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-    	if ( dirSize != null ) {
-    	  dirSize -= cacheStatus.size;
-    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
-    	}
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-      
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " + 
-                              cacheStatus.localLoadPath.toString());
-      }
+      parchive = cacheStatus.localLoadPath;
+    }
 
 
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        }
-        // else will not do anyhting
-        // and copy the file into the dir as it is
+    if (!localFs.mkdirs(parchive.getParent())) {
+      throw new IOException("Mkdirs failed to create directory " +
+          cacheStatus.localLoadPath.toString());
+    }
+    String cacheId = cache.getPath();
+    fs.copyToLocalFile(new Path(cacheId), parchive);
+    if (isArchive) {
+      String tmpArchive = parchive.toString().toLowerCase();
+      File srcFile = new File(parchive.toString());
+      File destDir = new File(parchive.getParent().toString());
+      if (tmpArchive.endsWith(".jar")) {
+        RunJar.unJar(srcFile, destDir);
+      } else if (tmpArchive.endsWith(".zip")) {
+        FileUtil.unZip(srcFile, destDir);
+      } else if (isTarFile(tmpArchive)) {
+        FileUtil.unTar(srcFile, destDir);
       }
       }
+      // else will not do anyhting
+      // and copy the file into the dir as it is
+    }
       
       
-      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      	if( dirSize == null ) {
-      	  dirSize = Long.valueOf(cacheSize);
-      	} else {
-      	  dirSize += cacheSize;
-      	}
-      	baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
+    long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
+    cacheStatus.size = cacheSize;
+    synchronized (baseDirSize) {
+    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+    	if (dirSize == null) {
+     	  dirSize = Long.valueOf(cacheSize);
+      } else {
+     	  dirSize += cacheSize;
+     	}
+     	baseDirSize.put(cacheStatus.baseDir, dirSize);
+    }
       
       
-      // do chmod here 
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
+    // do chmod here 
+    try {
+      //Setting recursive permission to grant everyone read and execute
+      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+    } catch(InterruptedException e) {
     	LOG.warn("Exception in chmod" + e.toString());
     	LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-    }
-    
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
     }
     }
+
+    // update cacheStatus to reflect the newly cached file
+    cacheStatus.mtime = getTimestamp(conf, cache);
+    return cacheStatus.localLoadPath;
   }
   }
 
 
   private static boolean isTarFile(String filename) {
   private static boolean isTarFile(String filename) {
@@ -469,27 +485,22 @@ public class DistributedCache {
                                           FileStatus fileStatus) 
                                           FileStatus fileStatus) 
   throws IOException {
   throws IOException {
     // check for existence of the cache
     // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
+    long dfsFileStamp;
+    if (fileStatus != null) {
+      dfsFileStamp = fileStatus.getModificationTime();
     } else {
     } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
+      dfsFileStamp = getTimestamp(conf, cache);
+    }
 
 
-      // ensure that the file on hdfs hasn't been modified since the job started 
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache + 
-                              " has changed on HDFS since job started");
-      }
+    // ensure that the file on hdfs hasn't been modified since the job started 
+    if (dfsFileStamp != confFileStamp) {
+      LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+      throw new IOException("File: " + cache + 
+          " has changed on HDFS since job started");
+    }
       
       
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
+    if (dfsFileStamp != lcacheStatus.mtime) {
+      return false;
     }
     }
     
     
     return true;
     return true;
@@ -841,9 +852,6 @@ public class DistributedCache {
   }
   }
 
 
   private static class CacheStatus {
   private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
     // the local load path of this cache
     // the local load path of this cache
     Path localLoadPath;
     Path localLoadPath;
     
     
@@ -858,16 +866,33 @@ public class DistributedCache {
 
 
     // the cache-file modification time
     // the cache-file modification time
     long mtime;
     long mtime;
+    
+    // is it initialized?
+    boolean inited = false;
 
 
     public CacheStatus(Path baseDir, Path localLoadPath) {
     public CacheStatus(Path baseDir, Path localLoadPath) {
       super();
       super();
-      this.currentStatus = false;
       this.localLoadPath = localLoadPath;
       this.localLoadPath = localLoadPath;
       this.refcount = 0;
       this.refcount = 0;
       this.mtime = -1;
       this.mtime = -1;
       this.baseDir = baseDir;
       this.baseDir = baseDir;
       this.size = 0;
       this.size = 0;
     }
     }
+    
+    // get the base dir for the cache
+    Path getBaseDir() {
+      return baseDir;
+    }
+    
+    // Is it initialized?
+    boolean isInited() {
+      return inited;
+    }
+    
+    // mark it as initalized
+    void initComplete() {
+      inited = true;
+    }
   }
   }
 
 
   /**
   /**

+ 13 - 20
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -178,21 +178,15 @@ abstract class TaskRunner extends Thread {
             fileSystem = FileSystem.get(archives[i], conf);
             fileSystem = FileSystem.get(archives[i], conf);
             fileStatus = fileSystem.getFileStatus(
             fileStatus = fileSystem.getFileStatus(
                                       new Path(archives[i].getPath()));
                                       new Path(archives[i].getPath()));
-            String cacheId = DistributedCache.makeRelative(archives[i],conf);
-            String cachePath = TaskTracker.getCacheSubdir() + 
-                                 Path.SEPARATOR + cacheId;
-            
-            localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                      fileStatus.getLen(), conf);
-            baseDir = localPath.toString().replace(cacheId, "");
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                  new Path(baseDir),
+                                        new Path(TaskTracker.getCacheSubdir()),
                                                   fileStatus,
                                                   fileStatus,
                                                   true, Long.parseLong(
                                                   true, Long.parseLong(
                                                         archivesTimestamps[i]),
                                                         archivesTimestamps[i]),
                                                   new Path(workDir.
                                                   new Path(workDir.
                                                         getAbsolutePath()), 
                                                         getAbsolutePath()), 
-                                                  false);
+                                                  false,
+                                                  lDirAlloc);
             
             
           }
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -204,21 +198,15 @@ abstract class TaskRunner extends Thread {
             fileSystem = FileSystem.get(files[i], conf);
             fileSystem = FileSystem.get(files[i], conf);
             fileStatus = fileSystem.getFileStatus(
             fileStatus = fileSystem.getFileStatus(
                                       new Path(files[i].getPath()));
                                       new Path(files[i].getPath()));
-            String cacheId = DistributedCache.makeRelative(files[i], conf);
-            String cachePath = TaskTracker.getCacheSubdir() +
-                                 Path.SEPARATOR + cacheId;
-            
-            localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                      fileStatus.getLen(), conf);
-            baseDir = localPath.toString().replace(cacheId, "");
             p[i] = DistributedCache.getLocalCache(files[i], conf, 
             p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                  new Path(baseDir),
+                                        new Path(TaskTracker.getCacheSubdir()),
                                                   fileStatus,
                                                   fileStatus,
                                                   false, Long.parseLong(
                                                   false, Long.parseLong(
                                                            fileTimestamps[i]),
                                                            fileTimestamps[i]),
                                                   new Path(workDir.
                                                   new Path(workDir.
                                                         getAbsolutePath()), 
                                                         getAbsolutePath()), 
-                                                  false);
+                                                  false,
+                                                  lDirAlloc);
           }
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
         }
@@ -537,14 +525,19 @@ abstract class TaskRunner extends Thread {
       try{
       try{
         URI[] archives = DistributedCache.getCacheArchives(conf);
         URI[] archives = DistributedCache.getCacheArchives(conf);
         URI[] files = DistributedCache.getCacheFiles(conf);
         URI[] files = DistributedCache.getCacheFiles(conf);
+        String[] archivesTimestamps = 
+          DistributedCache.getArchiveTimestamps(conf);
+        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
         if (archives != null){
         if (archives != null){
           for (int i = 0; i < archives.length; i++){
           for (int i = 0; i < archives.length; i++){
-            DistributedCache.releaseCache(archives[i], conf);
+            DistributedCache.releaseCache(archives[i], conf,
+              Long.parseLong(archivesTimestamps[i]));
           }
           }
         }
         }
         if (files != null){
         if (files != null){
           for(int i = 0; i < files.length; i++){
           for(int i = 0; i < files.length; i++){
-            DistributedCache.releaseCache(files[i], conf);
+            DistributedCache.releaseCache(files[i], conf,
+              Long.parseLong(fileTimestamps[i]));
           }
           }
         }
         }
       }catch(IOException ie){
       }catch(IOException ie){

+ 14 - 9
src/test/org/apache/hadoop/filecache/TestDistributedCache.java

@@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
@@ -15,11 +16,10 @@ import junit.framework.TestCase;
 public class TestDistributedCache extends TestCase {
 public class TestDistributedCache extends TestCase {
   
   
   static final URI LOCAL_FS = URI.create("file:///");
   static final URI LOCAL_FS = URI.create("file:///");
-  private static String TEST_CACHE_BASE_DIR =
-    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
-    .toString().replace(' ', '+');
+  private static String TEST_CACHE_BASE_DIR = "cachebasedir";
   private static String TEST_ROOT_DIR =
   private static String TEST_ROOT_DIR =
     System.getProperty("test.build.data", "/tmp/distributedcache");
     System.getProperty("test.build.data", "/tmp/distributedcache");
+  private static String MAPRED_LOCAL_DIR = TEST_ROOT_DIR + "/mapred/local";
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
   private Configuration conf;
   private Configuration conf;
@@ -34,6 +34,7 @@ public class TestDistributedCache extends TestCase {
   protected void setUp() throws IOException {
   protected void setUp() throws IOException {
     conf = new Configuration();
     conf = new Configuration();
     conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
     conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf.set("mapred.local.dir", MAPRED_LOCAL_DIR);
     localfs = FileSystem.get(LOCAL_FS, conf);
     localfs = FileSystem.get(LOCAL_FS, conf);
     firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
     firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
@@ -43,15 +44,19 @@ public class TestDistributedCache extends TestCase {
   
   
   /** test delete cache */
   /** test delete cache */
   public void testDeleteCache() throws Exception {
   public void testDeleteCache() throws Exception {
-    DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
-        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
-    DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
+    long now = System.currentTimeMillis();
+    DistributedCache.getLocalCache(firstCacheFile.toUri(), conf,
+        new Path(TEST_CACHE_BASE_DIR), localfs.getFileStatus(firstCacheFile),
+        false, now, new Path(TEST_ROOT_DIR));
+    DistributedCache.releaseCache(firstCacheFile.toUri(), conf, now);
     //in above code,localized a file of size 4K and then release the cache which will cause the cache 
     //in above code,localized a file of size 4K and then release the cache which will cause the cache 
     //be deleted when the limit goes out. The below code localize another cache which's designed to 
     //be deleted when the limit goes out. The below code localize another cache which's designed to 
     //sweep away the first cache.
     //sweep away the first cache.
-    DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
-        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
-    FileStatus[] dirStatuses = localfs.listStatus(new Path(TEST_CACHE_BASE_DIR));
+    DistributedCache.getLocalCache(secondCacheFile.toUri(), conf,
+        new Path(TEST_CACHE_BASE_DIR), localfs.getFileStatus(firstCacheFile),
+        false, now, new Path(TEST_ROOT_DIR));
+    FileStatus[] dirStatuses = localfs.listStatus(
+      new Path(MAPRED_LOCAL_DIR, TEST_CACHE_BASE_DIR));
     assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
     assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
         dirStatuses.length > 1);
         dirStatuses.length > 1);
   }
   }