Browse Source

HADOOP-4041. IsolationRunner does not work as documented. Contributed by Philip Zeyliger.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@785044 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 16 years ago
parent
commit
5ee5ce5821

+ 3 - 0
CHANGES.txt

@@ -823,6 +823,9 @@ Trunk (unreleased changes)
     LD_LIBRARY_PATH and other environment variables.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-4041. IsolationRunner does not work as documented.
+    (Philip Zeyliger via tomwhite)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 11 - 6
src/core/org/apache/hadoop/fs/LocalDirAllocator.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.Configuration;
  * files. The way it works is that it is kept track what disk was last
  * allocated for a file write. For the current request, the next disk from
  * the set of disks would be allocated if the free space on the disk is 
- * sufficient enough to accomodate the file that is being considered for
+ * sufficient enough to accommodate the file that is being considered for
  * creation. If the space requirements cannot be met, the next disk in order
  * would be tried and so on till a disk is found with sufficient capacity.
  * Once a disk with sufficient space is identified, a check is done to make
@@ -69,6 +69,9 @@ public class LocalDirAllocator {
                  new TreeMap<String, AllocatorPerContext>();
   private String contextCfgItemName;
 
+  /** Used when size of file to be allocated is unknown. */
+  public static final int SIZE_UNKNOWN = -1;
+
   /**Create an allocator object
    * @param contextCfgItemName
    */
@@ -105,10 +108,11 @@ public class LocalDirAllocator {
    */
   public Path getLocalPathForWrite(String pathStr, 
       Configuration conf) throws IOException {
-    return getLocalPathForWrite(pathStr, -1, conf);
+    return getLocalPathForWrite(pathStr, SIZE_UNKNOWN, conf);
   }
   
-  /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+  /** Get a path from the local FS. Pass size as 
+   *  SIZE_UNKNOWN if not known apriori. We
    *  round-robin over the set of disks (via the configured dirs) and return
    *  the first complete path which has enough space 
    *  @param pathStr the requested path (this will be created on the first 
@@ -274,7 +278,7 @@ public class LocalDirAllocator {
      */
     public synchronized Path getLocalPathForWrite(String path, 
         Configuration conf) throws IOException {
-      return getLocalPathForWrite(path, -1, conf);
+      return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
     }
 
     /** Get a path from the local FS. If size is known, we go
@@ -296,7 +300,7 @@ public class LocalDirAllocator {
       }
       Path returnPath = null;
       
-      if(size == -1) {  //do roulette selection: pick dir with probability 
+      if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with probability 
                     //proportional to available size
         long[] availableOnDisk = new long[dirDF.length];
         long totalAvailable = 0;
@@ -344,7 +348,8 @@ public class LocalDirAllocator {
           "directory for " + pathStr);
     }
 
-    /** Creates a file on the local FS. Pass size as -1 if not known apriori. We
+    /** Creates a file on the local FS. Pass size as 
+     * {@link LocalDirAllocator.SIZE_UNKNOWN} if not known apriori. We
      *  round-robin over the set of disks (via the configured dirs) and return
      *  a file on the first path which has enough space. The file is guaranteed
      *  to go away when the JVM exits.

+ 20 - 0
src/core/org/apache/hadoop/util/StringUtils.java

@@ -677,4 +677,24 @@ public class StringUtils {
   public static synchronized String limitDecimalTo2(double d) {
     return decimalFormat.format(d);
   }
+  
+  /**
+   * Concatenates strings, using a separator.
+   *
+   * @param separator Separator to join with.
+   * @param strings Strings to join.
+   */
+  public static String join(CharSequence separator, Iterable<String> strings) {
+    StringBuffer sb = new StringBuffer();
+    boolean first = true;
+    for (String s : strings) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(separator);
+      }
+      sb.append(s);
+    }
+    return sb.toString();
+  }
 }

+ 2 - 2
src/docs/cn/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1337,8 +1337,8 @@
           IsolationRunner</a> 是帮助调试Map/Reduce程序的工具。</p>
           
           <p>使用<code>IsolationRunner</code>的方法是,首先设置
