Procházet zdrojové kódy

commit 3b9be6aaf466f7e10f87e2ee5f254caed27ef6bf
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date: Wed Jan 20 15:28:40 2010 +0530

MAPREDUCE:476 from https://issues.apache.org/jira/secure/attachment/12430866/476.20S-2.patch

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner).
+ (Philip Zeyliger via tomwhite)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077114 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley před 14 roky
rodič
revize
8d898e3a3f

+ 1 - 1
src/core/org/apache/hadoop/fs/FileSystem.java

@@ -64,7 +64,7 @@ import org.apache.hadoop.security.UserGroupInformation;
  * implementation is DistributedFileSystem.
  *****************************************************************/
 public abstract class FileSystem extends Configured implements Closeable {
-  private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+  public static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
 
   public static final Log LOG = LogFactory.getLog(FileSystem.class);
 

+ 81 - 339
src/mapred/org/apache/hadoop/filecache/DistributedCache.java

@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.filecache;
 
-import org.apache.commons.logging.*;
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 
 import java.net.URI;
 
@@ -108,22 +108,23 @@ import java.net.URI;
  *     }
  *     
  * </pre></blockquote></p>
- * 
+ * It is also very common to use the DistributedCache by using
+ * {@link org.apache.hadoop.util.GenericOptionsParser}.
+ *
+ * This class includes methods that should be used by users
+ * (specifically those mentioned in the example above, as well
+ * as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}),
+ * as well as methods intended for use by the MapReduce framework
+ * (e.g., {@link org.apache.hadoop.mapred.JobClient}).  For implementation
+ * details, see {@link TrackerDistributedCacheManager} and
+ * {@link TaskDistributedCacheManager}.
+ *
+ * @see TrackerDistributedCacheManager
+ * @see TaskDistributedCacheManager
  * @see org.apache.hadoop.mapred.JobConf
  * @see org.apache.hadoop.mapred.JobClient
  */
 public class DistributedCache {
-  // cacheID to cacheStatus mapping
-  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
-  
-  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
-  
-  // default total cache size
-  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
-
-  private static final Log LOG =
-    LogFactory.getLog(DistributedCache.class);
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -148,15 +149,18 @@ public class DistributedCache {
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
                                    Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
         confFileStamp, currentWorkDir, true);
   }
+
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -184,48 +188,19 @@ public class DistributedCache {
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
       Path baseDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) 
-  throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    CacheStatus lcacheStatus;
-    Path localizedPath;
-    synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null) {
-        // was never localized
-        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
-
-      synchronized (lcacheStatus) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
-      }
-    }
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-    	size = get.longValue();
-      }
-    }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
+    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
+        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        honorSymLinkConf);
   }
 
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -249,17 +224,18 @@ public class DistributedCache {
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, boolean isArchive,
                                    long confFileStamp, Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, 
                          baseDir, null, isArchive,
                          confFileStamp, currentWorkDir);
   }
-  
+
   /**
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache
@@ -267,232 +243,28 @@ public class DistributedCache {
    * @param conf configuration which contains the filesystem the cache 
    * is contained in.
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void releaseCache(URI cache, Configuration conf)
-    throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
-        return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
-      }
-    }
+      throws IOException {
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
   }
   
-  // To delete the caches which have a refcount of zero
-  
-  private static void deleteCache(Configuration conf) throws IOException {
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
-        String cacheId = (String) it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-            	dirSize -= lcacheStatus.size;
-            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
-          }
-        }
-      }
-    }
-  }
-
-  /*
+  /**
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
    * hdfs://hostname:port/absolute_path -- the relative path is
    * hostname/absolute path -- if it is just /absolute_path -- then the
    * relative path is hostname of DFS this mapred cluster is running
    * on/absolute_path
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static String makeRelative(URI cache, Configuration conf)
-    throws IOException {
-    String host = cache.getHost();
-    if (host == null) {
-      host = cache.getScheme();
-    }
-    if (host == null) {
-      URI defaultUri = FileSystem.get(conf).getUri();
-      host = defaultUri.getHost();
-      if (host == null) {
-        host = defaultUri.getScheme();
-      }
-    }
-    String path = host + cache.getPath();
-    path = path.replace(":/","/");                // remove windows device colon
-    return path;
-  }
+      throws IOException {
+    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
 
-  private static Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
-  }
-
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private static Path localizeCache(Configuration conf, 
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive, 
-                                    Path currentWorkDir,boolean honorSymLinkConf) 
-  throws IOException {
-    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
-    if(cache.getFragment() == null) {
-    	doSymlink = false;
-    }
-    FileSystem fs = getFileSystem(cache, conf);
-    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                             link);
-        }
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                             link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
-      }
-    } else {
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-      
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-    	if ( dirSize != null ) {
-    	  dirSize -= cacheStatus.size;
-    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
-    	}
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-      
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " + 
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        }
-        // else will not do anyhting
-        // and copy the file into the dir as it is
-      }
-      
-      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      	if( dirSize == null ) {
-      	  dirSize = Long.valueOf(cacheSize);
-      	} else {
-      	  dirSize += cacheSize;
-      	}
-      	baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      
-      // do chmod here 
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
-    	LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-    }
-    
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
-  }
-
-  private static boolean isTarFile(String filename) {
-    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
-           filename.endsWith(".tar"));
-  }
-  
-  // Checks if the cache has already been localized and is fresh
-  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
-                                          URI cache, long confFileStamp, 
-                                          CacheStatus lcacheStatus,
-                                          FileStatus fileStatus) 
-  throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
-    } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
-
-      // ensure that the file on hdfs hasn't been modified since the job started 
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache + 
-                              " has changed on HDFS since job started");
-      }
-      
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
-    }
-    
-    return true;
   }
 
   /**
@@ -516,21 +288,12 @@ public class DistributedCache {
    * @param jobCacheDir the target directory for creating symlinks
    * @param workDir the directory in which the symlinks are created
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
     throws IOException{
-    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
-           workDir == null || (!workDir.isDirectory())) {
-      return;
-    }
-    boolean createSymlink = getSymlink(conf);
-    if (createSymlink){
-      File[] list = jobCacheDir.listFiles();
-      for (int i=0; i < list.length; i++){
-        FileUtil.symLink(list[i].getAbsolutePath(),
-                         new File(workDir, list[i].getName()).toString());
-      }
-    }  
+    TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
   }
   
   private static String getFileSysName(URI url) {
@@ -554,7 +317,8 @@ public class DistributedCache {
   }
 
   /**
-   * Set the configuration with the given set of archives
+   * Set the configuration with the given set of archives. Intended
+   * to be used by user code.
    * @param archives The list of archives that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -564,7 +328,8 @@ public class DistributedCache {
   }
 
   /**
-   * Set the configuration with the given set of files
+   * Set the configuration with the given set of files.  Intended to be
+   * used by user code.
    * @param files The list of files that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -574,7 +339,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get cache archives set in the Configuration
+   * Get cache archives set in the Configuration.  Used by
+   * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
@@ -584,18 +350,19 @@ public class DistributedCache {
   }
 
   /**
-   * Get cache files set in the Configuration
+   * Get cache files set in the Configuration.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
    * @return A URI array of the files set in the Configuration
    * @throws IOException
    */
