Forráskód Böngészése

HADOOP-3341. Allow streaming jobs to specify the field separator for map
and reduce input and output. The new configuration values are:
stream.map.input.field.separator
stream.map.output.field.separator
stream.reduce.input.field.separator
stream.reduce.output.field.separator
All of them default to "\t". Contributed by Zheng Shao.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@672807 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 éve
szülő
commit
3ad6c01d18

+ 8 - 0
CHANGES.txt

@@ -10,6 +10,14 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HADOOP-3341. Allow streaming jobs to specify the field separator for map
+    and reduce input and output. The new configuration values are:
+      stream.map.input.field.separator
+      stream.map.output.field.separator
+      stream.reduce.input.field.separator
+      stream.reduce.output.field.separator
+    All of them default to "\t". (Zheng Shao via omalley)
+
   IMPROVEMENTS
     HADOOP-3577. Tools to inject blocks into name node and simulated
     data nodes for testing. (Sanjay Radia via hairong)

+ 1 - 1
build.xml

@@ -642,7 +642,7 @@
     <fail if="tests.failed">Tests failed!</fail>
   </target>   
 
-  <target name="test-contrib" depends="compile-core, compile-core-test" description="Run contrib unit tests">
+  <target name="test-contrib" depends="compile, compile-core-test" description="Run contrib unit tests">
     <subant target="test">
        <property name="version" value="${version}"/>
        <fileset file="${contrib.dir}/build.xml"/>

+ 1 - 1
src/contrib/build.xml

@@ -46,7 +46,7 @@
   <!-- ====================================================== -->
   <target name="test">
     <subant target="test">
-      <fileset dir="." includes="*/build.xml"/>
+      <fileset dir="." includes="streaming/build.xml"/>
     </subant>
   </target>
   