-          <code>keep.failed.tasks.files</code>属性为<code>true</code> 
-          (同时参考<code>keep.tasks.files.pattern</code>)。</p>
+          <code>keep.failed.task.files</code>属性为<code>true</code> 
+          (同时参考<code>keep.task.files.pattern</code>)。</p>
           
           <p>
             然后,登录到任务运行失败的节点上,进入

+ 4 - 2
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1894,8 +1894,8 @@
           IsolationRunner</a> is a utility to help debug Map/Reduce programs.</p>
           
           <p>To use the <code>IsolationRunner</code>, first set 
-          <code>keep.failed.tasks.files</code> to <code>true</code> 
-          (also see <code>keep.tasks.files.pattern</code>).</p>
+          <code>keep.failed.task.files</code> to <code>true</code> 
+          (also see <code>keep.task.files.pattern</code>).</p>
           
           <p>
             Next, go to the node on which the failed task ran and go to the 
@@ -1909,6 +1909,8 @@
           
           <p><code>IsolationRunner</code> will run the failed task in a single 
           jvm, which can be in the debugger, over precisely the same input.</p>
+
+          <p>Note that currently IsolationRunner will only re-run map tasks.</p>
         </section>
 
         <section>

+ 60 - 69
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -36,7 +36,17 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
+/**
+ * IsolationRunner is intended to facilitate debugging by re-running a specific
+ * task, given left-over task files for a (typically failed) past job.  
+ * Currently, it is limited to re-running map tasks.
+ *
+ * Users may coerce MapReduce to keep task files around by setting 
+ * keep.failed.task.files.  See mapred_tutorial.xml for more documentation.
+ */
 public class IsolationRunner {
   private static final Log LOG = 
     LogFactory.getLog(IsolationRunner.class.getName());
@@ -109,82 +119,57 @@ public class IsolationRunner {
     }
   }
   
