فهرست منبع

HADOOP-3722. Fixed Hadoop Streaming and Hadoop Pipes to use the Tool interface and GenericOptionsParser. Contributed by Enis Soztutar

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@696551 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 16 سال پیش
والد
کامیت
bd1ce96c6f

+ 3 - 0
CHANGES.txt

@@ -65,6 +65,9 @@ Trunk (unreleased changes)
     HADOOP-4007. REMOVE DFSFileInfo - FileStatus is sufficient. 
     (Sanjay Radia via hairong)
 
+    HADOOP-3722. Fixed Hadoop Streaming and Hadoop Pipes to use the Tool
+    interface and GenericOptionsParser. (Enis Soztutar via acmurthy) 
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

+ 49 - 11
docs/changes.html

@@ -36,7 +36,7 @@
     function collapse() {
       for (var i = 0; i < document.getElementsByTagName("ul").length; i++) {
         var list = document.getElementsByTagName("ul")[i];
-        if (list.id != 'trunk_(unreleased_changes)_' && list.id != 'release_0.18.1_-_unreleased_') {
+        if (list.id != 'trunk_(unreleased_changes)_' && list.id != 'release_0.18.1_-_2008-09-17_') {
           list.style.display = "none";
         }
       }
@@ -56,7 +56,7 @@
 </a></h2>
 <ul id="trunk_(unreleased_changes)_">
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._incompatible_changes_')">  INCOMPATIBLE CHANGES
-</a>&nbsp;&nbsp;&nbsp;(13)
+</a>&nbsp;&nbsp;&nbsp;(16)
     <ol id="trunk_(unreleased_changes)_._incompatible_changes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3595">HADOOP-3595</a>. Remove deprecated methods for mapred.combine.once
 functionality, which was necessary to providing backwards
@@ -95,10 +95,18 @@ to a separate task.<br />(Amareshwari Sriramadasu via ddas)</li>
 fs.inmemory.size.mb and replace with properties defining in memory map
 output retention during the shuffle and reduce relative to maximum heap
 usage.<br />(cdouglas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3245">HADOOP-3245</a>. Adds the feature for supporting JobTracker restart. Running
+jobs can be recovered from the history file. The history file format has
+been modified to support recovery. The task attempt ID now has the
+JobTracker start time to disinguish attempts of the same TIP across
+restarts.<br />(Amar Ramesh Kamat via ddas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4007">HADOOP-4007</a>. REMOVE DFSFileInfo - FileStatus is sufficient.<br />(Sanjay Radia via hairong)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3722">HADOOP-3722</a>. Fixed Hadoop Streaming and Hadoop Pipes to use the Tool
+interface and GenericOptionsParser.<br />(Enis Soztutar via acmurthy)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._new_features_')">  NEW FEATURES
-</a>&nbsp;&nbsp;&nbsp;(30)
+</a>&nbsp;&nbsp;&nbsp;(31)
     <ol id="trunk_(unreleased_changes)_._new_features_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3341">HADOOP-3341</a>. Allow streaming jobs to specify the field separator for map
 and reduce input and output. The new configuration values are:
@@ -163,10 +171,12 @@ MAP+ / REDUCE MAP*.<br />(Alejandro Abdelnur via ddas)</li>
 queues as a percentage of the cluster.<br />(Vivek Ratan via omalley)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3992">HADOOP-3992</a>. Add a synthetic load generation facility to the test
 directory.<br />(hairong via szetszwo)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3981">HADOOP-3981</a>. Implement a distributed file checksum algorithm in HDFS
+and change DistCp to use file checksum for comparing src and dst files<br />(szetszwo)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._improvements_')">  IMPROVEMENTS
-</a>&nbsp;&nbsp;&nbsp;(50)
+</a>&nbsp;&nbsp;&nbsp;(55)
     <ol id="trunk_(unreleased_changes)_._improvements_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3908">HADOOP-3908</a>. Fuse-dfs: better error message if llibhdfs.so doesn't exist.<br />(Pete Wyckoff through zshao)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3732">HADOOP-3732</a>. Delay intialization of datanode block verification till
@@ -258,6 +268,16 @@ components.<br />(tomwhite)</li>
 unset.<br />(Al Hoang via tomwhite)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-4147">HADOOP-4147</a>. Remove unused class JobWithTaskContext from class
 JobInProgress.<br />(Amareshwari Sriramadasu via johan)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4151">HADOOP-4151</a>. Add a byte-comparable interface that both Text and
+BytesWritable implement.<br />(cdouglas via omalley)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4174">HADOOP-4174</a>. Move fs image/edit log methods from ClientProtocol to
+NamenodeProtocol.<br />(shv via szetszwo)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4181">HADOOP-4181</a>. Include a .gitignore and saveVersion.sh change to support
+developing under git.<br />(omalley)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4186">HADOOP-4186</a>. Factor LineReader out of LineRecordReader.<br />(tomwhite via
+omalley)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4184">HADOOP-4184</a>. Break the module dependencies between core, hdfs, and
+mapred.<br />(tomwhite via omalley)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._optimizations_')">  OPTIMIZATIONS
@@ -283,7 +303,7 @@ it from a different .crc file.<br />(Jothi Padmanabhan via ddas)</li>
     </ol>
   </li>
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._bug_fixes_')">  BUG FIXES
-</a>&nbsp;&nbsp;&nbsp;(66)
+</a>&nbsp;&nbsp;&nbsp;(79)
     <ol id="trunk_(unreleased_changes)_._bug_fixes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3563">HADOOP-3563</a>.  Refactor the distributed upgrade code so that it is
 easier to identify datanode and namenode related code.<br />(dhruba)</li>
@@ -414,21 +434,39 @@ build directory.<br />(Prasad Chakka via dhruba)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-4094">HADOOP-4094</a>. Hive now has hive-default.xml and hive-site.xml similar
 to core hadoop.<br />(Prasad Chakka via dhruba)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-4112">HADOOP-4112</a>. Handles cleanupTask in JobHistory<br />(Amareshwari Sriramadasu via ddas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3831">HADOOP-3831</a>. Very slow reading clients sometimes failed while reading.<br />(rangadi)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4155">HADOOP-4155</a>. Use JobTracker's start time while initializing JobHistory's
+JobTracker Unique String.<br />(lohit)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4099">HADOOP-4099</a>. Fix null pointer when using HFTP from an 0.18 server.<br />(dhruba via omalley)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3570">HADOOP-3570</a>. Includes user specified libjar files in the client side
+classpath path.<br />(Sharad Agarwal via ddas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4129">HADOOP-4129</a>. Changed memory limits of TaskTracker and Tasks to be in
+KiloBytes rather than bytes.<br />(Vinod Kumar Vavilapalli via acmurthy)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4139">HADOOP-4139</a>. Optimize Hive multi group-by.<br />(Namin Jain via dhruba)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3911">HADOOP-3911</a>. Add a check to fsck options to make sure -files is not
+the first option to resolve conflicts with GenericOptionsParser<br />(lohit)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3623">HADOOP-3623</a>. Refactor LeaseManager.<br />(szetszwo)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4125">HADOOP-4125</a>. Handles Reduce cleanup tip on the web ui.<br />(Amareshwari Sriramadasu via ddas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4087">HADOOP-4087</a>. Hive Metastore API for php and python clients.<br />(Prasad Chakka via dhruba)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4197">HADOOP-4197</a>. Update DATA_TRANSFER_VERSION for <a href="http://issues.apache.org/jira/browse/HADOOP-3981">HADOOP-3981</a>.<br />(szetszwo)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4138">HADOOP-4138</a>. Refactor the Hive SerDe library to better structure
+the interfaces to the serializer and de-serializer.<br />(Zheng Shao via dhruba)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-4195">HADOOP-4195</a>. Close compressor before returning to codec pool.<br />(acmurthy via omalley)</li>
     </ol>
   </li>
 </ul>
-<h2><a href="javascript:toggleList('release_0.18.1_-_unreleased_')">Release 0.18.1 - Unreleased
+<h2><a href="javascript:toggleList('release_0.18.1_-_2008-09-17_')">Release 0.18.1 - 2008-09-17
 </a></h2>
-<ul id="release_0.18.1_-_unreleased_">
-  <li><a href="javascript:toggleList('release_0.18.1_-_unreleased_._improvements_')">  IMPROVEMENTS
+<ul id="release_0.18.1_-_2008-09-17_">
+  <li><a href="javascript:toggleList('release_0.18.1_-_2008-09-17_._improvements_')">  IMPROVEMENTS
 </a>&nbsp;&nbsp;&nbsp;(1)
-    <ol id="release_0.18.1_-_unreleased_._improvements_">
+    <ol id="release_0.18.1_-_2008-09-17_._improvements_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3934">HADOOP-3934</a>. Upgrade log4j to 1.2.15.<br />(omalley)</li>
     </ol>
   </li>
-  <li><a href="javascript:toggleList('release_0.18.1_-_unreleased_._bug_fixes_')">  BUG FIXES
+  <li><a href="javascript:toggleList('release_0.18.1_-_2008-09-17_._bug_fixes_')">  BUG FIXES
 </a>&nbsp;&nbsp;&nbsp;(5)
-    <ol id="release_0.18.1_-_unreleased_._bug_fixes_">
+    <ol id="release_0.18.1_-_2008-09-17_._bug_fixes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3995">HADOOP-3995</a>. In case of quota failure on HDFS, rename does not restore
 source filename.<br />(rangadi)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3821">HADOOP-3821</a>. Prevent SequenceFile and IFile from duplicating codecs in

+ 46 - 13
docs/hadoop-default.html

@@ -134,9 +134,6 @@ creations/deletions), or "all".</td>
 <td><a name="fs.har.impl">fs.har.impl</a></td><td>org.apache.hadoop.fs.HarFileSystem</td><td>The filesystem for Hadoop archives. </td>
 </tr>
 <tr>
-<td><a name="fs.inmemory.size.mb">fs.inmemory.size.mb</a></td><td>75</td><td>The size of the in-memory filsystem instance in MB</td>
-</tr>
-<tr>
 <td><a name="fs.checkpoint.dir">fs.checkpoint.dir</a></td><td>${hadoop.tmp.dir}/dfs/namesecondary</td><td>Determines where on the local filesystem the DFS secondary
       name node should store the temporary images to merge.
       If this is a comma-delimited list of directories then the image is
@@ -465,6 +462,23 @@ creations/deletions), or "all".</td>
   </td>
 </tr>
 <tr>
+<td><a name="mapred.jobtracker.restart.recover">mapred.jobtracker.restart.recover</a></td><td>false</td><td>"true" to enable (job) recovery upon restart,
+               "false" to start afresh
+  </td>
+</tr>
+<tr>
+<td><a name="mapred.jobtracker.job.history.block.size">mapred.jobtracker.job.history.block.size</a></td><td>0</td><td>The block size of the job history file. Since the job recovery
+               uses job history, its important to dump job history to disk as 
+               soon as possible.
+  </td>
+</tr>
+<tr>
+<td><a name="mapred.jobtracker.job.history.buffer.size">mapred.jobtracker.job.history.buffer.size</a></td><td>4096</td><td>The buffer size for the job history file. Since the job 
+               recovery uses job history, its important to frequently flush the 
+               job history to disk. This will minimize the loss in recovery.
+  </td>
+</tr>
+<tr>
 <td><a name="mapred.jobtracker.taskScheduler">mapred.jobtracker.taskScheduler</a></td><td>org.apache.hadoop.mapred.JobQueueTaskScheduler</td><td>The class responsible for scheduling the tasks.</td>
 </tr>
 <tr>
@@ -559,6 +573,25 @@ creations/deletions), or "all".</td>
   </td>
 </tr>
 <tr>
+<td><a name="mapred.job.shuffle.merge.percent">mapred.job.shuffle.merge.percent</a></td><td>0.66</td><td>The usage threshold at which an in-memory merge will be
+  initiated, expressed as a percentage of the total memory allocated to
+  storing in-memory map outputs, as defined by
+  mapred.job.shuffle.input.buffer.percent.
+  </td>
+</tr>
+<tr>
+<td><a name="mapred.job.shuffle.input.buffer.percent">mapred.job.shuffle.input.buffer.percent</a></td><td>0.70</td><td>The percentage of memory to be allocated from the maximum heap
+  size to storing map outputs during the shuffle.
+  </td>
+</tr>
+<tr>
+<td><a name="mapred.job.reduce.input.buffer.percent">mapred.job.reduce.input.buffer.percent</a></td><td>0.0</td><td>The percentage of memory- relative to the maximum heap size- to
+  retain map outputs during the reduce. When the shuffle is concluded, any
+  remaining map outputs in memory must consume less than this threshold before
+  the reduce can begin.
+  </td>
+</tr>
+<tr>
 <td><a name="mapred.map.tasks.speculative.execution">mapred.map.tasks.speculative.execution</a></td><td>true</td><td>If true, then multiple instances of some map tasks 
                may be executed in parallel.</td>
 </tr>
@@ -846,19 +879,19 @@ creations/deletions), or "all".</td>
   </td>
 </tr>
 <tr>
-<td><a name="mapred.tasktracker.tasks.maxmemory">mapred.tasktracker.tasks.maxmemory</a></td><td>-1</td><td> The maximum amount of virtual memory all tasks running on a
-    tasktracker, including sub-processes they launch, can use. This value is
-    used to compute the amount of free memory available for tasks. Any task
-    scheduled on this tasktracker is guaranteed and constrained to use a 
-    share of this amount. Any task exceeding its share will be killed. 
-    If set to -1, this functionality is disabled, and mapred.task.maxmemory
-    is ignored.
+<td><a name="mapred.tasktracker.tasks.maxmemory">mapred.tasktracker.tasks.maxmemory</a></td><td>-1</td><td> The maximum amount of virtual memory in kilobytes all tasks 
+  	running on a tasktracker, including sub-processes they launch, can use. 
+  	This value is used to compute the amount of free memory available for 
+  	tasks. Any task scheduled on this tasktracker is guaranteed and constrained
+  	 to use a share of this amount. Any task exceeding its share will be 
+  	killed. If set to -1, this functionality is disabled, and 
+  	mapred.task.maxmemory is ignored.
   </td>
 </tr>
 <tr>
-<td><a name="mapred.task.maxmemory">mapred.task.maxmemory</a></td><td>-1</td><td> The maximum amount of memory any task of a job will use.
-    A task of this job will be scheduled on a tasktracker, only if the 
-    amount of free memory on the tasktracker is greater than or 
+<td><a name="mapred.task.maxmemory">mapred.task.maxmemory</a></td><td>-1</td><td> The maximum amount of memory in kilobytes any task of a job 
+    will use. A task of this job will be scheduled on a tasktracker, only if 
+    the amount of free memory on the tasktracker is greater than or 
     equal to this value. If set to -1, tasks are assured a memory limit on
     the tasktracker equal to 
     mapred.tasktracker.tasks.maxmemory/number of slots. If the value of 

+ 91 - 73
docs/streaming.html

@@ -378,7 +378,7 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
 Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.
 </p>
 <p>
-To be backward compatible, Hadoop Streaming also supports the "-reduce NONE" option, which is equivalent to "-jobconf mapred.reduce.tasks=0".
+To be backward compatible, Hadoop Streaming also supports the "-reduce NONE" option, which is equivalent to "-D mapred.reduce.tasks=0".
 </p>
 <a name="N10083"></a><a name="Specifying+Other+Plugins+for+Jobs"></a>
 <h3 class="h4">Specifying Other Plugins for Jobs </h3>
@@ -400,31 +400,31 @@ The class you supply for the output format is expected to take key/value pairs o
 <a name="N10096"></a><a name="Large+files+and+archives+in+Hadoop+Streaming"></a>
 <h3 class="h4">Large files and archives in Hadoop Streaming </h3>
 <p>
-The -cacheFile and -cacheArchive options allow you to make files and archives available to the tasks. The argument is a URI to the file or archive that you have already uploaded to HDFS. These files and archives are cached across jobs. You can retrieve the host and fs_port values from the fs.default.name config variable.
+The -files and -archives options allow you to make files and archives available to the tasks. The argument is a URI to the file or archive that you have already uploaded to HDFS. These files and archives are cached across jobs. You can retrieve the host and fs_port values from the fs.default.name config variable.
 </p>
 <p>
-Here are examples of the -cacheFile option:
+Here are examples of the -files option:
 </p>
 <pre class="code">
--cacheFile hdfs://host:fs_port/user/testfile.txt#testlink
+-files hdfs://host:fs_port/user/testfile.txt#testlink
 </pre>
 <p>
 In the above example, the part of the url after # is used as the symlink name that is created in the current working directory of tasks. So the tasks will have a symlink called testlink in the cwd that points to a local copy of testfile.txt. Multiple entries can be specified as: 
 </p>
 <pre class="code">
--cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2
+-files hdfs://host:fs_port/user/testfile1.txt#testlink1 -files hdfs://host:fs_port/user/testfile2.txt#testlink2
 </pre>
 <p>
-The -cacheArchive option allows you to copy jars locally to the cwd of tasks and automatically unjar the files. For example:
+The -archives option allows you to copy jars locally to the cwd of tasks and automatically unjar the files. For example:
 </p>
 <pre class="code">
--cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3
+-archives hdfs://host:fs_port/user/testfile.jar#testlink3
 </pre>
 <p>
 In the example above, a symlink testlink3 is created in the current working directory of tasks. This symlink points to the directory that stores the unjarred contents of the uploaded jar file.
 </p>
 <p>
-Here's another example of the -cacheArchive option. Here, the input.txt file has two lines specifying the names of the two files: testlink/cache.txt and testlink/cache2.txt. "testlink" is a symlink to the archived directory, which has the files "cache.txt" and "cache2.txt".
+Here's another example of the -archives option. Here, the input.txt file has two lines specifying the names of the two files: testlink/cache.txt and testlink/cache2.txt. "testlink" is a symlink to the archived directory, which has the files "cache.txt" and "cache2.txt".
 </p>
 <pre class="code">
 $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
@@ -432,10 +432,10 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
                   -mapper "xargs cat"  \
                   -reducer "cat"  \
                   -output "/user/me/samples/cachefile/out" \  
-                  -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \  
-                  -jobconf mapred.map.tasks=1 \
-                  -jobconf mapred.reduce.tasks=1 \ 
-                  -jobconf mapred.job.name="Experiment"
+                  -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \  
+                  -D mapred.map.tasks=1 \
+                  -D mapred.reduce.tasks=1 \ 
+                  -D mapred.job.name="Experiment"
 
 $ ls test_jar/
 cache.txt  cache2.txt
@@ -469,7 +469,7 @@ This is just the second cache string
 <a name="N100BF"></a><a name="Specifying+Additional+Configuration+Variables+for+Jobs"></a>
 <h3 class="h4">Specifying Additional Configuration Variables for Jobs </h3>
 <p>
-You can specify additional configuration variables by using "-jobconf  &lt;n&gt;=&lt;v&gt;". For example: 
+You can specify additional configuration variables by using "-D  &lt;n&gt;=&lt;v&gt;". For example: 
 </p>
 <pre class="code">
 $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
@@ -477,10 +477,10 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -output myOutputDir \
     -mapper org.apache.hadoop.mapred.lib.IdentityMapper\
     -reducer /bin/wc \
-    -jobconf mapred.reduce.tasks=2
+    -D mapred.reduce.tasks=2
 </pre>
 <p>
-The -jobconf mapred.reduce.tasks=2 in the above example specifies to use two reducers for the job.
+The -D mapred.reduce.tasks=2 in the above example specifies to use two reducers for the job.
 </p>
 <p>
 For more details on the jobconf parameters see:
@@ -497,63 +497,81 @@ Other options you may specify for a streaming job are described here:
 <th colspan="1" rowspan="1">Parameter</th><th colspan="1" rowspan="1">Optional/Required </th><th colspan="1" rowspan="1">Description </th>
 </tr>
 
+
 <tr>
-<td colspan="1" rowspan="1"> -cluster name </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Switch between local Hadoop and one or more remote clusters </td>
+<td colspan="1" rowspan="1"> -cmdenv   name=value </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Pass env var to streaming commands </td>
 </tr>
 
 
 <tr>
-<td colspan="1" rowspan="1"> -dfs  host:port or local </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Override the HDFS configuration for the job </td>
+<td colspan="1" rowspan="1"> -inputreader JavaClassName </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> For backwards-compatibility: specifies a record reader class (instead of an input format class) </td>
 </tr>
 
 <tr>
-<td colspan="1" rowspan="1"> -jt host:port or local </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Override the JobTracker configuration for the job </td>
+<td colspan="1" rowspan="1"> -verbose </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Verbose output </td>
 </tr>
 
+</table>
+<p>
+Streaming support Hadoop generic command line options. 
+
+Supported parameters are : 
+The general command line syntax is :
+<br>    bin/hadoop command [genericOptions] [commandOptions]
+</p>
+<table class="ForrestTable" cellspacing="1" cellpadding="4">
+
 <tr>
-<td colspan="1" rowspan="1"> -additionalconfspec specfile </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Specifies a set of configuration variables in an XML file like hadoop-site.xml, instead of using multiple  options of type "-jobconf name=value" </td>
+<th colspan="1" rowspan="1">Parameter</th><th colspan="1" rowspan="1">Optional/Required </th><th colspan="1" rowspan="1">Description </th>
 </tr>
 
 
 <tr>
-<td colspan="1" rowspan="1"> -cmdenv   name=value </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Pass env var to streaming commands </td>
+<td colspan="1" rowspan="1"> -conf  configuration_file </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> specify an application configuration file </td>
 </tr>
 
 <tr>
-<td colspan="1" rowspan="1"> -cacheFile fileNameURI </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Specify a file to be uploaded to the HDFS </td>
+<td colspan="1" rowspan="1"> -D  property=value </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> use value for given property </td>
 </tr>
 
 <tr>
-<td colspan="1" rowspan="1"> -cacheArchive fileNameURI </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Specify a jar file to be uploaded to the HDFS. This jar file is unjarred automatically in the cwd of the task </td>
+<td colspan="1" rowspan="1"> -fs host:port or local </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> specify a namenode </td>
 </tr>
 
+<tr>
+<td colspan="1" rowspan="1"> -jt host:port or local </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> specify a job tracker </td>
+</tr>
 
 <tr>
-<td colspan="1" rowspan="1"> -inputreader JavaClassName </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> For backwards-compatibility: specifies a record reader class (instead of an input format class) </td>
+<td colspan="1" rowspan="1"> -files </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> specify comma separated files to be copied to the map reduce cluster </td>
 </tr>
 
 <tr>
-<td colspan="1" rowspan="1"> -verbose </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> Verbose output </td>
+<td colspan="1" rowspan="1"> -archives </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1"> specify comma separated archives to be unarchived on the compute machines </td>
+</tr>
+
+<tr>
+<td colspan="1" rowspan="1">  </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1">  </td>
+</tr>
+
+<tr>
+<td colspan="1" rowspan="1"> -jt host:port or local </td><td colspan="1" rowspan="1"> Optional </td><td colspan="1" rowspan="1">  </td>
 </tr>
 
 </table>
 <p>
-To switch between "local" Hadoop and one or more remote Hadoop clusters use -cluster &lt;name&gt;.
-By default, hadoop-default.xml and hadoop-site.xml are used. The -cluster &lt;name&gt; option will cause $HADOOP_HOME/conf/hadoop-&lt;name&gt;.xml to be used instead.
-</p>
-<p>
 To change the local temp directory use:
 </p>
 <pre class="code">
-  -jobconf dfs.data.dir=/tmp
+  -D dfs.data.dir=/tmp
 </pre>
 <p>
 To specify additional local temp directories use:
 </p>
 <pre class="code">
-   -jobconf mapred.local.dir=/tmp/local
-   -jobconf mapred.system.dir=/tmp/system
-   -jobconf mapred.temp.dir=/tmp/temp
+   -D mapred.local.dir=/tmp/local
+   -D mapred.system.dir=/tmp/system
+   -D mapred.temp.dir=/tmp/temp
 </pre>
 <p>
 For more details on jobconf parameters see:
@@ -568,10 +586,10 @@ To set an environment variable in a streaming command use:
 </div>
 
 
-<a name="N1018E"></a><a name="More+usage+examples"></a>
+<a name="N101BD"></a><a name="More+usage+examples"></a>
 <h2 class="h3">More usage examples </h2>
 <div class="section">
-<a name="N10194"></a><a name="Customizing+the+Way+to+Split+Lines+into+Key%2FValue+Pairs"></a>
+<a name="N101C3"></a><a name="Customizing+the+Way+to+Split+Lines+into+Key%2FValue+Pairs"></a>
 <h3 class="h4">Customizing the Way to Split Lines into Key/Value Pairs </h3>
 <p>
 As noted earlier, when the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the the rest of the line (excluding the tab character) is the value.
@@ -585,16 +603,16 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -output myOutputDir \
     -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
     -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-    -jobconf stream.map.output.field.separator=. \
-    -jobconf stream.num.map.output.key.fields=4 
+    -D stream.map.output.field.separator=. \
+    -D stream.num.map.output.key.fields=4 
 </pre>
 <p>
-In the above example, "-jobconf stream.map.output.field.separator=." specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).
+In the above example, "-D stream.map.output.field.separator=." specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).
 </p>
 <p>