-
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
     return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
   }
 
   /**
-   * Return the path array of the localized caches
+   * Return the path array of the localized caches.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized archives
    * @return A path array of localized caches
    * @throws IOException
@@ -607,7 +374,8 @@ public class DistributedCache {
   }
 
   /**
-   * Return the path array of the localized files
+   * Return the path array of the localized files.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized files
    * @return A path array of localized files
    * @throws IOException
@@ -618,7 +386,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get the timestamps of the archives
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -629,7 +398,8 @@ public class DistributedCache {
 
 
   /**
-   * Get the timestamps of the files
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -639,7 +409,8 @@ public class DistributedCache {
   }
 
   /**
-   * This is to check the timestamp of the archives to be localized
+   * This is to check the timestamp of the archives to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of archives.
    * The order should be the same as the order in which the archives are added.
@@ -649,7 +420,8 @@ public class DistributedCache {
   }
 
   /**
-   * This is to check the timestamp of the files to be localized
+   * This is to check the timestamp of the files to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of files.
    * The order should be the same as the order in which the files are added.
@@ -659,7 +431,8 @@ public class DistributedCache {
   }
   
   /**
-   * Set the conf to contain the location for localized archives 
+   * Set the conf to contain the location for localized archives.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local archives
    */
@@ -668,7 +441,8 @@ public class DistributedCache {
   }
 
   /**
-   * Set the conf to contain the location for localized files 
+   * Set the conf to contain the location for localized files.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local files
    */
@@ -677,7 +451,8 @@ public class DistributedCache {
   }
 
   /**
-   * Add a archives to be localized to the conf
+   * Add a archives to be localized to the conf.  Intended to
+   * be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -688,7 +463,8 @@ public class DistributedCache {
   }
   
   /**
-   * Add a file to be localized to the conf
+   * Add a file to be localized to the conf.  Intended
+   * to be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -700,7 +476,7 @@ public class DistributedCache {
 
   /**
    * Add an file path to the current set of classpath entries It adds the file
-   * to cache as well.
+   * to cache as well.  Intended to be used by user code.
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
@@ -717,7 +493,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get the file entries in classpath as an array of Path
+   * Get the file entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -736,7 +513,7 @@ public class DistributedCache {
 
   /**
    * Add an archive path to the current set of classpath entries. It adds the
-   * archive to cache as well.
+   * archive to cache as well.  Intended to be used by user code.
    * 
    * @param archive Path of the archive to be added
    * @param conf Configuration that contains the classpath setting
@@ -754,7 +531,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get the archive entries in classpath as an array of Path
+   * Get the archive entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -773,7 +551,8 @@ public class DistributedCache {
 
   /**
    * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives
+   * of the task to all the cache files/archives.
+   * Intended to be used by user code.
    * @param conf the jobconf 
    */
   public static void createSymlink(Configuration conf){
@@ -783,6 +562,7 @@ public class DistributedCache {
   /**
    * This method checks to see if symlinks are to be create for the 
    * localized cache files in the current working directory 
+   * Used by internal DistributedCache code.
    * @param conf the jobconf
    * @return true if symlinks are to be created- else return false
    */
@@ -798,7 +578,7 @@ public class DistributedCache {
    * This method checks if there is a conflict in the fragment names 
    * of the uris. Also makes sure that each uri has a fragment. It 
    * is only to be called if you want to create symlinks for 
-   * the various archives and files.
+   * the various archives and files.  May be used by user code.
    * @param uriFiles The uri array of urifiles
    * @param uriArchives the uri array of uri archives
    */
@@ -840,52 +620,14 @@ public class DistributedCache {
     return true;
   }
 
-  private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
-    // the local load path of this cache
-    Path localLoadPath;
-    
-    //the base dir where the cache lies
-    Path baseDir;
-    
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // the cache-file modification time
-    long mtime;
-
-    public CacheStatus(Path baseDir, Path localLoadPath) {
-      super();
-      this.currentStatus = false;
-      this.localLoadPath = localLoadPath;
-      this.refcount = 0;
-      this.mtime = -1;
-      this.baseDir = baseDir;
-      this.size = 0;
-    }
-  }
-
   /**
    * Clear the entire contents of the cache and delete the backing files. This
    * should only be used when the server is reinitializing, because the users
    * are going to lose their files.
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void purgeCache(Configuration conf) throws IOException {
-    synchronized (cachedArchives) {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
-        try {
-          localFs.delete(f.getValue().localLoadPath, true);
-        } catch (IOException ie) {
-          LOG.debug("Error cleaning up cache", ie);
-        }
-      }
-      cachedArchives.clear();
-    }
+    new TrackerDistributedCacheManager(conf).purgeCache();
   }
 }

+ 237 - 0
src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java

@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Helper class of {@link TrackerDistributedCacheManager} that represents
+ * the cached files of a single task.  This class is used
+ * by TaskRunner/LocalJobRunner to parse out the job configuration
+ * and setup the local caches.
+ * 
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TaskDistributedCacheManager {
+  private final TrackerDistributedCacheManager distributedCacheManager;
+  private final Configuration taskConf;
+  private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
+  private final List<String> classPaths = new ArrayList<String>();
+ 
+  private boolean setupCalled = false;
+
+  /**
+   * Struct representing a single cached file.
+   * There are four permutations (archive, file) and
+   * (don't put in classpath, do put in classpath).
+   */
+  static class CacheFile {
+    /** URI as in the configuration */
+    final URI uri;
+    enum FileType {
+      REGULAR,
+      ARCHIVE
+    }
+    /** Whether to decompress */
+    final FileType type;
+    final long timestamp;
+    /** Whether this is to be added to the classpath */
+    final boolean shouldBeAddedToClassPath;
+
+    private CacheFile(URI uri, FileType type, long timestamp, 
+        boolean classPath) {
+      this.uri = uri;
+      this.type = type;
+      this.timestamp = timestamp;
+      this.shouldBeAddedToClassPath = classPath;
+    }
+
+    /**
+     * Converts the scheme used by DistributedCache to serialize what files to
+     * cache in the configuration into CacheFile objects that represent those 
+     * files.
+     */
+    private static List<CacheFile> makeCacheFiles(URI[] uris, 
+        String[] timestamps, Path[] paths, FileType type) {
+      List<CacheFile> ret = new ArrayList<CacheFile>();
+      if (uris != null) {
+        if (uris.length != timestamps.length) {
+          throw new IllegalArgumentException("Mismatched uris and timestamps.");
+        }
+        Map<String, Path> classPaths = new HashMap<String, Path>();
+        if (paths != null) {
+          for (Path p : paths) {
+            classPaths.put(p.toString(), p);
+          }
+        }
+        for (int i = 0; i < uris.length; ++i) {
+          URI u = uris[i];
+          boolean isClassPath = (null != classPaths.get(u.getPath()));
+          long t = Long.parseLong(timestamps[i]);
+          ret.add(new CacheFile(u, type, t, isClassPath));
+        }
+      }
+      return ret;
+    }
+  }
+
+  TaskDistributedCacheManager(
+      TrackerDistributedCacheManager distributedCacheManager,
+      Configuration taskConf) throws IOException {
+    this.distributedCacheManager = distributedCacheManager;
+    this.taskConf = taskConf;
+    
+    this.cacheFiles.addAll(
+        CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
+            DistributedCache.getFileTimestamps(taskConf),
+            DistributedCache.getFileClassPaths(taskConf),
+            CacheFile.FileType.REGULAR));
+    this.cacheFiles.addAll(
+        CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
+          DistributedCache.getArchiveTimestamps(taskConf),
+          DistributedCache.getArchiveClassPaths(taskConf), 
+          CacheFile.FileType.ARCHIVE));
+  }
+
+  /**
+   * Retrieve files into the local cache and updates the task configuration 
+   * (which has been passed in via the constructor).
+   * 
+   * It is the caller's responsibility to re-write the task configuration XML
+   * file, if necessary.
+   */
+  public void setup(LocalDirAllocator lDirAlloc, File workDir, 
+      String cacheSubdir) throws IOException {
+    setupCalled = true;
+    
+    if (cacheFiles.isEmpty()) {
+      return;
+    }
+
+    ArrayList<Path> localArchives = new ArrayList<Path>();
+    ArrayList<Path> localFiles = new ArrayList<Path>();
+    Path workdirPath = new Path(workDir.getAbsolutePath());
+
+    for (CacheFile cacheFile : cacheFiles) {
+      URI uri = cacheFile.uri;
+      FileSystem fileSystem = FileSystem.get(uri, taskConf);
+      FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
+      String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
+      String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
+      Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                fileStatus.getLen(), taskConf);
+      String baseDir = localPath.toString().replace(cacheId, "");
+      Path p = distributedCacheManager.getLocalCache(uri, taskConf,
+          new Path(baseDir), fileStatus, 
+          cacheFile.type == CacheFile.FileType.ARCHIVE,
+          cacheFile.timestamp, workdirPath, false);
+
+      if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
+        localArchives.add(p);
+      } else {
+        localFiles.add(p);
+      }
+      if (cacheFile.shouldBeAddedToClassPath) {
+        classPaths.add(p.toString());
+      }
+    }
+
+    // Update the configuration object with localized data.
+    if (!localArchives.isEmpty()) {
+      DistributedCache.setLocalArchives(taskConf, 
+        stringifyPathList(localArchives));
+    }
+    if (!localFiles.isEmpty()) {
+      DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+    }
+
+  }
+
+  private static String stringifyPathList(List<Path> p){
+    if (p == null || p.isEmpty()) {
+      return null;
+    }
+    StringBuilder str = new StringBuilder(p.get(0).toString());
+    for (int i = 1; i < p.size(); i++){
+      str.append(",");
+      str.append(p.get(i).toString());
+    }
+    return str.toString();
+  }
+
+  /** 
+   * Retrieves class paths (as local references) to add. 
+   * Should be called after setup().
+   * 
+   */
+  public List<String> getClassPaths() throws IOException {
+    if (!setupCalled) {
+      throw new IllegalStateException(
+          "getClassPaths() should be called after setup()");
+    }
+    return classPaths;
+  }
+
+  /**
+   * Releases the cached files/archives, so that space
+   * can be reclaimed by the {@link TrackerDistributedCacheManager}.
+   */
+  public void release() throws IOException {
+    for (CacheFile c : cacheFiles) {
+      distributedCacheManager.releaseCache(c.uri, taskConf);
+    }
+  }
+
+  /**
+   * Creates a class loader that includes the designated
+   * files and archives.
+   */
+  public ClassLoader makeClassLoader(final ClassLoader parent) 
+      throws MalformedURLException {
+    final URL[] urls = new URL[classPaths.size()];
+    for (int i = 0; i < classPaths.size(); ++i) {
+      urls[i] = new File(classPaths.get(i)).toURI().toURL();
+    }
+    return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+      @Override
+      public ClassLoader run() {
+        return new URLClassLoader(urls, parent);
+      }     
+    });
+  }
+}

