Browse Source

HADOOP-2302. Provides a comparator for numerical sorting of key fields. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@683592 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
7e9ca88886

+ 3 - 0
CHANGES.txt

@@ -67,6 +67,9 @@ Trunk (unreleased changes)
     the TaskTracker, refactor Hadoop Metrics as an implementation of the api.
     the TaskTracker, refactor Hadoop Metrics as an implementation of the api.
     (Ari Rabkin via acmurthy) 
     (Ari Rabkin via acmurthy) 
 
 
+    HADOOP-2302. Provides a comparator for numerical sorting of key fields.
+    (ddas)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-3732. Delay intialization of datanode block verification till
     HADOOP-3732. Delay intialization of datanode block verification till

+ 2 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
 
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -332,7 +333,7 @@ public abstract class PipeMapRed {
         key.set(line, 0, length);
         key.set(line, 0, length);
         val.set("");
         val.set("");
       } else {
       } else {
-        UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos, separator.length);
+        StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos, separator.length);
       }
       }
     } catch (CharacterCodingException e) {
     } catch (CharacterCodingException e) {
       LOG.warn(StringUtils.stringifyException(e));
       LOG.warn(StringUtils.stringifyException(e));

+ 141 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamKeyValUtil.java

@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+public class StreamKeyValUtil {
+
+  /**
+   * Find the first occured tab in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param length no. of bytes
+   * @return position that first tab occures otherwise -1
+   */
+  public static int findTab(byte [] utf, int start, int length) {
+    for(int i=start; i<(start+length); i++) {
+      if (utf[i]==(byte)'\t') {
+        return i;
+      }
+    }
+    return -1;      
+  }
+  /**
+   * Find the first occured tab in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @return position that first tab occures otherwise -1
+   */
+  public static int findTab(byte [] utf) {
+    return org.apache.hadoop.util.UTF8ByteArrayUtils.findNthByte(utf, 0, 
+        utf.length, (byte)'\t', 1);
+  }
+
+  /**
+   * split a UTF-8 byte array into key and value 
+   * assuming that the delimilator is at splitpos. 
+   * @param utf utf-8 encoded string
+   * @param start starting offset
+   * @param length no. of bytes
+   * @param key contains key upon the method is returned
+   * @param val contains value upon the method is returned
+   * @param splitPos the split pos
+   * @param separatorLength the length of the separator between key and value
+   * @throws IOException
+   */
+  public static void splitKeyVal(byte[] utf, int start, int length, 
+                                 Text key, Text val, int splitPos,
+                                 int separatorLength) throws IOException {
+    if (splitPos<start || splitPos >= (start+length))
+      throw new IllegalArgumentException("splitPos must be in the range " +
+                                         "[" + start + ", " + (start+length) + "]: " + splitPos);
+    int keyLen = (splitPos-start);
+    byte [] keyBytes = new byte[keyLen];
+    System.arraycopy(utf, start, keyBytes, 0, keyLen);
+    int valLen = (start+length)-splitPos-separatorLength;
+    byte [] valBytes = new byte[valLen];
+    System.arraycopy(utf, splitPos+separatorLength, valBytes, 0, valLen);
+    key.set(keyBytes);
+    val.set(valBytes);
+  }
+
+  /**
+   * split a UTF-8 byte array into key and value 
+   * assuming that the delimilator is at splitpos. 
+   * @param utf utf-8 encoded string
+   * @param start starting offset
+   * @param length no. of bytes
+   * @param key contains key upon the method is returned
+   * @param val contains value upon the method is returned
+   * @param splitPos the split pos
+   * @throws IOException
+   */
+  public static void splitKeyVal(byte[] utf, int start, int length, 
+                                 Text key, Text val, int splitPos) throws IOException {
+    splitKeyVal(utf, start, length, key, val, splitPos, 1);
+  }
+  
+
+  /**
+   * split a UTF-8 byte array into key and value 
+   * assuming that the delimilator is at splitpos. 
+   * @param utf utf-8 encoded string
+   * @param key contains key upon the method is returned
+   * @param val contains value upon the method is returned
+   * @param splitPos the split pos
+   * @param separatorLength the length of the separator between key and value
+   * @throws IOException
+   */
+  public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos, 
+                                 int separatorLength) 
+    throws IOException {
+    splitKeyVal(utf, 0, utf.length, key, val, splitPos, separatorLength);
+  }
+
+  /**
+   * split a UTF-8 byte array into key and value 
+   * assuming that the delimilator is at splitpos. 
+   * @param utf utf-8 encoded string
+   * @param key contains key upon the method is returned
+   * @param val contains value upon the method is returned
+   * @param splitPos the split pos
+   * @throws IOException
+   */
+  public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
+    throws IOException {
+    splitKeyVal(utf, 0, utf.length, key, val, splitPos, 1);
+  }
+  
+  /**
+   * Read a utf8 encoded line from a data input stream. 
+   * @param lineReader LineReader to read the line from.
+   * @param out Text to read into
+   * @return number of bytes read 
+   * @throws IOException
+   */
+  public static int readLine(LineReader lineReader, Text out) 
+  throws IOException {
+    out.clear();
+    return lineReader.readLine(out);
+  }
+
+}

+ 53 - 55
src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java

