Преглед на файлове

HADOOP-2116. Changes the layout of the task execution directory. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@639247 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das преди 17 години
родител
ревизия
df68ceb236

+ 3 - 0
CHANGES.txt

@@ -37,6 +37,9 @@ Trunk (unreleased changes)
     HADOOP-2822. Remove depreceted code for classes InputFormatBase and 
     PhasedFileSystem. (Amareshwari Sriramadasu via enis)
 
+    HADOOP-2116. Changes the layout of the task execution directory. 
+    (Amareshwari Sriramadasu via ddas)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

+ 3 - 1
docs/changes.html

@@ -275,7 +275,7 @@ client side configs.<br />(Vinod Kumar Vavilapalli via ddas)</li>
 </a></h2>
 <ul id="release_0.16.2_-_unreleased_">
   <li><a href="javascript:toggleList('release_0.16.2_-_unreleased_._bug_fixes_')">  BUG FIXES
-</a>&nbsp;&nbsp;&nbsp;(5)
+</a>&nbsp;&nbsp;&nbsp;(6)
     <ol id="release_0.16.2_-_unreleased_._bug_fixes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3011">HADOOP-3011</a>. Prohibit distcp from overwriting directories on the
 destination filesystem with files.<br />(cdouglas)</li>
@@ -288,6 +288,8 @@ Also makes the _temporary a constant in MRConstants.java.<br />(Amareshwari Srir
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3003">HADOOP-3003</a>. FileSystem cache key is updated after a
 FileSystem object is created. (Tsz Wo (Nicholas), SZE via dhruba)
 </li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3042">HADOOP-3042</a>. Updates the Javadoc in JobConf.getOutputPath to reflect
+the actual temporary path.<br />(Amareshwari Sriramadasu via ddas)</li>
     </ol>
   </li>
 </ul>

+ 58 - 22
docs/mapred_tutorial.html

@@ -289,7 +289,7 @@ document.write("Last Published: " + document.lastModified);
 <a href="#Example%3A+WordCount+v2.0">Example: WordCount v2.0</a>
 <ul class="minitoc">
 <li>
-<a href="#Source+Code-N10BE0">Source Code</a>
+<a href="#Source+Code-N10C11">Source Code</a>
 </li>
 <li>
 <a href="#Sample+Runs">Sample Runs</a>
@@ -1525,6 +1525,42 @@ document.write("Last Published: " + document.lastModified);
 <span class="codefrag">&lt;/property&gt;</span>
         
 </p>
+<p>When the job starts, the localized job directory
+        <span class="codefrag"> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</span>
+        has the following directories: </p>
+<ul>
+        
+<li> A job-specific shared directory, created at location
+        <span class="codefrag">${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </span>.
+        This directory is exposed to the users through 
+        <span class="codefrag">job.local.dir </span>. The tasks can use this space as scratch
+        space and share files among them. The directory can accessed through 
+        api <a href="api/org/apache/hadoop/mapred/JobConf.html#getJobLocalDir()">
+        JobConf.getJobLocalDir()</a>. It is available as System property also.
+        So,users can call <span class="codefrag">System.getProperty("job.local.dir")</span>;
+        </li>
+        
+<li>A jars directory, which has the job jar file and expanded jar </li>
+        
+<li>A job.xml file, the generic job configuration </li>
+        
+<li>Each task has directory <span class="codefrag">task-id</span> which again has the 
+        following structure
+        <ul>
+        
+<li>A job.xml file, task localized job configuration </li>
+        
+<li>A directory for intermediate output files</li>
+        
+<li>The working directory of the task. 
+        And work directory has a temporary directory 
+        to create temporary files</li>
+        
+</ul>
+        
+</li>
+        
+</ul>
 <p>The <a href="#DistributedCache">DistributedCache</a> can also be used
         as a rudimentary software distribution mechanism for use in the map 
         and/or reduce tasks. It can be used to distribute both jars and 
@@ -1543,7 +1579,7 @@ document.write("Last Published: " + document.lastModified);
         loaded via <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#loadLibrary(java.lang.String)">
         System.loadLibrary</a> or <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#load(java.lang.String)">
         System.load</a>.</p>
-<a name="N108B9"></a><a name="Job+Submission+and+Monitoring"></a>
+<a name="N108EA"></a><a name="Job+Submission+and+Monitoring"></a>
 <h3 class="h4">Job Submission and Monitoring</h3>
 <p>
 <a href="api/org/apache/hadoop/mapred/JobClient.html">
@@ -1604,7 +1640,7 @@ document.write("Last Published: " + document.lastModified);
 <p>Normally the user creates the application, describes various facets 
         of the job via <span class="codefrag">JobConf</span>, and then uses the 
         <span class="codefrag">JobClient</span> to submit the job and monitor its progress.</p>
-<a name="N10919"></a><a name="Job+Control"></a>
+<a name="N1094A"></a><a name="Job+Control"></a>
 <h4>Job Control</h4>
 <p>Users may need to chain map-reduce jobs to accomplish complex
           tasks which cannot be done via a single map-reduce job. This is fairly
@@ -1640,7 +1676,7 @@ document.write("Last Published: " + document.lastModified);
             </li>
           
 </ul>
-<a name="N10943"></a><a name="Job+Input"></a>
+<a name="N10974"></a><a name="Job+Input"></a>
 <h3 class="h4">Job Input</h3>
 <p>
 <a href="api/org/apache/hadoop/mapred/InputFormat.html">
@@ -1688,7 +1724,7 @@ document.write("Last Published: " + document.lastModified);
         appropriate <span class="codefrag">CompressionCodec</span>. However, it must be noted that
         compressed files with the above extensions cannot be <em>split</em> and 
         each compressed file is processed in its entirety by a single mapper.</p>
-<a name="N109AD"></a><a name="InputSplit"></a>
+<a name="N109DE"></a><a name="InputSplit"></a>
 <h4>InputSplit</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/InputSplit.html">
@@ -1702,7 +1738,7 @@ document.write("Last Published: " + document.lastModified);
           FileSplit</a> is the default <span class="codefrag">InputSplit</span>. It sets 
           <span class="codefrag">map.input.file</span> to the path of the input file for the
           logical split.</p>
-<a name="N109D2"></a><a name="RecordReader"></a>
+<a name="N10A03"></a><a name="RecordReader"></a>
 <h4>RecordReader</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/RecordReader.html">
@@ -1714,7 +1750,7 @@ document.write("Last Published: " + document.lastModified);
           for processing. <span class="codefrag">RecordReader</span> thus assumes the 
           responsibility of processing record boundaries and presents the tasks 
           with keys and values.</p>
-<a name="N109F5"></a><a name="Job+Output"></a>
+<a name="N10A26"></a><a name="Job+Output"></a>
 <h3 class="h4">Job Output</h3>
 <p>
 <a href="api/org/apache/hadoop/mapred/OutputFormat.html">
@@ -1739,7 +1775,7 @@ document.write("Last Published: " + document.lastModified);
 <p>
 <span class="codefrag">TextOutputFormat</span> is the default 
         <span class="codefrag">OutputFormat</span>.</p>
-<a name="N10A1E"></a><a name="Task+Side-Effect+Files"></a>
+<a name="N10A4F"></a><a name="Task+Side-Effect+Files"></a>
 <h4>Task Side-Effect Files</h4>
 <p>In some applications, component tasks need to create and/or write to
           side-files, which differ from the actual job-output files.</p>
@@ -1766,7 +1802,7 @@ document.write("Last Published: " + document.lastModified);
           JobConf.getOutputPath()</a>, and the framework will promote them 
           similarly for succesful task-attempts, thus eliminating the need to 
           pick unique paths per task-attempt.</p>
-<a name="N10A53"></a><a name="RecordWriter"></a>
+<a name="N10A84"></a><a name="RecordWriter"></a>
 <h4>RecordWriter</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/RecordWriter.html">
@@ -1774,9 +1810,9 @@ document.write("Last Published: " + document.lastModified);
           pairs to an output file.</p>
 <p>RecordWriter implementations write the job outputs to the 
           <span class="codefrag">FileSystem</span>.</p>
-<a name="N10A6A"></a><a name="Other+Useful+Features"></a>
+<a name="N10A9B"></a><a name="Other+Useful+Features"></a>
 <h3 class="h4">Other Useful Features</h3>
-<a name="N10A70"></a><a name="Counters"></a>
+<a name="N10AA1"></a><a name="Counters"></a>
 <h4>Counters</h4>
 <p>
 <span class="codefrag">Counters</span> represent global counters, defined either by 
@@ -1790,7 +1826,7 @@ document.write("Last Published: " + document.lastModified);
           Reporter.incrCounter(Enum, long)</a> in the <span class="codefrag">map</span> and/or 
           <span class="codefrag">reduce</span> methods. These counters are then globally 
           aggregated by the framework.</p>
-<a name="N10A9B"></a><a name="DistributedCache"></a>
+<a name="N10ACC"></a><a name="DistributedCache"></a>
 <h4>DistributedCache</h4>
 <p>
 <a href="api/org/apache/hadoop/filecache/DistributedCache.html">
@@ -1823,7 +1859,7 @@ document.write("Last Published: " + document.lastModified);
           <a href="api/org/apache/hadoop/filecache/DistributedCache.html#createSymlink(org.apache.hadoop.conf.Configuration)">
           DistributedCache.createSymlink(Path, Configuration)</a> api. Files 
           have <em>execution permissions</em> set.</p>
-<a name="N10AD9"></a><a name="Tool"></a>
+<a name="N10B0A"></a><a name="Tool"></a>
 <h4>Tool</h4>
 <p>The <a href="api/org/apache/hadoop/util/Tool.html">Tool</a> 
           interface supports the handling of generic Hadoop command-line options.
@@ -1863,7 +1899,7 @@ document.write("Last Published: " + document.lastModified);
             </span>
           
 </p>
-<a name="N10B0B"></a><a name="IsolationRunner"></a>
+<a name="N10B3C"></a><a name="IsolationRunner"></a>
 <h4>IsolationRunner</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/IsolationRunner.html">
@@ -1887,13 +1923,13 @@ document.write("Last Published: " + document.lastModified);
 <p>
 <span class="codefrag">IsolationRunner</span> will run the failed task in a single 
           jvm, which can be in the debugger, over precisely the same input.</p>
-<a name="N10B3E"></a><a name="JobControl"></a>
+<a name="N10B6F"></a><a name="JobControl"></a>
 <h4>JobControl</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/jobcontrol/package-summary.html">
           JobControl</a> is a utility which encapsulates a set of Map-Reduce jobs
           and their dependencies.</p>
-<a name="N10B4B"></a><a name="Data+Compression"></a>
+<a name="N10B7C"></a><a name="Data+Compression"></a>
 <h4>Data Compression</h4>
 <p>Hadoop Map-Reduce provides facilities for the application-writer to
           specify compression for both intermediate map-outputs and the
@@ -1907,7 +1943,7 @@ document.write("Last Published: " + document.lastModified);
           codecs for reasons of both performance (zlib) and non-availability of
           Java libraries (lzo). More details on their usage and availability are
           available <a href="native_libraries.html">here</a>.</p>
-<a name="N10B6B"></a><a name="Intermediate+Outputs"></a>
+<a name="N10B9C"></a><a name="Intermediate+Outputs"></a>
 <h5>Intermediate Outputs</h5>
 <p>Applications can control compression of intermediate map-outputs
             via the 
@@ -1928,7 +1964,7 @@ document.write("Last Published: " + document.lastModified);
             <a href="api/org/apache/hadoop/mapred/JobConf.html#setMapOutputCompressionType(org.apache.hadoop.io.SequenceFile.CompressionType)">
             JobConf.setMapOutputCompressionType(SequenceFile.CompressionType)</a> 
             api.</p>
-<a name="N10B97"></a><a name="Job+Outputs"></a>
+<a name="N10BC8"></a><a name="Job+Outputs"></a>
 <h5>Job Outputs</h5>
 <p>Applications can control compression of job-outputs via the
             <a href="api/org/apache/hadoop/mapred/OutputFormatBase.html#setCompressOutput(org.apache.hadoop.mapred.JobConf,%20boolean)">
@@ -1948,7 +1984,7 @@ document.write("Last Published: " + document.lastModified);
 </div>
 
     
-<a name="N10BC6"></a><a name="Example%3A+WordCount+v2.0"></a>
+<a name="N10BF7"></a><a name="Example%3A+WordCount+v2.0"></a>
 <h2 class="h3">Example: WordCount v2.0</h2>
 <div class="section">
 <p>Here is a more complete <span class="codefrag">WordCount</span> which uses many of the
@@ -1958,7 +1994,7 @@ document.write("Last Published: " + document.lastModified);
       <a href="quickstart.html#SingleNodeSetup">pseudo-distributed</a> or
       <a href="quickstart.html#Fully-Distributed+Operation">fully-distributed</a> 
       Hadoop installation.</p>
-<a name="N10BE0"></a><a name="Source+Code-N10BE0"></a>
+<a name="N10C11"></a><a name="Source+Code-N10C11"></a>
 <h3 class="h4">Source Code</h3>
 <table class="ForrestTable" cellspacing="1" cellpadding="4">
           
@@ -3168,7 +3204,7 @@ document.write("Last Published: " + document.lastModified);
 </tr>
         
 </table>
-<a name="N11342"></a><a name="Sample+Runs"></a>
+<a name="N11373"></a><a name="Sample+Runs"></a>
 <h3 class="h4">Sample Runs</h3>
 <p>Sample text-files as input:</p>
 <p>
@@ -3336,7 +3372,7 @@ document.write("Last Published: " + document.lastModified);
 <br>
         
 </p>
-<a name="N11416"></a><a name="Highlights"></a>
+<a name="N11447"></a><a name="Highlights"></a>
 <h3 class="h4">Highlights</h3>
 <p>The second version of <span class="codefrag">WordCount</span> improves upon the 
         previous one by using some features offered by the Map-Reduce framework:

Файловите разлики са ограничени, защото са твърде много
+ 3 - 3
docs/mapred_tutorial.pdf


+ 2 - 3
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -137,11 +137,10 @@ public abstract class PipeMapRed {
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       File currentDir = new File(".").getAbsoluteFile();
-      File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
       if (new File(prog).isAbsolute()) {
         // we don't own it. Hope it is executable
       } else {
-        FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
+        FileUtil.chmod(new File(currentDir, prog).toString(), "a+x");
       }
 
       // 
@@ -153,7 +152,7 @@ public abstract class PipeMapRed {
       //
       if (!new File(argvSplit[0]).isAbsolute()) {
         PathFinder finder = new PathFinder("PATH");
-        finder.prependPathComponent(jobCacheDir.toString());
+        finder.prependPathComponent(currentDir.toString());
         File f = finder.getAbsolutePath(argvSplit[0]);
         if (f != null) {
           argvSplit[0] = f.getAbsolutePath();

+ 27 - 0
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -1062,6 +1062,33 @@
           <code>&lt;/property&gt;</code>
         </p>
         
+        <p>When the job starts, the localized job directory
+        <code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
+        has the following directories: </p>
+        <ul>
+        <li> A job-specific shared directory, created at location
+        <code>${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+        This directory is exposed to the users through 
+        <code>job.local.dir </code>. The tasks can use this space as scratch
+        space and share files among them. The directory can accessed through 
+        api <a href="ext:api/org/apache/hadoop/mapred/jobconf/getjoblocaldir">
+        JobConf.getJobLocalDir()</a>. It is available as System property also.
+        So,users can call <code>System.getProperty("job.local.dir")</code>;
+        </li>
+        <li>A jars directory, which has the job jar file and expanded jar </li>
+        <li>A job.xml file, the generic job configuration </li>
+        <li>Each task has directory <code>task-id</code> which again has the 
+        following structure
+        <ul>
+        <li>A job.xml file, task localized job configuration </li>
+        <li>A directory for intermediate output files</li>
+        <li>The working directory of the task. 
+        And work directory has a temporary directory 
+        to create temporary files</li>
+        </ul>
+        </li>
+        </ul>
+ 
         <p>The <a href="#DistributedCache">DistributedCache</a> can also be used
         as a rudimentary software distribution mechanism for use in the map 
         and/or reduce tasks. It can be used to distribute both jars and 

+ 1 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -154,6 +154,7 @@ See http://forrest.apache.org/docs/linking.html for more info.
                 <setcompressmapoutput href="#setCompressMapOutput(boolean)" />
                 <setmapoutputcompressiontype href="#setMapOutputCompressionType(org.apache.hadoop.io.SequenceFile.CompressionType)" />
                 <setmapoutputcompressorclass href="#setMapOutputCompressorClass(java.lang.Class)" />
+                <getjoblocaldir href="#getJobLocalDir()" />
               </jobconf>
               <jobconfigurable href="JobConfigurable.html">
                 <configure href="#configure(org.apache.hadoop.mapred.JobConf)" />

+ 12 - 5
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -115,13 +116,14 @@ public class IsolationRunner {
    * @param conf the jobconf
    * @throws IOException if something goes wrong writing
    */
-  private static void fillInMissingMapOutputs(FileSystem fs, 
+  private static void fillInMissingMapOutputs(FileSystem fs,
+                                              String jobId,
                                               String taskId,
                                               int numMaps,
                                               JobConf conf) throws IOException {
     Class keyClass = conf.getMapOutputKeyClass();
     Class valueClass = conf.getMapOutputValueClass();
-    MapOutputFile namer = new MapOutputFile();
+    MapOutputFile namer = new MapOutputFile(jobId);
     namer.setConf(conf);
     for(int i=0; i<numMaps; i++) {
       Path f = namer.getInputFile(i, taskId);
@@ -156,8 +158,13 @@ public class IsolationRunner {
     
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
-    File taskDir = new File(jobFilename.getParent());
-    File workDirName = new File(taskDir.getParent(), "work");
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    File workDirName = new File(lDirAlloc.getLocalPathToRead(
+                                  TaskTracker.getJobCacheSubdir() 
+                                  + Path.SEPARATOR + jobId 
+                                  + Path.SEPARATOR + taskId
+                                  + Path.SEPARATOR + "work",
+                                  conf). toString());
     local.setWorkingDirectory(new Path(workDirName.toString()));
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
@@ -179,7 +186,7 @@ public class IsolationRunner {
                          taskId, partition, splitClass, split);
     } else {
       int numMaps = conf.getNumMapTasks();
-      fillInMissingMapOutputs(local, taskId, numMaps, conf);
+      fillInMissingMapOutputs(local, jobId, taskId, numMaps, conf);
       task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId, 
                             partition, numMaps);
     }

+ 19 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -1334,6 +1334,25 @@ public class JobConf extends Configuration {
   public void setJobEndNotificationURI(String uri) {
     set("job.end.notification.url", uri);
   }
+
+  /**
+   * Get job-specific shared directory for use as scratch space
+   * 
+   * <p>
+   * When a job starts, a shared directory is created at location
+   * <code>
+   * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+   * This directory is exposed to the users through 
+   * <code>job.local.dir </code>.
+   * So, the tasks can use this space 
+   * as scratch space and share files among them. </p>
+   * This value is available as System property also.
+   * 
+   * @return The localized job specific shared directory
+   */
+  public String getJobLocalDir() {
+    return get("job.local.dir");
+  }
   
   /** 
    * Find a jar that contains a class of the same name, if any.

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -78,7 +78,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     public Job(String jobid, JobConf conf) throws IOException {
       this.file = new Path(conf.getSystemDir(), jobid + "/job.xml");
       this.id = jobid;
-      this.mapoutputFile = new MapOutputFile();
+      this.mapoutputFile = new MapOutputFile(jobid);
       this.mapoutputFile.setConf(conf);
 
       this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");

+ 50 - 24
src/java/org/apache/hadoop/mapred/MapOutputFile.java

@@ -30,6 +30,15 @@ import org.apache.hadoop.conf.*;
 class MapOutputFile {
 
   private JobConf conf;
+  private String jobDir;
+  
+  MapOutputFile() {
+  }
+
+  MapOutputFile(String jobId) {
+    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+  }
+
   private LocalDirAllocator lDirAlloc = 
                             new LocalDirAllocator("mapred.local.dir");
   
@@ -38,7 +47,9 @@ class MapOutputFile {
    */
   public Path getOutputFile(String mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/file.out", conf);
   }
 
   /** Create a local map output file name.
@@ -47,7 +58,9 @@ class MapOutputFile {
    */
   public Path getOutputFileForWrite(String mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/file.out", size, conf);
   }
 
   /** Return the path to a local map output index file created earlier
@@ -55,7 +68,9 @@ class MapOutputFile {
    */
   public Path getOutputIndexFile(String mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/file.out.index", conf);
   }
 
   /** Create a local map output index file name.
@@ -64,7 +79,9 @@ class MapOutputFile {
    */
   public Path getOutputIndexFileForWrite(String mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index", 
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/file.out.index", 
                                           size, conf);
   }
 
@@ -74,8 +91,10 @@ class MapOutputFile {
    */
   public Path getSpillFile(String mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
-                                        conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/spill" 
+                                        + spillNumber + ".out", conf);
   }
 
   /** Create a local map spill file name.
@@ -85,9 +104,10 @@ class MapOutputFile {
    */
   public Path getSpillFileForWrite(String mapTaskId, int spillNumber, 
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(mapTaskId+
-                                                  "/spill" +spillNumber+".out",
-                                                  size, conf);
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/spill" + 
+                                          spillNumber + ".out", size, conf);
   }
 
   /** Return a local map spill index file created earlier
@@ -96,8 +116,10 @@ class MapOutputFile {
    */
   public Path getSpillIndexFile(String mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        mapTaskId+"/spill" +spillNumber+".out.index", conf);
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        mapTaskId + Path.SEPARATOR +
+                                        "output" + "/spill" + 
+                                        spillNumber + ".out.index", conf);
   }
 
   /** Create a local map spill index file name.
@@ -107,8 +129,10 @@ class MapOutputFile {
    */
   public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          mapTaskId + Path.SEPARATOR +
+                                          "output" + "/spill" + spillNumber + 
+                                          ".out.index", size, conf);
   }
 
   /** Return a local reduce input file created earlier
@@ -118,7 +142,9 @@ class MapOutputFile {
   public Path getInputFile(int mapId, String reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
+    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+                                        reduceTaskId + Path.SEPARATOR + 
+                                        "output" + "/map_" + mapId + ".out",
                                         conf);
   }
 
@@ -130,21 +156,16 @@ class MapOutputFile {
   public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
+    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+                                          reduceTaskId + Path.SEPARATOR +
+                                          "output" + "/map_" + mapId + ".out",
                                           size, conf);
   }
 
   /** Removes all of the files related to a task. */
   public void removeAll(String taskId) throws IOException {
-    conf.deleteLocalFiles(taskId);
-  }
-
-  /** 
-   * Removes all contents of temporary storage.  Called upon 
-   * startup, to remove any leftovers from previous run.
-   */
-  public void cleanupStorage() throws IOException {
-    conf.deleteLocalFiles();
+    conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
+                          taskId + Path.SEPARATOR + "output");
   }
 
   public void setConf(Configuration conf) {
@@ -154,4 +175,9 @@ class MapOutputFile {
       this.conf = new JobConf(conf);
     }
   }
+  
+  public void setJobId(String jobId) {
+    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+  }
+
 }

+ 11 - 7
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -48,14 +48,16 @@ class MapOutputLocation implements Writable, MRConstants {
   private int mapId;
   private String host;
   private int port;
+  private String jobId;
 
   /** RPC constructor **/
   public MapOutputLocation() {
   }
 
   /** Construct a location. */
-  public MapOutputLocation(String mapTaskId, int mapId, 
+  public MapOutputLocation(String jobId, String mapTaskId, int mapId, 
                            String host, int port) {
+    this.jobId = jobId;
     this.mapTaskId = mapTaskId;
     this.mapId = mapId;
     this.host = host;
@@ -80,22 +82,24 @@ class MapOutputLocation implements Writable, MRConstants {
   public int getPort() { return port; }
 
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, mapTaskId);
+    out.writeUTF(jobId);
+    out.writeUTF(mapTaskId);
     out.writeInt(mapId);
-    UTF8.writeString(out, host);
+    out.writeUTF(host);
     out.writeInt(port);
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.mapTaskId = UTF8.readString(in);
+    this.jobId = in.readUTF();
+    this.mapTaskId = in.readUTF();
     this.mapId = in.readInt();
-    this.host = UTF8.readString(in);
+    this.host = in.readUTF();
     this.port = in.readInt();
   }
 
   public String toString() {
-    return "http://" + host + ":" + port + "/mapOutput?map=" + 
-      mapTaskId;
+    return "http://" + host + ":" + port + "/mapOutput?job=" + jobId +
+           "&map=" + mapTaskId;
   }
   
   /**

+ 7 - 10
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -46,7 +46,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InMemoryFileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -807,7 +806,10 @@ class ReduceTask extends Task {
         // a temp filename. If this file gets created in ramfs, we're fine,
         // else, we will check the localFS to find a suitable final location
         // for this path
-        Path filename = new Path("/" + reduceId + "/map_" +
+        Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
+                                 Path.SEPARATOR + getJobId() +
+                                 Path.SEPARATOR + reduceId +
+                                 Path.SEPARATOR + "output" + "/map_" +
                                  loc.getMapId() + ".out");
         // a working filename that will be unique to this attempt
         Path tmpFilename = new Path(filename + "-" + id);
@@ -903,13 +905,7 @@ class ReduceTask extends Task {
       // add the jars and directories to the classpath
       String jar = conf.getJar();
       if (jar != null) {      
-        LocalDirAllocator lDirAlloc = 
-                            new LocalDirAllocator("mapred.local.dir");
-        File jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
-                                      TaskTracker.getJobCacheSubdir() 
-                                      + Path.SEPARATOR + getJobId() 
-                                      + Path.SEPARATOR  
-                                      + "work", conf).toString());
+        File jobCacheDir = new File(new Path(jar).getParent().toString());
 
         File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
@@ -1484,7 +1480,8 @@ class ReduceTask extends Task {
               maxFetchRetriesPerMap = 
                   getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
             }
-            knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
+            knownOutputs.add(new MapOutputLocation(reduceTask.getJobId(),
+                             taskId, mId, host, port));
           }
           break;
           case FAILED:

+ 2 - 0
src/java/org/apache/hadoop/mapred/Task.java

@@ -114,6 +114,7 @@ abstract class Task implements Writable, Configurable {
                                                     TaskStatus.Phase.MAP : 
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
+    this.mapOutputFile.setJobId(jobId);
   }
 
   ////////////////////////////////////////////
@@ -186,6 +187,7 @@ abstract class Task implements Writable, Configurable {
       taskOutputPath = null;
     }
     taskStatus.readFields(in);
+    this.mapOutputFile.setJobId(jobId); 
   }
 
   public String toString() { return taskId; }

+ 11 - 10
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -54,7 +54,7 @@ abstract class TaskRunner extends Thread {
     this.t = t;
     this.tracker = tracker;
     this.conf = conf;
-    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile = new MapOutputFile(t.getJobId());
     this.mapOutputFile.setConf(conf);
   }
 
@@ -91,19 +91,20 @@ abstract class TaskRunner extends Thread {
       
       //before preparing the job localize 
       //all the archives
-      File workDir = new File(t.getJobFile()).getParentFile();
       String taskid = t.getTaskId();
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File jobCacheDir = null;
-      try {
-        jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
-                                    TaskTracker.getJobCacheSubdir() 
-                                    + Path.SEPARATOR + t.getJobId() 
-                                    + Path.SEPARATOR  
-                                    + "work", conf).toString());
-      } catch (IOException ioe) {
-        LOG.warn("work directory doesnt exist");
+      if (conf.getJar() != null) {
+        jobCacheDir = new File(
+                          new Path(conf.getJar()).getParent().toString());
       }
+      File workDir = new File(lDirAlloc.getLocalPathToRead(
+                                TaskTracker.getJobCacheSubdir() 
+                                + Path.SEPARATOR + t.getJobId() 
+                                + Path.SEPARATOR + t.getTaskId()
+                                + Path.SEPARATOR + "work",
+                                conf). toString());
+
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       FileStatus fileStatus;

+ 76 - 26
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -152,7 +152,6 @@ public class TaskTracker
   private static final String JOBCACHE = "jobcache";
   private JobConf originalConf;
   private JobConf fConf;
-  private MapOutputFile mapOutputFile;
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
@@ -448,7 +447,7 @@ public class TaskTracker
 
     // Clear out temporary files that might be lying around
     DistributedCache.purgeCache(this.fConf);
-    this.mapOutputFile.cleanupStorage();
+    cleanupStorage();
     this.justStarted = true;
 
     this.jobClient = (InterTrackerProtocol) 
@@ -465,7 +464,15 @@ public class TaskTracker
                              taskTrackerName);
     mapEventsFetcher.start();
   }
-    
+  
+  /** 
+   * Removes all contents of temporary storage.  Called upon 
+   * startup, to remove any leftovers from previous run.
+   */
+  public void cleanupStorage() throws IOException {
+    this.fConf.deleteLocalFiles();
+  }
+
   // Object on wait which MapEventsFetcherThread is going to wait.
   private Object waitingOn = new Object();
 
@@ -638,13 +645,11 @@ public class TaskTracker
         jarFileSize = -1;
       }
     }
-    // Here we check for double the size of jobfile to accommodate for
-    // localize task file and we check four times the size of jarFileSize to 
-    // accommodate for unjarring the jar file in work directory 
+
     Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
                                     + Path.SEPARATOR + jobId 
                                     + Path.SEPARATOR + "job.xml"),
-                                    2 * jobFileSize + 5 * jarFileSize, fConf);
+                                    jobFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
     synchronized (rjob) {
       if (!rjob.localized) {
@@ -667,18 +672,30 @@ public class TaskTracker
         JobConf localJobConf = new JobConf(localJobFile);
         
         // create the 'work' directory
-        File workDir = new File(new File(localJobFile.toString()).getParent(),
-                                "work");
-        if (!workDir.mkdirs()) {
-          if (!workDir.isDirectory()) {
-            throw new IOException("Mkdirs failed to create " + workDir.toString());
-          }
+        // job-specific shared directory for use as scratch space 
+        Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+                       + Path.SEPARATOR + jobId 
+                       + Path.SEPARATOR + "work"), fConf);
+        if (!localFs.mkdirs(workDir)) {
+          throw new IOException("Mkdirs failed to create " 
+                      + workDir.toString());
         }
+        System.setProperty("job.local.dir", workDir.toString());
+        localJobConf.set("job.local.dir", workDir.toString());
         
-        // unjar the job.jar files in workdir
+        // copy Jar file to the local FS and unjar it.
         String jarFile = localJobConf.getJar();
         if (jarFile != null) {
-          localJarFile = new Path(jobDir,"job.jar");
+          // Here we check for and we check five times the size of jarFileSize
+          // to accommodate for unjarring the jar file in work directory 
+          localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
+                                     getJobCacheSubdir()
+                                     + Path.SEPARATOR + jobId 
+                                     + Path.SEPARATOR + "jars",
+                                     5 * jarFileSize, fConf), "job.jar");
+          if (!localFs.mkdirs(localJarFile.getParent())) {
+            throw new IOException("Mkdirs failed to create jars directory "); 
+          }
           fs.copyToLocalFile(new Path(jarFile), localJarFile);
           localJobConf.setJar(localJarFile.toString());
           OutputStream out = localFs.create(localJobFile);
@@ -687,8 +704,9 @@ public class TaskTracker
           } finally {
             out.close();
           }