+ 510 - 0
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Manages a single machine's instance of a cross-job
+ * cache.  This class would typically be instantiated
+ * by a TaskTracker (or something that emulates it,
+ * like LocalJobRunner).
+ * 
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TrackerDistributedCacheManager {
+  // cacheID to cacheStatus mapping
+  private TreeMap<String, CacheStatus> cachedArchives = 
+    new TreeMap<String, CacheStatus>();
+
+  private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+
+  // default total cache size (10GB)
+  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+
+  private static final Log LOG =
+    LogFactory.getLog(TrackerDistributedCacheManager.class);
+
+  private final LocalFileSystem localFs;
+
+  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+    this.localFs = FileSystem.getLocal(conf);
+  }
+
+  /**
+   * Get the locally cached file or archive; it could either be
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   *
+   * @param cache the cache to be localized, this should be specified as
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+   * @param conf The Configuration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the 
+   *  files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify
+   * that the file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create
+   * symlinks for the locally cached files/archives
+   * @param honorSymLinkConf if this is false, then the symlinks are not
+   * created even if conf says so (this is required for an optimization in task
+   * launches
+   * NOTE: This is effectively always on since r696957, since there is no code
+   * path that does not use this.
+   * @return the path to directory where the archives are unjarred in case of
+   * archives, the path to the file where the file is copied locally
+   * @throws IOException
+   */
+  Path getLocalCache(URI cache, Configuration conf,
+      Path baseDir, FileStatus fileStatus,
+      boolean isArchive, long confFileStamp,
+      Path currentWorkDir, boolean honorSymLinkConf)
+      throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    CacheStatus lcacheStatus;
+    Path localizedPath;
+    synchronized (cachedArchives) {
+      lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null) {
+        // was never localized
+        lcacheStatus = new CacheStatus(baseDir, 
+          new Path(baseDir, new Path(cacheId)));
+        cachedArchives.put(cacheId, lcacheStatus);
+      }
+
+      synchronized (lcacheStatus) {
+        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
+            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
+        lcacheStatus.refcount++;
+      }
+    }
+
+    // try deleting stuff if you can
+    long size = 0;
+    synchronized (baseDirSize) {
+      Long get = baseDirSize.get(baseDir);
+      if ( get != null ) {
+      size = get.longValue();
+      }
+    }
+    // setting the cache size to a default of 10GB
+    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+    if (allowedSize < size) {
+      // try some cache deletions
+      deleteCache(conf);
+    }
+    return localizedPath;
+  }
+
+  /**
+   * This is the opposite of getlocalcache. When you are done with
+   * using the cache, you need to release the cache
+   * @param cache The cache URI to be released
+   * @param conf configuration which contains the filesystem the cache
+   * is contained in.
+   * @throws IOException
+   */
+  void releaseCache(URI cache, Configuration conf)
+    throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null)
+        return;
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount--;
+      }
+    }
+  }
+
+  // To delete the caches which have a refcount of zero
+
+  private void deleteCache(Configuration conf) throws IOException {
+    // try deleting cache Status with refcount of zero
+    synchronized (cachedArchives) {
+      for (Iterator<String> it = cachedArchives.keySet().iterator(); 
+          it.hasNext();) {
+        String cacheId = it.next();
+        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+        synchronized (lcacheStatus) {
+          if (lcacheStatus.refcount == 0) {
+            // delete this cache entry
+            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+            synchronized (baseDirSize) {
+              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+              if ( dirSize != null ) {
+              dirSize -= lcacheStatus.size;
+              baseDirSize.put(lcacheStatus.baseDir, dirSize);
+              }
+            }
+            it.remove();
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Returns the relative path of the dir this cache will be localized in
+   * relative path that this cache will be localized in. For
+   * hdfs://hostname:port/absolute_path -- the relative path is
+   * hostname/absolute path -- if it is just /absolute_path -- then the
+   * relative path is hostname of DFS this mapred cluster is running
+   * on/absolute_path
+   */
+  String makeRelative(URI cache, Configuration conf)
+    throws IOException {
+    String host = cache.getHost();
+    if (host == null) {
+      host = cache.getScheme();
+    }
+    if (host == null) {
+      URI defaultUri = FileSystem.get(conf).getUri();
+      host = defaultUri.getHost();
+      if (host == null) {
+        host = defaultUri.getScheme();
+      }
+    }
+    String path = host + cache.getPath();
+    path = path.replace(":/","/");                // remove windows device colon
+    return path;
+  }
+
+  private Path cacheFilePath(Path p) {
+    return new Path(p, p.getName());
+  }
+
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private Path localizeCache(Configuration conf,
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive,
+                                    Path currentWorkDir, 
+                                    boolean honorSymLinkConf)
+  throws IOException {
+    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
+    if(cache.getFragment() == null) {
+      doSymlink = false;
+    }
+    FileSystem fs = FileSystem.get(cache, conf);
+    String link = 
+      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+    File flink = new File(link);
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                           cacheStatus, fileStatus)) {
+      LOG.info(String.format("Using existing cache of %s->%s",
+          cache.toString(), cacheStatus.localLoadPath));
+      if (isArchive) {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+                             link);
+        }
+
+        return cacheStatus.localLoadPath;
+      }
+      else {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(
+              cacheFilePath(cacheStatus.localLoadPath).toString(), link);
+        }
+        return cacheFilePath(cacheStatus.localLoadPath);
+      }
+    } else {
+
+      // remove the old archive
+      // if the old archive cannot be removed since it is being used by another
+      // job
+      // return null
+      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+                              + " is in use and cannot be refreshed");
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      localFs.delete(cacheStatus.localLoadPath, true);
+      synchronized (baseDirSize) {
+      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      if ( dirSize != null ) {
+        dirSize -= cacheStatus.size;
+        baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+      }
+      Path parchive = new Path(cacheStatus.localLoadPath,
+                               new Path(cacheStatus.localLoadPath.getName()));
+
+      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
+        throw new IOException("Mkdirs failed to create directory " +
+                              cacheStatus.localLoadPath.toString());
+      }
+
+      String cacheId = cache.getPath();
+      fs.copyToLocalFile(new Path(cacheId), parchive);
+      if (isArchive) {
+        String tmpArchive = parchive.toString().toLowerCase();
+        File srcFile = new File(parchive.toString());
+        File destDir = new File(parchive.getParent().toString());
+        LOG.info(String.format("Extracting %s to %s",
+            srcFile.toString(), destDir.toString()));
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(srcFile, destDir);
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(srcFile, destDir);
+        } else if (isTarFile(tmpArchive)) {
+          FileUtil.unTar(srcFile, destDir);
+        } else {
+          LOG.warn(String.format(
+            "Cache file %s specified as archive, but not valid extension.", 
+            srcFile.toString()));
+          // else will not do anyhting
+          // and copy the file into the dir as it is
+        }
+      }
+
+      long cacheSize = 
+        FileUtil.getDU(new File(parchive.getParent().toString()));
+      cacheStatus.size = cacheSize;
+      synchronized (baseDirSize) {
+        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+        if( dirSize == null ) {
+          dirSize = Long.valueOf(cacheSize);
+        } else {
+          dirSize += cacheSize;
+        }
+        baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+
+      // do chmod here
+      try {
+        //Setting recursive permission to grant everyone read and execute
+        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+      } catch(InterruptedException e) {
+      LOG.warn("Exception in chmod" + e.toString());
+      }
+
+      // update cacheStatus to reflect the newly cached file
+      cacheStatus.currentStatus = true;
+      cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
+
+      LOG.info(String.format("Cached %s as %s",
+          cache.toString(), cacheStatus.localLoadPath));
+    }
+
+    if (isArchive){
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+                           link);
+      }
+      return cacheStatus.localLoadPath;
+    }
+    else {
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+                           link);
+      }
+      return cacheFilePath(cacheStatus.localLoadPath);
+    }
+  }
+
+  private static boolean isTarFile(String filename) {
+    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
+           filename.endsWith(".tar"));
+  }
+
+  // Checks if the cache has already been localized and is fresh
+  private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+                                          URI cache, long confFileStamp,
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus)
+  throws IOException {
+    // check for existence of the cache
+    if (lcacheStatus.currentStatus == false) {
+      return false;
+    } else {
+      long dfsFileStamp;
+      if (fileStatus != null) {
+        dfsFileStamp = fileStatus.getModificationTime();
+      } else {
+        dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
+      }
+
+      // ensure that the file on hdfs hasn't been modified since the job started
+      if (dfsFileStamp != confFileStamp) {
+        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+        throw new IOException("File: " + cache +
+                              " has changed on HDFS since job started");
+      }
+
+      if (dfsFileStamp != lcacheStatus.mtime) {
+        // needs refreshing
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method create symlinks for all files in a given dir in another 
+   * directory.
+   * 
+   * Should not be used outside of DistributedCache code.
+   * 
+   * @param conf the configuration
+   * @param jobCacheDir the target directory for creating symlinks
+   * @param workDir the directory in which the symlinks are created
+   * @throws IOException
+   */
+  public static void createAllSymlink(Configuration conf, File jobCacheDir, 
+      File workDir)
+    throws IOException{
+    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+           workDir == null || (!workDir.isDirectory())) {
+      return;
+    }
+    boolean createSymlink = DistributedCache.getSymlink(conf);
+    if (createSymlink){
+      File[] list = jobCacheDir.listFiles();
+      for (int i=0; i < list.length; i++){
+        String target = list[i].getAbsolutePath();
+        String link = new File(workDir, list[i].getName()).toString();
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        int ret = FileUtil.symLink(target, link);
+        if (ret != 0) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+              link));
+        }
+      }
+    }
+  }
+
+  private static class CacheStatus {
+    // false, not loaded yet, true is loaded
+    boolean currentStatus;
+
+    // the local load path of this cache
+    Path localLoadPath;
+
+    //the base dir where the cache lies
+    Path baseDir;
+
+    //the size of this cache
+    long size;
+
+    // number of instances using this cache
+    int refcount;
+
+    // the cache-file modification time
+    long mtime;
+
+    public CacheStatus(Path baseDir, Path localLoadPath) {
+      super();
+      this.currentStatus = false;
+      this.localLoadPath = localLoadPath;
+      this.refcount = 0;
+      this.mtime = -1;
+      this.baseDir = baseDir;
+      this.size = 0;
+    }
+  }
+
+  /**
+   * Clear the entire contents of the cache and delete the backing files. This
+   * should only be used when the server is reinitializing, because the users
+   * are going to lose their files.
+   */
+  public void purgeCache() {
+    synchronized (cachedArchives) {
+      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
+        try {
+          localFs.delete(f.getValue().localLoadPath, true);
+        } catch (IOException ie) {
+          LOG.debug("Error cleaning up cache", ie);
+        }
+      }
+      cachedArchives.clear();
+    }
+  }
+
+  public TaskDistributedCacheManager newTaskDistributedCacheManager(
+      Configuration taskConf) throws IOException {
+    return new TaskDistributedCacheManager(this, taskConf);
+  }
+
+  /**
+   * Determines timestamps of files to be cached, and stores those
+   * in the configuration.  This is intended to be used internally by JobClient
+   * after all cache files have been added.
+   * 
+   * This is an internal method!
+   * 
+   * @param job Configuration of a job.
+   * @throws IOException
+   */
+  public static void determineTimestamps(Configuration job) throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    if (tarchives != null) {
+      StringBuffer archiveTimestamps = 
+        new StringBuffer(String.valueOf(
+            DistributedCache.getTimestamp(job, tarchives[0])));
+      for (int i = 1; i < tarchives.length; i++) {
+        archiveTimestamps.append(",");
+        archiveTimestamps.append(String.valueOf(
+            DistributedCache.getTimestamp(job, tarchives[i])));
+      }
+      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+    }
+  
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if (tfiles != null) {
+      StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
+          DistributedCache.getTimestamp(job, tfiles[0])));
+      for (int i = 1; i < tfiles.length; i++) {
+        fileTimestamps.append(",");
+        fileTimestamps.append(String.valueOf(
+            DistributedCache.getTimestamp(job, tfiles[i])));
+      }
+      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
+    }
+  }
+}

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -161,7 +161,7 @@ class Child {
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
-        TaskRunner.setupWorkDir(job);
+        TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);