-Similarly, you can use "-jobconf stream.reduce.output.field.separator=SEP" and "-jobconf stream.num.reduce.output.fields=NUM" to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.
+Similarly, you can use "-D stream.reduce.output.field.separator=SEP" and "-D stream.num.reduce.output.fields=NUM" to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.
 </p>
-<a name="N101AA"></a><a name="A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29"></a>
+<a name="N101D9"></a><a name="A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29"></a>
 <h3 class="h4">A Useful Partitioner Class (secondary sort, the -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner option) </h3>
 <p>
 Hadoop has a library class, org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, that is useful for many applications. This class allows the Map/Reduce framework to partition the map outputs based on prefixes of keys, not the whole keys. For example:
@@ -606,17 +624,17 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
     -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
     -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-    -jobconf stream.map.output.field.separator=. \
-    -jobconf stream.num.map.output.key.fields=4 \
-    -jobconf map.output.key.field.separator=. \
-    -jobconf num.key.fields.for.partition=2 \
-    -jobconf mapred.reduce.tasks=12
+    -D stream.map.output.field.separator=. \
+    -D stream.num.map.output.key.fields=4 \
+    -D map.output.key.field.separator=. \
+    -D num.key.fields.for.partition=2 \
+    -D mapred.reduce.tasks=12
 </pre>
 <p>