@@ -21,10 +21,13 @@ package org.apache.hadoop.streaming;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 
 
 /**
 /**
  * General utils for byte array containing UTF-8 encoded strings
  * General utils for byte array containing UTF-8 encoded strings
+ * @deprecated use {@link org.apache.hadoop.util.UTF8ByteArrayUtils} and
+ * {@link StreamKeyValUtil} instead
  */
  */
 
 
 public class UTF8ByteArrayUtils {
 public class UTF8ByteArrayUtils {
@@ -34,14 +37,11 @@ public class UTF8ByteArrayUtils {
    * @param start starting offset
    * @param start starting offset
    * @param length no. of bytes
    * @param length no. of bytes
    * @return position that first tab occures otherwise -1
    * @return position that first tab occures otherwise -1
+   * @deprecated use {@link StreamKeyValUtil#findTab(byte[], int, int)}
    */
    */
+  @Deprecated
   public static int findTab(byte [] utf, int start, int length) {
   public static int findTab(byte [] utf, int start, int length) {
-    for(int i=start; i<(start+length); i++) {
-      if (utf[i]==(byte)'\t') {
-        return i;
-      }
-    }
-    return -1;      
+    return StreamKeyValUtil.findTab(utf, start, length);      
   }
   }
   
   
   /**
   /**
@@ -51,14 +51,13 @@ public class UTF8ByteArrayUtils {
    * @param end ending position
    * @param end ending position
    * @param b the byte to find
    * @param b the byte to find
    * @return position that first byte occures otherwise -1
    * @return position that first byte occures otherwise -1
+   * @deprecated use 
+   * {@link org.apache.hadoop.util.UTF8ByteArrayUtils#findByte(byte[], int,
+   *  int, byte)}
    */
    */
+  @Deprecated
   public static int findByte(byte [] utf, int start, int end, byte b) {
   public static int findByte(byte [] utf, int start, int end, byte b) {
-    for(int i=start; i<end; i++) {
-      if (utf[i]==b) {
-        return i;
-      }
-    }
-    return -1;      
+    return org.apache.hadoop.util.UTF8ByteArrayUtils.findByte(utf, start, end, b);
   }
   }
 
 
   /**
   /**
@@ -68,22 +67,13 @@ public class UTF8ByteArrayUtils {
    * @param end ending position
    * @param end ending position
    * @param b the bytes to find
    * @param b the bytes to find
    * @return position that first byte occures otherwise -1
    * @return position that first byte occures otherwise -1
+   * @deprecated use 
+   * {@link org.apache.hadoop.util.UTF8ByteArrayUtils#findBytes(byte[], int, 
+   * int, byte[])}
    */
    */
+  @Deprecated
   public static int findBytes(byte [] utf, int start, int end, byte[] b) {
   public static int findBytes(byte [] utf, int start, int end, byte[] b) {
-    int matchEnd = end - b.length;
-    for(int i=start; i<=matchEnd; i++) {
-      boolean matched = true;
-      for(int j=0; j<b.length; j++) {
-        if (utf[i+j] != b[j]) {
-          matched = false;
-          break;
-        }
-      }
-      if (matched) {
-        return i;
-      }
-    }
-    return -1;      
+    return org.apache.hadoop.util.UTF8ByteArrayUtils.findBytes(utf, start, end, b);      
   }
   }
     
     
   /**
   /**
@@ -94,18 +84,14 @@ public class UTF8ByteArrayUtils {
    * @param b the byte to find
    * @param b the byte to find
    * @param n the desired occurrence of the given byte
    * @param n the desired occurrence of the given byte
    * @return position that nth occurrence of the given byte if exists; otherwise -1
    * @return position that nth occurrence of the given byte if exists; otherwise -1
+   * @deprecated use 
+   * {@link org.apache.hadoop.util.UTF8ByteArrayUtils#findNthByte(byte[], int, 
+   * int, byte, int)}
    */
    */
+  @Deprecated
   public static int findNthByte(byte [] utf, int start, int length, byte b, int n) {
   public static int findNthByte(byte [] utf, int start, int length, byte b, int n) {
-    int pos = -1;
-    int nextStart = start;
-    for (int i = 0; i < n; i++) {
-      pos = findByte(utf, nextStart, length, b);
-      if (pos < 0) {
-        return pos;
-      }
-      nextStart = pos + 1;
-    }
-    return pos;      
+    return org.apache.hadoop.util.UTF8ByteArrayUtils.findNthByte(utf, start,
+        length, b, n);
   }
   }
   
   
   /**
   /**
@@ -114,18 +100,24 @@ public class UTF8ByteArrayUtils {
    * @param b the byte to find
    * @param b the byte to find
    * @param n the desired occurrence of the given byte
    * @param n the desired occurrence of the given byte
    * @return position that nth occurrence of the given byte if exists; otherwise -1
    * @return position that nth occurrence of the given byte if exists; otherwise -1
+   * @deprecated use 
+   * {@link org.apache.hadoop.util.UTF8ByteArrayUtils#findNthByte(byte[], 
+   * byte, int)}
    */
    */
+  @Deprecated
   public static int findNthByte(byte [] utf, byte b, int n) {
   public static int findNthByte(byte [] utf, byte b, int n) {
-    return findNthByte(utf, 0, utf.length, b, n);      
+    return org.apache.hadoop.util.UTF8ByteArrayUtils.findNthByte(utf, b, n);      
   }
   }
     
     
   /**
   /**
    * Find the first occured tab in a UTF-8 encoded string
    * Find the first occured tab in a UTF-8 encoded string
    * @param utf a byte array containing a UTF-8 encoded string
    * @param utf a byte array containing a UTF-8 encoded string
    * @return position that first tab occures otherwise -1
    * @return position that first tab occures otherwise -1
+   * @deprecated use {@link StreamKeyValUtil#findTab(byte[])}
    */
    */
+  @Deprecated
   public static int findTab(byte [] utf) {
   public static int findTab(byte [] utf) {
-    return findNthByte(utf, 0, utf.length, (byte)'\t', 1);
+    return StreamKeyValUtil.findTab(utf);
   }
   }
 
 
   /**
   /**
@@ -138,22 +130,17 @@ public class UTF8ByteArrayUtils {
    * @param val contains value upon the method is returned
    * @param val contains value upon the method is returned
    * @param splitPos the split pos
    * @param splitPos the split pos
    * @param separatorLength the length of the separator between key and value
    * @param separatorLength the length of the separator between key and value
+   * @deprecated use 
+   * {@link StreamKeyValUtil#splitKeyVal(byte[], int, int, Text, Text, 
+   * int, int)}
    * @throws IOException
    * @throws IOException
    */
    */
+  @Deprecated
   public static void splitKeyVal(byte[] utf, int start, int length, 
   public static void splitKeyVal(byte[] utf, int start, int length, 
                                  Text key, Text val, int splitPos,
                                  Text key, Text val, int splitPos,
                                  int separatorLength) throws IOException {
                                  int separatorLength) throws IOException {
-    if (splitPos<start || splitPos >= (start+length))
-      throw new IllegalArgumentException("splitPos must be in the range " +
-                                         "[" + start + ", " + (start+length) + "]: " + splitPos);
-    int keyLen = (splitPos-start);
-    byte [] keyBytes = new byte[keyLen];
-    System.arraycopy(utf, start, keyBytes, 0, keyLen);
-    int valLen = (start+length)-splitPos-separatorLength;
-    byte [] valBytes = new byte[valLen];
-    System.arraycopy(utf, splitPos+separatorLength, valBytes, 0, valLen);
-    key.set(keyBytes);
-    val.set(valBytes);
+    StreamKeyValUtil.splitKeyVal(utf, start, 
+        length, key, val, splitPos, separatorLength);
   }
   }
 
 
   /**
   /**
@@ -165,11 +152,14 @@ public class UTF8ByteArrayUtils {
    * @param key contains key upon the method is returned
    * @param key contains key upon the method is returned
    * @param val contains value upon the method is returned
    * @param val contains value upon the method is returned
    * @param splitPos the split pos
    * @param splitPos the split pos
+   * @deprecated use 
+   * {@link StreamKeyValUtil#splitKeyVal(byte[], int, int, Text, Text, int)}
    * @throws IOException
    * @throws IOException
    */
    */
+  @Deprecated
   public static void splitKeyVal(byte[] utf, int start, int length, 
   public static void splitKeyVal(byte[] utf, int start, int length, 
                                  Text key, Text val, int splitPos) throws IOException {
                                  Text key, Text val, int splitPos) throws IOException {
-    splitKeyVal(utf, start, length, key, val, splitPos, 1);
+    StreamKeyValUtil.splitKeyVal(utf, start, length, key, val, splitPos);
   }
   }
   
   
 
 
@@ -181,12 +171,15 @@ public class UTF8ByteArrayUtils {
    * @param val contains value upon the method is returned
    * @param val contains value upon the method is returned
    * @param splitPos the split pos
    * @param splitPos the split pos
    * @param separatorLength the length of the separator between key and value
    * @param separatorLength the length of the separator between key and value
+   * @deprecated use 
+   * {@link StreamKeyValUtil#splitKeyVal(byte[], Text, Text, int, int)}
    * @throws IOException
    * @throws IOException
    */
    */
+  @Deprecated
   public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos, 
   public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos, 
                                  int separatorLength) 
                                  int separatorLength) 
     throws IOException {
     throws IOException {
-    splitKeyVal(utf, 0, utf.length, key, val, splitPos, separatorLength);
+    StreamKeyValUtil.splitKeyVal(utf, key, val, splitPos, separatorLength);
   }
   }
 
 
   /**
   /**
@@ -196,23 +189,28 @@ public class UTF8ByteArrayUtils {
    * @param key contains key upon the method is returned
    * @param key contains key upon the method is returned
    * @param val contains value upon the method is returned
    * @param val contains value upon the method is returned
    * @param splitPos the split pos
    * @param splitPos the split pos
+   * @deprecated use 
+   * {@link StreamKeyValUtil#splitKeyVal(byte[], Text, Text, int)}
    * @throws IOException
    * @throws IOException
    */
    */
+  @Deprecated
   public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
   public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
     throws IOException {
     throws IOException {
-    splitKeyVal(utf, 0, utf.length, key, val, splitPos, 1);
+    StreamKeyValUtil.splitKeyVal(utf, key, val, splitPos);
   }
   }
   
   
   /**
   /**
    * Read a utf8 encoded line from a data input stream. 
    * Read a utf8 encoded line from a data input stream. 
    * @param lineReader LineReader to read the line from.
    * @param lineReader LineReader to read the line from.
    * @param out Text to read into
    * @param out Text to read into
-   * @return number of bytes read 
+   * @return number of bytes read
+   * @deprecated use 
+   * {@link StreamKeyValUtil#readLine(LineRecordReader.LineReader, Text)} 
    * @throws IOException
    * @throws IOException
    */
    */
+  @Deprecated
   public static int readLine(LineReader lineReader, Text out) 
   public static int readLine(LineReader lineReader, Text out) 
   throws IOException {
   throws IOException {
-    out.clear();
-    return lineReader.readLine(out);
+    return StreamKeyValUtil.readLine(lineReader, out);
   }
   }
 }
 }

+ 98 - 0
src/core/org/apache/hadoop/util/UTF8ByteArrayUtils.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+public class UTF8ByteArrayUtils {
+  /**
+   * Find the first occurrence of the given byte b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param end ending position
+   * @param b the byte to find
+   * @return position that first byte occures otherwise -1
+   */
+  public static int findByte(byte [] utf, int start, int end, byte b) {
+    for(int i=start; i<end; i++) {
+      if (utf[i]==b) {
+        return i;
+      }
+    }
+    return -1;      
+  }
+
+  /**
+   * Find the first occurrence of the given bytes b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param end ending position
+   * @param b the bytes to find
+   * @return position that first byte occures otherwise -1
+   */
+  public static int findBytes(byte [] utf, int start, int end, byte[] b) {
+    int matchEnd = end - b.length;
+    for(int i=start; i<=matchEnd; i++) {
+      boolean matched = true;
+      for(int j=0; j<b.length; j++) {
+        if (utf[i+j] != b[j]) {
+          matched = false;
+          break;
+        }
+      }
+      if (matched) {
+        return i;
+      }
+    }
+    return -1;      
+  }
+    
+  /**
+   * Find the nth occurrence of the given byte b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param start starting offset
+   * @param length the length of byte array
+   * @param b the byte to find
+   * @param n the desired occurrence of the given byte
+   * @return position that nth occurrence of the given byte if exists; otherwise -1
+   */
+  public static int findNthByte(byte [] utf, int start, int length, byte b, int n) {
+    int pos = -1;
+    int nextStart = start;
+    for (int i = 0; i < n; i++) {
+      pos = findByte(utf, nextStart, length, b);
+      if (pos < 0) {
+        return pos;
+      }
+      nextStart = pos + 1;
+    }
+    return pos;      
+  }
+  
+  /**
+   * Find the nth occurrence of the given byte b in a UTF-8 encoded string
+   * @param utf a byte array containing a UTF-8 encoded string
+   * @param b the byte to find
+   * @param n the desired occurrence of the given byte
+   * @return position that nth occurrence of the given byte if exists; otherwise -1
+   */
+  public static int findNthByte(byte [] utf, byte b, int n) {
+    return findNthByte(utf, 0, utf.length, b, n);      
+  }
+
+}
+

+ 54 - 1
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -39,6 +39,8 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
+import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.Tool;
 
 
@@ -510,6 +512,58 @@ public class JobConf extends Configuration {
              theClass, RawComparator.class);
              theClass, RawComparator.class);
   }
   }
 
 