+ 8 - 29
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -560,15 +561,12 @@ public class JobClient extends Configured implements MRConstants, Tool  {
                "Applications should implement Tool for the same.");
     }
 
-    // get all the command line arguments into the 
-    // jobconf passed in by the user conf
-    String files = null;
-    String libjars = null;
-    String archives = null;
+    // Retrieve command line arguments placed into the JobConf
+    // by GenericOptionsParser.
+    String files = job.get("tmpfiles");
+    String libjars = job.get("tmpjars");
+    String archives = job.get("tmparchives");
 
-    files = job.get("tmpfiles");
-    libjars = job.get("tmpjars");
-    archives = job.get("tmparchives");
     /*
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
@@ -648,27 +646,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     }
     
     //  set the timestamps of the archives and files
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
-    if (tarchives != null) {
-      StringBuffer archiveTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
-      for (int i = 1; i < tarchives.length; i++) {
-        archiveTimestamps.append(",");
-        archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
-      }
-      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
-    }
-
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
-    if (tfiles != null) {
-      StringBuffer fileTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
-      for (int i = 1; i < tfiles.length; i++) {
-        fileTimestamps.append(",");
-        fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
-      }
-      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
-    }
+    TrackerDistributedCacheManager.determineTimestamps(job);
        
     String originalJarPath = job.getJar();
 
@@ -695,6 +673,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     }
   }
 
+
   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
     UnixUserGroupInformation ugi = null;
     try {

+ 74 - 17
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -27,13 +29,15 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.JobTrackerMetricsInst;
-import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -60,19 +64,28 @@ class LocalJobRunner implements JobSubmissionProtocol {
   
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
-    private Path file;
+    // The job directory on the system: JobClient places job configurations here.
+    // This is analogous to JobTracker's system directory.
+    private Path systemJobDir;
+    private Path systemJobFile;
+
+    // The job directory for the task.  Analagous to a task's job directory.
+    private Path localJobDir;
+    private Path localJobFile;
+
     private JobID id;
     private JobConf job;
-    private Path systemJobDir;
 
     private JobStatus status;
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
 
     private JobProfile profile;
-    private Path localFile;
     private FileSystem localFs;
     boolean killed = false;
     
+    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TaskDistributedCacheManager taskDistributedCacheManager;
+    
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
     private Counters completedTaskCounters = new Counters();
@@ -86,15 +99,56 @@ class LocalJobRunner implements JobSubmissionProtocol {
     
     public Job(JobID jobid, String jobSubmitDir) throws IOException {
       this.systemJobDir = new Path(jobSubmitDir);
-      this.file = new Path(systemJobDir, "job.xml");
+      this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
 
-      this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
 
-      fs.copyToLocalFile(file, localFile);
-      this.job = new JobConf(localFile);
-      profile = new JobProfile(job.getUser(), id, file.toString(), 
+      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      this.localJobFile = new Path(this.localJobDir, id + ".xml");
+
+      // Manage the distributed cache.  If there are files to be copied,
+      // this will trigger localFile to be re-written again.
+      this.trackerDistributerdCacheManager =
+        new TrackerDistributedCacheManager(conf);
+      this.taskDistributedCacheManager =
+        trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(
+          new LocalDirAllocator("mapred.local.dir"),
+          new File(systemJobDir.toString()),
+      "archive");
+
+      if (DistributedCache.getSymlink(conf)) {
+        // This is not supported largely because,
+        // for a Child subprocess, the cwd in LocalJobRunner
+        // is not a fresh slate, but rather the user's working directory.
+        // This is further complicated because the logic in
+        // setupWorkDir only creates symlinks if there's a jarfile
+        // in the configuration.
+        LOG.warn("LocalJobRunner does not support " +
+        "symlinking into current working dir.");
+      }
+      // Setup the symlinks for the distributed cache.
+      TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
+
+      // Write out configuration file.  Instead of copying it from
+      // systemJobFile, we re-write it, since setup(), above, may have
+      // updated it.
+      OutputStream out = localFs.create(localJobFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+      this.job = new JobConf(localJobFile);
+
+      // Job (the current object) is a Thread, so we wrap its class loader.
+      if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
+        setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
+            getContextClassLoader()));
+      }
+
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                                "http://localhost:8080/", job.getJobName());
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
 
@@ -130,7 +184,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
             mapIds.add(mapId);
-            MapTask map = new MapTask(file.toString(),  
+            MapTask map = new MapTask(systemJobFile.toString(),  
                                       mapId, i,
                                       taskSplitMetaInfos[i].getSplitIndex(), 1);
             JobConf localConf = new JobConf(job);
@@ -140,7 +194,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
             mapOutput.setConf(localConf);
             mapOutputFiles.put(mapId, mapOutput);
 
-            map.setJobFile(localFile.toString());
+            map.setJobFile(localJobFile.toString());
             map.localizeConfiguration(localConf);
             map.setConf(localConf);
             map_tasks += 1;
@@ -158,7 +212,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
         try {
           if (numReduceTasks > 0) {
             ReduceTask reduce =
-                new ReduceTask(file.toString(), reduceId, 0, mapIds.size(),
+                new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(),
                     1);
             JobConf localConf = new JobConf(job);
             TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
@@ -183,7 +237,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
               }
             }
             if (!this.isInterrupted()) {
-              reduce.setJobFile(localFile.toString());
+              reduce.setJobFile(localJobFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -231,8 +285,11 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
       } finally {
         try {
-          fs.delete(file.getParent(), true);  // delete submit dir
-          localFs.delete(localFile, true);              // delete local copy
+          fs.delete(systemJobFile.getParent(), true);  // delete submit dir
+          localFs.delete(localJobFile, true);              // delete local copy
+          // Cleanup distributed cache
+          taskDistributedCacheManager.release();
+          trackerDistributerdCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -469,5 +526,5 @@ class LocalJobRunner implements JobSubmissionProtocol {
   @Override
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
     return null;
-}
+  }
 }

+ 65 - 162
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -33,6 +33,8 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -64,6 +66,7 @@ abstract class TaskRunner extends Thread {
 
   
   private TaskTracker tracker;
+  private TaskDistributedCacheManager taskDistributedCacheManager;
 
   protected JobConf conf;
   JvmManager jvmManager;
@@ -100,18 +103,6 @@ abstract class TaskRunner extends Thread {
    * not execute user code, only system code.
    */
   public void close() throws IOException {}
