소스 검색

HADOOP-1853. Fix contrib/streaming to accept multiple -cacheFile options. Contributed by Prachi Gupta.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@573744 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 년 전
부모
커밋
b4e36fc858

+ 3 - 0
CHANGES.txt

@@ -105,6 +105,9 @@ Trunk (unreleased changes)
     length, so that it is not always zero in map tasks.
     length, so that it is not always zero in map tasks.
     (Thomas Friol via cutting)
     (Thomas Friol via cutting)
 
 
+    HADOOP-1853.  Fix contrib/streaming to accept multiple -cacheFile
+    options.  (Prachi Gupta via cutting)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-1779. Replace INodeDirectory.getINode() by a getExistingPathINodes()
     HADOOP-1779. Replace INodeDirectory.getINode() by a getExistingPathINodes()

+ 2 - 0
build.xml

@@ -492,6 +492,7 @@
 
 
   <target name="test-contrib" depends="compile-core, compile-core-test">
   <target name="test-contrib" depends="compile-core, compile-core-test">
     <subant target="test">
     <subant target="test">
+       <property name="version" value="${version}"/>
        <fileset file="${contrib.dir}/build.xml"/>
        <fileset file="${contrib.dir}/build.xml"/>
     </subant> 
     </subant> 
   </target>
   </target>
@@ -746,6 +747,7 @@
   <!-- ================================================================== -->
   <!-- ================================================================== -->
   <target name="deploy-contrib" depends="compile-core">
   <target name="deploy-contrib" depends="compile-core">
      <subant target="deploy">        
      <subant target="deploy">        
+        <property name="version" value="${version}"/>
         <fileset file="src/contrib/build.xml"/>
         <fileset file="src/contrib/build.xml"/>
      </subant>  	
      </subant>  	
   </target>
   </target>

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -396,7 +396,7 @@ public class StreamJob {
     Option inputreader = createOption("inputreader", 
     Option inputreader = createOption("inputreader", 
                                       "Optional.", "spec", 1, false);
                                       "Optional.", "spec", 1, false);
     Option cacheFile = createOption("cacheFile", 
     Option cacheFile = createOption("cacheFile", 
-                                    "File name URI", "fileNameURI", 1, false);
+                                    "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
     Option cacheArchive = createOption("cacheArchive", 
     Option cacheArchive = createOption("cacheArchive", 
                                        "File name URI", "fileNameURI", 1, false);
                                        "File name URI", "fileNameURI", 1, false);
     
     

+ 138 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java

@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+/**
+ * This test case tests the symlink creation
+ * utility provided by distributed caching 
+ */
+public class TestMultipleCachefiles extends TestCase
+{
+  String INPUT_FILE = "/testing-streaming/input.txt";
+  String OUTPUT_DIR = "/testing-streaming/out";
+  String CACHE_FILE = "/testing-streaming/cache.txt";
+  String CACHE_FILE_2 = "/testing-streaming/cache2.txt";
+  String input = "check to see if we can read this none reduce";
+  String map = "xargs cat ";
+  String reduce = "cat";
+  String mapString = "testlink";
+  String mapString2 = "testlink2";
+  String cacheString = "This is just the cache string";
+  String cacheString2 = "This is just the second cache string";
+  StreamJob job;
+
+  public TestMultipleCachefiles() throws IOException
+  {
+  }
+
+  public void testMultipleCachefiles()
+  {
+    try {
+      boolean mayExit = false;
+      MiniMRCluster mr = null;
+      MiniDFSCluster dfs = null; 
+      FileSystem fileSys = null;
+      try{
+        Configuration conf = new Configuration();
+        dfs = new MiniDFSCluster(conf, 1, true, null);
+        fileSys = dfs.getFileSystem();
+        String namenode = fileSys.getName();
+        mr  = new MiniMRCluster(1, namenode, 3);
+        // During tests, the default Configuration will use a local mapred
+        // So don't specify -config or -cluster
+        String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
+        String strNamenode = "fs.default.name=" + namenode;
+        String argv[] = new String[] {
+          "-input", INPUT_FILE,
+          "-output", OUTPUT_DIR,
+          "-mapper", map,
+          "-reducer", reduce,
+          //"-verbose",
+          //"-jobconf", "stream.debug=set"
+          "-jobconf", strNamenode,
+          "-jobconf", strJobtracker,
+          "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+          "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+                      "-Dbuild.test=" + System.getProperty("build.test") + " " +
+                      conf.get("mapred.child.java.opts",""),
+          "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#" + mapString,
+          "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE_2 + "#" + mapString2
+        };
+
+        fileSys.delete(new Path(OUTPUT_DIR));
+        
+        DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
+        file.writeBytes(mapString + "\n");
+        file.writeBytes(mapString2 + "\n");
+        file.close();
+        file = fileSys.create(new Path(CACHE_FILE));
+        file.writeBytes(cacheString);
+        file.close();
+        file = fileSys.create(new Path(CACHE_FILE_2));
+        file.writeBytes(cacheString2);
+        file.close();
+          
+        job = new StreamJob(argv, mayExit);     
+        job.go();
+        String line = null;
+        String line2 = null;
+        Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+        for (int i = 0; i < fileList.length; i++){
+          System.out.println(fileList[i].toString());
+          BufferedReader bread =
+            new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+          line = bread.readLine();
+          System.out.println(line);
+          line2 = bread.readLine();
+          System.out.println(line2);
+        }
+        assertEquals(cacheString + "\t", line);
+        assertEquals(cacheString2 + "\t", line2);
+      } finally{
+        if (fileSys != null) { fileSys.close(); }
+        if (dfs != null) { dfs.shutdown(); }
+        if (mr != null) { mr.shutdown();}
+      }
+      
+    } catch(Exception e) {
+      failTrace(e);
+    }
+  }
+
+  void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestMultipleCachefiles().testMultipleCachefiles();
+  }
+
+}