瀏覽代碼

HADOOP-2227. Use the LocalDirAllocator uniformly for handling all of the temporary storage required for a given task. It also implies that mapred.local.dir.minspacestart is handled by checking if there is enough free-space on any one of the available disks. Contributed by Amareshwari Sri Ramadasu.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@605471 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 年之前
父節點
當前提交
8883c1f634

+ 6 - 0
CHANGES.txt

@@ -287,6 +287,12 @@ Branch 0.15 (unreleased)
     transaction log, it stops writing new transactions to that one.
     transaction log, it stops writing new transactions to that one.
     (Raghu Angadi via dhruba)
     (Raghu Angadi via dhruba)
 
 
+    HADOOP-2227.  Use the LocalDirAllocator uniformly for handling all of the
+    temporary storage required for a given task. It also implies that
+    mapred.local.dir.minspacestart is handled by checking if there is enough
+    free-space on any one of the available disks. (Amareshwari Sri Ramadasu
+    via acmurthy)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-2160.  Remove project-level, non-user documentation from
     HADOOP-2160.  Remove project-level, non-user documentation from

+ 53 - 8
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -128,6 +128,7 @@ public class DistributedCache {
    * being used in the Configuration
    * being used in the Configuration
    * @param conf The Confguration file which contains the filesystem
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param fileStatus The file status on the dfs.
    * @param isArchive if the cache is an archive or a file. In case it is an archive
    * @param isArchive if the cache is an archive or a file. In case it is an archive
    *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
    *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
    *  and the directory where the archive is unjarred is returned as the Path.
    *  and the directory where the archive is unjarred is returned as the Path.
@@ -140,8 +141,10 @@ public class DistributedCache {
    * the path to the file where the file is copied locally 
    * the path to the file where the file is copied locally 
    * @throws IOException
    * @throws IOException
    */
    */
-  public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
-                                   boolean isArchive, long confFileStamp, Path currentWorkDir) 
+  public static Path getLocalCache(URI cache, Configuration conf, 
+                                   Path baseDir, FileStatus fileStatus,
+                                   boolean isArchive, long confFileStamp,
+                                   Path currentWorkDir) 
   throws IOException {
   throws IOException {
     String cacheId = makeRelative(cache, conf);
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
     CacheStatus lcacheStatus;
@@ -156,7 +159,7 @@ public class DistributedCache {
       
       
       synchronized (lcacheStatus) {
       synchronized (lcacheStatus) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-                                      isArchive, currentWorkDir);
+                                      fileStatus, isArchive, currentWorkDir);
         lcacheStatus.refcount++;
         lcacheStatus.refcount++;
       }
       }
     }
     }
@@ -172,6 +175,38 @@ public class DistributedCache {
     return localizedPath;
     return localizedPath;
   }
   }
   
   
+  /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
+   * or hostname:port is provided the file is assumed to be in the filesystem
+   * being used in the Configuration
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param isArchive if the cache is an archive or a file. In case it is an archive
+   *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
+   *  and the directory where the archive is unjarred is returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+                                   Path baseDir, boolean isArchive,
+                                   long confFileStamp, Path currentWorkDir) 
+  throws IOException {
+    return getLocalCache(cache, conf, 
+                         baseDir, null, isArchive,
+                         confFileStamp, currentWorkDir);
+  }
+  
   /**
   /**
    * This is the opposite of getlocalcache. When you are done with
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache
    * using the cache, you need to release the cache
@@ -220,7 +255,7 @@ public class DistributedCache {
    * relative path is hostname of DFS this mapred cluster is running
    * relative path is hostname of DFS this mapred cluster is running
    * on/absolute_path
    * on/absolute_path
    */
    */
