|
@@ -314,10 +314,15 @@ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
|
|
|
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
|
|
|
-reducer /bin/wc
|
|
|
</pre>
|
|
|
+<p>User can specify <span class="codefrag">stream.non.zero.exit.is.failure</span> as
|
|
|
+<span class="codefrag">true</span> or <span class="codefrag">false</span> to make a streaming task that exits
|
|
|
+with a non-zero status to be <span class="codefrag">Failure</span>
|
|
|
+or <span class="codefrag">Success</span> respectively. By default, streaming tasks exiting
|
|
|
+with non-zero status are considered to be failed tasks.</p>
|
|
|
</div>
|
|
|
|
|
|
|
|
|
-<a name="N10047"></a><a name="Package+Files+With+Job+Submissions"></a>
|
|
|
+<a name="N10059"></a><a name="Package+Files+With+Job+Submissions"></a>
|
|
|
<h2 class="h3">Package Files With Job Submissions</h2>
|
|
|
<div class="section">
|
|
|
<p>
|
|
@@ -349,10 +354,10 @@ $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
|
|
|
</div>
|
|
|
|
|
|
|
|
|
-<a name="N1005F"></a><a name="Streaming+Options+and+Usage"></a>
|
|
|
+<a name="N10071"></a><a name="Streaming+Options+and+Usage"></a>
|
|
|
<h2 class="h3">Streaming Options and Usage </h2>
|
|
|
<div class="section">
|
|
|
-<a name="N10065"></a><a name="Mapper-Only+Jobs"></a>
|
|
|
+<a name="N10077"></a><a name="Mapper-Only+Jobs"></a>
|
|
|
<h3 class="h4">Mapper-Only Jobs </h3>
|
|
|
<p>
|
|
|
Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The map/reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.
|
|
@@ -360,7 +365,7 @@ Often, you may want to process input data using a map function only. To do this,
|
|
|
<p>
|
|
|
To be backward compatible, Hadoop Streaming also supports the "-reduce NONE" option, which is equivalent to "-jobconf mapred.reduce.tasks=0".
|
|
|
</p>
|
|
|
-<a name="N10071"></a><a name="Specifying+Other+Plugins+for+Jobs"></a>
|
|
|
+<a name="N10083"></a><a name="Specifying+Other+Plugins+for+Jobs"></a>
|
|
|
<h3 class="h4">Specifying Other Plugins for Jobs </h3>
|
|
|
<p>
|
|
|
Just as with a normal map/reduce job, you can specify other plugins for a streaming job:
|
|
@@ -377,7 +382,7 @@ The class you supply for the input format should return key/value pairs of Text
|
|
|
<p>
|
|
|
The class you supply for the output format is expected to take key/value pairs of Text class. If you do not specify an output format class, the TextOutputFormat is used as the default.
|
|
|
</p>
|
|
|
-<a name="N10084"></a><a name="Large+files+and+archives+in+Hadoop+Streaming"></a>
|
|
|
+<a name="N10096"></a><a name="Large+files+and+archives+in+Hadoop+Streaming"></a>
|
|
|
<h3 class="h4">Large files and archives in Hadoop Streaming </h3>
|
|
|
<p>
|
|
|
The -cacheFile and -cacheArchive options allow you to make files and archives available to the tasks. The argument is a URI to the file or archive that you have already uploaded to HDFS. These files and archives are cached across jobs. You can retrieve the host and fs_port values from the fs.default.name config variable.
|
|
@@ -446,7 +451,7 @@ This is just the cache string
|
|
|
This is just the second cache string
|
|
|
|
|
|
</pre>
|
|
|
-<a name="N100AD"></a><a name="Specifying+Additional+Configuration+Variables+for+Jobs"></a>
|
|
|
+<a name="N100BF"></a><a name="Specifying+Additional+Configuration+Variables+for+Jobs"></a>
|
|
|
<h3 class="h4">Specifying Additional Configuration Variables for Jobs </h3>
|
|
|
<p>
|
|
|
You can specify additional configuration variables by using "-jobconf <n>=<v>". For example:
|
|
@@ -465,7 +470,7 @@ The -jobconf mapred.reduce.tasks=2 in the above example specifies to use two red
|
|
|
<p>
|
|
|
For more details on the jobconf parameters see: <a href="http://wiki.apache.org/hadoop/JobConfFile">http://wiki.apache.org/hadoop/JobConfFile</a>
|
|
|
</p>
|
|
|
-<a name="N100C4"></a><a name="Other+Supported+Options"></a>
|
|
|
+<a name="N100D6"></a><a name="Other+Supported+Options"></a>
|
|
|
<h3 class="h4">Other Supported Options </h3>
|
|
|
<p>
|
|
|
Other options you may specify for a streaming job are described here:
|
|
@@ -547,10 +552,10 @@ To set an environment variable in a streaming command use:
|
|
|
</div>
|
|
|
|
|
|
|
|
|
-<a name="N1017C"></a><a name="More+usage+examples"></a>
|
|
|
+<a name="N1018E"></a><a name="More+usage+examples"></a>
|
|
|
<h2 class="h3">More usage examples </h2>
|
|
|
<div class="section">
|
|
|
-<a name="N10182"></a><a name="Customizing+the+Way+to+Split+Lines+into+Key%2FValue+Pairs"></a>
|
|
|
+<a name="N10194"></a><a name="Customizing+the+Way+to+Split+Lines+into+Key%2FValue+Pairs"></a>
|
|
|
<h3 class="h4">Customizing the Way to Split Lines into Key/Value Pairs </h3>
|
|
|
<p>
|
|
|
As noted earlier, when the map/reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the the rest of the line (excluding the tab character) is the value.
|
|
@@ -573,7 +578,7 @@ In the above example, "-jobconf stream.map.output.field.separator=." specifies "
|
|
|
<p>
|
|
|
Similarly, you can use "-jobconf stream.reduce.output.field.separator=SEP" and "-jobconf stream.num.reduce.output.fields=NUM" to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.
|
|
|
</p>
|
|
|
-<a name="N10198"></a><a name="A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29"></a>
|
|
|
+<a name="N101AA"></a><a name="A+Useful+Partitioner+Class+%28secondary+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29"></a>
|
|
|
<h3 class="h4">A Useful Partitioner Class (secondary sort, the -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner option) </h3>
|
|
|
<p>
|
|
|
Hadoop has a library class, org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, that is useful for many applications. This class allows the map/reduce framework to partition the map outputs based on prefixes of keys, not the whole keys. For example:
|
|
@@ -633,7 +638,7 @@ Sorting within each partition for the reducer(all 4 fields used for sorting)</p>
|
|
|
11.14.2.2
|
|
|
11.14.2.3
|
|
|
</pre>
|
|
|
-<a name="N101CE"></a><a name="Working+with+the+Hadoop+Aggregate+Package+%28the+-reduce+aggregate+option%29"></a>
|
|
|
+<a name="N101E0"></a><a name="Working+with+the+Hadoop+Aggregate+Package+%28the+-reduce+aggregate+option%29"></a>
|
|
|
<h3 class="h4">Working with the Hadoop Aggregate Package (the -reduce aggregate option) </h3>
|
|
|
<p>
|
|
|
Hadoop has a library package called "Aggregate" (<a href="https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate">https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate</a>). Aggregate provides a special reducer class and a special combiner class, and a list of simple aggregators that perform aggregations such as "sum", "max", "min" and so on over a sequence of values. Aggregate allows you to define a mapper plugin class that is expected to generate "aggregatable items" for each input key/value pair of the mappers. The combiner/reducer will aggregate those aggregatable items by invoking the appropriate aggregators.
|
|
@@ -674,7 +679,7 @@ def main(argv):
|
|
|
if __name__ == "__main__":
|
|
|
main(sys.argv)
|
|
|
</pre>
|
|
|
-<a name="N101E9"></a><a name="Field+Selection+%28+similar+to+unix+%27cut%27+command%29"></a>
|
|
|
+<a name="N101FB"></a><a name="Field+Selection+%28+similar+to+unix+%27cut%27+command%29"></a>
|
|
|
<h3 class="h4">Field Selection ( similar to unix 'cut' command) </h3>
|
|
|
<p>
|
|
|
Hadoop has a library class, org.apache.hadoop.mapred.lib.FieldSelectionMapReduce, that effectively allows you to process text data like the unix "cut" utility. The map function defined in the class treats each input key/value pair as a list of fields. You can specify the field separator (the default is the tab character). You can select an arbitrary list of fields as the map output key, and an arbitrary list of fields as the map output value. Similarly, the reduce function defined in the class treats each input key/value pair as a list of fields. You can select an arbitrary list of fields as the reduce output key, and an arbitrary list of fields as the reduce output value. For example:
|
|
@@ -703,15 +708,15 @@ The option "-jobconf reduce.output.key.value.fields.spec=0-2:0-" specifies key/v
|
|
|
</div>
|
|
|
|
|
|
|
|
|
-<a name="N101FD"></a><a name="Frequently+Asked+Questions"></a>
|
|
|
+<a name="N1020F"></a><a name="Frequently+Asked+Questions"></a>
|
|
|
<h2 class="h3">Frequently Asked Questions </h2>
|
|
|
<div class="section">
|
|
|
-<a name="N10203"></a><a name="How+do+I+use+Hadoop+Streaming+to+run+an+arbitrary+set+of+%28semi-%29independent+tasks%3F"></a>
|
|
|
+<a name="N10215"></a><a name="How+do+I+use+Hadoop+Streaming+to+run+an+arbitrary+set+of+%28semi-%29independent+tasks%3F"></a>
|
|
|
<h3 class="h4">How do I use Hadoop Streaming to run an arbitrary set of (semi-)independent tasks? </h3>
|
|
|
<p>
|
|
|
Often you do not need the full power of Map Reduce, but only need to run multiple instances of the same program - either on different parts of the data, or on the same data, but with different parameters. You can use Hadoop Streaming to do this.
|
|
|
</p>
|
|
|
-<a name="N1020D"></a><a name="How+do+I+process+files%2C+one+per+map%3F"></a>
|
|
|
+<a name="N1021F"></a><a name="How+do+I+process+files%2C+one+per+map%3F"></a>
|
|
|
<h3 class="h4">How do I process files, one per map? </h3>
|
|
|
<p>
|
|
|
As an example, consider the problem of zipping (compressing) a set of files across the hadoop cluster. You can achieve this using either of these methods:
|
|
@@ -755,13 +760,13 @@ As an example, consider the problem of zipping (compressing) a set of files acro
|
|
|
</li>
|
|
|
|
|
|
</ol>
|
|
|
-<a name="N10238"></a><a name="How+many+reducers+should+I+use%3F"></a>
|
|
|
+<a name="N1024A"></a><a name="How+many+reducers+should+I+use%3F"></a>
|
|
|
<h3 class="h4">How many reducers should I use? </h3>
|
|
|
<p>
|
|
|
See the Hadoop Wiki for details: <a href="http://wiki.apache.org/hadoop/HowManyMapsAndReduces">http://wiki.apache.org/hadoop/HowManyMapsAndReduces</a>
|
|
|
|
|
|
</p>
|
|
|
-<a name="N10246"></a><a name="If+I+set+up+an+alias+in+my+shell+script%2C+will+that+work+after+-mapper%2C+i.e.+say+I+do%3A+alias+c1%3D%27cut+-f1%27.+Will+-mapper+%22c1%22+work%3F"></a>
|
|
|
+<a name="N10258"></a><a name="If+I+set+up+an+alias+in+my+shell+script%2C+will+that+work+after+-mapper%2C+i.e.+say+I+do%3A+alias+c1%3D%27cut+-f1%27.+Will+-mapper+%22c1%22+work%3F"></a>
|
|
|
<h3 class="h4">If I set up an alias in my shell script, will that work after -mapper, i.e. say I do: alias c1='cut -f1'. Will -mapper "c1" work? </h3>
|
|
|
<p>
|
|
|
Using an alias will not work, but variable substitution is allowed as shown in this example:
|
|
@@ -788,12 +793,12 @@ $ hadoop dfs -cat samples/student_out/part-00000
|
|
|
75
|
|
|
80
|
|
|
</pre>
|
|
|
-<a name="N10254"></a><a name="Can+I+use+UNIX+pipes%3F+For+example%2C+will+-mapper+%22cut+-f1+%7C+sed+s%2Ffoo%2Fbar%2Fg%22+work%3F"></a>
|
|
|
+<a name="N10266"></a><a name="Can+I+use+UNIX+pipes%3F+For+example%2C+will+-mapper+%22cut+-f1+%7C+sed+s%2Ffoo%2Fbar%2Fg%22+work%3F"></a>
|
|
|
<h3 class="h4">Can I use UNIX pipes? For example, will -mapper "cut -f1 | sed s/foo/bar/g" work?</h3>
|
|
|
<p>
|
|
|
Currently this does not work and gives an "java.io.IOException: Broken pipe" error. This is probably a bug that needs to be investigated.
|
|
|
</p>
|
|
|
-<a name="N1025E"></a><a name="When+I+run+a+streaming+job+by"></a>
|
|
|
+<a name="N10270"></a><a name="When+I+run+a+streaming+job+by"></a>
|
|
|
<h3 class="h4">When I run a streaming job by distributing large executables (for example, 3.6G) through the -file option, I get a "No space left on device" error. What do I do? </h3>
|
|
|
<p>
|
|
|
The jar packaging happens in a directory pointed to by the configuration variable stream.tmpdir. The default value of stream.tmpdir is /tmp. Set the value to a directory with more space:
|
|
@@ -801,7 +806,7 @@ The jar packaging happens in a directory pointed to by the configuration variabl
|
|
|
<pre class="code">
|
|
|
-jobconf stream.tmpdir=/export/bigspace/...
|
|
|
</pre>
|
|
|
-<a name="N1026F"></a><a name="How+do+I+specify+multiple+input+directories%3F"></a>
|
|
|
+<a name="N10281"></a><a name="How+do+I+specify+multiple+input+directories%3F"></a>
|
|
|
<h3 class="h4">How do I specify multiple input directories? </h3>
|
|
|
<p>
|
|
|
You can specify multiple input directories with multiple '-input' options:
|
|
@@ -809,17 +814,17 @@ You can specify multiple input directories with multiple '-input' options:
|
|
|
<pre class="code">
|
|
|
hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2'
|
|
|
</pre>
|
|
|
-<a name="N1027C"></a><a name="How+do+I+generate+output+files+with+gzip+format%3F"></a>
|
|
|
+<a name="N1028E"></a><a name="How+do+I+generate+output+files+with+gzip+format%3F"></a>
|
|
|
<h3 class="h4">How do I generate output files with gzip format? </h3>
|
|
|
<p>
|
|
|
Instead of plain text files, you can generate gzip files as your generated output. Pass '-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode' as option to your streaming job.
|
|
|
</p>
|
|
|
-<a name="N10286"></a><a name="How+do+I+provide+my+own+input%2Foutput+format+with+streaming%3F"></a>
|
|
|
+<a name="N10298"></a><a name="How+do+I+provide+my+own+input%2Foutput+format+with+streaming%3F"></a>
|
|
|
<h3 class="h4">How do I provide my own input/output format with streaming? </h3>
|
|
|
<p>
|
|
|
At least as late as version 0.14, Hadoop does not support multiple jar files. So, when specifying your own custom classes you will have to pack them along with the streaming jar and use the custom jar instead of the default hadoop streaming jar.
|
|
|
</p>
|
|
|
-<a name="N10290"></a><a name="How+do+I+parse+XML+documents+using+streaming%3F"></a>
|
|
|
+<a name="N102A2"></a><a name="How+do+I+parse+XML+documents+using+streaming%3F"></a>
|
|
|
<h3 class="h4">How do I parse XML documents using streaming? </h3>
|
|
|
<p>
|
|
|
You can use the record reader StreamXmlRecordReader to process XML documents.
|