-
-          RunJar.unJar(new File(localJarFile.toString()), workDir);
+          // also unjar the job.jar files 
+          RunJar.unJar(new File(localJarFile.toString()),
+                       new File(localJarFile.getParent().toString()));
         }
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
@@ -763,7 +781,7 @@ public class TaskTracker
     this.running = false;
         
     // Clear local storage
-    this.mapOutputFile.cleanupStorage();
+    cleanupStorage();
         
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
@@ -782,8 +800,6 @@ public class TaskTracker
     maxCurrentReduceTasks = conf.getInt(
                   "mapred.tasktracker.reduce.tasks.maximum", 2);
     this.jobTrackAddr = JobTracker.getAddress(conf);
-    this.mapOutputFile = new MapOutputFile();
-    this.mapOutputFile.setConf(conf);
     String infoAddr = 
       NetUtils.getServerAddress(conf,
                                 "tasktracker.http.bindAddress", 
@@ -1370,7 +1386,11 @@ public class TaskTracker
                     Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
                     task.getTaskId()), defaultJobConf );
       FileSystem localFs = FileSystem.getLocal(fConf);
-      
+      if (!localFs.mkdirs(localTaskDir)) {
+        throw new IOException("Mkdirs failed to create " 
+                    + localTaskDir.toString());
+      }
+
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
                          TaskTracker.getJobCacheSubdir() 
