Преглед на файлове

HADOOP-709. Fix contrib/streaming to work with commands that contain control characters. Contributed by Dhruba.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@477430 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting преди 18 години
родител
ревизия
343bec67df

+ 3 - 0
CHANGES.txt

@@ -103,6 +103,9 @@ Trunk (unreleased changes)
 31. HADOOP-733.  Make exit codes in DFShell consistent and add a unit
     test.  (Dhruba Borthakur via cutting)
 
+32. HADOOP-709.  Fix contrib/streaming to work with commands that
+    contain control characters.  (Dhruba Borthakur via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 13 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java

@@ -19,7 +19,9 @@
 package org.apache.hadoop.streaming;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.Iterator;
+import java.net.URLDecoder;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -45,7 +47,17 @@ import org.apache.hadoop.io.WritableComparable;
 public class PipeCombiner extends PipeReducer {
 
   String getPipeCommand(JobConf job) {
-    return job.get("stream.combine.streamprocessor");
+    String str = job.get("stream.combine.streamprocessor");
+    if (str == null) {
+      System.err.println("X1003");
+      return str;
+    }
+    try {
+      return URLDecoder.decode(str, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+        System.err.println("stream.combine.streamprocessor in jobconf not found");
+        return null;
+    }
   }
 
 }

+ 12 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.net.URLDecoder;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -35,7 +36,17 @@ import org.apache.hadoop.io.Writable;
 public class PipeMapper extends PipeMapRed implements Mapper {
 
   String getPipeCommand(JobConf job) {
-    return job.get("stream.map.streamprocessor");
+    String str = job.get("stream.map.streamprocessor");
+    if (str == null) {
+      return str;
+    }
+    try {
+      return URLDecoder.decode(str, "UTF-8");
+    }
+    catch (UnsupportedEncodingException e) {
+      System.err.println("stream.map.streamprocessor in jobconf not found");
+      return null;
+    }
   }
 
   String getKeyColPropName() {

+ 12 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -19,7 +19,9 @@
 package org.apache.hadoop.streaming;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.Iterator;
+import java.net.URLDecoder;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
@@ -36,7 +38,16 @@ import org.apache.hadoop.io.Writable;
 public class PipeReducer extends PipeMapRed implements Reducer {
 
   String getPipeCommand(JobConf job) {
-    return job.get("stream.reduce.streamprocessor");
+    String str = job.get("stream.reduce.streamprocessor");
+    if (str == null) {
+      return str;
+    }
+    try {
+      return URLDecoder.decode(str, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+        System.err.println("stream.reduce.streamprocessor in jobconf not found");
+        return null;
+    }
   }
 
   boolean getDoPipe() {

+ 4 - 3
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -549,7 +550,7 @@ public class StreamJob {
       jobConf_.setMapperClass(c);
     } else {
       jobConf_.setMapperClass(PipeMapper.class);
-      jobConf_.set("stream.map.streamprocessor", mapCmd_);
+      jobConf_.set("stream.map.streamprocessor", URLEncoder.encode(mapCmd_, "UTF-8"));
     }
 
     if (comCmd_ != null) {
@@ -558,7 +559,7 @@ public class StreamJob {
         jobConf_.setCombinerClass(c);
       } else {
         jobConf_.setCombinerClass(PipeCombiner.class);
-        jobConf_.set("stream.combine.streamprocessor", comCmd_);
+        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(comCmd_, "UTF-8"));
       }
     }
 
@@ -570,7 +571,7 @@ public class StreamJob {
         jobConf_.setReducerClass(c);
       } else {
         jobConf_.setReducerClass(PipeReducer.class);
-        jobConf_.set("stream.reduce.streamprocessor", redCmd_);
+        jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(redCmd_, "UTF-8"));
       }
     }
 

+ 9 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java

@@ -455,6 +455,15 @@ public class StreamUtil {
 
   static boolean getUseMapSideEffect(JobConf job) {
     String reduce = job.get("stream.reduce.streamprocessor");
+    if (reduce == null) {
+      return false;
+    }
+    try {
+      reduce = URLDecoder.decode(reduce, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      System.err.println("stream.reduce.streamprocessor in jobconf not found");
+      return false;
+    }
     return StreamJob.REDUCE_NONE.equals(reduce);
   }