+  /**
+   * Set the {@link KeyFieldBasedComparator} options used to compare keys.
+   * 
+   * @param keySpec the key specification of the form -k pos1[,pos2], where,
+   *  pos is of the form f[.c][opts], where f is the number
+   *  of the key field to use, and c is the number of the first character from
+   *  the beginning of the field. Fields and character posns are numbered 
+   *  starting with 1; a character position of zero in pos2 indicates the
+   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+   *  (the end of the field). opts are ordering options. The supported options
+   *  are:
+   *    -n, (Sort numerically)
+   *    -r, (Reverse the result of comparison)                 
+   */
+  public void setKeyFieldComparatorOptions(String keySpec) {
+    setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
+    set("mapred.text.key.comparator.options", keySpec);
+  }
+  
+  /**
+   * Get the {@link KeyFieldBasedComparator} options
+   */
+  public String getKeyFieldComparatorOption() {
+    return get("mapred.text.key.comparator.options");
+  }
+
+  /**
+   * Set the {@link KeyFieldBasedPartitioner} options used for 
+   * {@link Partitioner}
+   * 
+   * @param keySpec the key specification of the form -k pos1[,pos2], where,
+   *  pos is of the form f[.c][opts], where f is the number
+   *  of the key field to use, and c is the number of the first character from
+   *  the beginning of the field. Fields and character posns are numbered 
+   *  starting with 1; a character position of zero in pos2 indicates the
+   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+   *  (the end of the field).
+   */
+  public void setKeyFieldPartitionerOptions(String keySpec) {
+    setPartitionerClass(KeyFieldBasedPartitioner.class);
+    set("mapred.text.key.partitioner.options", keySpec);
+  }
+  
+  /**
+   * Get the {@link KeyFieldBasedPartitioner} options
+   */
+  public String getKeyFieldPartitionerOption() {
+    return get("mapred.text.key.partitioner.options");
+  }
+
   /** 
   /** 
    * Get the user defined {@link WritableComparable} comparator for 
    * Get the user defined {@link WritableComparable} comparator for 
    * grouping keys of inputs to the reduce.
    * grouping keys of inputs to the reduce.
@@ -1261,6 +1315,5 @@ public class JobConf extends Configuration {
     }
     }
     return null;
     return null;
   }
   }
-
 }
 }
 
 

+ 328 - 0
src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java

@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This comparator implementation provides a subset of the features provided
+ * by the Unix/GNU Sort. In particular, the supported features are:
+ * -n, (Sort numerically)
+ * -r, (Reverse the result of comparison)
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ *  of the field to use, and c is the number of the first character from the
+ *  beginning of the field. Fields and character posns are numbered starting
+ *  with 1; a character position of zero in pos2 indicates the field's last
+ *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ *  of the field); if omitted from pos2, it defaults to 0 (the end of the
+ *  field). opts are ordering options (any of 'nr' as described above). 
+ * We assume that the fields in the key are separated by 
+ * map.output.key.field.separator.
+ */
+
+public class KeyFieldBasedComparator<K, V> extends WritableComparator 
+implements JobConfigurable {
+  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+  private static final byte NEGATIVE = (byte)'-';
+  private static final byte ZERO = (byte)'0';
+  private static final byte DECIMAL = (byte)'.';
+  
+  public void configure(JobConf job) {
+    String option = job.getKeyFieldComparatorOption();
+    String keyFieldSeparator = job.get("map.output.key.field.separator","\t");
+    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+    keyFieldHelper.parseOption(option);
+  }
+  
+  public KeyFieldBasedComparator() {
+    super(Text.class);
+  }
+    
+
+  public int compare(byte[] b1, int s1, int l1,
+      byte[] b2, int s2, int l2) {
+    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+    if (allKeySpecs.size() == 0) {
+      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
+    }
+    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(b1, s1+n1, s1+l1);
+    int []lengthIndicesSecond = keyFieldHelper.getWordLengths(b2, s2+n2, s2+l2);
+    for (KeyDescription keySpec : allKeySpecs) {
+      int startCharFirst = keyFieldHelper.getStartOffset(b1, s1+n1, s1+l1, lengthIndicesFirst,
+          keySpec);
+      int endCharFirst = keyFieldHelper.getEndOffset(b1, s1+n1, s1+l1, lengthIndicesFirst,
+          keySpec);
+      int startCharSecond = keyFieldHelper.getStartOffset(b2, s2+n2, s2+l2, lengthIndicesSecond,
+          keySpec);
+      int endCharSecond = keyFieldHelper.getEndOffset(b2, s2+n2, s2+l2, lengthIndicesSecond,
+          keySpec);
+      int result;
+      if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 
+          startCharSecond, endCharSecond, keySpec)) != 0) {
+        return result;
+      }
+    }
+    return 0;
+  }
+  
+  private int compareByteSequence(byte[] first, int start1, int end1, 
+      byte[] second, int start2, int end2, KeyDescription key) {
+    if (start1 == -1) {
+      if (key.reverse) {
+        return 1;
+      }
+      return -1;
+    }
+    if (start2 == -1) {
+      if (key.reverse) {
+        return -1; 
+      }
+      return 1;
+    }
+    int compareResult = 0;
+    if (!key.numeric) {
+      compareResult = compareBytes(first, start1, end1, second, start2, end2);
+    }
+    if (key.numeric) {
+      compareResult = numericalCompare (first, start1, end1, second, start2, end2);
+    }
+    if (key.reverse) {
+      return -compareResult;
+    }
+    return compareResult;
+  }
+  
+  private int numericalCompare (byte[] a, int start1, int end1, 
+      byte[] b, int start2, int end2) {
+    int i = start1;
+    int j = start2;
+    int mul = 1;
+    byte first_a = a[i];
+    byte first_b = b[j];
+    if (first_a == NEGATIVE) {
+      if (first_b != NEGATIVE) {
+        //check for cases like -0.0 and 0.0 (they should be declared equal)
+        return oneNegativeCompare(a,start1+1,end1,b,start2,end2);
+      }
+      i++;
+    }
+    if (first_b == NEGATIVE) {
+      if (first_a != NEGATIVE) {
+        //check for cases like 0.0 and -0.0 (they should be declared equal)
+        return -oneNegativeCompare(b,start2+1,end2,a,start1,end1);
+      }
+      j++;
+    }
+    if (first_b == NEGATIVE && first_a == NEGATIVE) {
+      mul = -1;
+    }
+
+    //skip over ZEROs
+    while (i <= end1) {
+      if (a[i] != ZERO) {
+        break;
+      }
+      i++;
+    }
+    while (j <= end2) {
+      if (b[j] != ZERO) {
+        break;
+      }
+      j++;
+    }
+    
+    //skip over equal characters and stopping at the first nondigit char
+    //The nondigit character could be '.'
+    while (i <= end1 && j <= end2) {
+      if (!isdigit(a[i]) || a[i] != b[j]) {
+        break;
+      }
+      i++; j++;
+    }
+    if (i <= end1) {
+      first_a = a[i];
+    }
+    if (j <= end2) {
+      first_b = b[j];
+    }
+    //store the result of the difference. This could be final result if the
+    //number of digits in the mantissa is the same in both the numbers 
+    int firstResult = first_a - first_b;
+    
+    //check whether we hit a decimal in the earlier scan
+    if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
+            (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
+      return ((mul < 0) ? -decimalCompare(a,i,end1,b,j,end2) : 
+        decimalCompare(a,i,end1,b,j,end2));
+    }
+    //check the number of digits in the mantissa of the numbers
+    int numRemainDigits_a = 0;
+    int numRemainDigits_b = 0;
+    while (i <= end1) {
+      //if we encounter a non-digit treat the corresponding number as being 
+      //smaller      
+      if (isdigit(a[i++])) {
+        numRemainDigits_a++;
+      } else break;
+    }
+    while (j <= end2) {
+      //if we encounter a non-digit treat the corresponding number as being 
+      //smaller
+      if (isdigit(b[j++])) {
+        numRemainDigits_b++;
+      } else break;
+    }
+    int ret = numRemainDigits_a - numRemainDigits_b;
+    if (ret == 0) { 
+      return ((mul < 0) ? -firstResult : firstResult);
+    } else {
+      return ((mul < 0) ? -ret : ret);
+    }
+  }
+  private boolean isdigit(byte b) {
+    if ('0' <= b && b <= '9') {
+      return true;
+    }
+    return false;
+  }
+  private int decimalCompare(byte[] a, int i, int end1, 
+                             byte[] b, int j, int end2) {
+    if (i > end1) {
+      //if a[] has nothing remaining
+      return -decimalCompare1(b, ++j, end2);
+    }
+    if (j > end2) {
+      //if b[] has nothing remaining
+      return decimalCompare1(a, ++i, end1);
+    }
+    if (a[i] == DECIMAL && b[j] == DECIMAL) {
+      while (i <= end1 && j <= end2) {
+        if (a[i] != b[j]) {
+          if (isdigit(a[i]) && isdigit(b[j])) {
+            return a[i] - b[j];
+          }
+          if (isdigit(a[i])) {
+            return 1;
+          }
+          if (isdigit(b[j])) {
+            return -1;
+          }
+          return 0;
+        }
+        i++; j++;
+      }
+      if (i > end1 && j > end2) {
+        return 0;
+      }
+        
+      if (i > end1) {
+        //check whether there is a non-ZERO digit after potentially
+        //a number of ZEROs (e.g., a=.4444, b=.444400004)
+        return -decimalCompare1(b, j, end2);
+      }
+      if (j > end2) {
+        //check whether there is a non-ZERO digit after potentially
+        //a number of ZEROs (e.g., b=.4444, a=.444400004)
+        return decimalCompare1(a, i, end1);
+      }
+    }
+    else if (a[i] == DECIMAL) {
+      return decimalCompare1(a, ++i, end1);
+    }
+    else if (b[j] == DECIMAL) {
+      return -decimalCompare1(b, ++j, end2);
+    }
+    return 0;
+  }
+  
+  private int decimalCompare1(byte[] a, int i, int end) {
+    while (i <= end) {
+      if (a[i] == ZERO) {
+        i++;
+        continue;
+      }
+      if (isdigit(a[i])) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    return 0;
+  }
+  
+  private int oneNegativeCompare(byte[] a, int start1, int end1, 
+      byte[] b, int start2, int end2) {
+    //here a[] is negative and b[] is positive
+    //We have to ascertain whether the number contains any digits.
+    //If it does, then it is a smaller number for sure. If not,
+    //then we need to scan b[] to find out whether b[] has a digit
+    //If b[] does contain a digit, then b[] is certainly
+    //greater. If not, that is, both a[] and b[] don't contain
+    //digits then they should be considered equal.
+    if (!isZero(a, start1, end1)) {
+      return -1;
+    }
+    //reached here - this means that a[] is a ZERO
+    if (!isZero(b, start2, end2)) {
+      return -1;
+    }
+    //reached here - both numbers are basically ZEROs and hence
+    //they should compare equal
+    return 0;
+  }
+  
+  private boolean isZero(byte a[], int start, int end) {
+    //check for zeros in the significand part as well as the decimal part
+    //note that we treat the non-digit characters as ZERO
+    int i = start;
+    //we check the significand for being a ZERO
+    while (i <= end) {
+      if (a[i] != ZERO) {
+        if (a[i] != DECIMAL && isdigit(a[i])) {
+          return false;
+        }
+        break;
+      }
+      i++;
+    }
+
+    if (i != (end+1) && a[i++] == DECIMAL) {
+      //we check the decimal part for being a ZERO
+      while (i <= end) {
+        if (a[i] != ZERO) {
+          if (isdigit(a[i])) {
+            return false;
+          }
+          break;
+        }
+        i++;
+      }
+    }
+    return true;
+  }
+}