-Here, <em>-jobconf stream.map.output.field.separator=.</em> and <em>-jobconf stream.num.map.output.key.fields=4</em> are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper. 
+Here, <em>-D stream.map.output.field.separator=.</em> and <em>-D stream.num.map.output.key.fields=4</em> are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper. 
 </p>
 <p>
-The map output keys of the above Map/Reduce job normally have four fields separated by ".". However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the <em>-jobconf num.key.fields.for.partition=2</em> option. Here, <em>-jobconf map.output.key.field.separator=.</em> specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
+The map output keys of the above Map/Reduce job normally have four fields separated by ".". However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the <em>-D num.key.fields.for.partition=2</em> option. Here, <em>-D map.output.key.field.separator=.</em> specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
 </p>
 <p>
 
@@ -654,7 +672,7 @@ Sorting within each partition for the reducer(all 4 fields used for sorting)</p>
 11.14.2.2
 11.14.2.3
 </pre>
-<a name="N101E0"></a><a name="Working+with+the+Hadoop+Aggregate+Package+%28the+-reduce+aggregate+option%29"></a>
+<a name="N1020F"></a><a name="Working+with+the+Hadoop+Aggregate+Package+%28the+-reduce+aggregate+option%29"></a>
 <h3 class="h4">Working with the Hadoop Aggregate Package (the -reduce aggregate option) </h3>
 <p>
 Hadoop has a library package called "Aggregate" (
@@ -677,7 +695,7 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -mapper myAggregatorForKeyCount.py \
     -reducer aggregate \
     -file myAggregatorForKeyCount.py \
-    -jobconf mapred.reduce.tasks=12
+    -D mapred.reduce.tasks=12
 </pre>
 <p>
 The python program myAggregatorForKeyCount.py looks like:
@@ -703,7 +721,7 @@ def main(argv):
 if __name__ == "__main__":
      main(sys.argv)
 </pre>
-<a name="N101FB"></a><a name="Field+Selection+%28+similar+to+unix+%27cut%27+command%29"></a>
+<a name="N1022A"></a><a name="Field+Selection+%28+similar+to+unix+%27cut%27+command%29"></a>
 <h3 class="h4">Field Selection ( similar to unix 'cut' command) </h3>
 <p>
 Hadoop has a library class, org.apache.hadoop.mapred.lib.FieldSelectionMapReduce, that effectively allows you to process text data like the unix "cut" utility. The map function defined in the class treats each input key/value pair as a list of fields. You can specify the field separator (the default is the tab character). You can select an arbitrary list of fields as the map output key, and an arbitrary list of fields as the map output value. Similarly, the reduce function defined in the class treats each input key/value pair as a list of fields. You can select an arbitrary list of fields as the reduce output key, and an arbitrary list of fields as the reduce output value. For example:
@@ -715,32 +733,32 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
     -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
     -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-    -jobconf map.output.key.field.separa=. \
-    -jobconf num.key.fields.for.partition=2 \
-    -jobconf mapred.data.field.separator=. \
-    -jobconf map.output.key.value.fields.spec=6,5,1-3:0- \
-    -jobconf reduce.output.key.value.fields.spec=0-2:5- \
-    -jobconf mapred.reduce.tasks=12
+    -D map.output.key.field.separa=. \
+    -D num.key.fields.for.partition=2 \
+    -D mapred.data.field.separator=. \
+    -D map.output.key.value.fields.spec=6,5,1-3:0- \
+    -D reduce.output.key.value.fields.spec=0-2:5- \
+    -D mapred.reduce.tasks=12
 </pre>
 <p>
-The option "-jobconf map.output.key.value.fields.spec=6,5,1-3:0-" specifies key/value selection for the map outputs. Key selection spec and value selection spec are separated by ":". In this case, the map output key will consist of fields 6, 5, 1, 2, and 3. The map output value will consist of all fields (0- means field 0 and all 
+The option "-D map.output.key.value.fields.spec=6,5,1-3:0-" specifies key/value selection for the map outputs. Key selection spec and value selection spec are separated by ":". In this case, the map output key will consist of fields 6, 5, 1, 2, and 3. The map output value will consist of all fields (0- means field 0 and all 
 the subsequent fields). 
 </p>
 <p>
-The option "-jobconf reduce.output.key.value.fields.spec=0-2:0-" specifies key/value selection for the reduce outputs. In this case, the reduce output key will consist of fields 0, 1, 2 (corresponding to the original fields 6, 5, 1). The reduce output value will consist of all fields starting from field 5 (corresponding to all the original fields).  
+The option "-D reduce.output.key.value.fields.spec=0-2:0-" specifies key/value selection for the reduce outputs. In this case, the reduce output key will consist of fields 0, 1, 2 (corresponding to the original fields 6, 5, 1). The reduce output value will consist of all fields starting from field 5 (corresponding to all the original fields).  
 </p>
 </div>
 
 
-<a name="N1020F"></a><a name="Frequently+Asked+Questions"></a>
+<a name="N1023E"></a><a name="Frequently+Asked+Questions"></a>
 <h2 class="h3">Frequently Asked Questions </h2>
 <div class="section">
-<a name="N10215"></a><a name="How+do+I+use+Hadoop+Streaming+to+run+an+arbitrary+set+of+%28semi-%29independent+tasks%3F"></a>
+<a name="N10244"></a><a name="How+do+I+use+Hadoop+Streaming+to+run+an+arbitrary+set+of+%28semi-%29independent+tasks%3F"></a>
 <h3 class="h4">How do I use Hadoop Streaming to run an arbitrary set of (semi-)independent tasks? </h3>
 <p>
 Often you do not need the full power of Map Reduce, but only need to run multiple instances of the same program - either on different parts of the data, or on the same data, but with different parameters. You can use Hadoop Streaming to do this.
 </p>
-<a name="N1021F"></a><a name="How+do+I+process+files%2C+one+per+map%3F"></a>
+<a name="N1024E"></a><a name="How+do+I+process+files%2C+one+per+map%3F"></a>
 <h3 class="h4">How do I process files, one per map? </h3>
 <p>
 As an example, consider the problem of zipping (compressing) a set of files across the hadoop cluster. You can achieve this using either of these methods:
@@ -784,13 +802,13 @@ As an example, consider the problem of zipping (compressing) a set of files acro
 </li>
 
 </ol>
-<a name="N1024A"></a><a name="How+many+reducers+should+I+use%3F"></a>
+<a name="N10279"></a><a name="How+many+reducers+should+I+use%3F"></a>
 <h3 class="h4">How many reducers should I use? </h3>
 <p>
 See the Hadoop Wiki for details: <a href="mapred_tutorial.html#Reducer">Reducer</a>
 
 </p>
-<a name="N10258"></a><a name="If+I+set+up+an+alias+in+my+shell+script%2C+will+that+work+after+-mapper%2C+i.e.+say+I+do%3A+alias+c1%3D%27cut+-f1%27.+Will+-mapper+%22c1%22+work%3F"></a>
+<a name="N10287"></a><a name="If+I+set+up+an+alias+in+my+shell+script%2C+will+that+work+after+-mapper%2C+i.e.+say+I+do%3A+alias+c1%3D%27cut+-f1%27.+Will+-mapper+%22c1%22+work%3F"></a>
 <h3 class="h4">If I set up an alias in my shell script, will that work after -mapper, i.e. say I do: alias c1='cut -f1'. Will -mapper "c1" work? </h3>
 <p>
 Using an alias will not work, but variable substitution is allowed as shown in this example:
@@ -806,7 +824,7 @@ $ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
     -input /user/me/samples/student_marks 
     -mapper \"$c2\" -reducer 'cat'  
     -output /user/me/samples/student_out 
-    -jobconf mapred.job.name='Experiment'
+    -D mapred.job.name='Experiment'
 
 $ hadoop dfs -ls samples/student_out
 Found 1 items/user/me/samples/student_out/part-00000    &lt;r 3&gt;   16
@@ -817,20 +835,20 @@ $ hadoop dfs -cat samples/student_out/part-00000
 75
 80
 </pre>
-<a name="N10266"></a><a name="Can+I+use+UNIX+pipes%3F+For+example%2C+will+-mapper+%22cut+-f1+%7C+sed+s%2Ffoo%2Fbar%2Fg%22+work%3F"></a>
+<a name="N10295"></a><a name="Can+I+use+UNIX+pipes%3F+For+example%2C+will+-mapper+%22cut+-f1+%7C+sed+s%2Ffoo%2Fbar%2Fg%22+work%3F"></a>
 <h3 class="h4">Can I use UNIX pipes? For example, will -mapper "cut -f1 | sed s/foo/bar/g" work?</h3>
 <p>
 Currently this does not work and gives an "java.io.IOException: Broken pipe" error. This is probably a bug that needs to be investigated.
 </p>
-<a name="N10270"></a><a name="When+I+run+a+streaming+job+by"></a>
+<a name="N1029F"></a><a name="When+I+run+a+streaming+job+by"></a>
 <h3 class="h4">When I run a streaming job by distributing large executables (for example, 3.6G) through the -file option, I get a "No space left on device" error. What do I do? </h3>
 <p>
 The jar packaging happens in a directory pointed to by the configuration variable stream.tmpdir. The default value of stream.tmpdir is /tmp. Set the value to a directory with more space:
 </p>
 <pre class="code">
--jobconf stream.tmpdir=/export/bigspace/...
+-D stream.tmpdir=/export/bigspace/...
 </pre>
-<a name="N10281"></a><a name="How+do+I+specify+multiple+input+directories%3F"></a>
+<a name="N102B0"></a><a name="How+do+I+specify+multiple+input+directories%3F"></a>
 <h3 class="h4">How do I specify multiple input directories? </h3>
 <p>
 You can specify multiple input directories with multiple '-input' options:
@@ -838,17 +856,17 @@ You can specify multiple input directories with multiple '-input' options:
 <pre class="code">
  hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2' 
 </pre>
-<a name="N1028E"></a><a name="How+do+I+generate+output+files+with+gzip+format%3F"></a>
+<a name="N102BD"></a><a name="How+do+I+generate+output+files+with+gzip+format%3F"></a>
 <h3 class="h4">How do I generate output files with gzip format? </h3>
 <p>
-Instead of plain text files, you can generate gzip files as your generated output. Pass '-jobconf mapred.output.compress=true -jobconf  mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode' as option to your streaming job.
+Instead of plain text files, you can generate gzip files as your generated output. Pass '-D mapred.output.compress=true -D  mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode' as option to your streaming job.
 </p>
-<a name="N10298"></a><a name="How+do+I+provide+my+own+input%2Foutput+format+with+streaming%3F"></a>
+<a name="N102C7"></a><a name="How+do+I+provide+my+own+input%2Foutput+format+with+streaming%3F"></a>
 <h3 class="h4">How do I provide my own input/output format with streaming? </h3>
 <p>
 At least as late as version 0.14, Hadoop does not support multiple jar files. So, when specifying your own custom classes you will have to pack them along with the streaming jar and use the custom jar instead of the default hadoop streaming jar. 
 </p>
-<a name="N102A2"></a><a name="How+do+I+parse+XML+documents+using+streaming%3F"></a>
+<a name="N102D1"></a><a name="How+do+I+parse+XML+documents+using+streaming%3F"></a>
 <h3 class="h4">How do I parse XML documents using streaming? </h3>
 <p>
 You can use the record reader StreamXmlRecordReader to process XML documents. 
@@ -859,14 +877,14 @@ hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING
 <p>
 Anything found between BEGIN_STRING and END_STRING would be treated as one record for map tasks.
 </p>
-<a name="N102B3"></a><a name="How+do+I+update+counters+in+streaming+applications%3F"></a>
+<a name="N102E2"></a><a name="How+do+I+update+counters+in+streaming+applications%3F"></a>
 <h3 class="h4">How do I update counters in streaming applications? </h3>
 <p>
 A streaming process can use the stderr to emit counter information.
 <span class="codefrag">reporter:counter:&lt;group&gt;,&lt;counter&gt;,&lt;amount&gt;</span> 
 should be sent to stderr to update the counter.
 </p>
-<a name="N102C0"></a><a name="How+do+I+update+status+in+streaming+applications%3F"></a>
+<a name="N102EF"></a><a name="How+do+I+update+status+in+streaming+applications%3F"></a>
 <h3 class="h4">How do I update status in streaming applications? </h3>
 <p>
 A streaming process can use the stderr to emit status information.

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 1 - 1
docs/streaming.pdf


+ 4 - 5
src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.streaming;
 
-import java.io.IOException;
+import org.apache.hadoop.util.ToolRunner;
 
 /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
  * or bin/hadoop har hadoop-streaming.jar args.
@@ -26,11 +26,10 @@ import java.io.IOException;
  */
 public class HadoopStreaming {
 
-  public static void main(String[] args) throws IOException {
-    boolean mayExit = true;
+  public static void main(String[] args) throws Exception {
     int returnStatus = 0;
-    StreamJob job = new StreamJob(args, mayExit);
-    returnStatus = job.go();
+    StreamJob job = new StreamJob();
+    returnStatus = ToolRunner.run(job, args);
     if (returnStatus != 0) {
       System.err.println("Streaming Job Failed!");
       System.exit(returnStatus);

+ 113 - 138
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -68,12 +68,14 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
 import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
+import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
 
 /** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
  */
-public class StreamJob {
+public class StreamJob implements Tool {
 
   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
   final static String REDUCE_NONE = "NONE";
@@ -91,11 +93,47 @@ public class StreamJob {
                                                                 "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D'); 
   private MultiPropertyOption cmdenv = new MultiPropertyOption(
                                                                "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E');  
-  
+  /**@deprecated use StreamJob() with ToolRunner or set the 
+   * Configuration using {@link #setConf(Configuration)} and 
+   * run with {@link #run(String[])}.  
+   */
+  @Deprecated
   public StreamJob(String[] argv, boolean mayExit) {
-    setupOptions();
+    this();
     argv_ = argv;
-    mayExit_ = mayExit;
+    this.config_ = new Configuration();
+  }
+  
+  public StreamJob() {
+    setupOptions();
+  }
+  
+  @Override
+  public Configuration getConf() {
+    return config_;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.config_ = conf;
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    try {
+      this.argv_ = args;
+      init();
+  
+      preProcessArgs();
+      parseArgv();
+      postProcessArgs();
+  
+      setJobConf();
+      return submitAndMonitorJob();
+    }catch (IllegalArgumentException ex) {
+      //ignore, since log will already be printed
+      return 1;
+    }
   }
   
   /**
@@ -103,16 +141,16 @@ public class StreamJob {
    * intializes the job conf and submits the job
    * to the jobtracker
    * @throws IOException
+   * @deprecated use {@link #run(String[])} instead.
    */
+  @Deprecated
   public int go() throws IOException {
-    init();
-
-    preProcessArgs();
-    parseArgv();
-    postProcessArgs();
-
-    setJobConf();
-    return submitAndMonitorJob();
+    try {
+      return run(argv_);
+    }
+    catch (Exception ex) {
+      throw new IOException(ex.getMessage());
+    }
   }
   
   protected void init() {
@@ -129,11 +167,7 @@ public class StreamJob {
   }
 
   void postProcessArgs() throws IOException {
-    if (cluster_ == null) {
-      // hadoop-default.xml is standard, hadoop-local.xml is not.
-      cluster_ = "default";
-    }
-    hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";
+    
     if (inputSpecs_.size() == 0) {
       fail("Required argument: -input <name>");
     }
@@ -193,21 +227,13 @@ public class StreamJob {
     return cmd;
   }
 
-  String getHadoopAliasConfFile() {
-    return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
-  }
-
   void parseArgv(){
     CommandLine cmdLine = null; 
     try{
       cmdLine = parser.parse(argv_);
     }catch(Exception oe){
       LOG.error(oe.getMessage());
-      if (detailedUsage_) {
-        exitUsage(true);
-      } else {
-        exitUsage(false);
-      }
+      exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
     }
     
     if (cmdLine != null){
@@ -222,20 +248,14 @@ public class StreamJob {
       comCmd_ = (String)cmdLine.getValue("-combiner"); 
       redCmd_ = (String)cmdLine.getValue("-reducer"); 
       
-      packageFiles_.addAll(cmdLine.getValues("-file"));
-      
-      cluster_ = (String)cmdLine.getValue("-cluster");
-      
-      configPath_.addAll(cmdLine.getValues("-config"));
-      
+      if(!cmdLine.getValues("-file").isEmpty()) {
+        packageFiles_.addAll(cmdLine.getValues("-file"));
+      }
+         
       String fsName = (String)cmdLine.getValue("-dfs");
       if (null != fsName){
-        userJobConfProps_.put("fs.default.name", fsName);        
-      }
-      
-      String jt = (String)cmdLine.getValue("mapred.job.tracker");
-      if (null != jt){
-        userJobConfProps_.put("fs.default.name", jt);        
+        LOG.warn("-dfs option is deprecated, please use -fs instead.");
+        config_.set("fs.default.name", fsName);
       }
       
       additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec"); 
@@ -248,14 +268,16 @@ public class StreamJob {
       reduceDebugSpec_ = (String)cmdLine.getValue("-reducedebug");
       
       List<String> car = cmdLine.getValues("-cacheArchive"); 
-      if (null != car){
+      if (null != car && !car.isEmpty()){
+        LOG.warn("-cacheArchive option is deprecated, please use -archives instead.");
         for(String s : car){
           cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;  
         }
       }
 
       List<String> caf = cmdLine.getValues("-cacheFile"); 
-      if (null != caf){
+      if (null != caf && !caf.isEmpty()){
+        LOG.warn("-cacheFile option is deprecated, please use -files instead.");
         for(String s : caf){
           cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;  
         }
@@ -264,10 +286,11 @@ public class StreamJob {
       List<String> jobConfArgs = (List<String>)cmdLine.getValue(jobconf); 
       List<String> envArgs = (List<String>)cmdLine.getValue(cmdenv); 
       
-      if (null != jobConfArgs){
+      if (null != jobConfArgs && !jobConfArgs.isEmpty()){
+        LOG.warn("-jobconf option is deprecated, please use -D instead.");
         for(String s : jobConfArgs){
           String []parts = s.split("=", 2); 
-          userJobConfProps_.put(parts[0], parts[1]);
+          config_.set(parts[0], parts[1]);
         }
       }
       if (null != envArgs){
@@ -278,8 +301,8 @@ public class StreamJob {
           addTaskEnvironment_ += s;
         }
       }
-    }else if (detailedUsage_) {
-      exitUsage(true);
+    }else {
+      exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
     }
   }
 
@@ -454,38 +477,37 @@ public class StreamJob {
   public void exitUsage(boolean detailed) {
     //         1         2         3         4         5         6         7
     //1234567890123456789012345678901234567890123456789012345678901234567890123456789
+    
+    System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar \\");
+    System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");
+    System.out.println("Options:");
+    System.out.println("  -input    <path>     DFS input file(s) for the Map step");
+    System.out.println("  -output   <path>     DFS output directory for the Reduce step");
+    System.out.println("  -mapper   <cmd|JavaClassName>      The streaming command to run");
+    System.out.println("  -combiner <JavaClassName> Combiner has to be a Java class");
+    System.out.println("  -reducer  <cmd|JavaClassName>      The streaming command to run");
+    System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
+    System.out.println("  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
+    System.out.println("  -outputformat TextOutputFormat(default)|JavaClassName  Optional.");
+    System.out.println("  -partitioner JavaClassName  Optional.");
+    System.out.println("  -numReduceTasks <num>  Optional.");
+    System.out.println("  -inputreader <spec>  Optional.");
+    System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
+    System.out.println("  -mapdebug <path>  Optional. " +
+    "To run this script when a map task fails ");
+    System.out.println("  -reducedebug <path>  Optional." +
+    " To run this script when a reduce task fails ");
+    System.out.println("  -verbose");
+    System.out.println();
+    GenericOptionsParser.printGenericCommandUsage(System.out);
+
     if (!detailed) {
-      System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");
-      System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");
-      System.out.println("Options:");
-      System.out.println("  -input    <path>     DFS input file(s) for the Map step");
-      System.out.println("  -output   <path>     DFS output directory for the Reduce step");
-      System.out.println("  -mapper   <cmd|JavaClassName>      The streaming command to run");
-      System.out.println("  -combiner <JavaClassName> Combiner has to be a Java class");
-      System.out.println("  -reducer  <cmd|JavaClassName>      The streaming command to run");
-      System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
-      System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
-      System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
-      System.out.println("  -additionalconfspec specfile  Optional.");
-      System.out.println("  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
-      System.out.println("  -outputformat TextOutputFormat(default)|JavaClassName  Optional.");
-      System.out.println("  -partitioner JavaClassName  Optional.");
-      System.out.println("  -numReduceTasks <num>  Optional.");
-      System.out.println("  -inputreader <spec>  Optional.");
-      System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
-      System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
-      System.out.println("  -mapdebug <path>  Optional. " +
-                                "To run this script when a map task fails ");
-      System.out.println("  -reducedebug <path>  Optional." +
-                             " To run this script when a reduce task fails ");
-      System.out.println("  -cacheFile fileNameURI");
-      System.out.println("  -cacheArchive fileNameURI");
-      System.out.println("  -verbose");
       System.out.println();      
       System.out.println("For more details about these options:");
       System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
       fail("");
     }
+    System.out.println();
     System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
     System.out.println("Default Map input format: a line is a record in UTF-8");
     System.out.println("  the key part ends at first TAB, the rest of the line is the value");
@@ -498,7 +520,7 @@ public class StreamJob {
     System.out.println("  The location of this working directory is unspecified.");
     System.out.println();
     System.out.println("To set the number of reduce tasks (num. of output files):");
-    System.out.println("  -jobconf mapred.reduce.tasks=10");
+    System.out.println("  -D mapred.reduce.tasks=10");
     System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
     System.out.println("  Use -numReduceTasks 0");
     System.out
@@ -509,24 +531,24 @@ public class StreamJob {
     System.out.println("  This equivalent -reducer NONE");
     System.out.println();
     System.out.println("To speed up the last maps:");
-    System.out.println("  -jobconf mapred.map.tasks.speculative.execution=true");
+    System.out.println("  -D mapred.map.tasks.speculative.execution=true");
     System.out.println("To speed up the last reduces:");
-    System.out.println("  -jobconf mapred.reduce.tasks.speculative.execution=true");
+    System.out.println("  -D mapred.reduce.tasks.speculative.execution=true");
     System.out.println("To name the job (appears in the JobTracker Web UI):");
-    System.out.println("  -jobconf mapred.job.name='My Job' ");
+    System.out.println("  -D mapred.job.name='My Job' ");
     System.out.println("To change the local temp directory:");
-    System.out.println("  -jobconf dfs.data.dir=/tmp/dfs");
-    System.out.println("  -jobconf stream.tmpdir=/tmp/streaming");
+    System.out.println("  -D dfs.data.dir=/tmp/dfs");
+    System.out.println("  -D stream.tmpdir=/tmp/streaming");
     System.out.println("Additional local temp directories with -cluster local:");
-    System.out.println("  -jobconf mapred.local.dir=/tmp/local");
-    System.out.println("  -jobconf mapred.system.dir=/tmp/system");
-    System.out.println("  -jobconf mapred.temp.dir=/tmp/temp");
+    System.out.println("  -D mapred.local.dir=/tmp/local");
+    System.out.println("  -D mapred.system.dir=/tmp/system");
+    System.out.println("  -D mapred.temp.dir=/tmp/temp");
     System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");    
-    System.out.println("  -jobconf stream.non.zero.exit.is.failure=false");
+    System.out.println("  -D stream.non.zero.exit.is.failure=false");
     System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");
     System.out.println("  $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");
     System.out
-      .println("    [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
+      .println("    [...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
     System.out.println("For more details about jobconf parameters see:");
     System.out.println("  http://wiki.apache.org/hadoop/JobConfFile");
     System.out.println("To set an environement variable in a streaming command:");
@@ -544,13 +566,9 @@ public class StreamJob {
     fail("");
   }
 
-  public void fail(String message) {
-    if (mayExit_) {
-      System.err.println(message);
-      throw new RuntimeException(message);
-    } else {
-      throw new IllegalArgumentException(message);
-    }
+  public void fail(String message) {    
+    System.err.println(message);
+    throw new IllegalArgumentException(message);
   }
 
   // --------------------------------------------
@@ -565,17 +583,12 @@ public class StreamJob {
   }
 
   protected boolean isLocalHadoop() {
-    boolean local;
-    if (jobConf_ == null) {
-      local = getClusterNick().equals("local");
-    } else {
-      local = StreamUtil.isLocalJobTracker(jobConf_);
-    }
-    return local;
+    return StreamUtil.isLocalJobTracker(jobConf_);
   }
 
+  @Deprecated
   protected String getClusterNick() {
-    return cluster_;
+    return "default";
   }
 
   /** @return path to the created Jar file or null if no files are necessary.
@@ -589,8 +602,7 @@ public class StreamJob {
     // First try an explicit spec: it's too hard to find our own location in this case:
     // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
     // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
-    String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
-    System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
+    String runtimeClasses = config_.get("stream.shipped.hadoopstreaming"); // jar or class dir
     
     if (runtimeClasses == null) {
       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
@@ -632,25 +644,6 @@ public class StreamJob {
     builder.merge(packageFiles_, unjarFiles, jobJarName);
     return jobJarName;
   }
-
-  /**
-   * This method sets the user jobconf variable specified
-   * by user using -jobconf key=value
-   * @param doEarlyProps
-   */
-  protected void setUserJobConfProps(boolean doEarlyProps) {
-    Iterator it = userJobConfProps_.keySet().iterator();
-    while (it.hasNext()) {
-      String key = (String) it.next();
-      String val = userJobConfProps_.get(key);
-      boolean earlyName = key.equals("fs.default.name");
-      earlyName |= key.equals("stream.shipped.hadoopstreaming");
-      if (doEarlyProps == earlyName) {
-        msg("xxxJobConf: set(" + key + ", " + val + ") early=" + doEarlyProps);
-        jobConf_.set(key, val);
-      }
-    }
-  }
   
   /**
    * get the uris of all the files/caches
@@ -663,22 +656,10 @@ public class StreamJob {
   }
   
   protected void setJobConf() throws IOException {
-    msg("hadoopAliasConf_ = " + hadoopAliasConf_);
-    config_ = new Configuration();
-    if (!cluster_.equals("default")) {
-      config_.addResource(new Path(getHadoopAliasConfFile()));
-    } else {
-      // use only defaults: hadoop-default.xml and hadoop-site.xml
-    }
-    System.out.println("additionalConfSpec_:" + additionalConfSpec_);
     if (additionalConfSpec_ != null) {
+      LOG.warn("-additionalconfspec option is deprecated, please use -conf instead.");
       config_.addResource(new Path(additionalConfSpec_));
     }
-    Iterator it = configPath_.iterator();
-    while (it.hasNext()) {
-      String pathName = (String) it.next();
-      config_.addResource(new Path(pathName));
-    }
 
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
@@ -686,8 +667,6 @@ public class StreamJob {
     // All streaming jobs get the task timeout value
     // from the configuration settings.
 
-    setUserJobConfProps(true);
-
     // The correct FS must be set before this is called!
     // (to resolve local vs. dfs drive letter differences) 
     // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
@@ -802,7 +781,6 @@ public class StreamJob {
       }
     }
     
-    setUserJobConfProps(false);
     FileOutputFormat.setOutputPath(jobConf_, new Path(output_));
     fmt = null;
     if (outputFormatSpec_!= null) {
@@ -1027,7 +1005,6 @@ public class StreamJob {
     }
   }
 
-  protected boolean mayExit_;
   protected String[] argv_;
   protected boolean verbose_;
   protected boolean detailedUsage_;
@@ -1047,18 +1024,15 @@ public class StreamJob {
   protected boolean hasSimpleInputSpecs_;
   protected ArrayList packageFiles_ = new ArrayList(); // <String>
   protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
-  protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); 
+  //protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); 
   protected String output_;
   protected String mapCmd_;
   protected String comCmd_;
   protected String redCmd_;
-  protected String cluster_;
   protected String cacheFiles;
   protected String cacheArchives;
   protected URI[] fileURIs;
   protected URI[] archiveURIs;
-  protected ArrayList configPath_ = new ArrayList(); // <String>
-  protected String hadoopAliasConf_;
   protected String inReaderSpec_;
   protected String inputFormatSpec_;
   protected String outputFormatSpec_;
@@ -1079,4 +1053,5 @@ public class StreamJob {
   protected JobID jobId_;
   protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
     "Please specify a different link name for all of your caching URIs";
+
 }

+ 7 - 3
src/core/org/apache/hadoop/util/GenericOptionsParser.java

@@ -35,8 +35,8 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * <code>GenericOptionsParser</code> is a utility to parse command line
@@ -165,7 +165,7 @@ public class GenericOptionsParser {
    * Specify properties of each generic option
    */
   @SuppressWarnings("static-access")
-  private Options buildGeneralOptions(Options opts) {
+  private static Options buildGeneralOptions(Options opts) {
     Option fs = OptionBuilder.withArgName("local|namenode:port")
     .hasArg()
     .withDescription("specify a namenode")
@@ -224,7 +224,10 @@ public class GenericOptionsParser {
       conf.set("mapred.job.tracker", line.getOptionValue("jt"));
     }
     if (line.hasOption("conf")) {
-      conf.addResource(new Path(line.getOptionValue("conf")));
+      String[] values = line.getOptionValues("conf");
+      for(String value : values) {
+        conf.addResource(new Path(value));
+      }
     }
     try {
       if (line.hasOption("libjars")) {
@@ -356,6 +359,7 @@ public class GenericOptionsParser {
    * @param out stream to print the usage message to.
    */
   public static void printGenericCommandUsage(PrintStream out) {
+    
     out.println("Generic options supported are");
     out.println("-conf <configuration file>     specify an application configuration file");
     out.println("-D <property=value>            use value for given property");

+ 61 - 51
src/docs/src/documentation/content/xdocs/streaming.xml

@@ -111,7 +111,7 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
 <p>
 Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.
 </p><p>
-To be backward compatible, Hadoop Streaming also supports the "-reduce NONE" option, which is equivalent to "-jobconf mapred.reduce.tasks=0".
+To be backward compatible, Hadoop Streaming also supports the "-reduce NONE" option, which is equivalent to "-D mapred.reduce.tasks=0".
 </p>
 </section>
 
@@ -137,31 +137,31 @@ The class you supply for the output format is expected to take key/value pairs o
 <title>Large files and archives in Hadoop Streaming </title>
 
 <p>
-The -cacheFile and -cacheArchive options allow you to make files and archives available to the tasks. The argument is a URI to the file or archive that you have already uploaded to HDFS. These files and archives are cached across jobs. You can retrieve the host and fs_port values from the fs.default.name config variable.
+The -files and -archives options allow you to make files and archives available to the tasks. The argument is a URI to the file or archive that you have already uploaded to HDFS. These files and archives are cached across jobs. You can retrieve the host and fs_port values from the fs.default.name config variable.
 </p>
 <p>
-Here are examples of the -cacheFile option:
+Here are examples of the -files option:
 </p> 
 <source>
--cacheFile hdfs://host:fs_port/user/testfile.txt#testlink
+-files hdfs://host:fs_port/user/testfile.txt#testlink
 </source>
 <p>
 In the above example, the part of the url after # is used as the symlink name that is created in the current working directory of tasks. So the tasks will have a symlink called testlink in the cwd that points to a local copy of testfile.txt. Multiple entries can be specified as: 
 </p>
 <source>
--cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2
+-files hdfs://host:fs_port/user/testfile1.txt#testlink1 -files hdfs://host:fs_port/user/testfile2.txt#testlink2
 </source>
 <p>
-The -cacheArchive option allows you to copy jars locally to the cwd of tasks and automatically unjar the files. For example:
+The -archives option allows you to copy jars locally to the cwd of tasks and automatically unjar the files. For example:
 </p>
 <source>
--cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3
+-archives hdfs://host:fs_port/user/testfile.jar#testlink3
 </source>
 <p>
 In the example above, a symlink testlink3 is created in the current working directory of tasks. This symlink points to the directory that stores the unjarred contents of the uploaded jar file.
 </p>
 <p>
-Here's another example of the -cacheArchive option. Here, the input.txt file has two lines specifying the names of the two files: testlink/cache.txt and testlink/cache2.txt. "testlink" is a symlink to the archived directory, which has the files "cache.txt" and "cache2.txt".
+Here's another example of the -archives option. Here, the input.txt file has two lines specifying the names of the two files: testlink/cache.txt and testlink/cache2.txt. "testlink" is a symlink to the archived directory, which has the files "cache.txt" and "cache2.txt".
 </p>
 <source>
 $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
@@ -169,10 +169,10 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
                   -mapper "xargs cat"  \
                   -reducer "cat"  \
                   -output "/user/me/samples/cachefile/out" \  
-                  -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \  
-                  -jobconf mapred.map.tasks=1 \
-                  -jobconf mapred.reduce.tasks=1 \ 
-                  -jobconf mapred.job.name="Experiment"
+                  -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \  
+                  -D mapred.map.tasks=1 \
+                  -D mapred.reduce.tasks=1 \ 
+                  -D mapred.job.name="Experiment"
 
 $ ls test_jar/
 cache.txt  cache2.txt
@@ -208,7 +208,7 @@ This is just the second cache string
 <section>
 <title>Specifying Additional Configuration Variables for Jobs </title>
 <p>
-You can specify additional configuration variables by using "-jobconf  &lt;n&gt;=&lt;v&gt;". For example: 
+You can specify additional configuration variables by using "-D  &lt;n&gt;=&lt;v&gt;". For example: 
 </p>
 <source>
 $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
@@ -216,10 +216,10 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -output myOutputDir \
     -mapper org.apache.hadoop.mapred.lib.IdentityMapper\
     -reducer /bin/wc \
-    -jobconf mapred.reduce.tasks=2
+    -D mapred.reduce.tasks=2
 </source>
 <p>
-The -jobconf mapred.reduce.tasks=2 in the above example specifies to use two reducers for the job.
+The -D mapred.reduce.tasks=2 in the above example specifies to use two reducers for the job.
 </p>
 <p>
 For more details on the jobconf parameters see:
@@ -233,36 +233,46 @@ Other options you may specify for a streaming job are described here:
 </p>
 <table>
 <tr><th>Parameter</th><th>Optional/Required </th><th>Description </th></tr>
-<tr><td> -cluster name </td><td> Optional </td><td> Switch between local Hadoop and one or more remote clusters </td></tr>
-
-<tr><td> -dfs  host:port or local </td><td> Optional </td><td> Override the HDFS configuration for the job </td></tr>
-<tr><td> -jt host:port or local </td><td> Optional </td><td> Override the JobTracker configuration for the job </td></tr>
-<tr><td> -additionalconfspec specfile </td><td> Optional </td><td> Specifies a set of configuration variables in an XML file like hadoop-site.xml, instead of using multiple  options of type "-jobconf name=value" </td></tr>
 
 <tr><td> -cmdenv   name=value </td><td> Optional </td><td> Pass env var to streaming commands </td></tr>
-<tr><td> -cacheFile fileNameURI </td><td> Optional </td><td> Specify a file to be uploaded to the HDFS </td></tr>
-<tr><td> -cacheArchive fileNameURI </td><td> Optional </td><td> Specify a jar file to be uploaded to the HDFS. This jar file is unjarred automatically in the cwd of the task </td></tr>
 
 <tr><td> -inputreader JavaClassName </td><td> Optional </td><td> For backwards-compatibility: specifies a record reader class (instead of an input format class) </td></tr>
 <tr><td> -verbose </td><td> Optional </td><td> Verbose output </td></tr>
 </table>
 <p>
-To switch between "local" Hadoop and one or more remote Hadoop clusters use -cluster &lt;name&gt;.
-By default, hadoop-default.xml and hadoop-site.xml are used. The -cluster &lt;name&gt; option will cause $HADOOP_HOME/conf/hadoop-&lt;name&gt;.xml to be used instead.
+Streaming support Hadoop generic command line options. 
+
+Supported parameters are : 
+The general command line syntax is :
+<br/>    bin/hadoop command [genericOptions] [commandOptions]
 </p>
+
+<table>
+<tr><th>Parameter</th><th>Optional/Required </th><th>Description </th></tr>
+
+<tr><td> -conf  configuration_file </td><td> Optional </td><td> specify an application configuration file </td></tr>
+<tr><td> -D  property=value </td><td> Optional </td><td> use value for given property </td></tr>
+<tr><td> -fs host:port or local </td><td> Optional </td><td> specify a namenode </td></tr>
+<tr><td> -jt host:port or local </td><td> Optional </td><td> specify a job tracker </td></tr>
+<tr><td> -files </td><td> Optional </td><td> specify comma separated files to be copied to the map reduce cluster </td></tr>
+<tr><td> -archives </td><td> Optional </td><td> specify comma separated archives to be unarchived on the compute machines </td></tr>
+<tr><td>  </td><td> Optional </td><td>  </td></tr>
+<tr><td> -jt host:port or local </td><td> Optional </td><td>  </td></tr>
+</table>
+
 <p>
 To change the local temp directory use:
 </p>
 <source>
-  -jobconf dfs.data.dir=/tmp
+  -D dfs.data.dir=/tmp
 </source>
 <p>
 To specify additional local temp directories use:
 </p>
 <source>
-   -jobconf mapred.local.dir=/tmp/local
-   -jobconf mapred.system.dir=/tmp/system
-   -jobconf mapred.temp.dir=/tmp/temp
+   -D mapred.local.dir=/tmp/local
+   -D mapred.system.dir=/tmp/system
+   -D mapred.temp.dir=/tmp/temp
 </source>
 <p>
 For more details on jobconf parameters see:
@@ -294,13 +304,13 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -output myOutputDir \
     -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
     -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-    -jobconf stream.map.output.field.separator=. \
-    -jobconf stream.num.map.output.key.fields=4 
+    -D stream.map.output.field.separator=. \
+    -D stream.num.map.output.key.fields=4 
 </source>
 <p>
-In the above example, "-jobconf stream.map.output.field.separator=." specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).
+In the above example, "-D stream.map.output.field.separator=." specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).
 </p><p>
-Similarly, you can use "-jobconf stream.reduce.output.field.separator=SEP" and "-jobconf stream.num.reduce.output.fields=NUM" to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.
+Similarly, you can use "-D stream.reduce.output.field.separator=SEP" and "-D stream.num.reduce.output.fields=NUM" to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.
 </p>
 </section>
 
@@ -317,16 +327,16 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
     -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
     -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-    -jobconf stream.map.output.field.separator=. \
-    -jobconf stream.num.map.output.key.fields=4 \
-    -jobconf map.output.key.field.separator=. \
-    -jobconf num.key.fields.for.partition=2 \
-    -jobconf mapred.reduce.tasks=12
+    -D stream.map.output.field.separator=. \
+    -D stream.num.map.output.key.fields=4 \
+    -D map.output.key.field.separator=. \
+    -D num.key.fields.for.partition=2 \
+    -D mapred.reduce.tasks=12
 </source>
 <p>
-Here, <em>-jobconf stream.map.output.field.separator=.</em> and <em>-jobconf stream.num.map.output.key.fields=4</em> are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper. 
+Here, <em>-D stream.map.output.field.separator=.</em> and <em>-D stream.num.map.output.key.fields=4</em> are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper. 
 </p><p>
-The map output keys of the above Map/Reduce job normally have four fields separated by ".". However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the <em>-jobconf num.key.fields.for.partition=2</em> option. Here, <em>-jobconf map.output.key.field.separator=.</em> specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
+The map output keys of the above Map/Reduce job normally have four fields separated by ".". However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the <em>-D num.key.fields.for.partition=2</em> option. Here, <em>-D map.output.key.field.separator=.</em> specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
 </p><p>
 <em>This is effectively equivalent to specifying the first two fields as the primary key and the next two fields as the secondary. The primary key is used for partitioning, and the combination of the primary and secondary keys is used for sorting.</em> A simple illustration is shown here:
 </p>
@@ -383,7 +393,7 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -mapper myAggregatorForKeyCount.py \
     -reducer aggregate \
     -file myAggregatorForKeyCount.py \
-    -jobconf mapred.reduce.tasks=12
+    -D mapred.reduce.tasks=12
 </source>
 <p>
 The python program myAggregatorForKeyCount.py looks like:
@@ -423,18 +433,18 @@ $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
     -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
     -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
     -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-    -jobconf map.output.key.field.separa=. \
-    -jobconf num.key.fields.for.partition=2 \
-    -jobconf mapred.data.field.separator=. \
-    -jobconf map.output.key.value.fields.spec=6,5,1-3:0- \
-    -jobconf reduce.output.key.value.fields.spec=0-2:5- \
-    -jobconf mapred.reduce.tasks=12
+    -D map.output.key.field.separa=. \
+    -D num.key.fields.for.partition=2 \
+    -D mapred.data.field.separator=. \
+    -D map.output.key.value.fields.spec=6,5,1-3:0- \
+    -D reduce.output.key.value.fields.spec=0-2:5- \
+    -D mapred.reduce.tasks=12
 </source>
 <p>
-The option "-jobconf map.output.key.value.fields.spec=6,5,1-3:0-" specifies key/value selection for the map outputs. Key selection spec and value selection spec are separated by ":". In this case, the map output key will consist of fields 6, 5, 1, 2, and 3. The map output value will consist of all fields (0- means field 0 and all 
+The option "-D map.output.key.value.fields.spec=6,5,1-3:0-" specifies key/value selection for the map outputs. Key selection spec and value selection spec are separated by ":". In this case, the map output key will consist of fields 6, 5, 1, 2, and 3. The map output value will consist of all fields (0- means field 0 and all 
 the subsequent fields). 
 </p><p>
-The option "-jobconf reduce.output.key.value.fields.spec=0-2:0-" specifies key/value selection for the reduce outputs. In this case, the reduce output key will consist of fields 0, 1, 2 (corresponding to the original fields 6, 5, 1). The reduce output value will consist of all fields starting from field 5 (corresponding to all the original fields).  
+The option "-D reduce.output.key.value.fields.spec=0-2:0-" specifies key/value selection for the reduce outputs. In this case, the reduce output key will consist of fields 0, 1, 2 (corresponding to the original fields 6, 5, 1). The reduce output value will consist of all fields starting from field 5 (corresponding to all the original fields).  
 </p>
 </section>
 </section>
@@ -504,7 +514,7 @@ $ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
     -input /user/me/samples/student_marks 
     -mapper \"$c2\" -reducer 'cat'  
     -output /user/me/samples/student_out 
-    -jobconf mapred.job.name='Experiment'
+    -D mapred.job.name='Experiment'
 
 $ hadoop dfs -ls samples/student_out
 Found 1 items/user/me/samples/student_out/part-00000    &lt;r 3&gt;   16
@@ -530,7 +540,7 @@ Currently this does not work and gives an "java.io.IOException: Broken pipe" err
 The jar packaging happens in a directory pointed to by the configuration variable stream.tmpdir. The default value of stream.tmpdir is /tmp. Set the value to a directory with more space:
 </p>
 <source>
--jobconf stream.tmpdir=/export/bigspace/...
+-D stream.tmpdir=/export/bigspace/...
 </source>
 </section>
 
@@ -546,7 +556,7 @@ You can specify multiple input directories with multiple '-input' options:
 <section>
 <title>How do I generate output files with gzip format? </title>
 <p>
-Instead of plain text files, you can generate gzip files as your generated output. Pass '-jobconf mapred.output.compress=true -jobconf  mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode' as option to your streaming job.
+Instead of plain text files, you can generate gzip files as your generated output. Pass '-D mapred.output.compress=true -D  mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode' as option to your streaming job.
 </p>
 </section>
 

+ 75 - 41
src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java

@@ -23,6 +23,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.StringTokenizer;
 
 import org.apache.commons.cli2.CommandLine;
@@ -31,7 +33,10 @@ import org.apache.commons.cli2.builder.ArgumentBuilder;
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,13 +53,25 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
 
 /**
  * The main entry point and job submitter. It may either be used as a command
  * line-based or API-based method to launch Pipes jobs.
  */
-public class Submitter {
+public class Submitter extends Configured implements Tool {
 
+  protected static final Log LOG = LogFactory.getLog(Submitter.class);
+  
+  public Submitter() {
+    this(new Configuration());
+  }
+  
+  public Submitter(Configuration conf) {
+    setConf(conf);
+  }
+  
   /**
    * Get the URI of the application's executable.
    * @param conf
@@ -334,11 +351,6 @@ public class Submitter {
       // The CLI package should do this for us, but I can't figure out how
       // to make it print something reasonable.
       System.out.println("bin/hadoop pipes");
-      System.out.println("  [-conf <path>]  // Configuration for job");
-      System.out.println("  [-jobconf <key=value>, <key=value>, ...]" +
-                         "  // add/override configuration for job." +
-                         " (Multiple comma delimited key=value pairs" +
-                         " can be passed)");
       System.out.println("  [-input <path>] // Input directory");
       System.out.println("  [-output <path>] // Output directory");
       System.out.println("  [-jar <jar file> // jar filename");
@@ -349,6 +361,8 @@ public class Submitter {
       System.out.println("  [-writer <class>] // Java RecordWriter");
       System.out.println("  [-program <executable>] // executable URI");
       System.out.println("  [-reduces <num>] // number of reduces");
+      System.out.println();
+      GenericOptionsParser.printGenericCommandUsage(System.out);
     }
   }
   
@@ -360,19 +374,16 @@ public class Submitter {
     return conf.getClassByName((String) cl.getValue(key)).asSubclass(cls);
   }
 
-  /**
-   * Submit a pipes job based on the command line arguments.
-   * @param args
-   */
-  public static void main(String[] args) throws Exception {
+  @Override
+  public int run(String[] args) throws Exception {
     CommandLineParser cli = new CommandLineParser();
     if (args.length == 0) {
       cli.printUsage();
-      return;
+      return 1;
     }
     cli.addOption("input", false, "input path to the maps", "path");
     cli.addOption("output", false, "output path from the reduces", "path");
-    cli.addOption("conf", false, "job xml configuration file", "path");
+    
     cli.addOption("jar", false, "job jar file", "path");
     cli.addOption("inputformat", false, "java classname of InputFormat", 
                   "class");
@@ -385,79 +396,102 @@ public class Submitter {
     cli.addOption("program", false, "URI to application executable", "class");
     cli.addOption("reduces", false, "number of reduces", "num");
     cli.addOption("jobconf", false, 
-        "\"n1=v1,n2=v2,..\" Optional. Add or override a JobConf property.",
+        "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.",
         "key=val");
     Parser parser = cli.createParser();
     try {
-      CommandLine results = parser.parse(args);
-      JobConf conf = new JobConf();
-      if (results.hasOption("-conf")) {
-        conf.addResource(new Path((String) results.getValue("-conf")));
-      }
+      
+      GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args);
+      CommandLine results = parser.parse(genericParser.getRemainingArgs());
+      
+      JobConf job = new JobConf(getConf());
+      
       if (results.hasOption("-input")) {
-        FileInputFormat.setInputPaths(conf, 
+        FileInputFormat.setInputPaths(job, 
                           (String) results.getValue("-input"));
       }
       if (results.hasOption("-output")) {
-        FileOutputFormat.setOutputPath(conf, 
+        FileOutputFormat.setOutputPath(job, 
           new Path((String) results.getValue("-output")));
       }
       if (results.hasOption("-jar")) {
-        conf.setJar((String) results.getValue("-jar"));
+        job.setJar((String) results.getValue("-jar"));
       }
       if (results.hasOption("-inputformat")) {
-        setIsJavaRecordReader(conf, true);
-        conf.setInputFormat(getClass(results, "-inputformat", conf,
+        setIsJavaRecordReader(job, true);
+        job.setInputFormat(getClass(results, "-inputformat", job,
                                      InputFormat.class));
       }
       if (results.hasOption("-javareader")) {
-        setIsJavaRecordReader(conf, true);
+        setIsJavaRecordReader(job, true);
       }
       if (results.hasOption("-map")) {
-        setIsJavaMapper(conf, true);
-        conf.setMapperClass(getClass(results, "-map", conf, Mapper.class));
+        setIsJavaMapper(job, true);
+        job.setMapperClass(getClass(results, "-map", job, Mapper.class));
       }
       if (results.hasOption("-partitioner")) {
-        conf.setPartitionerClass(getClass(results, "-partitioner", conf,
+        job.setPartitionerClass(getClass(results, "-partitioner", job,
                                           Partitioner.class));
       }
       if (results.hasOption("-reduce")) {
-        setIsJavaReducer(conf, true);
-        conf.setReducerClass(getClass(results, "-reduce", conf, Reducer.class));
+        setIsJavaReducer(job, true);
+        job.setReducerClass(getClass(results, "-reduce", job, Reducer.class));
       }
       if (results.hasOption("-reduces")) {
-        conf.setNumReduceTasks(Integer.parseInt((String) 
+        job.setNumReduceTasks(Integer.parseInt((String) 
                                                 results.getValue("-reduces")));
       }
       if (results.hasOption("-writer")) {
-        setIsJavaRecordWriter(conf, true);
-        conf.setOutputFormat(getClass(results, "-writer", conf, 
+        setIsJavaRecordWriter(job, true);
+        job.setOutputFormat(getClass(results, "-writer", job, 
                                       OutputFormat.class));
       }
       if (results.hasOption("-program")) {
-        setExecutable(conf, (String) results.getValue("-program"));
+        setExecutable(job, (String) results.getValue("-program"));
       }
       if (results.hasOption("-jobconf")) {
+        LOG.warn("-jobconf option is deprecated, please use -D instead.");
         String options = (String)results.getValue("-jobconf");
         StringTokenizer tokenizer = new StringTokenizer(options, ",");
         while (tokenizer.hasMoreTokens()) {
           String keyVal = tokenizer.nextToken().trim();
           String[] keyValSplit = keyVal.split("=");
-          conf.set(keyValSplit[0], keyValSplit[1]);
+          job.set(keyValSplit[0], keyValSplit[1]);
         }
       }
       // if they gave us a jar file, include it into the class path
-      String jarFile = conf.getJar();
+      String jarFile = job.getJar();
       if (jarFile != null) {
+        final URL[] urls = new URL[]{ FileSystem.getLocal(job).
+            pathToFile(new Path(jarFile)).toURL()};
+        //FindBugs complains that creating a URLClassLoader should be
+        //in a doPrivileged() block. 
         ClassLoader loader =
-          new URLClassLoader(new URL[]{ FileSystem.getLocal(conf).
-                                        pathToFile(new Path(jarFile)).toURL()});
-        conf.setClassLoader(loader);
+          AccessController.doPrivileged(
+              new PrivilegedAction<ClassLoader>() {
+                public ClassLoader run() {
+                  return new URLClassLoader(urls);
+                }
+              }
+            );
+        job.setClassLoader(loader);
       }
-      runJob(conf);
+      
+      runJob(job);
+      return 0;
     } catch (OptionException oe) {
       cli.printUsage();
+      return 1;
     }
+    
+  }
+  
+  /**
+   * Submit a pipes job based on the command line arguments.
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    new Submitter().run(args);
   }
 
 }

+ 9 - 2
src/mapred/org/apache/hadoop/mapred/pipes/package.html

@@ -35,7 +35,6 @@ output directory. The cli for the main looks like:
 
 <pre>
 bin/hadoop pipes \
-  [-conf <i>path</i>] \
   [-input <i>inputDir</i>] \
   [-output <i>outputDir</i>] \
   [-jar <i>applicationJarFile</i>] \
@@ -44,9 +43,17 @@ bin/hadoop pipes \
   [-partitioner <i>class</i>] \
   [-reduce <i>class</i>] \
   [-writer <i>class</i>] \
-  [-program <i>program url</i>]
+  [-program <i>program url</i>] \ 
+  [-conf <i>configuration file</i>] \
+  [-D <i>property=value</i>] \
+  [-fs <i>local|namenode:port</i>] \
+  [-jt <i>local|jobtracker:port</i>] \
+  [-files <i>comma separated list of files</i>] \ 
+  [-libjars <i>comma separated list of jars</i>] \
+  [-archives <i>comma separated list of archives</i>] 
 </pre>
 
+
 <p>
 
 The application programs link against a thin C++ wrapper library that

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است