-
-  private static String stringifyPathArray(Path[] p){
-    if (p == null){
-      return null;
-    }
-    StringBuffer str = new StringBuffer(p[0].toString());
-    for (int i = 1; i < p.length; i++){
-      str.append(",");
-      str.append(p[i].toString());
-    }
-    return str.toString();
-  }
   
   /**
    * Get the java command line options for the child map/reduce tasks.
@@ -165,11 +156,12 @@ abstract class TaskRunner extends Thread {
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
       
-      URI[] archives = DistributedCache.getCacheArchives(conf);
-      URI[] files = DistributedCache.getCacheFiles(conf);
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
-      setupDistributedCache(lDirAlloc, workDir, archives, files);
+      taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
+                                    .newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(lDirAlloc, workDir,
+                                        TaskTracker.getDistributedCacheDir());
       
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
@@ -180,9 +172,10 @@ abstract class TaskRunner extends Thread {
         return;
       }
       
-      // Build classpath
-      List<String> classPaths = getClassPaths(conf, workDir, archives, files);
-      
+      // Accumulates class paths for child.
+      List<String> classPaths = getClassPaths(conf, workDir,
+                                              taskDistributedCacheManager);
+
       long logSize = TaskLog.getTaskLogLength(conf);
       
       //  Build exec child JVM args.
@@ -240,18 +233,8 @@ abstract class TaskRunner extends Thread {
       }
     } finally {
       try{
-        URI[] archives = DistributedCache.getCacheArchives(conf);
-        URI[] files = DistributedCache.getCacheFiles(conf);
-        if (archives != null){
-          for (int i = 0; i < archives.length; i++){
-            DistributedCache.releaseCache(archives[i], conf);
-          }
-        }
-        if (files != null){
-          for(int i = 0; i < files.length; i++){
-            DistributedCache.releaseCache(files[i], conf);
-          }
-        }
+        taskDistributedCacheManager.release();
+
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
@@ -466,7 +449,7 @@ abstract class TaskRunner extends Thread {
   /**
    */
   private static List<String> getClassPaths(JobConf conf, File workDir,
-      URI[] archives, URI[] files)
+      TaskDistributedCacheManager taskDistributedCacheManager)
       throws IOException {
     // Accumulates class paths for child.
     List<String> classPaths = new ArrayList<String>();
@@ -477,7 +460,7 @@ abstract class TaskRunner extends Thread {
     appendJobJarClasspaths(conf.getJar(), classPaths);
     
     // Distributed cache paths
-    appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+    classPaths.addAll(taskDistributedCacheManager.getClassPaths());
     
     // Include the working dir too
     classPaths.add(workDir.toString());
@@ -612,105 +595,6 @@ abstract class TaskRunner extends Thread {
     return new File(workDir.toString());
   }
 
-  private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
-      URI[] archives, URI[] files) throws IOException {
-    FileStatus fileStatus;
-    FileSystem fileSystem;
-    Path localPath;
-    String baseDir;
-    if ((archives != null) || (files != null)) {
-      if (archives != null) {
-        String[] archivesTimestamps = 
-                             DistributedCache.getArchiveTimestamps(conf);
-        Path[] p = new Path[archives.length];
-        for (int i = 0; i < archives.length;i++){
-          fileSystem = FileSystem.get(archives[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(archives[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(archives[i],conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() + 
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                true, Long.parseLong(
-                                                      archivesTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-          
-        }
-        DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
-      }
-      if ((files != null)) {
-        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
-        Path[] p = new Path[files.length];
-        for (int i = 0; i < files.length;i++){
-          fileSystem = FileSystem.get(files[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(files[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(files[i], conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() +
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                false, Long.parseLong(
-                                                         fileTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-        }
-        DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
-      }
-    }
-  }
-
-  private static void appendDistributedCacheClasspaths(JobConf conf,
-      URI[] archives, URI[] files, List<String> classPaths)
-      throws IOException {
-    // Archive paths
-    Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-    if (archiveClasspaths != null && archives != null) {
-      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-      if (localArchives != null){
-        for (int i=0;i<archives.length;i++){
-          for(int j=0;j<archiveClasspaths.length;j++){
-            if (archives[i].getPath().equals(
-                                             archiveClasspaths[j].toString())){
-              classPaths.add(localArchives[i].toString());
-            }
-          }
-        }
-      }
-    }
-    
-    //file paths
-    Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-    if (fileClasspaths!=null && files != null) {
-      Path[] localFiles = DistributedCache
-        .getLocalCacheFiles(conf);
-      if (localFiles != null) {
-        for (int i = 0; i < files.length; i++) {
-          for (int j = 0; j < fileClasspaths.length; j++) {
-            if (files[i].getPath().equals(
-                                          fileClasspaths[j].toString())) {
-              classPaths.add(localFiles[i].toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
   private static void appendSystemClasspaths(List<String> classPaths) {
     for (String c : System.getProperty("java.class.path").split(
         SYSTEM_PATH_SEPARATOR)) {
@@ -743,13 +627,23 @@ abstract class TaskRunner extends Thread {
     classPaths.add(new File(jobCacheDir, "classes").toString());
     classPaths.add(jobCacheDir.toString());
   }
-  
-  //Mostly for setting up the symlinks. Note that when we setup the distributed
-  //cache, we didn't create the symlinks. This is done on a per task basis
-  //by the currently executing task.
-  public static void setupWorkDir(JobConf conf) throws IOException {
-    File workDir = new File(".").getAbsoluteFile();
+   
+  /**
+   * Creates distributed cache symlinks and tmp directory, as appropriate.
+   * Note that when we setup the distributed
+   * cache, we didn't create the symlinks. This is done on a per task basis
+   * by the currently executing task.
+   * 
+   * @param conf The job configuration.
+   * @param workDir Working directory, which is completely deleted.
+   */
+  public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
+    LOG.debug("Fully deleting and re-creating" + workDir);
     FileUtil.fullyDelete(workDir);
+    if (!workDir.mkdir()) {
+      LOG.debug("Did not recreate " + workDir);
+    }
+    
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
@@ -758,47 +652,56 @@ abstract class TaskRunner extends Thread {
       if (archives != null) {
         for (int i = 0; i < archives.length; i++) {
           String link = archives[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localArchives[i].toString(), link);
-            }
-          }
+          String target = localArchives[i].toString();
+          symlink(workDir, target, link);
         }
       }
       if (files != null) {
         for (int i = 0; i < files.length; i++) {
           String link = files[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localFiles[i].toString(), link);
-            }
-          }
+          String target = localFiles[i].toString();
+          symlink(workDir, target, link);
         }
       }
     }
-    File jobCacheDir = null;
+
     if (conf.getJar() != null) {
-      jobCacheDir = new File(
+      File jobCacheDir = new File(
           new Path(conf.getJar()).getParent().toString());
-    }
 
-    // create symlinks for all the files in job cache dir in current
-    // workingdir for streaming
-    try{
-      DistributedCache.createAllSymlink(conf, jobCacheDir,
-          workDir);
-    } catch(IOException ie){
-      // Do not exit even if symlinks have not been created.
-      LOG.warn(StringUtils.stringifyException(ie));
+      // create symlinks for all the files in job cache dir in current
+      // workingdir for streaming
+      try{
+        TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir,
+            workDir);
+      } catch(IOException ie){
+        // Do not exit even if symlinks have not been created.
+        LOG.warn(StringUtils.stringifyException(ie));
+      }
     }
 
     createChildTmpDir(workDir, conf);
   }
 
+  /**
+   * Utility method for creating a symlink and warning on errors.
+   *
+   * If link is null, does nothing.
+   */
+  private static void symlink(File workDir, String target, String link)
+      throws IOException {
+    if (link != null) {
+      link = workDir.toString() + Path.SEPARATOR + link;
+      File flink = new File(link);
+      if (!flink.exists()) {
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        if (0 != FileUtil.symLink(target, link)) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target, link));
+        }
+      }
+    }
+  }
+
   /**
    * Kill the child process
    */

+ 12 - 3
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -53,7 +53,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -161,6 +161,8 @@ public class TaskTracker
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
+  
+  private TrackerDistributedCacheManager distributedCacheManager;
     
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
@@ -633,8 +635,11 @@ public class TaskTracker
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Clear out temporary files that might be lying around
-    DistributedCache.purgeCache(this.fConf);
+    // Initialize DistributedCache and
+    // clear out temporary files that might be lying around
+    this.distributedCacheManager = 
+        new TrackerDistributedCacheManager(this.fConf);
+    this.distributedCacheManager.purgeCache();
     cleanupStorage();
 
     this.jobClient = (InterTrackerProtocol) 
@@ -3717,6 +3722,10 @@ public class TaskTracker
     healthChecker.start();
   }
   
+  TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
+    return distributedCacheManager;
+  }
+
     /**
      * Download the job-token file from the FS and save on local fs.
      * @param user

+ 0 - 77
src/test/org/apache/hadoop/filecache/TestDistributedCache.java

@@ -1,77 +0,0 @@
-package org.apache.hadoop.filecache;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import junit.framework.TestCase;
-
-public class TestDistributedCache extends TestCase {
-  
-  static final URI LOCAL_FS = URI.create("file:///");
-  private static String TEST_CACHE_BASE_DIR =
-    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
-    .toString().replace(' ', '+');
-  private static String TEST_ROOT_DIR =
-    System.getProperty("test.build.data", "/tmp/distributedcache");
-  private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
-  private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
-  private Configuration conf;
-  private Path firstCacheFile;
-  private Path secondCacheFile;
-  private FileSystem localfs;
-  
-  /**
-   * @see TestCase#setUp()
-   */
-  @Override
-  protected void setUp() throws IOException {
-    conf = new Configuration();
-    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
-    localfs = FileSystem.get(LOCAL_FS, conf);
-    firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
-    secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
-    createTempFile(localfs, firstCacheFile);
-    createTempFile(localfs, secondCacheFile);
-  }
-  
-  /** test delete cache */
-  public void testDeleteCache() throws Exception {
-    DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
-        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
-    DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
-    //in above code,localized a file of size 4K and then release the cache which will cause the cache 
-    //be deleted when the limit goes out. The below code localize another cache which's designed to 
-    //sweep away the first cache.
-    DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
-        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
-    FileStatus[] dirStatuses = localfs.listStatus(new Path(TEST_CACHE_BASE_DIR));
-    assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
-        dirStatuses.length > 1);
-  }
-
-  private void createTempFile(FileSystem fs, Path p) throws IOException {
-    FSDataOutputStream out = fs.create(p);
-    byte[] toWrite = new byte[TEST_FILE_SIZE];
-    new Random().nextBytes(toWrite);
-    out.write(toWrite);
-    out.close();
-    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
-  }
-  
-  /**
-   * @see TestCase#tearDown()
-   */
-  @Override
-  protected void tearDown() throws IOException {
-    localfs.delete(firstCacheFile, true);
-    localfs.delete(secondCacheFile, true);
-    localfs.close();
-  }
-}

+ 195 - 0
src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java

@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+
+/**
+ * Tests the use of the
+ * {@link org.apache.hadoop.mapreduce.filecache.DistributedCache} within the
+ * full MR flow as well as the LocalJobRunner. This ought to be part of the
+ * filecache package, but that package is not currently in mapred, so cannot
+ * depend on MR for testing.
+ * 
+ * We use the distributed.* namespace for temporary files.
+ * 
+ * See {@link TestMiniMRLocalFS}, {@link TestMiniMRDFSCaching}, and
+ * {@link MRCaching} for other tests that test the distributed cache.
+ * 
+ * This test is not fast: it uses MiniMRCluster.
+ */
+public class TestMRWithDistributedCache extends TestCase {
+  private static Path TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"));
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static final Log LOG =
+    LogFactory.getLog(TestMRWithDistributedCache.class);
+
+  public static class DistributedCacheChecker extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      Path[] files = DistributedCache.getLocalCacheFiles(conf);
+      Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+      FileSystem fs = LocalFileSystem.get(conf);
+
+      // Check that 2 files and 2 archives are present
+      TestCase.assertEquals(2, files.length);
+      TestCase.assertEquals(2, archives.length);
+
+      // Check lengths of the files
+      TestCase.assertEquals(1, fs.getFileStatus(files[0]).getLen());
+      TestCase.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
+
+      // Check extraction of the archive
+      TestCase.assertTrue(fs.exists(new Path(archives[0],
+          "distributed.jar.inside3")));
+      TestCase.assertTrue(fs.exists(new Path(archives[1],
+          "distributed.jar.inside4")));
+
+      // Check the class loaders
+      LOG.info("Java Classpath: " + System.getProperty("java.class.path"));
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      // Both the file and the archive were added to classpath, so both
+      // should be reachable via the class loader.
+      TestCase.assertNotNull(cl.getResource("distributed.jar.inside2"));
+      TestCase.assertNotNull(cl.getResource("distributed.jar.inside3"));
+      TestCase.assertNull(cl.getResource("distributed.jar.inside4"));
+
+
+      // Check that the symlink for the renaming was created in the cwd;
+      // This only happens for real for non-local jobtrackers.
+      // (The symlinks exist in "localRunner/" for local Jobtrackers,
+      // but the user has no way to get at them.
+      if (!"local".equals(
+          context.getConfiguration().get("mapred.job.tracker"))) {
+        File symlinkFile = new File("distributed.first.symlink");
+        TestCase.assertTrue(symlinkFile.exists());
+        TestCase.assertEquals(1, symlinkFile.length());
+      }
+    }
+  }
+
+  private void testWithConf(JobConf conf) throws IOException,
+      InterruptedException, ClassNotFoundException, URISyntaxException {
+    // Create a temporary file of length 1.
+    Path first = createTempFile("distributed.first", "x");
+    // Create two jars with a single file inside them.
+    Path second =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
+    Path third =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
+    Path fourth =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
+
+    // Creates the Job Configuration
+    DistributedCache.addCacheFile(
+        new URI(first.toUri().toString() + "#distributed.first.symlink"),
+        conf);
+    DistributedCache.addFileToClassPath(second, conf);
+    DistributedCache.addArchiveToClassPath(third, conf);
+    DistributedCache.addCacheArchive(fourth.toUri(), conf);
+    DistributedCache.createSymlink(conf);
+
+    conf.setMaxMapAttempts(1); // speed up failures
+    Job job = new Job(conf);
+    job.setMapperClass(DistributedCacheChecker.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    FileInputFormat.setInputPaths(job, first);
+
+    job.submit();
+    assertTrue(job.waitForCompletion(false));
+  }
+
+  /** Tests using the local job runner. */
+  public void testLocalJobRunner() throws Exception {
+    JobConf c = new JobConf();
+    c.set("mapred.job.tracker", "local");
+    c.set("fs.default.name", "file:///");
+    testWithConf(c);
+  }
+
+  /** Tests using a full MiniMRCluster. */
+  public void testMiniMRJobRunner() throws Exception {
+    MiniMRCluster m = new MiniMRCluster(1, "file:///", 1);
+    try {
+      testWithConf(m.createJobConf());
+    } finally {
+      m.shutdown();
+    }
+
+  }
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(TEST_ROOT_DIR, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    return path;
+  }
+
+  private Path makeJar(Path p, int index) throws FileNotFoundException,
+      IOException {
+    FileOutputStream fos = new FileOutputStream(new File(p.toString()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
+    jos.putNextEntry(ze);
+    jos.write(("inside the jar!" + index).getBytes());
+    jos.closeEntry();
+    jos.close();
+    return p;
+  }
+}

+ 166 - 0
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.filecache;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+
+public class TestTrackerDistributedCacheManager extends TestCase {
+  private static final String TEST_LOCAL_DIR_PROP = "test.local.dir";
+  private static String TEST_CACHE_BASE_DIR =
+    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
+    .toString().replace(' ', '+');
+  private static String TEST_ROOT_DIR =
+    System.getProperty("test.build.data", "/tmp/distributedcache");
+  private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
+  private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private Configuration conf;
+  private Path firstCacheFile;
+  private Path secondCacheFile;
+
+  @Override
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf.set(TEST_LOCAL_DIR_PROP, TEST_ROOT_DIR);
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
+    secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+    createTempFile(firstCacheFile);
+    createTempFile(secondCacheFile);
+  }
+
+  /**
+   * This is the typical flow for using the DistributedCache classes.
+   */
+  public void testManagerFlow() throws IOException {
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf);
+    LocalDirAllocator localDirAllocator = 
+        new LocalDirAllocator(TEST_LOCAL_DIR_PROP);
+
+    // Configures a task/job with both a regular file and a "classpath" file.
+    Configuration subConf = new Configuration(conf);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+    DistributedCache.addFileToClassPath(secondCacheFile, subConf);
+    TrackerDistributedCacheManager.determineTimestamps(subConf);
+
+    Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
+    FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
+    subConf.writeXml(os);
+    os.close();
+
+    TaskDistributedCacheManager handle =
+      manager.newTaskDistributedCacheManager(subConf);
+    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+    handle.setup(localDirAllocator, 
+        new File(new Path(TEST_ROOT_DIR, "workdir").toString()), "distcache");
+    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
+    assertNotNull(null, localCacheFiles);
+    assertEquals(2, localCacheFiles.length);
+    Path cachedFirstFile = localCacheFiles[0];
+    Path cachedSecondFile = localCacheFiles[1];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
+    assertFalse("Paths should be different.", 
+        firstCacheFile.equals(cachedFirstFile));
+
+    assertEquals(1, handle.getClassPaths().size());
+    assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
+
+    // Cleanup
+    handle.release();
+    manager.purgeCache();
+    assertFalse(pathToFile(cachedFirstFile).exists());
+  }
+
+
+  /** test delete cache */
+  public void testDeleteCache() throws Exception {
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf);
+    FileSystem localfs = FileSystem.getLocal(conf);
+
+    manager.getLocalCache(firstCacheFile.toUri(), conf, 
+        new Path(TEST_CACHE_BASE_DIR), null, false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
+    manager.releaseCache(firstCacheFile.toUri(), conf);
+    //in above code,localized a file of size 4K and then release the cache 
+    // which will cause the cache be deleted when the limit goes out. 
+    // The below code localize another cache which's designed to
+    //sweep away the first cache.
+    manager.getLocalCache(secondCacheFile.toUri(), conf, 
+        new Path(TEST_CACHE_BASE_DIR), null, false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
+    FileStatus[] dirStatuses = localfs.listStatus(
+        new Path(TEST_CACHE_BASE_DIR));
+    assertTrue("DistributedCache failed deleting old" + 
+        " cache when the cache store is full.",
+        dirStatuses.length > 1);
+  }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    TrackerDistributedCacheManager manager =
+      new TrackerDistributedCacheManager(conf);
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///"
+        + firstCacheFile.toUri().getPath());
+    Path result = manager.getLocalCache(fileToCache.toUri(), conf,
+        new Path(TEST_CACHE_BASE_DIR), null, false, System.currentTimeMillis(),
+        new Path(TEST_ROOT_DIR), false);
+    assertNotNull("DistributedCache cached file on non-default filesystem.",
+        result);
+  }
+
+  static void createTempFile(Path p) throws IOException {
+    File f = new File(p.toString());
+    FileOutputStream os = new FileOutputStream(f);
+    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    new Random().nextBytes(toWrite);
+    os.write(toWrite);
+    os.close();
+    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+  }
+
+  @Override
+  protected void tearDown() throws IOException {
+    new File(firstCacheFile.toString()).delete();
+    new File(secondCacheFile.toString()).delete();
+  }
+
+  private void assertFileLengthEquals(Path a, Path b) 
+      throws FileNotFoundException {
+    assertEquals("File sizes mismatch.", 
+       pathToFile(a).length(), pathToFile(b).length());
+  }
+
+  private File pathToFile(Path p) {
+    return new File(p.toString());
+  }
+}