+ 65 - 19
src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java

@@ -18,37 +18,83 @@
 
 
 package org.apache.hadoop.mapred.lib;
 package org.apache.hadoop.mapred.lib;
 
 
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
 
 
+ /**   
+  *  Defines a way to partition keys based on certain key fields (also see
+  *  {@link KeyFieldBasedComparator}.
+  *  The key specification supported is of the form -k pos1[,pos2], where,
+  *  pos is of the form f[.c][opts], where f is the number
+  *  of the key field to use, and c is the number of the first character from
+  *  the beginning of the field. Fields and character posns are numbered 
+  *  starting with 1; a character position of zero in pos2 indicates the
+  *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+  *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+  *  (the end of the field).
+  * 
+  */
 public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
 public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
 
 
+  private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
   private int numOfPartitionFields;
   private int numOfPartitionFields;
-
-  private String keyFieldSeparator;
+  
+  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
 
 
   public void configure(JobConf job) {
   public void configure(JobConf job) {
-    this.keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
-    this.numOfPartitionFields = job.getInt("num.key.fields.for.partition", 0);
+    String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
+    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+    if (job.get("num.key.fields.for.partition") != null) {
+      LOG.warn("Using deprecated num.key.fields.for.partition. " +
+      		"Use mapred.text.key.partitioner.options instead");
+      this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
+      keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
+    } else {
+      String option = job.getKeyFieldPartitionerOption();
+      keyFieldHelper.parseOption(option);
+    }
   }
   }
 
 
