瀏覽代碼

HADOOP-673. Give each task its own working directory again. Contributed by Mahadev.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@486372 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
3c187e06cc

+ 3 - 0
CHANGES.txt

@@ -71,6 +71,9 @@ Trunk (unreleased changes)
 21. HADOOP-792.  Fix 'dfs -mv' to return correct status.
     (Dhruba Borthakur via cutting) 
 
+22. HADOOP-673.  Give each task its own working directory again.
+    (Mahadev Konar via cutting)
+
 
 Release 0.9.1 - 2006-12-06
 

+ 4 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -234,10 +234,12 @@ public abstract class PipeMapRed {
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       String userdir = System.getProperty("user.dir");
+      File currentDir = new File(".").getAbsoluteFile();
+      File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
       if (new File(prog).isAbsolute()) {
         // we don't own it. Hope it is executable
       } else {
-        new MustangFile(prog).setExecutable(true, true);
+        new MustangFile(new File(jobCacheDir, prog).toString()).setExecutable(true, true);
       }
 
       if (job_.getInputValueClass().equals(BytesWritable.class)) {
@@ -282,7 +284,7 @@ public abstract class PipeMapRed {
       //
       if (!new File(argvSplit[0]).isAbsolute()) {
           PathFinder finder = new PathFinder("PATH");
-          finder.prependPathComponent(".");
+          finder.prependPathComponent(jobCacheDir.toString());
           File f = finder.getAbsolutePath(argvSplit[0]);
           if (f != null) {
               argvSplit[0] = f.getAbsolutePath();

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -620,8 +620,8 @@ public class StreamJob {
       boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
       if (!b)
         fail(LINK_URI);
-      DistributedCache.createSymlink(jobConf_);
     }
+    DistributedCache.createSymlink(jobConf_);
     // set the jobconf for the caching parameters
     if (cacheArchives != null)
       DistributedCache.setCacheArchives(archiveURIs, jobConf_);

+ 26 - 1
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -24,6 +24,7 @@ import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.net.URI;
@@ -108,6 +109,8 @@ public class DistributedCache {
     String cacheId = makeRelative(cache, conf);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+      if (lcacheStatus == null)
+        return;
       synchronized (lcacheStatus) {
         lcacheStatus.refcount--;
       }
@@ -320,7 +323,29 @@ public class DistributedCache {
 
     return digest;
   }
-
+  
+  /**
+   * This method create symlinks for all files in a given dir in another directory
+   * @param conf the configuration
+   * @param jobCacheDir the target directory for creating symlinks
+   * @param workDir the directory in which the symlinks are created
+   * @throws IOException
+   */
+  public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
+  throws IOException{
+    if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){
+      return;
+    }
+    boolean createSymlink = getSymlink(conf);
+     if (createSymlink){
+       File[] list = jobCacheDir.listFiles();
+       for (int i=0; i < list.length; i++){
+         FileUtil.symLink(list[i].getAbsolutePath(),
+             new File(workDir, list[i].getName()).toString());
+       }
+     }  
+  }
+  
   private static String getFileSysName(URI url) {
     String fsname = url.getScheme();
     if ("dfs".equals(fsname)) {

+ 10 - 0
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -43,6 +43,16 @@ public class FileUtil {
             return false;
           }
         } else {
+          //try deleting the directory
+          // this might be a symlink
+          boolean b = false;
+          b = contents[i].delete();
+          if (b){
+            //this was indeed a symlink or an empty directory
+            continue;
+          }
+          // if not an empty directory or symlink let
+          // fullydelete handle it.
           if (! fullyDelete(contents[i])) {
             return false;
           }

+ 19 - 7
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -21,6 +21,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.util.*;
 import java.io.*;
 import java.util.Vector;
 import java.net.URI;
@@ -82,7 +83,8 @@ abstract class TaskRunner extends Thread {
       
       //before preparing the job localize 
       //all the archives
-      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+      File workDir = new File(t.getJobFile()).getParentFile();
+      File jobCacheDir = new File(workDir.getParent(), "work");
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       if ((archives != null) || (files != null)) {
@@ -104,8 +106,6 @@ abstract class TaskRunner extends Thread {
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
-        
-        // sets the paths to local archives and paths
         Path localTaskFile = new Path(t.getJobFile());
         FileSystem localFs = FileSystem.getNamed("local", conf);
         localFs.delete(localTaskFile);
@@ -116,6 +116,16 @@ abstract class TaskRunner extends Thread {
           out.close();
         }
       }
+    
+      // create symlinks for all the files in job cache dir in current
+      // workingdir for streaming
+      try{
+        DistributedCache.createAllSymlink(conf, jobCacheDir, 
+            workDir);
+      } catch(IOException ie){
+        // Do not exit even if symlinks have not been created.
+        LOG.warn(StringUtils.stringifyException(ie));
+      }
       
       if (! prepare()) {
         return;
@@ -135,7 +145,7 @@ abstract class TaskRunner extends Thread {
       String jar = conf.getJar();
       if (jar != null) {       
     	  // if jar exists, it into workDir
-        File[] libs = new File(workDir, "lib").listFiles();
+        File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
             classPath.append(sep);            // add libs from jar to classpath
@@ -143,11 +153,13 @@ abstract class TaskRunner extends Thread {
           }
         }
         classPath.append(sep);
-        classPath.append(new File(workDir, "classes"));
+        classPath.append(new File(jobCacheDir, "classes"));
         classPath.append(sep);
-        classPath.append(workDir);
+        classPath.append(jobCacheDir);
+       
       }
-
+      classPath.append(sep);
+      classPath.append(workDir);
       //  Build exec child jmv args.
       Vector vargs = new Vector(8);
       File jvm =                                  // use same jvm as parent