Browse Source

HADOOP-2131. Allow finer-grained control over speculative-execution. Now users can set it for maps and reduces independently. Contributed by Amareshwari Sri Ramadasu.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@610910 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 years ago
parent
commit
5c1fd2f4d6

+ 9 - 0
CHANGES.txt

@@ -191,6 +191,15 @@ Trunk (unreleased changes)
     HADOOP-2406. Add a benchmark for measuring read/write performance through
     the InputFormat interface, particularly with compression. (cdouglas)
 
+    HADOOP-2131. Allow finer-grained control over speculative-execution. Now
+    users can set it for maps and reduces independently.
+    Configuration changes to hadoop-default.xml:
+      deprecated mapred.speculative.execution
+      add mapred.map.tasks.speculative.execution
+      add mapred.reduce.tasks.speculative.execution
+    (Amareshwari Sri Ramadasu via acmurthy) 
+      
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 9 - 2
conf/hadoop-default.xml

@@ -678,9 +678,16 @@ creations/deletions), or "all".</description>
 </property>
 
 <property>
-  <name>mapred.speculative.execution</name>
+  <name>mapred.map.tasks.speculative.execution</name>
   <value>true</value>
-  <description>If true, then multiple instances of some map and reduce tasks 
+  <description>If true, then multiple instances of some map tasks 
+               may be executed in parallel.</description>
+</property>
+
+<property>
+  <name>mapred.reduce.tasks.speculative.execution</name>
+  <value>true</value>
+  <description>If true, then multiple instances of some reduce tasks 
                may be executed in parallel.</description>
 </property>
 

+ 12 - 2
docs/hadoop-default.html

@@ -163,7 +163,7 @@ creations/deletions), or "all".</td>
       directories, for redundancy. </td>
 </tr>
 <tr>
-<td><a name="dfs.permissions">dfs.permissions</a></td><td>false</td><td>
+<td><a name="dfs.permissions">dfs.permissions</a></td><td>true</td><td>
     If "true", enable permission checking in HDFS.
     If "false", permission checking is turned off,
     but all other behavior is unchanged.
@@ -261,6 +261,12 @@ creations/deletions), or "all".</td>
   excluded.</td>
 </tr>
 <tr>
+<td><a name="dfs.max.objects">dfs.max.objects</a></td><td>0</td><td>The maximum number of files, directories and blocks
+  dfs supports. A value of zero indicates no limit to the number
+  of objects that dfs supports.
+  </td>
+</tr>
+<tr>
 <td><a name="fs.s3.block.size">fs.s3.block.size</a></td><td>67108864</td><td>Block size to use when writing files to S3.</td>
 </tr>
 <tr>
@@ -412,7 +418,11 @@ creations/deletions), or "all".</td>
   </td>
 </tr>
 <tr>
-<td><a name="mapred.speculative.execution">mapred.speculative.execution</a></td><td>true</td><td>If true, then multiple instances of some map and reduce tasks 
+<td><a name="mapred.map.tasks.speculative.execution">mapred.map.tasks.speculative.execution</a></td><td>true</td><td>If true, then multiple instances of some map tasks 
+               may be executed in parallel.</td>
+</tr>
+<tr>
+<td><a name="mapred.reduce.tasks.speculative.execution">mapred.reduce.tasks.speculative.execution</a></td><td>true</td><td>If true, then multiple instances of some reduce tasks 
                may be executed in parallel.</td>
 </tr>
 <tr>

+ 24 - 24
docs/mapred_tutorial.html

@@ -277,7 +277,7 @@ document.write("Last Published: " + document.lastModified);
 <a href="#Example%3A+WordCount+v2.0">Example: WordCount v2.0</a>
 <ul class="minitoc">
 <li>
-<a href="#Source+Code-N10B98">Source Code</a>
+<a href="#Source+Code-N10B9C">Source Code</a>
 </li>
 <li>
 <a href="#Sample+Runs">Sample Runs</a>