-  /** Use {@link Object#hashCode()} to partition. */
   public int getPartition(K2 key, V2 value,
   public int getPartition(K2 key, V2 value,
       int numReduceTasks) {
       int numReduceTasks) {
-    String partitionKeyStr = key.toString();
-    String[] fields = partitionKeyStr.split(this.keyFieldSeparator);
-    if (this.numOfPartitionFields > 0
-        && this.numOfPartitionFields < fields.length) {
-      StringBuffer sb = new StringBuffer();
-      for (int i = 0; i < this.numOfPartitionFields; i++) {
-        sb.append(fields[i]).append(this.keyFieldSeparator);
-      }
-      partitionKeyStr = sb.toString();
-      if (partitionKeyStr.length() > 0) {
-        partitionKeyStr = partitionKeyStr.substring(0,
-            partitionKeyStr.length() - 1);
-      }
+    byte[] keyBytes;
+
+    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+    if (allKeySpecs.size() == 0) {
+      return (key.toString().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+    }
+
+    try {
+      keyBytes = key.toString().getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not " +
+          "support UTF-8 encoding!", e);
+    }
+    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
+        keyBytes.length);
+    int currentHash = 0;
+    for (KeyDescription keySpec : allKeySpecs) {
+      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length, 
+          lengthIndicesFirst, keySpec);
+      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
+          lengthIndicesFirst, keySpec);
+      currentHash = hashCode(keyBytes, startChar, endChar, 
+          currentHash);
     }
     }