-  private static String makeRelative(URI cache, Configuration conf)
+  public static String makeRelative(URI cache, Configuration conf)
     throws IOException {
     throws IOException {
     String fsname = cache.getScheme();
     String fsname = cache.getScheme();
     String path;
     String path;
@@ -243,6 +278,7 @@ public class DistributedCache {
   private static Path localizeCache(Configuration conf, 
   private static Path localizeCache(Configuration conf, 
                                     URI cache, long confFileStamp,
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
                                     CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
                                     boolean isArchive, 
                                     boolean isArchive, 
                                     Path currentWorkDir) 
                                     Path currentWorkDir) 
   throws IOException {
   throws IOException {
@@ -250,7 +286,8 @@ public class DistributedCache {
     FileSystem fs = getFileSystem(cache, conf);
     FileSystem fs = getFileSystem(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
     File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus)) {
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                           cacheStatus, fileStatus)) {
       if (isArchive) {
       if (isArchive) {
         if (doSymlink){
         if (doSymlink){
           if (!flink.exists())
           if (!flink.exists())
@@ -280,6 +317,7 @@ public class DistributedCache {
       localFs.delete(cacheStatus.localLoadPath);
       localFs.delete(cacheStatus.localLoadPath);
       Path parchive = new Path(cacheStatus.localLoadPath,
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));
                                new Path(cacheStatus.localLoadPath.getName()));
+      
       if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
       if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
         throw new IOException("Mkdirs failed to create directory " + 
         throw new IOException("Mkdirs failed to create directory " + 
                               cacheStatus.localLoadPath.toString());
                               cacheStatus.localLoadPath.toString());
@@ -334,13 +372,19 @@ public class DistributedCache {
   // Checks if the cache has already been localized and is fresh
   // Checks if the cache has already been localized and is fresh
   private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
   private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
                                           URI cache, long confFileStamp, 
                                           URI cache, long confFileStamp, 
-                                          CacheStatus lcacheStatus) 
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus) 
   throws IOException {
   throws IOException {
     // check for existence of the cache
     // check for existence of the cache
     if (lcacheStatus.currentStatus == false) {
     if (lcacheStatus.currentStatus == false) {
       return false;
       return false;
     } else {
     } else {
-      long dfsFileStamp = getTimestamp(conf, cache);
+      long dfsFileStamp;
+      if (fileStatus != null) {
+        dfsFileStamp = fileStatus.getModificationTime();
+      } else {
+        dfsFileStamp = getTimestamp(conf, cache);
+      }
 
 
       // ensure that the file on hdfs hasn't been modified since the job started 
       // ensure that the file on hdfs hasn't been modified since the job started 
       if (dfsFileStamp != confFileStamp) {
       if (dfsFileStamp != confFileStamp) {
@@ -382,7 +426,8 @@ public class DistributedCache {
    */
    */
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
     throws IOException{
     throws IOException{
-    if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){
+    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+           workDir == null || (!workDir.isDirectory())) {
       return;
       return;
     }
     }
     boolean createSymlink = getSymlink(conf);
     boolean createSymlink = getSymlink(conf);

+ 37 - 0
src/java/org/apache/hadoop/fs/LocalDirAllocator.java

@@ -165,6 +165,18 @@ public class LocalDirAllocator {
     }
     }
   }
   }
     
     
+  /** We search through all the configured dirs for the file's existence
+   *  and return true when we find  
+   *  @param pathStr the requested file (this will be searched)
+   *  @param conf the Configuration object
+   *  @return true if files exist. false otherwise
+   *  @throws IOException
+   */
+  public boolean ifExists(String pathStr,Configuration conf) {
+    AllocatorPerContext context = obtainContext(contextCfgItemName);
+    return context.ifExists(pathStr, conf);
+  }
+
   private static class AllocatorPerContext {
   private static class AllocatorPerContext {
 
 
     private final Log LOG =
     private final Log LOG =
@@ -327,5 +339,30 @@ public class LocalDirAllocator {
       throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
       throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
       " the configured local directories");
       " the configured local directories");
     }
     }
+
+    /** We search through all the configured dirs for the file's existence
+     *  and return true when we find one 
+     */
+    public synchronized boolean ifExists(String pathStr,Configuration conf) {
+      try {
+        int numDirs = localDirs.length;
+        int numDirsSearched = 0;
+        //remove the leading slash from the path (to make sure that the uri
+        //resolution results in a valid path on the dir being checked)
+        if (pathStr.startsWith("/")) {
+          pathStr = pathStr.substring(1);
+        }
+        while (numDirsSearched < numDirs) {
+          Path file = new Path(localDirs[numDirsSearched], pathStr);
+          if (localFS.exists(file)) {
+            return true;
+          }
+          numDirsSearched++;
+        }
+      } catch (IOException e) {
+        // IGNORE and try again
+      }
+      return false;
+    }
   }
   }
 }
 }

+ 9 - 1
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InMemoryFileSystem;
 import org.apache.hadoop.fs.InMemoryFileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
@@ -796,12 +797,19 @@ class ReduceTask extends Task {
       // get the work directory which holds the elements we are dynamically
       // get the work directory which holds the elements we are dynamically
       // adding to the classpath
       // adding to the classpath
       File workDir = new File(task.getJobFile()).getParentFile();
       File workDir = new File(task.getJobFile()).getParentFile();
-      File jobCacheDir = new File(workDir.getParent(), "work");
       ArrayList<URL> urllist = new ArrayList<URL>();
       ArrayList<URL> urllist = new ArrayList<URL>();
       
       
       // add the jars and directories to the classpath
       // add the jars and directories to the classpath
       String jar = conf.getJar();
       String jar = conf.getJar();
       if (jar != null) {      
       if (jar != null) {      
+        LocalDirAllocator lDirAlloc = 
+                            new LocalDirAllocator("mapred.local.dir");
+        File jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
+                                      TaskTracker.getJobCacheSubdir() 
+                                      + Path.SEPARATOR + getJobId() 
+                                      + Path.SEPARATOR  
+                                      + "work", conf).toString());
+
         File[] libs = new File(jobCacheDir, "lib").listFiles();
         File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
           for (int i = 0; i < libs.length; i++) {

+ 59 - 8
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
 import java.io.*;
 import java.io.*;
@@ -91,18 +92,52 @@ abstract class TaskRunner extends Thread {
       //all the archives
       //all the archives
       File workDir = new File(t.getJobFile()).getParentFile();
       File workDir = new File(t.getJobFile()).getParentFile();
       String taskid = t.getTaskId();
       String taskid = t.getTaskId();
-      File jobCacheDir = new File(workDir.getParent(), "work");
+      LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+      File jobCacheDir = null;
+      try {
+        jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
+                                    TaskTracker.getJobCacheSubdir() 
+                                    + Path.SEPARATOR + t.getJobId() 
+                                    + Path.SEPARATOR  
+                                    + "work", conf).toString());
+      } catch (IOException ioe) {
+        LOG.warn("work directory doesnt exist");
+      }
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
+      FileStatus fileStatus;
+      FileSystem fileSystem;
+      Path localPath;
+      String baseDir;
+
       if ((archives != null) || (files != null)) {
       if ((archives != null) || (files != null)) {
         if (archives != null) {
         if (archives != null) {
-          String[] archivesTimestamps = DistributedCache.getArchiveTimestamps(conf);
+          String[] archivesTimestamps = 
+                               DistributedCache.getArchiveTimestamps(conf);
           Path[] p = new Path[archives.length];
           Path[] p = new Path[archives.length];
           for (int i = 0; i < archives.length;i++){
           for (int i = 0; i < archives.length;i++){
+            fileSystem = FileSystem.get(archives[i], conf);
+            fileStatus = fileSystem.getFileStatus(
+                                      new Path(archives[i].getPath()));
+            String cacheId = DistributedCache.makeRelative(archives[i],conf);
+            String cachePath = TaskTracker.getCacheSubdir() + 
+                                 Path.SEPARATOR + cacheId;
+            if (lDirAlloc.ifExists(cachePath, conf)) {
+              localPath =  lDirAlloc.getLocalPathToRead(cachePath, conf);
+            }
+            else {
+              localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                      fileStatus.getLen(), conf);
+            }
+            baseDir = localPath.toString().replace(cacheId, "");
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()), 
-                                                  true, Long.parseLong(archivesTimestamps[i]), 
-                                                  new Path(workDir.getAbsolutePath()));
+                                                  new Path(baseDir),
+                                                  fileStatus,
+                                                  true, Long.parseLong(
+                                                        archivesTimestamps[i]),
+                                                  new Path(workDir.
+                                                        getAbsolutePath()));
+            
           }
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
         }
         }
@@ -110,10 +145,26 @@ abstract class TaskRunner extends Thread {
           String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
           String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
           Path[] p = new Path[files.length];
           Path[] p = new Path[files.length];
           for (int i = 0; i < files.length;i++){
           for (int i = 0; i < files.length;i++){
+            fileSystem = FileSystem.get(files[i], conf);
+            fileStatus = fileSystem.getFileStatus(
+                                      new Path(files[i].getPath()));
+            String cacheId = DistributedCache.makeRelative(files[i], conf);
+            String cachePath = TaskTracker.getCacheSubdir() +
+                                 Path.SEPARATOR + cacheId;
+            if (lDirAlloc.ifExists(cachePath,conf)) {
+              localPath =  lDirAlloc.getLocalPathToRead(cachePath, conf);
+            } else {
+              localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                      fileStatus.getLen(), conf);
+            }
+            baseDir = localPath.toString().replace(cacheId, "");
             p[i] = DistributedCache.getLocalCache(files[i], conf, 
             p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()), 
-                                                  false, Long.parseLong(fileTimestamps[i]), 
-                                                  new Path(workDir.getAbsolutePath()));
+                                                  new Path(baseDir),
+                                                  fileStatus,
+                                                  false, Long.parseLong(
+                                                           fileTimestamps[i]),
+                                                  new Path(workDir.
+                                                        getAbsolutePath()));
           }
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
         }

+ 38 - 11
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -601,20 +602,44 @@ public class TaskTracker
     }
     }
   }
   }
 
 