-  private static ClassLoader makeClassLoader(JobConf conf, 
+  private ClassLoader makeClassLoader(JobConf conf, 
                                              File workDir) throws IOException {
-    List<URL> cp = new ArrayList<URL>();
-
+    List<String> classPaths = new ArrayList();
+    // Add jar clas files (includes lib/* and classes/*)
     String jar = conf.getJar();
-    if (jar != null) {                      // if jar exists, it into workDir
-      File[] libs = new File(workDir, "lib").listFiles();
-      if (libs != null) {
-        for (int i = 0; i < libs.length; i++) {
-          cp.add(new URL("file:" + libs[i].toString()));
-        }
-      }
-      cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
-      cp.add(new URL("file:" + workDir.toString() + "/"));
+    if (jar != null) {
+      TaskRunner.appendJobJarClasspaths(conf.getJar(), classPaths);
     }
-    return new URLClassLoader(cp.toArray(new URL[cp.size()]));
-  }
-  
-  /**
-   * Create empty sequence files for any of the map outputs that we don't have.
-   * @param fs the filesystem to create the files in
-   * @param dir the directory name to create the files in
-   * @param conf the jobconf
-   * @throws IOException if something goes wrong writing
-   */
-  private static void fillInMissingMapOutputs(FileSystem fs, 
-                                              TaskAttemptID taskId,
-                                              int numMaps,
-                                              JobConf conf) throws IOException {
-    Class<? extends WritableComparable> keyClass
-        = conf.getMapOutputKeyClass().asSubclass(WritableComparable.class);
-    Class<? extends Writable> valueClass
-        = conf.getMapOutputValueClass().asSubclass(Writable.class);
-    MapOutputFile namer = new MapOutputFile(taskId.getJobID());
-    namer.setConf(conf);
-    for(int i=0; i<numMaps; i++) {
-      Path f = namer.getInputFile(i, taskId);
-      if (!fs.exists(f)) {
-        LOG.info("Create missing input: " + f);
-        SequenceFile.Writer out =
-          SequenceFile.createWriter(fs, conf, f, keyClass, valueClass);
-        out.close();
-      }
-    }    
+    // Add the workdir, too.
+    classPaths.add(workDir.toString());
+    // Note: TaskRunner.run() does more, including DistributedCache files.
+    
+    // Convert to URLs
+    URL[] urls = new URL[classPaths.size()];
+    for (int i = 0; i < classPaths.size(); ++i) {
+      urls[i] = new File(classPaths.get(i)).toURL();
+    }
+    return new URLClassLoader(urls);
   }
   
   /**
-   * Run a single task
-   * @param args the first argument is the task directory
+   * Main method.
    */
-  public static void main(String[] args
-                          ) throws ClassNotFoundException, IOException, 
-                                   InterruptedException {
+  boolean run(String[] args) 
+      throws ClassNotFoundException, IOException, InterruptedException {
     if (args.length != 1) {
       System.out.println("Usage: IsolationRunner <path>/job.xml");
-      System.exit(1);
+      return false;
     }
     File jobFilename = new File(args[0]);
     if (!jobFilename.exists() || !jobFilename.isFile()) {
       System.out.println(jobFilename + " is not a valid job file.");
-      System.exit(1);
+      return false;
     }
     JobConf conf = new JobConf(new Path(jobFilename.toString()));
     TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
+    if (taskId == null) {
+      System.out.println("mapred.task.id not found in configuration;" + 
+          " job.xml is not a task config");
+    }
     boolean isMap = conf.getBoolean("mapred.task.is.map", true);
+    if (!isMap) {
+      System.out.println("Only map tasks are supported.");
+      return false;
+    }
     int partition = conf.getInt("mapred.task.partition", 0);
     
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-    File workDirName = new File(lDirAlloc.getLocalPathToRead(
-                                  TaskTracker.getLocalTaskDir(
-                                    taskId.getJobID().toString(), 
-                                    taskId.toString())
-                                  + Path.SEPARATOR + "work",
-                                  conf). toString());
+    File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
     local.setWorkingDirectory(new Path(workDirName.toString()));
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
@@ -193,23 +178,29 @@ public class IsolationRunner {
     Thread.currentThread().setContextClassLoader(classLoader);
     conf.setClassLoader(classLoader);
     
-    Task task;
-    if (isMap) {
-      Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), 
-                                 "split.dta");
-      DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
-      String splitClass = Text.readString(splitFile);
-      BytesWritable split = new BytesWritable();
-      split.readFields(splitFile);
-      splitFile.close();
-      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
-    } else {
-      int numMaps = conf.getNumMapTasks();
-      fillInMissingMapOutputs(local, taskId, numMaps, conf);
-      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
-    }
+    Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), 
+                               "split.dta");
+    DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
+    String splitClass = Text.readString(splitFile);
+    BytesWritable split = new BytesWritable();
+    split.readFields(splitFile);
+    splitFile.close();
+    Task task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());
+    return true;
   }
 
+
+  /**
+   * Run a single task.
+   *
+   * @param args the first argument is the task directory
+   */
+  public static void main(String[] args) 
+      throws ClassNotFoundException, IOException, InterruptedException {
+    if (!new IsolationRunner().run(args)) {
+      System.exit(1);
+    }
+  }
 }