-    return (partitionKeyStr.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+    return (currentHash & Integer.MAX_VALUE) % numReduceTasks;
   }
   }
+  
+  protected int hashCode(byte[] b, int start, int end, int currentHash) {
+    for (int i = start; i <= end; i++) {
+      currentHash = 31*currentHash + b[i];
+    }
+    return currentHash;
+  }
+
 }
 }

+ 289 - 0
src/mapred/org/apache/hadoop/mapred/lib/KeyFieldHelper.java

@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
+
+/**
+ * This is used in {@link KeyFieldBasedComparator} & 
+ * {@link KeyFieldBasedPartitioner}. Defines all the methods
+ * for parsing key specifications. The key specification is of the form:
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ *  of the field to use, and c is the number of the first character from the
+ *  beginning of the field. Fields and character posns are numbered starting
+ *  with 1; a character position of zero in pos2 indicates the field's last
+ *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ *  of the field); if omitted from pos2, it defaults to 0 (the end of the
+ *  field). opts are ordering options (supported options are 'nr'). 
+ */
+
+class KeyFieldHelper {
+  
+  protected static class KeyDescription {
+    int beginFieldIdx = 1;
+    int beginChar = 1;
+    int endFieldIdx = 0;
+    int endChar = 0;
+    boolean numeric;
+    boolean reverse;
+  }
+  
+  private List<KeyDescription> allKeySpecs = new ArrayList<KeyDescription>();
+  private byte[] keyFieldSeparator;
+  private boolean keySpecSeen = false;
+  
+  public void setKeyFieldSeparator(String keyFieldSeparator) {
+    try {
+      this.keyFieldSeparator =
+        keyFieldSeparator.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not " +
+          "support UTF-8 encoding!", e);
+    }    
+  }
+  
+  /** Required for backcompatibility with num.key.fields.for.partition in
+   * {@link KeyFieldBasedPartitioner} */
+  public void setKeyFieldSpec(int start, int end) {
+    if (end >= start) {
+      KeyDescription k = new KeyDescription();
+      k.beginFieldIdx = start;
+      k.endFieldIdx = end;
+      keySpecSeen = true;
+      allKeySpecs.add(k);
+    }
+  }
+  
+  public List<KeyDescription> keySpecs() {
+    return allKeySpecs;
+  }
+    
+  public int[] getWordLengths(byte []b, int start, int end) {
+    //Given a string like "hello how are you", it returns an array
+    //like [4 5, 3, 3, 3], where the first element is the number of
+	//fields
+    if (!keySpecSeen) {
+      //if there were no key specs, then the whole key is one word
+      return new int[] {1};
+    }
+    int[] lengths = new int[10];
+    int currLenLengths = lengths.length;
+    int idx = 1;
+    int pos;
+    while ((pos = UTF8ByteArrayUtils.findBytes(b, start, end, 
+        keyFieldSeparator)) != -1) {
+      if (++idx == currLenLengths) {
+        int[] temp = lengths;
+        lengths = new int[(currLenLengths = currLenLengths*2)];
+        System.arraycopy(temp, 0, lengths, 0, temp.length);
+      }
+      lengths[idx - 1] = pos - start;
+      start = pos + 1;
+    }
+    
+    if (start != end) {
+      lengths[idx] = end - start;
+    }
+    lengths[0] = idx; //number of words is the first element
+    return lengths;
+  }
+  public int getStartOffset(byte[]b, int start, int end, 
+      int []lengthIndices, KeyDescription k) {
+    //if -k2.5,2 is the keyspec, the startChar is lengthIndices[1] + 5
+    //note that the [0]'th element is the number of fields in the key
+    if (lengthIndices[0] >= k.beginFieldIdx) {
+      int position = 0;
+      for (int i = 1; i < k.beginFieldIdx; i++) {
+        position += lengthIndices[i] + keyFieldSeparator.length; 
+      }
+      if (position + k.beginChar <= (end - start)) {
+        return start + position + k.beginChar - 1; 
+      }
+    }
+    return -1;
+  }
+  public int getEndOffset(byte[]b, int start, int end, 
+      int []lengthIndices, KeyDescription k) {
+    //if -k2,2.8 is the keyspec, the endChar is lengthIndices[1] + 8
+    //note that the [0]'th element is the number of fields in the key
+    if (k.endFieldIdx == 0) {
+      //there is no end field specified for this keyspec. So the remaining
+      //part of the key is considered in its entirety.
+      return end; 
+    }
+    if (lengthIndices[0] >= k.endFieldIdx) {
+      int position = 0;
+      int i;
+      for (i = 1; i < k.endFieldIdx; i++) {
+        position += lengthIndices[i] + keyFieldSeparator.length;
+      }
+      if (k.endChar == 0) { 
+        position += lengthIndices[i];
+      }
+      if (position + k.endChar <= (end - start)) {
+        return start + position + k.endChar - 1;
+      }
+      return end;
+    }
+    return end;
+  }
+  public void parseOption(String option) {
+    if (option == null || option.equals("")) {
+      //we will have only default comparison
+      return;
+    }
+    StringTokenizer args = new StringTokenizer(option);
+    KeyDescription global = new KeyDescription();
+    while (args.hasMoreTokens()) {
+      String arg = args.nextToken();
+      if (arg.equals("-n")) {  
+        global.numeric = true;
+      }
+      if (arg.equals("-r")) {
+        global.reverse = true;
+      }
+      if (arg.equals("-nr")) {
+        global.numeric = true;
+        global.reverse = true;
+      }
+      if (arg.startsWith("-k")) {
+        KeyDescription k = parseKey(arg, args);
+        if (k != null) {
+          allKeySpecs.add(k);
+          keySpecSeen = true;
+        }
+      }
+    }
+    for (KeyDescription key : allKeySpecs) {
+      if (!(key.reverse | key.numeric)) {
+        key.reverse = global.reverse;
+        key.numeric = global.numeric;
+      }
+    }
+    if (allKeySpecs.size() == 0) {
+      allKeySpecs.add(global);
+    }
+  }
+  
+  private KeyDescription parseKey(String arg, StringTokenizer args) {
+    //we allow for -k<arg> and -k <arg>
+    String keyArgs = null;
+    if (arg.length() == 2) {
+      if (args.hasMoreTokens()) {
+        keyArgs = args.nextToken();
+      }
+    } else {
+      keyArgs = arg.substring(2);
+    }
+    if (keyArgs == null || keyArgs.length() == 0) {
+      return null;
+    }
+    StringTokenizer st = new StringTokenizer(keyArgs,"nr.,",true);
+       
+    KeyDescription key = new KeyDescription();
+    
+    String token;
+    //the key is of the form 1[.3][nr][,1.5][nr]
+    if (st.hasMoreTokens()) {
+      token = st.nextToken();
+      //the first token must be a number
+      key.beginFieldIdx = Integer.parseInt(token);
+    }
+    if (st.hasMoreTokens()) {
+      token = st.nextToken();
+      if (token.equals(".")) {
+        token = st.nextToken();
+        key.beginChar = Integer.parseInt(token);
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+        } else {
+          return key;
+        }
+      } 
+      do {
+        if (token.equals("n")) {
+          key.numeric = true;
+        }
+        else if (token.equals("r")) {
+          key.reverse = true;
+        }
+        else break;
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+        } else {
+          return key;
+        }
+      } while (true);
+      if (token.equals(",")) {
+        token = st.nextToken();
+        //the first token must be a number
+        key.endFieldIdx = Integer.parseInt(token);
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+          if (token.equals(".")) {
+            token = st.nextToken();
+            key.endChar = Integer.parseInt(token);
+            if (st.hasMoreTokens()) {
+              token = st.nextToken();
+            } else {
+              return key;
+            }
+          }
+          do {
+            if (token.equals("n")) {
+              key.numeric = true;
+            }
+            else if (token.equals("r")) {
+              key.reverse = true;
+            }
+            else { 
+              throw new IllegalArgumentException("Invalid -k argument. " +
+               "Must be of the form -k pos1,[pos2], where pos is of the form " +
+               "f[.c]nr");
+            }
+            if (st.hasMoreTokens()) {
+              token = st.nextToken();
+            } else {
+              break;
+            }
+          } while (true);
+        }
+        return key;
+      }
+      throw new IllegalArgumentException("Invalid -k argument. " +
+          "Must be of the form -k pos1,[pos2], where pos is of the form " +
+          "f[.c]nr");
+    }
+    return key;
+  }
+  private void printKey(KeyDescription key) {
+    System.out.println("key.beginFieldIdx: " + key.beginFieldIdx);
+    System.out.println("key.beginChar: " + key.beginChar);
+    System.out.println("key.endFieldIdx: " + key.endFieldIdx);
+    System.out.println("key.endChar: " + key.endChar);
+    System.out.println("key.numeric: " + key.numeric);
+    System.out.println("key.reverse: " + key.reverse);
+    System.out.println("parseKey over");
+  }  
+}