+  private LocalDirAllocator lDirAlloc = 
+                              new LocalDirAllocator("mapred.local.dir");
+
   // intialize the job directory
   // intialize the job directory
   private void localizeJob(TaskInProgress tip) throws IOException {
   private void localizeJob(TaskInProgress tip) throws IOException {
     Path localJarFile = null;
     Path localJarFile = null;
     Task t = tip.getTask();
     Task t = tip.getTask();
     String jobId = t.getJobId();
     String jobId = t.getJobId();
-    Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), 
-                                 jobId + Path.SEPARATOR + "job.xml");
+    String jobFile = t.getJobFile();
+    // Get sizes of JobFile and JarFile
+    // sizes are -1 if they are not present.
+    FileSystem fileSystem = FileSystem.get(fConf);
+    FileStatus status[] = fileSystem.listStatus(new Path(jobFile).getParent());
+    long jarFileSize = -1;
+    long jobFileSize = -1;
+    for(FileStatus stat : status) {
+      if (stat.getPath().toString().contains("job.xml")) {
+        jobFileSize = stat.getLen();
+      } else {
+        jobFileSize = -1;
+      }
+      if (stat.getPath().toString().contains("job.jar")) {
+        jarFileSize = stat.getLen();
+      } else {
+        jarFileSize = -1;
+      }
+    }
+    // Here we check for double the size of jobfile to accommodate for
+    // localize task file and we check four times the size of jarFileSize to 
+    // accommodate for unjarring the jar file in work directory 
+    Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+                                    + Path.SEPARATOR + jobId 
+                                    + Path.SEPARATOR + "job.xml"),
+                                    2 * jobFileSize + 5 * jarFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
     RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
     synchronized (rjob) {
     synchronized (rjob) {
       if (!rjob.localized) {
       if (!rjob.localized) {
-        localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), 
-                                jobId + Path.SEPARATOR + "job.jar");
   
   
-        String jobFile = t.getJobFile();
         FileSystem localFs = FileSystem.getLocal(fConf);
         FileSystem localFs = FileSystem.getLocal(fConf);
         // this will happen on a partial execution of localizeJob.
         // this will happen on a partial execution of localizeJob.
         // Sometimes the job.xml gets copied but copying job.jar
         // Sometimes the job.xml gets copied but copying job.jar
@@ -632,6 +657,7 @@ public class TaskTracker
         JobConf localJobConf = new JobConf(localJobFile);
         JobConf localJobConf = new JobConf(localJobFile);
         String jarFile = localJobConf.getJar();
         String jarFile = localJobConf.getJar();
         if (jarFile != null) {
         if (jarFile != null) {
+          localJarFile = new Path(jobDir,"job.jar");
           fs.copyToLocalFile(new Path(jarFile), localJarFile);
           fs.copyToLocalFile(new Path(jarFile), localJarFile);
           localJobConf.setJar(localJarFile.toString());
           localJobConf.setJar(localJarFile.toString());
           OutputStream out = localFs.create(localJobFile);
           OutputStream out = localFs.create(localJobFile);
@@ -1208,11 +1234,11 @@ public class TaskTracker
         localDirsDf.put(localDirs[i], df);
         localDirsDf.put(localDirs[i], df);
       }
       }
 
 
-      if (df.getAvailable() < minSpace)
-        return false;
+      if (df.getAvailable() > minSpace)
+        return true;
     }
     }
 
 
-    return true;
+    return false;
   }
   }
     
     
   /**
   /**
@@ -1345,9 +1371,10 @@ public class TaskTracker
     }
     }
         
         
     private void localizeTask(Task task) throws IOException{
     private void localizeTask(Task task) throws IOException{
-      Path localTaskDir =
-        new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()), 
-                 (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
+      Path localTaskDir = 
+        lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
+                    Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+                    task.getTaskId()), defaultJobConf );
       FileSystem localFs = FileSystem.getLocal(fConf);
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
       if (!localFs.mkdirs(localTaskDir)) {
         throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
         throw new IOException("Mkdirs failed to create " + localTaskDir.toString());