ソースを参照

HADOOP-1284. In contrib/streaming, permit flexible specification of field delimiter and fields for partitioning and sorting. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@532859 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年 前
コミット
3d28f8219a

+ 4 - 0
CHANGES.txt

@@ -263,6 +263,10 @@ Trunk (unreleased changes)
     completed jobs by order of completion, not submission.
     (Arun C Murthy via cutting)
 
+79. HADOOP-1284.  In contrib/streaming, permit flexible specification
+    of field delimiter and fields for partitioning and sorting.
+    (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 22 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -64,6 +64,10 @@ public abstract class PipeMapRed {
    */
   abstract String getKeyColPropName();
 
+  abstract char getFieldSeparator();
+  
+  abstract int getNumOfKeyFields();
+  
   /** Write output as side-effect files rather than as map outputs.
       This is useful to do "Map" tasks rather than "MapReduce" tasks. */
   boolean getUseSideEffect() {
@@ -208,7 +212,13 @@ public abstract class PipeMapRed {
       } else {
         sideFs_ = fs_;
       }
-
+      String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t");
+      String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t");
+      this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
+      this.reduceOutFieldSeparator = reduceOutputFieldSeparator.charAt(0);
+      this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1);
+      this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
+      
       if (debug_) {
         System.out.println("kind   :" + this.getClass());
         System.out.println("split  :" + StreamUtil.getCurrentSplit(job_));
@@ -466,8 +476,12 @@ public abstract class PipeMapRed {
   void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
     int pos = -1;
     if (keyCols_ != ALL_COLS) {
-      pos = UTF8ByteArrayUtils.findTab(line);
+      pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
     }
+    LOG.info("FieldSeparator: " + this.getFieldSeparator());
+    LOG.info("NumOfKeyFields: " + this.getNumOfKeyFields());
+    LOG.info("Line: " + new String (line));
+    LOG.info("Pos: " + pos);
     try {
       if (pos == -1) {
         key.set(line);
@@ -730,4 +744,10 @@ public abstract class PipeMapRed {
   String LOGNAME;
   PrintStream log_;
 
+  protected char mapOutputFieldSeparator = '\t';
+  protected char reduceOutFieldSeparator = '\t';
+  protected int numOfMapOutputKeyFields = 1;
+  protected int numOfMapOutputPartitionFields = 1;
+  protected int numOfReduceOutputKeyFields = 1;
+  
 }

+ 10 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -116,4 +116,14 @@ public class PipeMapper extends PipeMapRed implements Mapper {
     mapRedFinished();
   }
 
+  @Override
+  char getFieldSeparator() {
+    return super.mapOutputFieldSeparator;
+  }
+
+  @Override
+  int getNumOfKeyFields() {
+    return super.numOfMapOutputKeyFields;
+  }
+
 }

+ 10 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -102,4 +102,14 @@ public class PipeReducer extends PipeMapRed implements Reducer {
     mapRedFinished();
   }
 
+  @Override
+  char getFieldSeparator() {
+    return super.reduceOutFieldSeparator;
+  }
+
+  @Override
+  int getNumOfKeyFields() {
+    return super.numOfReduceOutputKeyFields;
+  }
+
 }

+ 60 - 47
src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java

@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.streaming;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
 
 /**
  * General utils for byte array containing UTF-8 encoded strings
@@ -45,6 +48,56 @@ public class UTF8ByteArrayUtils {
     }
     return -1;      
   }
+  
+  /**
+   * Find the first occurrence of the given byte b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param end ending position
+   * @param b the byte to find
+   * @return position that first byte occures otherwise -1
+   */
+  public static int findByte(byte [] utf, int start, int end, byte b) {
+    for(int i=start; i<end; i++) {
+      if (utf[i]==b) {
+        return i;
+      }
+    }
+    return -1;      
+  }
+  
+  /**
+   * Find the nth occurrence of the given byte b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param end ending position
+   * @param b the byte to find
+   * @param n the desired occurrence of the given byte
+   * @return position that nth occurrence of the given byte if exists; otherwise -1
+   */
+  private static int findNthByte(byte [] utf, int start, int length, byte b, int n) {
+    int pos = -1;
+    int nextStart = start + 1;
+    for (int i = 0; i < n; i++) {
+      pos = findByte(utf, nextStart, length, b);
+      if (pos < 0) {
+        return pos;
+      }
+      nextStart = pos + 1;
+    }
+    return pos;      
+  }
+  
+  /**
+   * Find the nth occurrence of the given byte b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param b the byte to find
+   * @param n the desired occurrence of the given byte
+   * @return position that nth occurrence of the given byte if exists; otherwise -1
+   */
+  public static int findNthByte(byte [] utf, byte b, int n) {
+    return findNthByte(utf, 0, utf.length, b, n);      
+  }
     
   /**
    * Find the first occured tab in a UTF-8 encoded string
@@ -52,7 +105,7 @@ public class UTF8ByteArrayUtils {
    * @return position that first tab occures otherwise -1
    */
   public static int findTab(byte [] utf) {
-    return findTab(utf, 0, utf.length);
+    return findNthByte(utf, 0, utf.length, (byte)'\t', 1);
   }
 
   /**
@@ -102,52 +155,12 @@ public class UTF8ByteArrayUtils {
    * @return a byte array containing the line 
    * @throws IOException
    */
-  public static byte[] readLine(InputStream in) throws IOException {
-    byte [] buf = new byte[128];
-    byte [] lineBuffer = buf;
-    int room = 128;
-    int offset = 0;
-    boolean isEOF = false;
-    while (true) {
-      int b = in.read();
-      if (b == -1) {
-        isEOF = true;
-        break;
-      }
-
-      char c = (char)b;
-      if (c == '\n')
-        break;
-
-      if (c == '\r') {
-        in.mark(1);
-        int c2 = in.read();
-        if (c2 == -1) {
-          isEOF = true;
-          break;
-        }
-        if (c2 != '\n') {
-          // push it back
-          in.reset();
-        }
-        break;
-      }
-        
-      if (--room < 0) {
-        buf = new byte[offset + 128];
-        room = buf.length - offset - 1;
-        System.arraycopy(lineBuffer, 0, buf, 0, offset);
-        lineBuffer = buf;
-      }
-      buf[offset++] = (byte) c;
-    }
-
-    if (isEOF && offset==0) {
+  public static byte [] readLine(InputStream in) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    long bytes = LineRecordReader.readLine(in, baos);
+    baos.close();
+    if (bytes <= 0)
       return null;
-    } else {
-      lineBuffer = new byte[offset];
-      System.arraycopy(buf, 0, lineBuffer, 0, offset);
-      return lineBuffer;
-    }
+    return baos.toByteArray();
   }
 }

+ 125 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode.
+ */
+public class TestStreamDataProtocol extends TestCase
+{
+
+  // "map" command: grep -E (red|green|blue)
+  // reduce command: uniq
+  protected File INPUT_FILE = new File("input_for_data_protocol_test.txt");
+  protected File OUTPUT_DIR = new File("out_for_data_protocol_test");
+  protected String input = "roses.smell.good\nroses.look.good\nroses.need.care\nroses.attract.bees\nroses.are.red\nroses.are.not.blue\nbunnies.are.pink\nbunnies.run.fast\nbunnies.have.short.tail\nbunnies.have.long.ears\n";
+  // map behaves like "/usr/bin/cat"; 
+  protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "."});
+  // reduce counts the number of values for each key
+  protected String reduce = "org.apache.hadoop.streaming.ValueCountReduce";
+  protected String outputExpect = "bunnies.are\t1\nbunnies.have\t2\nbunnies.run\t1\nroses.are\t2\nroses.attract\t1\nroses.look\t1\nroses.need\t1\nroses.smell\t1\n";
+
+  private StreamJob job;
+
+  public TestStreamDataProtocol() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected void createInput() throws IOException
+  {
+    DataOutputStream out = new DataOutputStream(
+                                                new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+    out.write(input.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      "-partitioner", KeyFieldBasedPartitioner.class.getCanonicalName(),
+      //"-verbose",
+      "-jobconf", "stream.map.output.field.separator=.",
+      "-jobconf", "stream.num.map.output.key.fields=2",
+      "-jobconf", "map.output.key.field.separator=.",
+      "-jobconf", "num.key.fields.for.partition=1",
+      "-jobconf", "mapred.reduce.tasks=2",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+  
+  public void testCommandLine()
+  {
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
+      createInput();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      outFile.delete();
+      System.err.println("outEx1=" + outputExpect);
+      System.err.println("  out1=" + output);
+      System.err.println("  equals=" + outputExpect.compareTo(output));
+      assertEquals(outputExpect, output);
+    } catch(Exception e) {
+      failTrace(e);
+    } finally {
+      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+      INPUT_FILE.delete();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  private void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamDataProtocol().testCommandLine();
+  }
+
+}

+ 64 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.Date;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ValueCountReduce implements Reducer {
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    // TODO Auto-generated method stub
+
+  }
+
+  public void reduce(WritableComparable arg0, Iterator arg1, OutputCollector arg2, Reporter arg3) throws IOException {
+    int count = 0;
+    while (arg1.hasNext()) {
+      count += 1;
+      arg1.next();
+    }
+    arg2.collect(arg0, new Text("" + count));
+  }
+
+  public void configure(JobConf arg0) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  public void close() throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+
+}
+

+ 336 - 0
src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java

@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * This class implements a mapper/reducer class that can be used to perform
+ * field selections in a manner similar to unix cut. The input data is treated
+ * as fields separated by a user specified separator (the default value is
+ * "\t"). The user can specify a list of fields that form the map output keys,
+ * and a list of fields that form the map output values. If the inputformat is
+ * TextInputFormat, the mapper will ignore the key to the map function. and the
+ * fields are from the value only. Otherwise, the fields are the union of those
+ * from the key and those from the value.
+ * 
+ * The field separator is under attribute "mapred.data.field.separator"
+ * 
+ * The map output field list spec is under attribute "map.output.key.value.fields.spec".
+ * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
+ * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
+ * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
+ * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
+ * the fields starting from field 3. The open range field spec applies value fields only.
+ * They have no effect on the key fields.
+ * 
+ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
+ * and use fields 6,5,1,2,3,7 and above for values.
+ * 
+ * The reduce output field list spec is under attribute "reduce.output.key.value.fields.spec".
+ * 
+ * The reducer extracts output key/value pairs in a similar manner, except that
+ * the key is never ignored.
+ * 
+ */
+public class FieldSelectionMapReduce implements Mapper, Reducer {
+
+  private String mapOutputKeyValueSpec;
+
+  private boolean ignoreInputKey;
+
+  private String fieldSeparator = "\t";
+
+  private int[] mapOutputKeyFieldList = null;
+
+  private int[] mapOutputValueFieldList = null;
+
+  private int allMapValueFieldsFrom = -1;
+
+  private String reduceOutputKeyValueSpec;
+
+  private int[] reduceOutputKeyFieldList = null;
+
+  private int[] reduceOutputValueFieldList = null;
+
+  private int allReduceValueFieldsFrom = -1;
+
+  private static Text emptyText = new Text("");
+
+  public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
+
+  private String specToString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
+
+    sb.append("mapOutputKeyValueSpec: ").append(mapOutputKeyValueSpec).append(
+        "\n");
+    sb.append("reduceOutputKeyValueSpec: ").append(reduceOutputKeyValueSpec)
+        .append("\n");
+
+    sb.append("allMapValueFieldsFrom: ").append(allMapValueFieldsFrom).append(
+        "\n");
+
+    sb.append("allReduceValueFieldsFrom: ").append(allReduceValueFieldsFrom)
+        .append("\n");
+
+    int i = 0;
+
+    sb.append("mapOutputKeyFieldList.length: ").append(
+        mapOutputKeyFieldList.length).append("\n");
+    for (i = 0; i < mapOutputKeyFieldList.length; i++) {
+      sb.append("\t").append(mapOutputKeyFieldList[i]).append("\n");
+    }
+    sb.append("mapOutputValueFieldList.length: ").append(
+        mapOutputValueFieldList.length).append("\n");
+    for (i = 0; i < mapOutputValueFieldList.length; i++) {
+      sb.append("\t").append(mapOutputValueFieldList[i]).append("\n");
+    }
+
+    sb.append("reduceOutputKeyFieldList.length: ").append(
+        reduceOutputKeyFieldList.length).append("\n");
+    for (i = 0; i < reduceOutputKeyFieldList.length; i++) {
+      sb.append("\t").append(reduceOutputKeyFieldList[i]).append("\n");
+    }
+    sb.append("reduceOutputValueFieldList.length: ").append(
+        reduceOutputValueFieldList.length).append("\n");
+    for (i = 0; i < reduceOutputValueFieldList.length; i++) {
+      sb.append("\t").append(reduceOutputValueFieldList[i]).append("\n");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * The identify function. Input key/value pair is written directly to output.
+   */
+  public void map(WritableComparable key, Writable val, OutputCollector output,
+      Reporter reporter) throws IOException {
+    String valStr = val.toString();
+    String[] inputValFields = valStr.split(this.fieldSeparator);
+    String[] inputKeyFields = null;
+    String[] fields = null;
+    if (this.ignoreInputKey) {
+      fields = inputValFields;
+    } else {
+      inputKeyFields = key.toString().split(this.fieldSeparator);
+      fields = new String[inputKeyFields.length + inputValFields.length];
+      int i = 0;
+      for (i = 0; i < inputKeyFields.length; i++) {
+        fields[i] = inputKeyFields[i];
+      }
+      for (i = 0; i < inputValFields.length; i++) {
+        fields[inputKeyFields.length + i] = inputValFields[i];
+      }
+    }
+    String newKey = selectFields(fields, mapOutputKeyFieldList, -1,
+        fieldSeparator);
+    String newVal = selectFields(fields, mapOutputValueFieldList,
+        allMapValueFieldsFrom, fieldSeparator);
+
+    if (newKey == null) {
+      newKey = newVal;
+      newVal = null;
+    }
+    Text newTextKey = emptyText;
+    if (newKey != null) {
+      newTextKey = new Text(newKey);
+    }
+    Text newTextVal = emptyText;
+    if (newTextVal != null) {
+      newTextVal = new Text(newVal);
+    }
+    output.collect(newTextKey, newTextVal);
+  }
+
+  /**
+   * Extract the actual field numbers from the given field specs.
+   * If a field spec is in the form of "n-" (like 3-), then n will be the 
+   * return value. Otherwise, -1 will be returned.  
+   * @param fieldListSpec an array of field specs
+   * @param fieldList an array of field numbers extracted from the specs.
+   * @return number n if some field spec is in the form of "n-", -1 otherwise.
+   */
+  private int extractFields(String[] fieldListSpec, ArrayList<Integer> fieldList) {
+    int allFieldsFrom = -1;
+    int i = 0;
+    int j = 0;
+    int pos = -1;
+    String fieldSpec = null;
+    for (i = 0; i < fieldListSpec.length; i++) {
+      fieldSpec = fieldListSpec[i];
+      if (fieldSpec.length() == 0) {
+        continue;
+      }
+      pos = fieldSpec.indexOf('-');
+      if (pos < 0) {
+        Integer fn = new Integer(fieldSpec);
+        fieldList.add(fn);
+      } else {
+        String start = fieldSpec.substring(0, pos);
+        String end = fieldSpec.substring(pos + 1);
+        if (start.length() == 0) {
+          start = "0";
+        }
+        if (end.length() == 0) {
+          allFieldsFrom = Integer.parseInt(start);
+          continue;
+        }
+        int startPos = Integer.parseInt(start);
+        int endPos = Integer.parseInt(end);
+        for (j = startPos; j <= endPos; j++) {
+          fieldList.add(new Integer(j));
+        }
+      }
+    }
+    return allFieldsFrom;
+  }
+
+  private void parseOutputKeyValueSpec() {
+    String[] mapKeyValSpecs = mapOutputKeyValueSpec.split(":", -1);
+    String[] mapKeySpec = mapKeyValSpecs[0].split(",");
+    String[] mapValSpec = new String[0];
+    if (mapKeyValSpecs.length > 1) {
+      mapValSpec = mapKeyValSpecs[1].split(",");
+    }
+
+    int i = 0;
+    ArrayList<Integer> fieldList = new ArrayList<Integer>();
+    extractFields(mapKeySpec, fieldList);
+    this.mapOutputKeyFieldList = new int[fieldList.size()];
+    for (i = 0; i < fieldList.size(); i++) {
+      this.mapOutputKeyFieldList[i] = fieldList.get(i).intValue();
+    }
+
+    fieldList = new ArrayList<Integer>();
+    allMapValueFieldsFrom = extractFields(mapValSpec, fieldList);
+    this.mapOutputValueFieldList = new int[fieldList.size()];
+    for (i = 0; i < fieldList.size(); i++) {
+      this.mapOutputValueFieldList[i] = fieldList.get(i).intValue();
+    }
+
+    String[] reduceKeyValSpecs = reduceOutputKeyValueSpec.split(":", -1);
+    String[] reduceKeySpec = reduceKeyValSpecs[0].split(",");
+    String[] reduceValSpec = new String[0];
+    if (reduceKeyValSpecs.length > 1) {
+      reduceValSpec = reduceKeyValSpecs[1].split(",");
+    }
+
+    fieldList = new ArrayList<Integer>();
+    extractFields(reduceKeySpec, fieldList);
+    this.reduceOutputKeyFieldList = new int[fieldList.size()];
+    for (i = 0; i < fieldList.size(); i++) {
+      this.reduceOutputKeyFieldList[i] = fieldList.get(i).intValue();
+    }
+
+    fieldList = new ArrayList<Integer>();
+    allReduceValueFieldsFrom = extractFields(reduceValSpec, fieldList);
+    this.reduceOutputValueFieldList = new int[fieldList.size()];
+    for (i = 0; i < fieldList.size(); i++) {
+      this.reduceOutputValueFieldList[i] = fieldList.get(i).intValue();
+    }
+  }
+
+  public void configure(JobConf job) {
+    this.fieldSeparator = job.get("mapred.data.field.separator", "\t");
+    this.mapOutputKeyValueSpec = job.get("map.output.key.value.fields.spec",
+        "0-:");
+    this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
+        job.getInputFormat().getClass().getCanonicalName());
+    this.reduceOutputKeyValueSpec = job.get(
+        "reduce.output.key.value.fields.spec", "0-:");
+    parseOutputKeyValueSpec();
+    LOG.info(specToString());
+  }
+
+  public void close() throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+
+  private static String selectFields(String[] fields, int[] fieldList,
+      int allFieldsFrom, String separator) {
+    String retv = null;
+    int i = 0;
+    StringBuffer sb = null;
+    if (fieldList != null && fieldList.length > 0) {
+      if (sb == null) {
+        sb = new StringBuffer();
+      }
+      for (i = 0; i < fieldList.length; i++) {
+        if (fieldList[i] < fields.length) {
+          sb.append(fields[fieldList[i]]);
+        }
+        sb.append(separator);
+      }
+    }
+    if (allFieldsFrom >= 0) {
+      if (sb == null) {
+        sb = new StringBuffer();
+      }
+      for (i = allFieldsFrom; i < fields.length; i++) {
+        sb.append(fields[i]).append(separator);
+      }
+    }
+    if (sb != null) {
+      retv = sb.toString();
+      if (retv.length() > 0) {
+        retv = retv.substring(0, retv.length() - 1);
+      }
+    }
+    return retv;
+  }
+
+  public void reduce(WritableComparable key, Iterator values,
+      OutputCollector output, Reporter reporter) throws IOException {
+
+    String keyStr = key.toString() + this.fieldSeparator;
+    while (values.hasNext()) {
+      String valStr = values.next().toString();
+      valStr = keyStr + valStr;
+      String[] fields = valStr.split(this.fieldSeparator);
+      String newKey = selectFields(fields, reduceOutputKeyFieldList, -1,
+          fieldSeparator);
+      String newVal = selectFields(fields, reduceOutputValueFieldList,
+          allReduceValueFieldsFrom, fieldSeparator);
+      Text newTextKey = null;
+      if (newKey != null) {
+        newTextKey = new Text(newKey);
+      }
+      Text newTextVal = null;
+      if (newVal != null) {
+        newTextVal = new Text(newVal);
+      }
+      output.collect(newTextKey, newTextVal);
+    }
+  }
+}

+ 56 - 0
src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public class KeyFieldBasedPartitioner implements Partitioner {
+
+  private int numOfPartitionFields;
+
+  private String keyFieldSeparator;
+
+  public void configure(JobConf job) {
+    this.keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
+    this.numOfPartitionFields = job.getInt("num.key.fields.for.partition", 0);
+  }
+
+  /** Use {@link Object#hashCode()} to partition. */
+  public int getPartition(WritableComparable key, Writable value,
+      int numReduceTasks) {
+    String partitionKeyStr = key.toString();
+    String[] fields = partitionKeyStr.split(this.keyFieldSeparator);
+    if (this.numOfPartitionFields > 0
+        && this.numOfPartitionFields < fields.length) {
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < this.numOfPartitionFields; i++) {
+        sb.append(fields[i]).append(this.keyFieldSeparator);
+      }
+      partitionKeyStr = sb.toString();
+      if (partitionKeyStr.length() > 0) {
+        partitionKeyStr = partitionKeyStr.substring(0,
+            partitionKeyStr.length() - 1);
+      }
+    }
+    return (partitionKeyStr.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+  }
+}

+ 131 - 0
src/test/org/apache/hadoop/mapred/TestFieldSelection.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.lib.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import java.text.NumberFormat;
+
+public class TestFieldSelection extends TestCase {
+
+private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+
+  public void testFieldSelection() throws Exception {
+    launch();
+  }
+
+  public static void launch() throws Exception {
+    JobConf conf = new JobConf(TestFieldSelection.class);
+    FileSystem fs = FileSystem.get(conf);
+    int numOfInputLines = 10;
+
+    Path OUTPUT_DIR = new Path("build/test/output_for_field_selection_test");
+    Path INPUT_DIR = new Path("build/test/input_for_field_selection_test");
+    String inputFile = "input.txt";
+    fs.delete(INPUT_DIR);
+    fs.mkdirs(INPUT_DIR);
+    fs.delete(OUTPUT_DIR);
+
+    StringBuffer inputData = new StringBuffer();
+    StringBuffer expectedOutput = new StringBuffer();
+
+    FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
+    for (int i = 0; i < numOfInputLines; i++) {
+        inputData.append(idFormat.format(i));
+        inputData.append("-").append(idFormat.format(i+1));
+        inputData.append("-").append(idFormat.format(i+2));
+        inputData.append("-").append(idFormat.format(i+3));
+        inputData.append("-").append(idFormat.format(i+4));
+        inputData.append("-").append(idFormat.format(i+5));
+        inputData.append("-").append(idFormat.format(i+6));
+        inputData.append("\n");
+
+
+        expectedOutput.append(idFormat.format(i+3));
+        expectedOutput.append("-" ).append (idFormat.format(i+2));
+        expectedOutput.append("-" ).append (idFormat.format(i+1));
+        expectedOutput.append("-" ).append (idFormat.format(i+5));
+        expectedOutput.append("-" ).append (idFormat.format(i+6));
+
+        expectedOutput.append("-" ).append (idFormat.format(i+6));
+        expectedOutput.append("-" ).append (idFormat.format(i+5));
+        expectedOutput.append("-" ).append (idFormat.format(i+1));
+        expectedOutput.append("-" ).append (idFormat.format(i+2));
+        expectedOutput.append("-" ).append (idFormat.format(i+3));
+
+        expectedOutput.append("-" ).append (idFormat.format(i+0));
+        expectedOutput.append("-" ).append (idFormat.format(i+1));
+        expectedOutput.append("-" ).append (idFormat.format(i+2));
+        expectedOutput.append("-" ).append (idFormat.format(i+3));
+        expectedOutput.append("-" ).append (idFormat.format(i+4));
+        expectedOutput.append("-" ).append (idFormat.format(i+5));
+        expectedOutput.append("-" ).append (idFormat.format(i+6));
+        expectedOutput.append("\n");
+    }
+    fileOut.write(inputData.toString().getBytes("utf-8"));
+    fileOut.close();
+
+    System.out.println("inputData:");
+    System.out.println(inputData.toString());
+    JobConf job = new JobConf(conf, TestFieldSelection.class);
+    job.setInputPath(INPUT_DIR);
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapperClass(FieldSelectionMapReduce.class);
+    job.setReducerClass(FieldSelectionMapReduce.class);
+
+    job.setOutputPath(OUTPUT_DIR);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setNumReduceTasks(1);
+
+    job.set("mapred.data.field.separator", "-");
+    job.set("map.output.key.value.fields.spec", "6,5,1-3:0-");
+    job.set("reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
+
+    JobClient.runJob(job);
+
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    boolean success = true;
+    Path outPath = new Path(OUTPUT_DIR, "part-00000");
+    String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+
+    assertEquals(expectedOutput.toString(),outdata);
+    fs.delete(OUTPUT_DIR);
+    fs.delete(INPUT_DIR);
+  }
+
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    launch();
+  }
+}