@@ -1384,9 +1404,17 @@ public class TaskTracker
         FileUtil.symLink(workDir, link);
       
       // create the working-directory of the task 
-      if (!localFs.mkdirs(localTaskDir)) {
-        throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
+      Path cwd = lDirAlloc.getLocalPathForWrite(
+                         TaskTracker.getJobCacheSubdir() 
+                         + Path.SEPARATOR + task.getJobId() 
+                         + Path.SEPARATOR + task.getTaskId()
+                         + Path.SEPARATOR + "work",
+                         defaultJobConf);
+      if (!localFs.mkdirs(cwd)) {
+        throw new IOException("Mkdirs failed to create " 
+                    + cwd.toString());
       }
+
       Path localTaskFile = new Path(localTaskDir, "job.xml");
       task.setJobFile(localTaskFile.toString());
       localJobConf.set("mapred.local.dir",
@@ -1598,7 +1626,19 @@ public class TaskTracker
               } catch(IOException e){
                 LOG.warn("Exception finding task's stdout/err/syslog files");
               }
-              File workDir = new File(task.getJobFile()).getParentFile();
+              File workDir = null;
+              try {
+                workDir = new File(lDirAlloc.getLocalPathToRead(
+                                     TaskTracker.getJobCacheSubdir() 
+                                     + Path.SEPARATOR + task.getJobId() 
+                                     + Path.SEPARATOR + task.getTaskId()
+                                     + Path.SEPARATOR + "work",
+                                     localJobConf). toString());
+              } catch (IOException e) {
+                LOG.warn("Working Directory of the task " + task.getTaskId() +
+                		 "doesnt exist. Throws expetion " +
+                          StringUtils.stringifyException(e));
+              }
               // Build the command  
               File stdout = TaskLog.getTaskLogFile(task.getTaskId(),
                                                    TaskLog.LogName.DEBUGOUT);
@@ -2216,6 +2256,12 @@ public class TaskTracker
                       ) throws ServletException, IOException {
       String mapId = request.getParameter("map");
       String reduceId = request.getParameter("reduce");
+      String jobId = request.getParameter("job");
+
+      if (jobId == null) {
+        throw new IOException("job parameter is required");
+      }
+
       if (mapId == null || reduceId == null) {
         throw new IOException("map and reduce parameters are required");
       }
@@ -2241,11 +2287,15 @@ public class TaskTracker
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
-            mapId+"/file.out.index", conf);
+            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
+            jobId + Path.SEPARATOR +
+            mapId + "/output" + "/file.out.index", conf);
         
         // Map-output file
         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            mapId+"/file.out", conf);