@@ -1453,7 +1453,7 @@ document.write("Last Published: " + document.lastModified);
         user-provided scripts
         (<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapDebugScript(java.lang.String)">setMapDebugScript(String)</a>/<a href="api/org/apache/hadoop/mapred/JobConf.html#setReduceDebugScript(java.lang.String)">setReduceDebugScript(String)</a>) 
         , whether job tasks can be executed in a <em>speculative</em> manner 
-        (<a href="api/org/apache/hadoop/mapred/JobConf.html#setSpeculativeExecution(boolean)">setSpeculativeExecution(boolean)</a>)
+        (<a href="api/org/apache/hadoop/mapred/JobConf.html#setMapSpeculativeExecution(boolean)">setMapSpeculativeExecution(boolean)</a>)/(<a href="api/org/apache/hadoop/mapred/JobConf.html#setReduceSpeculativeExecution(boolean)">setReduceSpeculativeExecution(boolean)</a>)
         , maximum number of attempts per task
         (<a href="api/org/apache/hadoop/mapred/JobConf.html#setMaxMapAttempts(int)">setMaxMapAttempts(int)</a>/<a href="api/org/apache/hadoop/mapred/JobConf.html#setMaxReduceAttempts(int)">setMaxReduceAttempts(int)</a>) 
         , percentage of tasks failure which can be tolerated by the job
@@ -1463,7 +1463,7 @@ document.write("Last Published: " + document.lastModified);
         <a href="api/org/apache/hadoop/conf/Configuration.html#set(java.lang.String, java.lang.String)">set(String, String)</a>/<a href="api/org/apache/hadoop/conf/Configuration.html#get(java.lang.String, java.lang.String)">get(String, String)</a>
         to set/get arbitrary parameters needed by applications. However, use the 
         <span class="codefrag">DistributedCache</span> for large amounts of (read-only) data.</p>
-<a name="N1082C"></a><a name="Task+Execution+%26+Environment"></a>
+<a name="N10830"></a><a name="Task+Execution+%26+Environment"></a>
 <h3 class="h4">Task Execution &amp; Environment</h3>
 <p>The <span class="codefrag">TaskTracker</span> executes the <span class="codefrag">Mapper</span>/ 
         <span class="codefrag">Reducer</span>  <em>task</em> as a child process in a separate jvm.
@@ -1523,7 +1523,7 @@ document.write("Last Published: " + document.lastModified);
         loaded via <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#loadLibrary(java.lang.String)">
         System.loadLibrary</a> or <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/lang/System.html#load(java.lang.String)">
         System.load</a>.</p>
-<a name="N108A1"></a><a name="Job+Submission+and+Monitoring"></a>
+<a name="N108A5"></a><a name="Job+Submission+and+Monitoring"></a>
 <h3 class="h4">Job Submission and Monitoring</h3>
 <p>
 <a href="api/org/apache/hadoop/mapred/JobClient.html">
@@ -1559,7 +1559,7 @@ document.write("Last Published: " + document.lastModified);
 <p>Normally the user creates the application, describes various facets 
         of the job via <span class="codefrag">JobConf</span>, and then uses the 
         <span class="codefrag">JobClient</span> to submit the job and monitor its progress.</p>
-<a name="N108DF"></a><a name="Job+Control"></a>
+<a name="N108E3"></a><a name="Job+Control"></a>
 <h4>Job Control</h4>
 <p>Users may need to chain map-reduce jobs to accomplish complex
           tasks which cannot be done via a single map-reduce job. This is fairly
@@ -1595,7 +1595,7 @@ document.write("Last Published: " + document.lastModified);
             </li>
           
 </ul>
-<a name="N10909"></a><a name="Job+Input"></a>
+<a name="N1090D"></a><a name="Job+Input"></a>
 <h3 class="h4">Job Input</h3>
 <p>
 <a href="api/org/apache/hadoop/mapred/InputFormat.html">
@@ -1643,7 +1643,7 @@ document.write("Last Published: " + document.lastModified);
         appropriate <span class="codefrag">CompressionCodec</span>. However, it must be noted that
         compressed files with the above extensions cannot be <em>split</em> and 
         each compressed file is processed in its entirety by a single mapper.</p>
-<a name="N10973"></a><a name="InputSplit"></a>
+<a name="N10977"></a><a name="InputSplit"></a>
 <h4>InputSplit</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/InputSplit.html">
@@ -1657,7 +1657,7 @@ document.write("Last Published: " + document.lastModified);
           FileSplit</a> is the default <span class="codefrag">InputSplit</span>. It sets 
           <span class="codefrag">map.input.file</span> to the path of the input file for the
           logical split.</p>
-<a name="N10998"></a><a name="RecordReader"></a>
+<a name="N1099C"></a><a name="RecordReader"></a>
 <h4>RecordReader</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/RecordReader.html">
@@ -1669,7 +1669,7 @@ document.write("Last Published: " + document.lastModified);
           for processing. <span class="codefrag">RecordReader</span> thus assumes the 
           responsibility of processing record boundaries and presents the tasks 
           with keys and values.</p>
-<a name="N109BB"></a><a name="Job+Output"></a>
+<a name="N109BF"></a><a name="Job+Output"></a>
 <h3 class="h4">Job Output</h3>
 <p>
 <a href="api/org/apache/hadoop/mapred/OutputFormat.html">
@@ -1694,7 +1694,7 @@ document.write("Last Published: " + document.lastModified);
 <p>
 <span class="codefrag">TextOutputFormat</span> is the default 
         <span class="codefrag">OutputFormat</span>.</p>
-<a name="N109E4"></a><a name="Task+Side-Effect+Files"></a>
+<a name="N109E8"></a><a name="Task+Side-Effect+Files"></a>
 <h4>Task Side-Effect Files</h4>
 <p>In some applications, component tasks need to create and/or write to
           side-files, which differ from the actual job-output files.</p>
@@ -1720,7 +1720,7 @@ document.write("Last Published: " + document.lastModified);
           JobConf.getOutputPath()</a>, and the framework will promote them 
           similarly for succesful task-attempts, thus eliminating the need to 
           pick unique paths per task-attempt.</p>
-<a name="N10A19"></a><a name="RecordWriter"></a>
+<a name="N10A1D"></a><a name="RecordWriter"></a>
 <h4>RecordWriter</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/RecordWriter.html">
@@ -1728,9 +1728,9 @@ document.write("Last Published: " + document.lastModified);
           pairs to an output file.</p>
 <p>RecordWriter implementations write the job outputs to the 
           <span class="codefrag">FileSystem</span>.</p>
-<a name="N10A30"></a><a name="Other+Useful+Features"></a>
+<a name="N10A34"></a><a name="Other+Useful+Features"></a>
 <h3 class="h4">Other Useful Features</h3>
-<a name="N10A36"></a><a name="Counters"></a>
+<a name="N10A3A"></a><a name="Counters"></a>
 <h4>Counters</h4>
 <p>
 <span class="codefrag">Counters</span> represent global counters, defined either by 
@@ -1744,7 +1744,7 @@ document.write("Last Published: " + document.lastModified);
           Reporter.incrCounter(Enum, long)</a> in the <span class="codefrag">map</span> and/or 
           <span class="codefrag">reduce</span> methods. These counters are then globally 
           aggregated by the framework.</p>
-<a name="N10A61"></a><a name="DistributedCache"></a>
+<a name="N10A65"></a><a name="DistributedCache"></a>
 <h4>DistributedCache</h4>
 <p>
 <a href="api/org/apache/hadoop/filecache/DistributedCache.html">
@@ -1777,7 +1777,7 @@ document.write("Last Published: " + document.lastModified);
           <a href="api/org/apache/hadoop/filecache/DistributedCache.html#createSymlink(org.apache.hadoop.conf.Configuration)">
           DistributedCache.createSymlink(Path, Configuration)</a> api. Files 
           have <em>execution permissions</em> set.</p>
-<a name="N10A9F"></a><a name="Tool"></a>
+<a name="N10AA3"></a><a name="Tool"></a>
 <h4>Tool</h4>
 <p>The <a href="api/org/apache/hadoop/util/Tool.html">Tool</a> 
           interface supports the handling of generic Hadoop command-line options.
@@ -1817,7 +1817,7 @@ document.write("Last Published: " + document.lastModified);
             </span>
           
 </p>
-<a name="N10AD1"></a><a name="IsolationRunner"></a>
+<a name="N10AD5"></a><a name="IsolationRunner"></a>
 <h4>IsolationRunner</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/IsolationRunner.html">
@@ -1841,13 +1841,13 @@ document.write("Last Published: " + document.lastModified);
 <p>
 <span class="codefrag">IsolationRunner</span> will run the failed task in a single 
           jvm, which can be in the debugger, over precisely the same input.</p>
-<a name="N10B04"></a><a name="JobControl"></a>
+<a name="N10B08"></a><a name="JobControl"></a>
 <h4>JobControl</h4>
 <p>
 <a href="api/org/apache/hadoop/mapred/jobcontrol/package-summary.html">
           JobControl</a> is a utility which encapsulates a set of Map-Reduce jobs
           and their dependencies.</p>
-<a name="N10B11"></a><a name="Data+Compression"></a>
+<a name="N10B15"></a><a name="Data+Compression"></a>
 <h4>Data Compression</h4>
 <p>Hadoop Map-Reduce provides facilities for the application-writer to
           specify compression for both intermediate map-outputs and the
@@ -1861,7 +1861,7 @@ document.write("Last Published: " + document.lastModified);
           codecs for reasons of both performance (zlib) and non-availability of
           Java libraries (lzo). More details on their usage and availability are
           available <a href="native_libraries.html">here</a>.</p>
-<a name="N10B31"></a><a name="Intermediate+Outputs"></a>
+<a name="N10B35"></a><a name="Intermediate+Outputs"></a>
 <h5>Intermediate Outputs</h5>
 <p>Applications can control compression of intermediate map-outputs
             via the 
@@ -1882,7 +1882,7 @@ document.write("Last Published: " + document.lastModified);
             <a href="api/org/apache/hadoop/mapred/JobConf.html#setMapOutputCompressionType(org.apache.hadoop.io.SequenceFile.CompressionType)">
             JobConf.setMapOutputCompressionType(SequenceFile.CompressionType)</a> 
             api.</p>
-<a name="N10B5D"></a><a name="Job+Outputs"></a>
+<a name="N10B61"></a><a name="Job+Outputs"></a>
 <h5>Job Outputs</h5>
 <p>Applications can control compression of job-outputs via the
             <a href="api/org/apache/hadoop/mapred/OutputFormatBase.html#setCompressOutput(org.apache.hadoop.mapred.JobConf,%20boolean)">
@@ -1902,12 +1902,12 @@ document.write("Last Published: " + document.lastModified);
 </div>
 
     
-<a name="N10B8C"></a><a name="Example%3A+WordCount+v2.0"></a>
+<a name="N10B90"></a><a name="Example%3A+WordCount+v2.0"></a>
 <h2 class="h3">Example: WordCount v2.0</h2>
 <div class="section">
 <p>Here is a more complete <span class="codefrag">WordCount</span> which uses many of the
       features provided by the Map-Reduce framework we discussed so far:</p>
-<a name="N10B98"></a><a name="Source+Code-N10B98"></a>
+<a name="N10B9C"></a><a name="Source+Code-N10B9C"></a>
 <h3 class="h4">Source Code</h3>
 <table class="ForrestTable" cellspacing="1" cellpadding="4">
           
@@ -3085,7 +3085,7 @@ document.write("Last Published: " + document.lastModified);
 </tr>
         
 </table>
-<a name="N112CA"></a><a name="Sample+Runs"></a>
+<a name="N112CE"></a><a name="Sample+Runs"></a>
 <h3 class="h4">Sample Runs</h3>
 <p>Sample text-files as input:</p>
 <p>
@@ -3250,7 +3250,7 @@ document.write("Last Published: " + document.lastModified);
 <br>
         
 </p>
-<a name="N1139A"></a><a name="Salient+Points"></a>
+<a name="N1139E"></a><a name="Salient+Points"></a>
 <h3 class="h4">Salient Points</h3>
 <p>The second version of <span class="codefrag">WordCount</span> improves upon the 
         previous one by using some features offered by the Map-Reduce framework:

File diff suppressed because it is too large
+ 1 - 1
docs/mapred_tutorial.pdf


+ 3 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -502,8 +502,10 @@ public class StreamJob {
     System.out.println("  because the input filename and the map input order are preserved");
     System.out.println("  This equivalent -reducer NONE");
     System.out.println();
+    System.out.println("To speed up the last maps:");
+    System.out.println("  -jobconf mapred.map.tasks.speculative.execution=true");
     System.out.println("To speed up the last reduces:");
-    System.out.println("  -jobconf mapred.speculative.execution=true");
+    System.out.println("  -jobconf mapred.reduce.tasks.speculative.execution=true");
     System.out.println("To name the job (appears in the JobTracker Web UI):");
     System.out.println("  -jobconf mapred.job.name='My Job' ");
     System.out.println("To change the local temp directory:");

+ 1 - 1
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -989,7 +989,7 @@
         user-provided scripts
         (<a href="ext:api/org/apache/hadoop/mapred/jobconf/setmapdebugscript">setMapDebugScript(String)</a>/<a href="ext:api/org/apache/hadoop/mapred/jobconf/setreducedebugscript">setReduceDebugScript(String)</a>) 
         , whether job tasks can be executed in a <em>speculative</em> manner 
-        (<a href="ext:api/org/apache/hadoop/mapred/jobconf/setspeculativeexecution">setSpeculativeExecution(boolean)</a>)
+        (<a href="ext:api/org/apache/hadoop/mapred/jobconf/setmapspeculativeexecution">setMapSpeculativeExecution(boolean)</a>)/(<a href="ext:api/org/apache/hadoop/mapred/jobconf/setreducespeculativeexecution">setReduceSpeculativeExecution(boolean)</a>)
         , maximum number of attempts per task
         (<a href="ext:api/org/apache/hadoop/mapred/jobconf/setmaxmapattempts">setMaxMapAttempts(int)</a>/<a href="ext:api/org/apache/hadoop/mapred/jobconf/setmaxreduceattempts">setMaxReduceAttempts(int)</a>) 
         , percentage of tasks failure which can be tolerated by the job

+ 2 - 1
src/docs/src/documentation/content/xdocs/site.xml

@@ -110,7 +110,8 @@ See http://forrest.apache.org/docs/linking.html for more info.
                 <setcombinerclass href="#setCombinerClass(java.lang.Class)" />
                 <setmapdebugscript href="#setMapDebugScript(java.lang.String)" />
                 <setreducedebugscript href="#setReduceDebugScript(java.lang.String)" />
-                <setspeculativeexecution href="#setSpeculativeExecution(boolean)" />
+                <setmapspeculativeexecution href="#setMapSpeculativeExecution(boolean)" />
+                <setreducespeculativeexecution href="#setReduceSpeculativeExecution(boolean)" />
                 <setmaxmapattempts href="#setMaxMapAttempts(int)" />
                 <setmaxreduceattempts href="#setMaxReduceAttempts(int)" />
                 <setmaxmaptaskfailurespercent href="#setMaxMapTaskFailuresPercent(int)" />

+ 53 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -828,6 +828,9 @@ public class JobConf extends Configuration {
   }
   
   /**
+   * @deprecated Use {{@link #getMapSpeculativeExecution()} or
+   *             {@link #getReduceSpeculativeExecution()} instead.
+   * 
    * Should speculative execution be used for this job? 
    * Defaults to <code>true</code>.
    * 
@@ -839,6 +842,9 @@ public class JobConf extends Configuration {
   }
   
   /**
+   * @deprecated Use {@link #setMapSpeculativeExecution(boolean)} or
+   *                 {@link #setReduceSpeculativeExecution(boolean)} instead. 
+   * 
    * Turn speculative execution on or off for this job. 
    * 
    * @param speculativeExecution <code>true</code> if speculative execution 
@@ -847,7 +853,54 @@ public class JobConf extends Configuration {
   public void setSpeculativeExecution(boolean speculativeExecution) {
     setBoolean("mapred.speculative.execution", speculativeExecution);
   }
+
+  /**
+   * Should speculative execution be used for this job for map tasks? 
+   * Defaults to <code>true</code>.
+   * 
+   * @return <code>true</code> if speculative execution be 
+   *                           used for this job for map tasks,
+   *         <code>false</code> otherwise.
+   */
+  public boolean getMapSpeculativeExecution() { 
+    return getBoolean("mapred.map.tasks.speculative.execution", true);
+  }
+  
+  /**
+   * Turn speculative execution on or off for this job for map tasks. 
+   * 
+   * @param speculativeExecution <code>true</code> if speculative execution 
+   *                             should be turned on for map tasks,
+   *                             else <code>false</code>.
+   */
+  public void setMapSpeculativeExecution(boolean speculativeExecution) {
+    setBoolean("mapred.map.tasks.speculative.execution", speculativeExecution);
+  }
+
+  /**
+   * Should speculative execution be used for this job for reduce tasks? 
+   * Defaults to <code>true</code>.
+   * 
+   * @return <code>true</code> if speculative execution be used 
+   *                           for reduce tasks for this job,
+   *         <code>false</code> otherwise.
+   */
+  public boolean getReduceSpeculativeExecution() { 
+    return getBoolean("mapred.reduce.tasks.speculative.execution", true);
+  }
   
+  /**
+   * Turn speculative execution on or off for this job for reduce tasks. 
+   * 
+   * @param speculativeExecution <code>true</code> if speculative execution 
+   *                             should be turned on for reduce tasks,
+   *                             else <code>false</code>.
+   */
+  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
+    setBoolean("mapred.reduce.tasks.speculative.execution", 
+               speculativeExecution);
+  }
+
   /**
    * Get configured the number of reduce tasks for this job.
    * Defaults to <code>1</code>.

+ 8 - 1
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -131,6 +131,7 @@ class TaskInProgress {
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
+    this.runSpeculative = conf.getMapSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
         
@@ -148,8 +149,10 @@ class TaskInProgress {
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
+    this.runSpeculative = conf.getReduceSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
+  
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
    */
@@ -201,7 +204,11 @@ class TaskInProgress {
    */
   void init(String jobUniqueString) {
     this.startTime = System.currentTimeMillis();
-    this.runSpeculative = conf.getSpeculativeExecution();
+    if ("true".equals(conf.get("mapred.speculative.execution"))) {
+      this.runSpeculative = true;
+    } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
+      this.runSpeculative = false;
+    }
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.id = "tip_" + this.taskIdPrefix;
   }

Some files were not shown because too many files changed in this diff