+ 131 - 0
src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+public class TestKeyFieldBasedComparator extends HadoopTestCase {
+  JobConf conf;
+  String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.1 4444 011 011 234";
+  String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.0 4444.1";
+
+  public TestKeyFieldBasedComparator() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+    conf = createJobConf();
+  }
+  public void configure(String keySpec, int expect) throws Exception {
+    Path testdir = new Path("build/test/test.mapred.spill");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = getFileSystem();
+    fs.delete(testdir, true);
+    conf.setInputFormat(TextInputFormat.class);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(LongWritable.class);
+
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(2);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
+    conf.setKeyFieldComparatorOptions(keySpec);
+    conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
+    conf.set("map.output.key.field.separator", " ");
+    conf.setMapperClass(InverseMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    // set up input data in 2 files 
+    Path inFile = new Path(inDir, "part0");
+    FileOutputStream fos = new FileOutputStream(inFile.toString());
+    fos.write((line1 + "\n").getBytes());
+    fos.write((line2 + "\n").getBytes());
+    fos.close();
+    JobClient jc = new JobClient(conf);
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+    Path[] outputFiles = FileUtil.stat2Paths(
+        getFileSystem().listStatus(outDir,
+        new OutputLogFilter()));
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      //make sure we get what we expect as the first line, and also
+      //that we have two lines (both the lines must end up in the same
+      //reducer since the partitioner takes the same key spec for all
+      //lines
+      if (expect == 1) {
+        assertTrue(line.startsWith(line1));
+      } else if (expect == 2) {
+        assertTrue(line.startsWith(line2));
+      }
+      line = reader.readLine();
+      if (expect == 1) {
+        assertTrue(line.startsWith(line2));
+      } else if (expect == 2) {
+        assertTrue(line.startsWith(line1));
+      }
+      reader.close();
+    }
+  }
+  public void testBasicUnixComparator() throws Exception {
+    configure("-k1,1n", 1);
+    configure("-k2,2n", 1);
+    configure("-k2.2,2n", 2);
+    configure("-k3.4,3n", 2);
+    configure("-k3.2,3.3n -k4,4n", 2);
+    configure("-k3.2,3.3n -k4,4nr", 1);
+    configure("-k2.4,2.4n", 2);
+    configure("-k7,7", 1);
+    configure("-k7,7n", 2);
+    configure("-k8,8n", 2);
+    configure("-k9,9n", 1);
+    configure("-k11,11",2);
+    configure("-k10,10",2);
+  }
+}