+ 194 - 154
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -17,21 +17,30 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.filecache.*;
-import org.apache.hadoop.util.*;
-
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
-import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 
 /** Base class that runs a task in a separate process.  Tasks are run in a
  * separate process in order to isolate the map/reduce system code from bugs in
@@ -49,6 +58,9 @@ abstract class TaskRunner extends Thread {
   private int exitCode = -1;
   private boolean exitCodeSet = false;
   
+  private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+
+  
   private TaskTracker tracker;
 
   protected JobConf conf;
@@ -108,163 +120,40 @@ abstract class TaskRunner extends Thread {
       //all the archives
       TaskAttemptID taskid = t.getTaskID();
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-      File jobCacheDir = null;
-      if (conf.getJar() != null) {
-        jobCacheDir = new File(
-                          new Path(conf.getJar()).getParent().toString());
-      }
-      File workDir = new File(lDirAlloc.getLocalPathToRead(
-                                TaskTracker.getLocalTaskDir( 
-                                  t.getJobID().toString(), 
-                                  t.getTaskID().toString(),
-                                  t.isTaskCleanupTask())
-                                + Path.SEPARATOR + MRConstants.WORKDIR,
-                                conf). toString());
-
+      File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
+      
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
-      FileStatus fileStatus;
-      FileSystem fileSystem;
-      Path localPath;
-      String baseDir;
-
-      if ((archives != null) || (files != null)) {
-        if (archives != null) {
-          String[] archivesTimestamps = 
-                               DistributedCache.getArchiveTimestamps(conf);
-          Path[] p = new Path[archives.length];
-          for (int i = 0; i < archives.length;i++){
-            fileSystem = FileSystem.get(archives[i], conf);
-            fileStatus = fileSystem.getFileStatus(
-                                      new Path(archives[i].getPath()));
-            String cacheId = DistributedCache.makeRelative(archives[i],conf);
-            String cachePath = TaskTracker.getCacheSubdir() + 
-                                 Path.SEPARATOR + cacheId;
-            
-            localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                      fileStatus.getLen(), conf);
-            baseDir = localPath.toString().replace(cacheId, "");
-            p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                  new Path(baseDir),
-                                                  fileStatus,
-                                                  true, Long.parseLong(
-                                                        archivesTimestamps[i]),
-                                                  new Path(workDir.
-                                                        getAbsolutePath()), 
-                                                  false);
-            
-          }
-          DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
-        }
-        if ((files != null)) {
-          String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
-          Path[] p = new Path[files.length];
-          for (int i = 0; i < files.length;i++){
-            fileSystem = FileSystem.get(files[i], conf);
-            fileStatus = fileSystem.getFileStatus(
-                                      new Path(files[i].getPath()));
-            String cacheId = DistributedCache.makeRelative(files[i], conf);
-            String cachePath = TaskTracker.getCacheSubdir() +
-                                 Path.SEPARATOR + cacheId;
-            
-            localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                      fileStatus.getLen(), conf);
-            baseDir = localPath.toString().replace(cacheId, "");
-            p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                  new Path(baseDir),
-                                                  fileStatus,
-                                                  false, Long.parseLong(
-                                                           fileTimestamps[i]),
-                                                  new Path(workDir.
-                                                        getAbsolutePath()), 
-                                                  false);
-          }
-          DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
-        }
-        Path localTaskFile = new Path(t.getJobFile());
-        FileSystem localFs = FileSystem.getLocal(conf);
-        localFs.delete(localTaskFile, true);
-        OutputStream out = localFs.create(localTaskFile);
-        try {
-          conf.writeXml(out);
-        } finally {
-          out.close();
-        }
-      }
+      setupDistributedCache(lDirAlloc, workDir, archives, files);
           
       if (!prepare()) {
         return;
       }
 
-      String sep = System.getProperty("path.separator");
-      StringBuffer classPath = new StringBuffer();
+      // Accumulates class paths for child.
+      List<String> classPaths = new ArrayList<String>();
       // start with same classpath as parent process
-      classPath.append(System.getProperty("java.class.path"));
-      classPath.append(sep);
+      appendSystemClasspaths(classPaths);
+
       if (!workDir.mkdirs()) {
         if (!workDir.isDirectory()) {
           LOG.fatal("Mkdirs failed to create " + workDir.toString());
         }
       }
-	  
-      String jar = conf.getJar();
-      if (jar != null) {       
-        // if jar exists, it into workDir
-        File[] libs = new File(jobCacheDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            classPath.append(sep);            // add libs from jar to classpath
-            classPath.append(libs[i]);
-          }
-        }
-        classPath.append(sep);
-        classPath.append(new File(jobCacheDir, "classes"));
-        classPath.append(sep);
-        classPath.append(jobCacheDir);
-       
-      }
 
       // include the user specified classpath
+      appendJobJarClasspaths(conf.getJar(), classPaths);
   		
-      //archive paths
-      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-      if (archiveClasspaths != null && archives != null) {
-        Path[] localArchives = DistributedCache
-          .getLocalCacheArchives(conf);
-        if (localArchives != null){
-          for (int i=0;i<archives.length;i++){
-            for(int j=0;j<archiveClasspaths.length;j++){
-              if (archives[i].getPath().equals(
-                                               archiveClasspaths[j].toString())){
-                classPath.append(sep);
-                classPath.append(localArchives[i]
-                                 .toString());
-              }
-            }
-          }
-        }
-      }
-      //file paths
-      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-      if (fileClasspaths!=null && files != null) {
-        Path[] localFiles = DistributedCache
-          .getLocalCacheFiles(conf);
-        if (localFiles != null) {
-          for (int i = 0; i < files.length; i++) {
-            for (int j = 0; j < fileClasspaths.length; j++) {
-              if (files[i].getPath().equals(
-                                            fileClasspaths[j].toString())) {
-                classPath.append(sep);
-                classPath.append(localFiles[i].toString());
-              }
-            }
-          }
-        }
-      }
-
-      classPath.append(sep);
-      classPath.append(workDir);
-      //  Build exec child jmv args.
+      // Distributed cache paths
+      appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+      
+      // Include the working dir too
+      classPaths.add(workDir.toString());
+      
+      // Build classpath
+      
+      
+      //  Build exec child JVM args.
       Vector<String> vargs = new Vector<String>(8);
       File jvm =                                  // use same jvm as parent
         new File(new File(System.getProperty("java.home"), "bin"), "java");
@@ -308,12 +197,12 @@ abstract class TaskRunner extends Thread {
       if (libraryPath == null) {
         libraryPath = workDir.getAbsolutePath();
       } else {
-        libraryPath += sep + workDir;
+        libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
       }
       boolean hasUserLDPath = false;
       for(int i=0; i<javaOptsSplit.length ;i++) { 
         if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
-          javaOptsSplit[i] += sep + libraryPath;
+          javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
           hasUserLDPath = true;
           break;
         }
@@ -342,7 +231,8 @@ abstract class TaskRunner extends Thread {
 
       // Add classpath.
       vargs.add("-classpath");
-      vargs.add(classPath.toString());
+      String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+      vargs.add(classPath);
 
       // Setup the log4j prop
       long logSize = TaskLog.getTaskLogLength(conf);
@@ -396,7 +286,7 @@ abstract class TaskRunner extends Thread {
       String oldLdLibraryPath = null;
       oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
       if (oldLdLibraryPath != null) {
-        ldLibraryPath.append(sep);
+        ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
@@ -494,6 +384,156 @@ abstract class TaskRunner extends Thread {
       tip.reportTaskFinished();
     }
   }
+
+  /** Creates the working directory pathname for a task attempt. */ 
+  static File formWorkDir(LocalDirAllocator lDirAlloc, 
+      TaskAttemptID task, boolean isCleanup, JobConf conf) 
+      throws IOException {
+    File workDir = new File(lDirAlloc.getLocalPathToRead(
+        TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
+          task.toString(), isCleanup) 
+        + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
+    return workDir;
+  }
+
+  private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
+      URI[] archives, URI[] files) throws IOException {
+    FileStatus fileStatus;
+    FileSystem fileSystem;
+    Path localPath;
+    String baseDir;
+    if ((archives != null) || (files != null)) {
+      if (archives != null) {
+        String[] archivesTimestamps = 
+                             DistributedCache.getArchiveTimestamps(conf);
+        Path[] p = new Path[archives.length];
+        for (int i = 0; i < archives.length;i++){
+          fileSystem = FileSystem.get(archives[i], conf);
+          fileStatus = fileSystem.getFileStatus(
+                                    new Path(archives[i].getPath()));
+          String cacheId = DistributedCache.makeRelative(archives[i],conf);
+          String cachePath = TaskTracker.getCacheSubdir() + 
+                               Path.SEPARATOR + cacheId;
+          
+          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                    fileStatus.getLen(), conf);
+          baseDir = localPath.toString().replace(cacheId, "");
+          p[i] = DistributedCache.getLocalCache(archives[i], conf, 
+                                                new Path(baseDir),
+                                                fileStatus,
+                                                true, Long.parseLong(
+                                                      archivesTimestamps[i]),
+                                                new Path(workDir.
+                                                      getAbsolutePath()), 
+                                                false);
+          
+        }
+        DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
+      }
+      if ((files != null)) {
+        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
+        Path[] p = new Path[files.length];
+        for (int i = 0; i < files.length;i++){
+          fileSystem = FileSystem.get(files[i], conf);
+          fileStatus = fileSystem.getFileStatus(
+                                    new Path(files[i].getPath()));
+          String cacheId = DistributedCache.makeRelative(files[i], conf);
+          String cachePath = TaskTracker.getCacheSubdir() +
+                               Path.SEPARATOR + cacheId;
+          
+          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                    fileStatus.getLen(), conf);
+          baseDir = localPath.toString().replace(cacheId, "");
+          p[i] = DistributedCache.getLocalCache(files[i], conf, 
+                                                new Path(baseDir),
+                                                fileStatus,
+                                                false, Long.parseLong(
+                                                         fileTimestamps[i]),
+                                                new Path(workDir.
+                                                      getAbsolutePath()), 
+                                                false);
+        }
+        DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
+      }
+      Path localTaskFile = new Path(t.getJobFile());
+      FileSystem localFs = FileSystem.getLocal(conf);
+      localFs.delete(localTaskFile, true);
+      OutputStream out = localFs.create(localTaskFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+    }
+  }
+
+  private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives, 
+      URI[] files, List<String> classPaths) throws IOException {
+    // Archive paths
+    Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+    if (archiveClasspaths != null && archives != null) {
+      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+      if (localArchives != null){
+        for (int i=0;i<archives.length;i++){
+          for(int j=0;j<archiveClasspaths.length;j++){
+            if (archives[i].getPath().equals(
+                                             archiveClasspaths[j].toString())){
+              classPaths.add(localArchives[i].toString());
+            }
+          }
+        }
+      }
+    }
+    
+    //file paths
+    Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+    if (fileClasspaths!=null && files != null) {
+      Path[] localFiles = DistributedCache
+        .getLocalCacheFiles(conf);
+      if (localFiles != null) {
+        for (int i = 0; i < files.length; i++) {
+          for (int j = 0; j < fileClasspaths.length; j++) {
+            if (files[i].getPath().equals(
+                                          fileClasspaths[j].toString())) {
+              classPaths.add(localFiles[i].toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void appendSystemClasspaths(List<String> classPaths) {
+    for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+      classPaths.add(c);
+    }
+  }
+  
+  /**
+   * Given a "jobJar" (typically retrieved via {@link Configuration.getJar()}),
+   * appends classpath entries for it, as well as its lib/ and classes/
+   * subdirectories.
+   * 
+   * @param jobJar Job jar from configuration
+   * @param classPaths Accumulator for class paths
+   */
+  static void appendJobJarClasspaths(String jobJar, List<String> classPaths) {
+    if (jobJar == null) {
+      return;
+      
+    }
+    File jobCacheDir = new File(new Path(jobJar).getParent().toString());
+    
+    // if jar exists, it into workDir
+    File[] libs = new File(jobCacheDir, "lib").listFiles();
+    if (libs != null) {
+      for (File l : libs) {
+        classPaths.add(l.toString());
+      }
+    }
+    classPaths.add(new File(jobCacheDir, "classes").toString());
+    classPaths.add(jobCacheDir.toString());
+  }
   
   //Mostly for setting up the symlinks. Note that when we setup the distributed
   //cache, we didn't create the symlinks. This is done on a per task basis

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -778,7 +778,7 @@ public class TaskTracker
         // job-specific shared directory for use as scratch space 
         Path workDir = lDirAlloc.getLocalPathForWrite(
                          (getLocalJobDir(jobId.toString())
-                         + Path.SEPARATOR + "work"), fConf);
+                         + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
         if (!localFs.mkdirs(workDir)) {
           throw new IOException("Mkdirs failed to create " 
                       + workDir.toString());

+ 14 - 0
src/test/core/org/apache/hadoop/util/TestStringUtils.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.util;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import junit.framework.TestCase;
 
 public class TestStringUtils extends TestCase {
@@ -118,4 +121,15 @@ public class TestStringUtils extends TestCase {
     assertEquals(-1259520L, StringUtils.TraditionalBinaryPrefix.string2long("-1230k"));
     assertEquals(956703965184L, StringUtils.TraditionalBinaryPrefix.string2long("891g"));
   }
+
+  public void testJoin() {
+    List<String> s = new ArrayList<String>();
+    s.add("a");
+    s.add("b");
+    s.add("c");
+    assertEquals("", StringUtils.join(":", s.subList(0, 0)));
+    assertEquals("a", StringUtils.join(":", s.subList(0, 1)));
+    assertEquals("a:b", StringUtils.join(":", s.subList(0, 2)));
+    assertEquals("a:b:c", StringUtils.join(":", s.subList(0, 3)));
+  }
 }

+ 180 - 0
src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java

@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/** 
+ * Re-runs a map task using the IsolationRunner. 
+ *
+ * The task included here is an identity mapper that touches
+ * a file in a side-effect directory.  This is used
+ * to verify that the task in fact ran.
+ */
+public class TestIsolationRunner extends TestCase {
+
+  private static final String SIDE_EFFECT_DIR_PROPERTY =
+    "test.isolationrunner.sideeffectdir";
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+  
+  /** Identity mapper that also creates a side effect file. */
+  static class SideEffectMapper<K, V> extends IdentityMapper<K, V> {
+    private JobConf conf;
+    @Override
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+    @Override
+    public void close() throws IOException {
+      writeSideEffectFile(conf, "map");
+    }
+  }
+
+  static class SideEffectReducer<K, V> extends IdentityReducer<K, V> {
+    private JobConf conf;
+    @Override
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+    @Override
+    public void close() throws IOException {
+      writeSideEffectFile(conf, "reduce");
+    }
+  }
+
+  private static void deleteSideEffectFiles(JobConf conf) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), true);
+    assertEquals(0, countSideEffectFiles(conf, ""));
+  }
+  
+  private static void writeSideEffectFile(JobConf conf, String prefix)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path sideEffectFile = new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY),
+        prefix + "-" + UUID.randomUUID().toString());
+    localFs.create(sideEffectFile).close();
+  }
+  
+  private static int countSideEffectFiles(JobConf conf, final String prefix)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    FileStatus[] files = localFs.listStatus(
+        new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
+      @Override public boolean accept(Path path) {
+        return path.getName().startsWith(prefix + "-");
+      }
+    });
+    return files.length;
+  }
+  
+  private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
+      throws IOException {
+    String[] localDirs = conf.getLocalDirs();
+    assertEquals(1, localDirs.length);
+    Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
+        "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);    
+    Path attemptDir = new Path(jobCacheDir,
+        new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString());    
+    return new Path(attemptDir, "job.xml");
+  }
+
+  public void testIsolationRunOfMapTask() throws 
+      IOException, InterruptedException, ClassNotFoundException {
+    MiniMRCluster mr = null;
+    try {
+      mr = new MiniMRCluster(1, "file:///", 1);
+
+      // Run a job succesfully; keep task files.
+      JobConf conf = mr.createJobConf();
+      conf.setKeepTaskFilesPattern(".*");
+      conf.set(SIDE_EFFECT_DIR_PROPERTY, TEST_ROOT_DIR +
+          "/isolationrunnerjob/sideeffect");
+      // Delete previous runs' data.
+      deleteSideEffectFiles(conf);
+      JobID jobId = runJobNormally(conf);
+      assertEquals(1, countSideEffectFiles(conf, "map"));
+      assertEquals(1, countSideEffectFiles(conf, "reduce"));
+      
+      deleteSideEffectFiles(conf);
+
+      // Retrieve succesful job's configuration and 
+      // run IsolationRunner against the map task.
+      FileSystem localFs = FileSystem.getLocal(conf);
+      Path mapJobXml = getAttemptJobXml(conf, jobId,
+          TaskType.MAP).makeQualified(localFs);
+      assertTrue(localFs.exists(mapJobXml));
+      
+      new IsolationRunner().run(new String[] {
+          new File(mapJobXml.toUri()).getCanonicalPath() });
+      
+      assertEquals(1, countSideEffectFiles(conf, "map"));
+      assertEquals(0, countSideEffectFiles(conf, "reduce"));
+
+      // Clean up
+      deleteSideEffectFiles(conf);
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  static JobID runJobNormally(JobConf conf) throws IOException {
+    final Path inDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/input");
+    final Path outDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/output");
+
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outDir, true);
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    String input = "The quick brown fox jumps over lazy dog\n";
+    DataOutputStream file = fs.create(new Path(inDir, "file"));
+    file.writeBytes(input);
+    file.close();
+
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setMapperClass(SideEffectMapper.class);
+    conf.setReducerClass(SideEffectReducer.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+    job.waitForCompletion();
+    return job.getID();
+  }
+}