Browse Source

MAPREDUCE-3018. Fixed -file option for streaming. Contributed by Mahadev Konar.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173451 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 years ago
parent
commit
b8e8b8da75

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1370,6 +1370,8 @@ Release 0.23.0 - Unreleased
     YarnClientProtocolProvider and ensured MiniMRYarnCluster sets JobHistory
     configuration for tests. (acmurthy) 
 
+    MAPREDUCE-3018. Fixed -file option for streaming. (mahadev via acmurthy) 
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 17 - 8
hadoop-mapreduce-project/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -22,8 +22,10 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -43,6 +45,7 @@ import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -277,19 +280,25 @@ public class StreamJob implements Tool {
       if (values != null && values.length > 0) {
         LOG.warn("-file option is deprecated, please use generic option" +
         		" -files instead.");
-        StringBuilder unpackRegex = new StringBuilder(
-          config_.getPattern(MRJobConfig.JAR_UNPACK_PATTERN,
-                             JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern());
+
+        String fileList = null;
         for (String file : values) {
           packageFiles_.add(file);
-          String fname = new File(file).getName();
-          unpackRegex.append("|(?:").append(Pattern.quote(fname)).append(")");
+          try {
+            URI pathURI = new URI(file);
+            Path path = new Path(pathURI);
+            FileSystem localFs = FileSystem.getLocal(config_);
+            String finalPath = path.makeQualified(localFs).toString();
+            fileList = fileList == null ? finalPath : fileList + "," + finalPath;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(e);
+          }
         }
-        config_.setPattern(MRJobConfig.JAR_UNPACK_PATTERN,
-                           Pattern.compile(unpackRegex.toString()));
+        config_.set("tmpfiles", config_.get("tmpfiles", "") +
+                                  (fileList == null ? "" : fileList));
         validate(packageFiles_);
       }
-         
+
       String fsName = cmdLine.getOptionValue("dfs");
       if (null != fsName){
         LOG.warn("-dfs option is deprecated, please use -fs instead.");