+            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
+            jobId + Path.SEPARATOR +
+            mapId + "/output" + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

+ 5 - 1
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -256,12 +256,14 @@ public class TestMapRed extends TestCase {
     private JobConf conf;
     private boolean compressInput;
     private String taskId;
+    private String jobId;
     private boolean first = true;
       
     public void configure(JobConf conf) {
       this.conf = conf;
       compressInput = conf.getCompressMapOutput();
       taskId = conf.get("mapred.task.id");
+      jobId = conf.get("mapred.job.id");
     }
       
     public void reduce(WritableComparable key, Iterator values,
@@ -269,7 +271,9 @@ public class TestMapRed extends TestCase {
                        ) throws IOException {
       if (first) {
         first = false;
-        Path input = conf.getLocalPath(taskId+"/map_0.out");
+        MapOutputFile mapOutputFile = new MapOutputFile(jobId);
+        mapOutputFile.setConf(conf);
+        Path input = mapOutputFile.getInputFile(0, taskId);
         FileSystem fs = FileSystem.get(conf);
         assertTrue("reduce input exists " + input, fs.exists(input));
         SequenceFile.Reader rdr = 

+ 7 - 5
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -144,12 +144,14 @@ public class TestMiniMRWithDFS extends TestCase {
         String name = contents[fileIdx];
         if (!("taskTracker".equals(contents[fileIdx]))) {
           LOG.debug("Looking at " + name);
-          int idx = neededDirs.indexOf(name);
           assertTrue("Spurious directory " + name + " found in " +
-                     localDir, idx != -1);
-          assertTrue("Matching output directory not found " + name +
-                     " in " + trackerDir, 
-                     new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
+                     localDir, false);
+        }
+      }
+      for (int idx = 0; idx < neededDirs.size(); ++idx) {
+        String name = neededDirs.get(idx);
+        if (new File(new File(new File(trackerDir, "jobcache"),
+                              jobIds[idx]), name).isDirectory()) {
           found[idx] = true;
           numNotDel++;
         }  

Някои файлове не бяха показани, защото твърде много файлове са промени