+ 13 - 19
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -53,8 +53,8 @@ public abstract class PipeMapRed {
    */
   abstract String getPipeCommand(JobConf job);
 
-  abstract char getFieldSeparator();
-  
+  abstract byte[] getFieldSeparator();
+
   abstract int getNumOfKeyFields();
 
   abstract boolean getDoPipe();
@@ -120,13 +120,6 @@ public abstract class PipeMapRed {
       job_ = job;
       fs_ = FileSystem.get(job_);
 
-      String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t");
-      String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t");
-      this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
-      this.reduceOutFieldSeparator = reduceOutputFieldSeparator.charAt(0);
-      this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1);
-      this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
-
       nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
       
       doPipe_ = getDoPipe();
@@ -317,7 +310,7 @@ public abstract class PipeMapRed {
   }
 
   /**
-   * Split a line into key and value. Assume the delimitor is a tab.
+   * Split a line into key and value.
    * @param line: a byte array of line containing UTF-8 bytes
    * @param key: key of a record
    * @param val: value of a record
@@ -325,14 +318,21 @@ public abstract class PipeMapRed {
    */
   void splitKeyVal(byte[] line, int length, Text key, Text val)
   throws IOException {
-    int pos = UTF8ByteArrayUtils.findNthByte(line, 0, length,
-                (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
+    int numKeyFields = getNumOfKeyFields();
+    byte[] separator = getFieldSeparator();
+    
+    // Need to find numKeyFields separators
+    int pos = UTF8ByteArrayUtils.findBytes(line, 0, line.length, separator);
+    for(int k=1; k<numKeyFields && pos!=-1; k++) {
+      pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length, 
+          line.length, separator);
+    }
     try {
       if (pos == -1) {
         key.set(line, 0, length);
         val.set("");
       } else {
-        UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos);
+        UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos, separator.length);
       }
     } catch (CharacterCodingException e) {
       LOG.warn(StringUtils.stringifyException(e));
@@ -647,10 +647,4 @@ public abstract class PipeMapRed {
   String LOGNAME;
   PrintStream log_;
 
-  protected char mapOutputFieldSeparator = '\t';
-  protected char reduceOutFieldSeparator = '\t';
-  protected int numOfMapOutputKeyFields = 1;
-  protected int numOfMapOutputPartitionFields = 1;
-  protected int numOfReduceOutputKeyFields = 1;
-  
 }

+ 21 - 5
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -34,6 +34,10 @@ import org.apache.hadoop.util.StringUtils;
 public class PipeMapper extends PipeMapRed implements Mapper {
 
   private boolean ignoreKey = false;
+
+  private byte[] mapOutputFieldSeparator;
+  private byte[] mapInputFieldSeparator;
+  private int numOfMapOutputKeyFields = 1;
   
   String getPipeCommand(JobConf job) {
     String str = job.get("stream.map.streamprocessor");
@@ -56,7 +60,15 @@ public class PipeMapper extends PipeMapRed implements Mapper {
   public void configure(JobConf job) {
     super.configure(job);
     String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
-    this.ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+    ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+
+    try {
+      mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
+      mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
+      numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
+    }
   }
 
   // Do NOT declare default constructor
@@ -85,7 +97,7 @@ public class PipeMapper extends PipeMapRed implements Mapper {
       if (numExceptions_ == 0) {
         if (!this.ignoreKey) {
           write(key);
-          clientOut_.write('\t');
+          clientOut_.write(getInputSeparator());
         }
         write(value);
         clientOut_.write('\n');
@@ -112,14 +124,18 @@ public class PipeMapper extends PipeMapRed implements Mapper {
     mapRedFinished();
   }
 
+  byte[] getInputSeparator() {
+    return mapInputFieldSeparator;
+  }
+
   @Override
-  char getFieldSeparator() {
-    return super.mapOutputFieldSeparator;
+  byte[] getFieldSeparator() {
+    return mapOutputFieldSeparator;
   }
 
   @Override
   int getNumOfKeyFields() {
-    return super.numOfMapOutputKeyFields;
+    return numOfMapOutputKeyFields;
   }
 
 }

+ 25 - 5
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -36,6 +36,10 @@ import org.apache.hadoop.io.Writable;
  */
 public class PipeReducer extends PipeMapRed implements Reducer {
 
+  private byte[] reduceOutFieldSeparator;
+  private byte[] reduceInputFieldSeparator;
+  private int numOfReduceOutputKeyFields = 1;
+  
   String getPipeCommand(JobConf job) {
     String str = job.get("stream.reduce.streamprocessor");
     if (str == null) {
@@ -55,6 +59,18 @@ public class PipeReducer extends PipeMapRed implements Reducer {
     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
   }
 
+  public void configure(JobConf job) {
+    super.configure(job);
+
+    try {
+      reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
+      reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
+      this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
+    }
+  }
+
   public void reduce(Object key, Iterator values, OutputCollector output,
                      Reporter reporter) throws IOException {
 
@@ -75,7 +91,7 @@ public class PipeReducer extends PipeMapRed implements Reducer {
                                                                     outerrThreadsThrowable));
           }
           write(key);
-          clientOut_.write('\t');
+          clientOut_.write(getInputSeparator());
           write(val);
           clientOut_.write('\n');
         } else {
@@ -109,14 +125,18 @@ public class PipeReducer extends PipeMapRed implements Reducer {
     mapRedFinished();
   }
 
-  @Override
-  char getFieldSeparator() {
-    return super.reduceOutFieldSeparator;
+  byte[] getInputSeparator() {
+    return reduceInputFieldSeparator;
   }
 
+  @Override
+  byte[] getFieldSeparator() {
+    return reduceOutFieldSeparator;
+  }
+  
   @Override
   int getNumOfKeyFields() {
-    return super.numOfReduceOutputKeyFields;
+    return numOfReduceOutputKeyFields;
   }
 
 }

+ 66 - 7
src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java

@@ -60,7 +60,32 @@ public class UTF8ByteArrayUtils {
     }
     return -1;      
   }
-  
+
+  /**
+   * Find the first occurrence of the given bytes b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param end ending position
+   * @param b the bytes to find
+   * @return position that first byte occures otherwise -1
+   */
+  public static int findBytes(byte [] utf, int start, int end, byte[] b) {
+    int matchEnd = end - b.length;
+    for(int i=start; i<=matchEnd; i++) {
+      boolean matched = true;
+      for(int j=0; j<b.length; j++) {
+        if (utf[i+j] != b[j]) {
+          matched = false;
+          break;
+        }
+      }
+      if (matched) {
+        return i;
+      }
+    }
+    return -1;      
+  }
+    
   /**
    * Find the nth occurrence of the given byte b in a UTF-8 encoded string
    * @param utf a byte array containing a UTF-8 encoded string
@@ -112,23 +137,57 @@ public class UTF8ByteArrayUtils {
    * @param key contains key upon the method is returned
    * @param val contains value upon the method is returned
    * @param splitPos the split pos
+   * @param separatorLength the length of the separator between key and value
    * @throws IOException
    */
   public static void splitKeyVal(byte[] utf, int start, int length, 
-                                 Text key, Text val, int splitPos) throws IOException {
+                                 Text key, Text val, int splitPos,
+                                 int separatorLength) throws IOException {
     if (splitPos<start || splitPos >= (start+length))
       throw new IllegalArgumentException("splitPos must be in the range " +
                                          "[" + start + ", " + (start+length) + "]: " + splitPos);
     int keyLen = (splitPos-start);
     byte [] keyBytes = new byte[keyLen];
     System.arraycopy(utf, start, keyBytes, 0, keyLen);
-    int valLen = (start+length)-splitPos-1;
+    int valLen = (start+length)-splitPos-separatorLength;
     byte [] valBytes = new byte[valLen];
-    System.arraycopy(utf, splitPos+1, valBytes, 0, valLen);
+    System.arraycopy(utf, splitPos+separatorLength, valBytes, 0, valLen);
     key.set(keyBytes);
     val.set(valBytes);
   }
-    
+
+  /**
+   * split a UTF-8 byte array into key and value 
+   * assuming that the delimilator is at splitpos. 
+   * @param utf utf-8 encoded string
+   * @param start starting offset
+   * @param length no. of bytes
+   * @param key contains key upon the method is returned
+   * @param val contains value upon the method is returned
+   * @param splitPos the split pos
+   * @throws IOException
+   */
+  public static void splitKeyVal(byte[] utf, int start, int length, 
+                                 Text key, Text val, int splitPos) throws IOException {
+    splitKeyVal(utf, start, length, key, val, splitPos, 1);
+  }
+  
+
+  /**
+   * split a UTF-8 byte array into key and value 
+   * assuming that the delimilator is at splitpos. 
+   * @param utf utf-8 encoded string
+   * @param key contains key upon the method is returned
+   * @param val contains value upon the method is returned
+   * @param splitPos the split pos
+   * @param separatorLength the length of the separator between key and value
+   * @throws IOException
+   */
+  public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos, 
+                                 int separatorLength) 
+    throws IOException {
+    splitKeyVal(utf, 0, utf.length, key, val, splitPos, separatorLength);
+  }
 
   /**
    * split a UTF-8 byte array into key and value 
@@ -141,9 +200,9 @@ public class UTF8ByteArrayUtils {
    */
   public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
     throws IOException {
-    splitKeyVal(utf, 0, utf.length, key, val, splitPos);
+    splitKeyVal(utf, 0, utf.length, key, val, splitPos, 1);
   }
-    
+  
   /**
    * Read a utf8 encoded line from a data input stream. 
    * @param lineReader LineReader to read the line from.

+ 131 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming with customized separator in MapReduce local mode.
+ */
+public class TestStreamingSeparator extends TestCase
+{
+
+  // "map" command: grep -E (red|green|blue)
+  // reduce command: uniq
+  protected File INPUT_FILE = new File("TestStreamingSeparator.input.txt");
+  protected File OUTPUT_DIR = new File("TestStreamingSeparator.out");
+  protected String input = "roses1are.red\nviolets1are.blue\nbunnies1are.pink\n";
+  // key.value.separator.in.input.line reads 1 as separator
+  // stream.map.input.field.separator uses 2 as separator
+  // map behaves like "/usr/bin/tr 2 3"; (translate 2 to 3)
+  protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{"2", "3"});
+  // stream.map.output.field.separator recognize 3 as separator
+  // stream.reduce.input.field.separator recognize 3 as separator
+  // reduce behaves like "/usr/bin/tr 3 4"; (translate 3 to 4)
+  protected String reduce = StreamUtil.makeJavaCommand(TrAppReduce.class, new String[]{"3", "4"});
+  // stream.reduce.output.field.separator recognize 4 as separator
+  // mapred.textoutputformat.separator outputs 5 as separator
+  protected String outputExpect = "bunnies5are.pink\nroses5are.red\nviolets5are.blue\n";
+
+  private StreamJob job;
+
+  public TestStreamingSeparator() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected void createInput() throws IOException
+  {
+    DataOutputStream out = new DataOutputStream(
+                                                new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+    out.write(input.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      //"-verbose",
+      //"-jobconf", "stream.debug=set"
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+      "-inputformat", "KeyValueTextInputFormat",
+      "-jobconf", "key.value.separator.in.input.line=1",
+      "-jobconf", "stream.map.input.field.separator=2",
+      "-jobconf", "stream.map.output.field.separator=3",
+      "-jobconf", "stream.reduce.input.field.separator=3",
+      "-jobconf", "stream.reduce.output.field.separator=4",
+      "-jobconf", "mapred.textoutputformat.separator=5",
+    };
+  }
+  
+  public void testCommandLine()
+  {
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
+      createInput();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      outFile.delete();
+      System.err.println("outEx1=" + outputExpect);
+      System.err.println("  out1=" + output);
+      assertEquals(outputExpect, output);
+    } catch(Exception e) {
+      failTrace(e);
+    } finally {
+      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+      INPUT_FILE.delete();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  private void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamingSeparator().testCommandLine();
+  }
+
+}

+ 0 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

@@ -41,7 +41,6 @@ public class TrApp
     // test that some JobConf properties are exposed as expected     
     // Note the dots translated to underscore: 
     // property names have been escaped in PipeMapRed.safeEnvVarName()
-    expect("mapred_input_format_class", "org.apache.hadoop.mapred.TextInputFormat");
     expect("mapred_job_tracker", "local");
     //expect("mapred_local_dir", "build/test/mapred/local");
     expectDefined("mapred_local_dir");

+ 111 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java

@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.streaming.Environment;
+
+/** A minimal Java implementation of /usr/bin/tr.
+    Used to test the usage of external applications without adding
+    platform-specific dependencies.
+ */
+public class TrAppReduce
+{
+
+  public TrAppReduce(char find, char replace)
+  {
+    this.find = find;
+    this.replace = replace;
+  }
+
+  void testParentJobConfToEnvVars() throws IOException
+  {
+    env = new Environment();
+    // test that some JobConf properties are exposed as expected     
+    // Note the dots translated to underscore: 
+    // property names have been escaped in PipeMapRed.safeEnvVarName()
+    expect("mapred_job_tracker", "local");
+    //expect("mapred_local_dir", "build/test/mapred/local");
+    expectDefined("mapred_local_dir");
+    expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
+    expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
+    expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
+
+    expect("mapred_task_is_map", "false");
+    expectDefined("mapred_task_id");
+
+    expectDefined("io_sort_factor");
+
+    // the FileSplit context properties are not available in local hadoop..
+    // so can't check them in this test.
+
+  }
+
+  // this runs in a subprocess; won't use JUnit's assertTrue()    
+  void expect(String evName, String evVal) throws IOException
+  {
+    String got = env.getProperty(evName);
+    if (!evVal.equals(got)) {
+      String msg = "FAIL evName=" + evName + " got=" + got + " expect=" + evVal;
+      throw new IOException(msg);
+    }
+  }
+
+  void expectDefined(String evName) throws IOException
+  {
+    String got = env.getProperty(evName);
+    if (got == null) {
+      String msg = "FAIL evName=" + evName + " is undefined. Expect defined.";
+      throw new IOException(msg);
+    }
+  }
+
+  public void go() throws IOException
+  {
+    testParentJobConfToEnvVars();
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+    String line;
+
+    while ((line = in.readLine()) != null) {
+      String out = line.replace(find, replace);
+      System.out.println(out);
+    }
+  }
+
+  public static void main(String[] args) throws IOException
+  {
+    args[0] = CUnescape(args[0]);
+    args[1] = CUnescape(args[1]);
+    TrAppReduce app = new TrAppReduce(args[0].charAt(0), args[1].charAt(0));
+    app.go();
+  }
+
+  public static String CUnescape(String s)
+  {
+    if (s.equals("\\n")) {
+      return "\n";
+    } else {
+      return s;
+    }
+  }
+  char find;
+  char replace;
+  Environment env;
+}