浏览代码

HADOOP-2085. A library to support map-side joins of consistently
partitioned and sorted data sets. Contributed by Chris Douglas.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@602240 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 年之前
父节点
当前提交
44fba3a8db
共有 23 个文件被更改,包括 3233 次插入10 次删除
  1. 4 1
      CHANGES.txt
  2. 1 0
      src/examples/org/apache/hadoop/examples/ExampleDriver.java
  3. 162 0
      src/examples/org/apache/hadoop/examples/Join.java
  4. 20 9
      src/java/org/apache/hadoop/io/WritableUtils.java
  5. 91 0
      src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
  6. 40 0
      src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
  7. 65 0
      src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
  8. 193 0
      src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
  9. 149 0
      src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
  10. 472 0
      src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
  11. 50 0
      src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
  12. 114 0
      src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
  13. 153 0
      src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
  14. 45 0
      src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
  15. 93 0
      src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
  16. 495 0
      src/java/org/apache/hadoop/mapred/join/Parser.java
  17. 92 0
      src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
  18. 96 0
      src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
  19. 226 0
      src/java/org/apache/hadoop/mapred/join/TupleWritable.java
  20. 206 0
      src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
  21. 88 0
      src/java/org/apache/hadoop/mapred/join/package.html
  22. 245 0
      src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
  23. 133 0
      src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java

+ 4 - 1
CHANGES.txt

@@ -40,7 +40,10 @@ Trunk (unreleased changes)
 
     HADOOP-1652.  A utility to balance data among datanodes in a HDFS cluster.
     (Hairong Kuang via dhruba)
-    
+
+    HADOOP-2085.  A library to support map-side joins of consistently 
+    partitioned and sorted data sets. (Chris Douglas via omalley)
+
   IMPROVEMENTS
 
     HADOOP-2045.  Change committer list on website to a table, so that

+ 1 - 0
src/examples/org/apache/hadoop/examples/ExampleDriver.java

@@ -48,6 +48,7 @@ public class ExampleDriver {
       "A map/reduce tile laying program to find solutions to pentomino problems.");
       pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
       pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
+      pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
       pgd.driver(argv);
     }
     catch(Throwable e){

+ 162 - 0
src/examples/org/apache/hadoop/examples/Join.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.join.*;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is the trivial map/reduce program that does absolutely nothing
+ * other than use the framework to fragment and sort the input values.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar join
+ *            [-m <i>maps</i>] [-r <i>reduces</i>]
+ *            [-inFormat <i>input format class</i>] 
+ *            [-outFormat <i>output format class</i>] 
+ *            [-outKey <i>output key class</i>] 
+ *            [-outValue <i>output value class</i>] 
+ *            [-joinOp &lt;inner|outer|override&gt;]
+ *            [<i>in-dir</i>]* <i>in-dir</i> <i>out-dir</i> 
+ */
+public class Join extends Configured implements Tool {
+
+  static int printUsage() {
+    System.out.println("join [-m <maps>] [-r <reduces>] " +
+                       "[-inFormat <input format class>] " +
+                       "[-outFormat <output format class>] " + 
+                       "[-outKey <output key class>] " +
+                       "[-outValue <output value class>] " +
+                       "[-joinOp <inner|outer|override>] " +
+                       "[input]* <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  /**
+   * The main driver for sort program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  public int run(String[] args) throws Exception {
+    JobConf jobConf = new JobConf(getConf(), Sort.class);
+    jobConf.setJobName("join");
+
+    jobConf.setMapperClass(IdentityMapper.class);        
+    jobConf.setReducerClass(IdentityReducer.class);
+
+    JobClient client = new JobClient(jobConf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int num_maps = cluster.getTaskTrackers() * 
+    jobConf.getInt("test.sort.maps_per_host", 10);
+    int num_reduces = cluster.getTaskTrackers() * 
+    jobConf.getInt("test.sort.reduces_per_host", cluster.getMaxTasks());
+    Class<? extends InputFormat> inputFormatClass = 
+      SequenceFileInputFormat.class;
+    Class<? extends OutputFormat> outputFormatClass = 
+      SequenceFileOutputFormat.class;
+    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
+    Class<? extends Writable> outputValueClass = TupleWritable.class;
+    String op = "inner";
+    List<String> otherArgs = new ArrayList<String>();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          num_maps = Integer.parseInt(args[++i]);
+        } else if ("-r".equals(args[i])) {
+          num_reduces = Integer.parseInt(args[++i]);
+        } else if ("-inFormat".equals(args[i])) {
+          inputFormatClass = 
+            Class.forName(args[++i]).asSubclass(InputFormat.class);
+        } else if ("-outFormat".equals(args[i])) {
+          outputFormatClass = 
+            Class.forName(args[++i]).asSubclass(OutputFormat.class);
+        } else if ("-outKey".equals(args[i])) {
+          outputKeyClass = 
+            Class.forName(args[++i]).asSubclass(WritableComparable.class);
+        } else if ("-outValue".equals(args[i])) {
+          outputValueClass = 
+            Class.forName(args[++i]).asSubclass(Writable.class);
+        } else if ("-joinOp".equals(args[i])) {
+          op = args[++i];
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage(); // exits
+      }
+    }
+
+    // Set user-supplied (possibly default) job configs
+    jobConf.setNumMapTasks(num_maps);
+    jobConf.setNumReduceTasks(num_reduces);
+
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+
+    jobConf.setOutputPath(new Path(otherArgs.remove(otherArgs.size() - 1)));
+    List<Path> plist = new ArrayList<Path>(otherArgs.size());
+    for (String s : otherArgs) {
+      plist.add(new Path(s));
+    }
+
+    jobConf.setInputFormat(CompositeInputFormat.class);
+    jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
+          op, inputFormatClass, plist.toArray(new Path[0])));
+    jobConf.setOutputFormat(outputFormatClass);
+
+    jobConf.setOutputKeyClass(outputKeyClass);
+    jobConf.setOutputValueClass(outputValueClass);
+
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    JobClient.runJob(jobConf);
+    Date end_time = new Date();
+    System.out.println("Job ended: " + end_time);
+    System.out.println("The job took " + 
+        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Join(), args);
+    System.exit(res);
+  }
+
+}

+ 20 - 9
src/java/org/apache/hadoop/io/WritableUtils.java

@@ -218,7 +218,7 @@ public final class WritableUtils  {
         return new CopyInCopyOutBuffer();
       }
     };
-  
+
   /**
    * Make a copy of a writable object using serialization to a buffer.
    * @param orig The object to copy
@@ -226,19 +226,30 @@ public final class WritableUtils  {
    */
   public static Writable clone(Writable orig, JobConf conf) {
     try {
-      Writable newInst = (Writable)ReflectionUtils.newInstance(orig.getClass(),
-                                                               conf);
-      CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
-      buffer.outBuffer.reset();
-      orig.write(buffer.outBuffer);
-      buffer.moveData();
-      newInst.readFields(buffer.inBuffer);
+      Writable newInst =
+        (Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
+      cloneInto(newInst, orig);
       return newInst;
     } catch (IOException e) {
       throw new RuntimeException("Error writing/reading clone buffer", e);
     }
   }
- 
+
+  /**
+   * Make a copy of the writable object using serialiation to a buffer
+   * @param dst the object to copy from
+   * @param src the object to copy into, which is destroyed
+   * @throws IOException
+   */
+  public static void cloneInto(Writable dst, Writable src) throws IOException {
+    CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
+    buffer.outBuffer.reset();
+    src.write(buffer.outBuffer);
+    buffer.moveData();
+    dst.readFields(buffer.inBuffer);
+    return;
+  }
+
   /**
    * Serializes an integer to a binary stream with zero-compressed encoding.
    * For -120 <= i <= 127, only one byte is used with the actual value.

+ 91 - 0
src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class provides an implementation of ResetableIterator. The
+ * implementation uses an {@link java.util.ArrayList} to store elements
+ * added to it, replaying them as requested.
+ * Prefer {@link StreamBackedIterator}.
+ */
+public class ArrayListBackedIterator<X extends Writable>
+    implements ResetableIterator<X> {
+
+  private Iterator<X> iter;
+  private ArrayList<X> data;
+  private X hold = null;
+
+  public ArrayListBackedIterator() {
+    this(new ArrayList<X>());
+  }
+
+  public ArrayListBackedIterator(ArrayList<X> data) {
+    this.data = data;
+    this.iter = this.data.iterator();
+  }
+
+  public boolean hasNext() {
+    return iter.hasNext();
+  }
+
+  @SuppressWarnings("unchecked")
+  public boolean next(X val) throws IOException {
+    if (iter.hasNext()) {
+      WritableUtils.cloneInto(val, iter.next());
+      if (null == hold) {
+        hold = (X) WritableUtils.clone(val, null);
+      } else {
+        WritableUtils.cloneInto(hold, val);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public void replay(X val) throws IOException {
+    WritableUtils.cloneInto(val, hold);
+  }
+
+  public void reset() {
+    iter = data.iterator();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void add(X item) throws IOException {
+    data.add((X) WritableUtils.clone(item, null));
+  }
+
+  public void close() throws IOException {
+    iter = null;
+    data = null;
+  }
+
+  public void clear() {
+    data.clear();
+    reset();
+  }
+
+}

+ 40 - 0
src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Refinement of InputFormat requiring implementors to provide
+ * ComposableRecordReader instead of RecordReader.
+ */
+public interface ComposableInputFormat<K extends WritableComparable,
+                                       V extends Writable>
+    extends InputFormat<K,V> {
+
+  ComposableRecordReader<K,V> getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter) throws IOException;
+}

+ 65 - 0
src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Additional operations required of a RecordReader to participate in a join.
+ */
+interface ComposableRecordReader<K extends WritableComparable,
+                                 V extends Writable>
+    extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {
+
+  /**
+   * Return the position in the collector this class occupies.
+   */
+  int id();
+
+  /**
+   * Return the key this RecordReader would supply on a call to next(K,V)
+   */
+  K key();
+
+  /**
+   * Clone the key at the head of this RecordReader into the object provided.
+   */
+  void key(K key) throws IOException;
+
+  /**
+   * Returns true if the stream is not empty, but provides no guarantee that
+   * a call to next(K,V) will succeed.
+   */
+  boolean hasNext();
+
+  /**
+   * Skip key-value pairs with keys less than or equal to the key provided.
+   */
+  void skip(K key) throws IOException;
+
+  /**
+   * While key-value pairs from this RecordReader match the given key, register
+   * them with the JoinCollector provided.
+   */
+  void accept(CompositeRecordReader.JoinCollector jc, K key) throws IOException;
+}

+ 193 - 0
src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java

@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An InputFormat capable of performing joins over a set of data sources sorted
+ * and partitioned the same way.
+ * @see #setFormat
+ *
+ * A user may define new join types by setting the property
+ * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
+ * <tt>mapred.join.expr</tt>, the identifier will be assumed to be a
+ * ComposableRecordReader.
+ * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
+ * in the join.
+ * @see JoinRecordReader
+ * @see MultiFilterRecordReader
+ */
+public class CompositeInputFormat<K extends WritableComparable>
+      implements ComposableInputFormat<K,TupleWritable> {
+
+  // expression parse tree to which IF requests are proxied
+  private Parser.Node root;
+
+  public CompositeInputFormat() { }
+
+
+  /**
+   * Interpret a given string as a composite expression.
+   * {@code
+   *   func  ::= <ident>([<func>,]*<func>)
+   *   func  ::= tbl(<class>,"<path>")
+   *   class ::= @see java.lang.Class#forName(java.lang.String)
+   *   path  ::= @see org.apache.hadoop.fs.Path#Path(java.lang.String)
+   * }
+   * Reads expression from the <tt>mapred.join.expr</tt> property and
+   * user-supplied join types from <tt>mapred.join.define.&lt;ident&gt;</tt>
+   *  types. Paths supplied to <tt>tbl</tt> are given as input paths to the
+   * InputFormat class listed.
+   * @see #compose(java.lang.String, java.lang.Class, java.lang.String...)
+   */
+  public void setFormat(JobConf job) throws IOException {
+    addDefaults();
+    addUserIdentifiers(job);
+    Class<? extends WritableComparator> cmpcl =
+      job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
+    root = Parser.parse(job.get("mapred.join.expr", null), cmpcl);
+  }
+
+  /**
+   * Adds the default set of identifiers to the parser.
+   */
+  protected void addDefaults() {
+    try {
+      Parser.CNode.addIdentifier("inner", InnerJoinRecordReader.class);
+      Parser.CNode.addIdentifier("outer", OuterJoinRecordReader.class);
+      Parser.CNode.addIdentifier("override", OverrideRecordReader.class);
+      Parser.WNode.addIdentifier("tbl", WrappedRecordReader.class);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("FATAL: Failed to init defaults", e);
+    }
+  }
+
+  /**
+   * Inform the parser of user-defined types.
+   */
+  private void addUserIdentifiers(JobConf job) throws IOException {
+    Pattern x = Pattern.compile("^mapred\\.join\\.define\\.(\\w+)$");
+    for (Map.Entry<String,String> kv : job) {
+      Matcher m = x.matcher(kv.getKey());
+      if (m.matches()) {
+        try {
+          Parser.CNode.addIdentifier(m.group(1),
+              job.getClass(m.group(0), null, ComposableRecordReader.class));
+        } catch (NoSuchMethodException e) {
+          throw (IOException)new IOException(
+              "Invalid define for " + m.group(1)).initCause(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Verify that this composite has children and that all its children
+   * can validate their input.
+   */
+  public void validateInput(JobConf job) throws IOException {
+    setFormat(job);
+    root.validateInput(job);
+  }
+
+  /**
+   * Build a CompositeInputSplit from the child InputFormats by assigning the
+   * ith split from each child to the ith composite split.
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    setFormat(job);
+    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+    return root.getSplits(job, numSplits);
+  }
+
+  /**
+   * Construct a CompositeRecordReader for the children of this InputFormat
+   * as defined in the init expression.
+   * The outermost join need only be composable, not necessarily a composite.
+   * Mandating TupleWritable isn't strictly correct.
+   */
+  @SuppressWarnings("unchecked") // child types unknown
+  public ComposableRecordReader<K,TupleWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    setFormat(job);
+    return root.getRecordReader(split, job, reporter);
+  }
+
+  /**
+   * Convenience method for constructing composite formats.
+   * Given InputFormat class (inf), path (p) return:
+   * {@code tbl(<inf>, <p>) }
+   */
+  public static String compose(Class<? extends InputFormat> inf, String path) {
+    return compose(inf.getName().intern(), path, new StringBuffer()).toString();
+  }
+
+  /**
+   * Convenience method for constructing composite formats.
+   * Given operation (op), Object class (inf), set of paths (p) return:
+   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+   */
+  public static String compose(String op, Class<? extends InputFormat> inf,
+      String... path) {
+    final String infname = inf.getName();
+    StringBuffer ret = new StringBuffer(op + '(');
+    for (String p : path) {
+      compose(infname, p, ret);
+      ret.append(',');
+    }
+    ret.setCharAt(ret.length() - 1, ')');
+    return ret.toString();
+  }
+
+  /**
+   * Convenience method for constructing composite formats.
+   * Given operation (op), Object class (inf), set of paths (p) return:
+   * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
+   */
+  public static String compose(String op, Class<? extends InputFormat> inf,
+      Path... path) {
+    ArrayList<String> tmp = new ArrayList<String>(path.length);
+    for (Path p : path) {
+      tmp.add(p.toString());
+    }
+    return compose(op, inf, tmp.toArray(new String[0]));
+  }
+
+  private static StringBuffer compose(String inf, String path,
+      StringBuffer sb) {
+    sb.append("tbl(" + inf + ",\"");
+    sb.append(path);
+    sb.append("\")");
+    return sb;
+  }
+
+}

+ 149 - 0
src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
+ * into this collection must have a public default constructor.
+ */
+public class CompositeInputSplit implements InputSplit {
+
+  private int fill = 0;
+  private long totsize = 0L;
+  private InputSplit[] splits;
+
+  public CompositeInputSplit() { }
+
+  public CompositeInputSplit(int capacity) {
+    splits = new InputSplit[capacity];
+  }
+
+  /**
+   * Add an InputSplit to this collection.
+   * @throws IOException If capacity was not specified during construction
+   *                     or if capacity has been reached.
+   */
+  public void add(InputSplit s) throws IOException {
+    if (null == splits) {
+      throw new IOException("Uninitialized InputSplit");
+    }
+    if (fill == splits.length) {
+      throw new IOException("Too many splits");
+    }
+    splits[fill++] = s;
+    totsize += s.getLength();
+  }
+
+  /**
+   * Get ith child InputSplit.
+   */
+  public InputSplit get(int i) {
+    return splits[i];
+  }
+
+  /**
+   * Return the aggregate length of all child InputSplits currently added.
+   */
+  public long getLength() throws IOException {
+    return totsize;
+  }
+
+  /**
+   * Get the length of ith child InputSplit.
+   */
+  public long getLength(int i) throws IOException {
+    return splits[i].getLength();
+  }
+
+  /**
+   * Collect a set of hosts from all child InputSplits.
+   */
+  public String[] getLocations() throws IOException {
+    HashSet<String> hosts = new HashSet<String>();
+    for (InputSplit s : splits) {
+      String[] hints = s.getLocations();
+      if (hints != null && hints.length > 0) {
+        for (String host : hints) {
+          hosts.add(host);
+        }
+      }
+    }
+    return hosts.toArray(new String[hosts.size()]);
+  }
+
+  /**
+   * getLocations from ith InputSplit.
+   */
+  public String[] getLocation(int i) throws IOException {
+    return splits[i].getLocations();
+  }
+
+  /**
+   * Write splits in the following format.
+   * {@code
+   * <count><class1><class2>...<classn><split1><split2>...<splitn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, splits.length);
+    for (InputSplit s : splits) {
+      Text.writeString(out, s.getClass().getName());
+    }
+    for (InputSplit s : splits) {
+      s.write(out);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @throws IOException If the child InputSplit cannot be read, typically
+   *                     for faliing access checks.
+   */
+  @SuppressWarnings("unchecked")  // Explicit check for split class agreement
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    if (splits == null || splits.length != card) {
+      splits = new InputSplit[card];
+    }
+    Class<? extends InputSplit>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        cls[i] =
+          Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
+      }
+      for (int i = 0; i < card; ++i) {
+        splits[i] = (InputSplit) ReflectionUtils.newInstance(cls[i], null);
+        splits[i].readFields(in);
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException)new IOException("Failed split init").initCause(e);
+    }
+  }
+
+}

+ 472 - 0
src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java

@@ -0,0 +1,472 @@
+/** * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A RecordReader that can effect joins of RecordReaders sharing a common key
+ * type and partitioning.
+ */
+public abstract class CompositeRecordReader<
+    K extends WritableComparable, // key type
+    V extends Writable,           // accepts RecordReader<K,V> as children
+    X extends Writable>           // emits Writables of this type
+    implements Configurable {
+
+
+  private int id;
+  private Configuration conf;
+  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
+
+  private WritableComparator cmp;
+  private Class<? extends WritableComparable> keyclass;
+  private PriorityQueue<ComposableRecordReader<K,?>> q;
+
+  protected final JoinCollector jc;
+  protected final ComposableRecordReader<K,? extends V>[] kids;
+
+  protected abstract boolean combine(Object[] srcs, TupleWritable value);
+
+  /**
+   * Create a RecordReader with <tt>capacity</tt> children to position
+   * <tt>id</tt> in the parent reader.
+   * The id of a root CompositeRecordReader is -1 by convention, but relying
+   * on this is not recommended.
+   */
+  @SuppressWarnings("unchecked") // Generic array assignment
+  public CompositeRecordReader(int id, int capacity,
+      Class<? extends WritableComparator> cmpcl)
+      throws IOException {
+    assert capacity > 0 : "Invalid capacity";
+    this.id = id;
+    if (null != cmpcl) {
+      cmp = (WritableComparator)ReflectionUtils.newInstance(cmpcl, null);
+      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+          new Comparator<ComposableRecordReader<K,?>>() {
+            public int compare(ComposableRecordReader<K,?> o1,
+                               ComposableRecordReader<K,?> o2) {
+              return cmp.compare(o1.key(), o2.key());
+            }
+          });
+    }
+    jc = new JoinCollector(capacity);
+    kids = new ComposableRecordReader[capacity];
+  }
+
+  /**
+   * Return the position in the collector this class occupies.
+   */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Return sorted list of RecordReaders for this composite.
+   */
+  protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
+    return q;
+  }
+
+  /**
+   * Return comparator defining the ordering for RecordReaders in this
+   * composite.
+   */
+  protected WritableComparator getComparator() {
+    return cmp;
+  }
+
+  /**
+   * Add a RecordReader to this collection.
+   * The id() of a RecordReader determines where in the Tuple its
+   * entry will appear. Adding RecordReaders with the same id has
+   * undefined behavior.
+   */
+  public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
+    kids[rr.id()] = rr;
+    if (null == q) {
+      cmp = WritableComparator.get(rr.createKey().getClass());
+      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
+          new Comparator<ComposableRecordReader<K,?>>() {
+            public int compare(ComposableRecordReader<K,?> o1,
+                               ComposableRecordReader<K,?> o2) {
+              return cmp.compare(o1.key(), o2.key());
+            }
+          });
+    }
+    q.add(rr);
+  }
+
+  /**
+   * Collector for join values.
+   * This accumulates values for a given key from the child RecordReaders. If
+   * one or more child RR contain duplicate keys, this will emit the cross
+   * product of the associated values until exhausted.
+   */
+  class JoinCollector {
+    private K key;
+    private ResetableIterator<X>[] iters;
+    private long partial = 0L;
+    private long replaymask = 0L;
+    private int start = 0;
+    private int pos = -1;
+    private int iterpos = -1;
+    private boolean first = true;
+
+    /**
+     * Construct a collector capable of handling the specified number of
+     * children.
+     */
+    @SuppressWarnings("unchecked") // Generic array assignment
+    public JoinCollector(int card) {
+      iters = new ResetableIterator[card];
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i] = EMPTY;
+      }
+    }
+
+    /**
+     * Register a given iterator at position id.
+     */
+    public void add(int id, ResetableIterator<X> i)
+        throws IOException {
+      iters[id] = i;
+    }
+
+    /**
+     * Return the key associated with this collection.
+     */
+    public K key() {
+      return key;
+    }
+
+    /**
+     * Codify the contents of the collector to be iterated over.
+     * When this is called, all RecordReaders registered for this
+     * key should have added ResetableIterators.
+     */
+    public void reset(K key) {
+      this.key = key;
+      start = 0;
+      pos = 0;
+      first = true;
+      partial = 0L;
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].reset();
+      }
+    }
+
+    /**
+     * Clear all state information.
+     */
+    public void clear() {
+      key = null;
+      pos = -1;
+      first = true;
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].clear();
+        iters[i] = EMPTY;
+      }
+      partial = 0L;
+    }
+
+    /**
+     * Returns false if exhausted or if reset(K) has not been called.
+     */
+    protected boolean hasNext() {
+      return !(pos < 0);
+    }
+
+    /**
+     * Populate Tuple from iterators.
+     * It should be the case that, given iterators i_1...i_n over values from
+     * sources s_1...s_n sharing key k, repeated calls to next should yield
+     * I x I.
+     */
+    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+    protected boolean next(TupleWritable val) throws IOException {
+      if (pos < 0) {
+        clear();
+        return false;
+      }
+      int i = start;
+      if (first) { // Find first iterator with elements
+        for (; i < iters.length && !iters[i].hasNext(); ++i);
+        if (iters.length <= i) { // no children had key
+          clear();
+          return false;
+        }
+        start = i;
+        for (int j = i; j < iters.length; ++j) {
+          if (iters[j].hasNext()) {
+            partial |= 1 << j;
+          }
+        }
+        iterpos = pos = iters.length - 1;
+        first = false;
+      } else { // Copy all elements in partial into tuple
+        for (; i < iterpos; ++i) {
+          if ((partial & (1 << i)) != 0) {
+            iters[i].replay((X)val.get(i));
+            val.setWritten(i);
+          }
+        }
+      }
+      long partialwritten = val.mask();
+      if (iters[i].next((X)val.get(i))) {
+        val.setWritten(i);
+      }
+      for (++i; i < iters.length; ++i) {
+        iters[i].reset();
+        if (iters[i].hasNext() && iters[i].next((X)val.get(i))) {
+          val.setWritten(i);
+        }
+      }
+      iterpos = iters.length - 1;
+      for (; iterpos > pos && !iters[iterpos].hasNext(); --iterpos);
+      if (!iters[iterpos].hasNext()) {
+        for (; !(pos < 0 || iters[pos].hasNext()); --pos);
+        iterpos = pos;
+      }
+      replaymask = val.mask();
+      if ((replaymask ^ partialwritten) == 0L) {
+        return next(val);
+      }
+      return true;
+    }
+
+    /**
+     * Replay the last Tuple emitted.
+     */
+    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+    public void replay(TupleWritable val) throws IOException {
+      // The last emitted tuple might have drawn on an empty source;
+      // it can't be cleared prematurely, b/c there may be more duplicate
+      // keys in iterator positions < pos
+      if (first) {
+        throw new IllegalStateException();
+      }
+      for (int i = 0; i < iters.length; ++i) {
+        if ((replaymask & (1 << i)) != 0) {
+          iters[i].replay((X)val.get(i));
+        }
+      }
+    }
+
+    /**
+     * Close all child iterators.
+     */
+    public void close() throws IOException {
+      for (int i = 0; i < iters.length; ++i) {
+        iters[i].close();
+      }
+    }
+
+    /**
+     * Write the next value into key, value as accepted by the operation
+     * associated with this set of RecordReaders.
+     */
+    public boolean flush(TupleWritable value) throws IOException {
+      while (hasNext()) {
+        value.clearWritten();
+        if (next(value) && combine(kids, value)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Return the key for the current join or the value at the top of the
+   * RecordReader heap.
+   */
+  public K key() {
+    if (jc.hasNext()) {
+      return jc.key();
+    }
+    if (!q.isEmpty()) {
+      return q.peek().key();
+    }
+    return null;
+  }
+
+  /**
+   * Clone the key at the top of this RR into the given object.
+   */
+  public void key(K key) throws IOException {
+    WritableUtils.cloneInto(key, key());
+  }
+
+  /**
+   * Return true if it is possible that this could emit more values.
+   */
+  public boolean hasNext() {
+    return jc.hasNext() || !q.isEmpty();
+  }
+
+  /**
+   * Pass skip key to child RRs.
+   */
+  public void skip(K key) throws IOException {
+    ArrayList<ComposableRecordReader<K,?>> tmp =
+      new ArrayList<ComposableRecordReader<K,?>>();
+    while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
+      tmp.add(q.poll());
+    }
+    for (ComposableRecordReader<K,?> rr : tmp) {
+      rr.skip(key);
+      q.add(rr);
+    }
+  }
+
+  /**
+   * Obtain an iterator over the child RRs apropos of the value type
+   * ultimately emitted from this join.
+   */
+  protected abstract ResetableIterator<X> getDelegate();
+
+  /**
+   * If key provided matches that of this Composite, give JoinCollector
+   * iterator over values it may emit.
+   */
+  @SuppressWarnings("unchecked") // No values from static EMPTY class
+  public void accept(CompositeRecordReader.JoinCollector jc, K key)
+      throws IOException {
+    if (hasNext() && 0 == cmp.compare(key, key())) {
+      fillJoinCollector(createKey());
+      jc.add(id, getDelegate());
+      return;
+    }
+    jc.add(id, EMPTY);
+  }
+
+  /**
+   * For all child RRs offering the key provided, obtain an iterator
+   * at that position in the JoinCollector.
+   */
+  protected void fillJoinCollector(K iterkey) throws IOException {
+    if (!q.isEmpty()) {
+      q.peek().key(iterkey);
+      while (0 == cmp.compare(q.peek().key(), iterkey)) {
+        ComposableRecordReader<K,?> t = q.poll();
+        t.accept(jc, iterkey);
+        if (t.hasNext()) {
+          q.add(t);
+        } else if (q.isEmpty()) {
+          return;
+        }
+      }
+    }
+  }
+
+  /**
+   * Implement Comparable contract (compare key of join or head of heap
+   * with that of another).
+   */
+  public int compareTo(ComposableRecordReader<K,?> other) {
+    return cmp.compare(key(), other.key());
+  }
+
+  /**
+   * Create a new key value common to all child RRs.
+   * @throws ClassCastException if key classes differ.
+   */
+  @SuppressWarnings("unchecked") // Explicit check for key class agreement
+  public K createKey() {
+    if (null == keyclass) {
+      final Class<?> cls = kids[0].createKey().getClass();
+      for (RecordReader<K,? extends Writable> rr : kids) {
+        if (!cls.equals(rr.createKey().getClass())) {
+          throw new ClassCastException("Child key classes fail to agree");
+        }
+      }
+      keyclass = cls.asSubclass(WritableComparable.class);
+    }
+    return (K) ReflectionUtils.newInstance(keyclass, getConf());
+  }
+
+  /**
+   * Create a value to be used internally for joins.
+   */
+  protected TupleWritable createInternalValue() {
+    Writable[] vals = new Writable[kids.length];
+    for (int i = 0; i < vals.length; ++i) {
+      vals[i] = kids[i].createValue();
+    }
+    return new TupleWritable(vals);
+  }
+
+  /**
+   * Unsupported (returns zero in all cases).
+   */
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  /**
+   * Close all child RRs.
+   */
+  public void close() throws IOException {
+    if (kids != null) {
+      for (RecordReader<K,? extends Writable> rr : kids) {
+        rr.close();
+      }
+    }
+    if (jc != null) {
+      jc.close();
+    }
+  }
+
+  /**
+   * Report progress as the minimum of all child RR progress.
+   */
+  public float getProgress() throws IOException {
+    float ret = 1.0f;
+    for (RecordReader<K,? extends Writable> rr : kids) {
+      ret = Math.min(ret, rr.getProgress());
+    }
+    return ret;
+  }
+}

+ 50 - 0
src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Full inner join.
+ */
+public class InnerJoinRecordReader<K extends WritableComparable>
+    extends JoinRecordReader<K> {
+
+  InnerJoinRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, conf, capacity, cmpcl);
+  }
+
+  /**
+   * Return true iff the tuple is full (all data sources contain this key).
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    assert srcs.length == dst.size();
+    for (int i = 0; i < srcs.length; ++i) {
+      if (!dst.has(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

+ 114 - 0
src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java

@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Base class for Composite joins returning Tuples of arbitrary Writables.
+ */
+public abstract class JoinRecordReader<K extends WritableComparable>
+    extends CompositeRecordReader<K,Writable,TupleWritable>
+    implements ComposableRecordReader<K,TupleWritable> {
+
+  public JoinRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, capacity, cmpcl);
+    setConf(conf);
+  }
+
+  /**
+   * Emit the next set of key, value pairs as defined by the child
+   * RecordReaders and operation associated with this composite RR.
+   */
+  public boolean next(K key, TupleWritable value) throws IOException {
+    if (jc.flush(value)) {
+      WritableUtils.cloneInto(key, jc.key());
+      return true;
+    }
+    jc.clear();
+    K iterkey = createKey();
+    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+    while (!q.isEmpty()) {
+      fillJoinCollector(iterkey);
+      jc.reset(iterkey);
+      if (jc.flush(value)) {
+        WritableUtils.cloneInto(key, jc.key());
+        return true;
+      }
+      jc.clear();
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public TupleWritable createValue() {
+    return createInternalValue();
+  }
+
+  /**
+   * Return an iterator wrapping the JoinCollector.
+   */
+  protected ResetableIterator<TupleWritable> getDelegate() {
+    return new JoinDelegationIterator();
+  }
+
+  /**
+   * Since the JoinCollector is effecting our operation, we need only
+   * provide an iterator proxy wrapping its operation.
+   */
+  protected class JoinDelegationIterator
+      implements ResetableIterator<TupleWritable> {
+
+    public boolean hasNext() {
+      return jc.hasNext();
+    }
+
+    public boolean next(TupleWritable val) throws IOException {
+      return jc.flush(val);
+    }
+
+    public void replay(TupleWritable val) throws IOException {
+      jc.replay(val);
+    }
+
+    public void reset() {
+      jc.reset(jc.key());
+    }
+
+    public void add(TupleWritable item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {
+      jc.close();
+    }
+
+    public void clear() {
+      jc.clear();
+    }
+  }
+}

+ 153 - 0
src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java

@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Base class for Composite join returning values derived from multiple
+ * sources, but generally not tuples.
+ */
+public abstract class MultiFilterRecordReader<K extends WritableComparable,
+                                              V extends Writable>
+    extends CompositeRecordReader<K,V,V>
+    implements ComposableRecordReader<K,V> {
+
+  private Class<? extends Writable> valueclass;
+  private TupleWritable ivalue;
+
+  public MultiFilterRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, capacity, cmpcl);
+    setConf(conf);
+  }
+
+  /**
+   * For each tuple emitted, return a value (typically one of the values
+   * in the tuple).
+   * Modifying the Writables in the tuple is permitted and unlikely to affect
+   * join behavior in most cases, but it is not recommended. It's safer to
+   * clone first.
+   */
+  protected abstract V emit(TupleWritable dst) throws IOException;
+
+  /**
+   * Default implementation offers {@link #emit} every Tuple from the
+   * collector (the outer join of child RRs).
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  public boolean next(K key, V value) throws IOException {
+    if (jc.flush(ivalue)) {
+      WritableUtils.cloneInto(key, jc.key());
+      WritableUtils.cloneInto(value, emit(ivalue));
+      return true;
+    }
+    jc.clear();
+    K iterkey = createKey();
+    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+    while (!q.isEmpty()) {
+      fillJoinCollector(iterkey);
+      jc.reset(iterkey);
+      if (jc.flush(ivalue)) {
+        WritableUtils.cloneInto(key, jc.key());
+        WritableUtils.cloneInto(value, emit(ivalue));
+        return true;
+      }
+      jc.clear();
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked") // Explicit check for value class agreement
+  public V createValue() {
+    if (null == valueclass) {
+      final Class<?> cls = kids[0].createValue().getClass();
+      for (RecordReader<K,? extends V> rr : kids) {
+        if (!cls.equals(rr.createValue().getClass())) {
+          throw new ClassCastException("Child value classes fail to agree");
+        }
+      }
+      valueclass = cls.asSubclass(Writable.class);
+      ivalue = createInternalValue();
+    }
+    return (V) ReflectionUtils.newInstance(valueclass, null);
+  }
+
+  /**
+   * Return an iterator returning a single value from the tuple.
+   * @see MultiFilterDelegationIterator
+   */
+  protected ResetableIterator<V> getDelegate() {
+    return new MultiFilterDelegationIterator();
+  }
+
+  /**
+   * Proxy the JoinCollector, but include callback to emit.
+   */
+  protected class MultiFilterDelegationIterator
+      implements ResetableIterator<V> {
+
+    public boolean hasNext() {
+      return jc.hasNext();
+    }
+
+    public boolean next(V val) throws IOException {
+      boolean ret;
+      if (ret = jc.flush(ivalue)) {
+        WritableUtils.cloneInto(val, emit(ivalue));
+      }
+      return ret;
+    }
+
+    public void replay(V val) throws IOException {
+      WritableUtils.cloneInto(val, emit(ivalue));
+    }
+
+    public void reset() {
+      jc.reset(jc.key());
+    }
+
+    public void add(V item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {
+      jc.close();
+    }
+
+    public void clear() {
+      jc.clear();
+    }
+  }
+
+}

+ 45 - 0
src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Full outer join.
+ */
+public class OuterJoinRecordReader<K extends WritableComparable>
+    extends JoinRecordReader<K> {
+
+  OuterJoinRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, conf, capacity, cmpcl);
+  }
+
+  /**
+   * Emit everything from the collector.
+   */
+  protected boolean combine(Object[] srcs, TupleWritable dst) {
+    assert srcs.length == dst.size();
+    return true;
+  }
+}

+ 93 - 0
src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Prefer the &quot;rightmost&quot; data source for this key.
+ * For example, <tt>override(S1,S2,S3)</tt> will prefer values
+ * from S3 over S2, and values from S2 over S1 for all keys
+ * emitted from all sources.
+ */
+public class OverrideRecordReader<K extends WritableComparable,
+                                  V extends Writable>
+    extends MultiFilterRecordReader<K,V> {
+
+  OverrideRecordReader(int id, JobConf conf, int capacity,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    super(id, conf, capacity, cmpcl);
+  }
+
+  /**
+   * Emit the value with the highest position in the tuple.
+   */
+  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+  protected V emit(TupleWritable dst) {
+    return (V) dst.iterator().next();
+  }
+
+  /**
+   * Instead of filling the JoinCollector with iterators from all
+   * data sources, fill only the rightmost for this key.
+   * This not only saves space by discarding the other sources, but
+   * it also emits the number of key-value pairs in the preferred
+   * RecordReader instead of repeating that stream n times, where
+   * n is the cardinality of the cross product of the discarded
+   * streams for the given key.
+   */
+  protected void fillJoinCollector(K iterkey) throws IOException {
+    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
+    if (!q.isEmpty()) {
+      int highpos = -1;
+      ArrayList<ComposableRecordReader<K,?>> list =
+        new ArrayList<ComposableRecordReader<K,?>>(kids.length);
+      q.peek().key(iterkey);
+      final WritableComparator cmp = getComparator();
+      while (0 == cmp.compare(q.peek().key(), iterkey)) {
+        ComposableRecordReader<K,?> t = q.poll();
+        if (-1 == highpos || list.get(highpos).id() < t.id()) {
+          highpos = list.size();
+        }
+        list.add(t);
+        if (q.isEmpty())
+          break;
+      }
+      ComposableRecordReader<K,?> t = list.remove(highpos);
+      t.accept(jc, iterkey);
+      for (ComposableRecordReader<K,?> rr : list) {
+        rr.skip(iterkey);
+      }
+      list.add(t);
+      for (ComposableRecordReader<K,?> rr : list) {
+        if (rr.hasNext()) {
+          q.add(rr);
+        }
+      }
+    }
+  }
+
+}

+ 495 - 0
src/java/org/apache/hadoop/mapred/join/Parser.java

@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.CharArrayReader;
+import java.io.IOException;
+import java.io.StreamTokenizer;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Very simple shift-reduce parser for join expressions.
+ *
+ * This should be sufficient for the user extension permitted now, but ought to
+ * be replaced with a parser generator if more complex grammars are supported.
+ * In particular, this &quot;shift-reduce&quot; parser has no states. Each set
+ * of formals requires a different internal node type, which is responsible for
+ * interpreting the list of tokens it receives. This is sufficient for the
+ * current grammar, but it has several annoying properties that might inhibit
+ * extension. In particular, parenthesis are always function calls; an
+ * algebraic or filter grammar would not only require a node type, but must
+ * also work around the internals of this parser.
+ *
+ * For most other cases, adding classes to the hierarchy- particularly by
+ * extending JoinRecordReader and MultiFilterRecordReader- is fairly
+ * straightforward. One need only override the relevant method(s) (usually only
+ * {@link CompositeRecordReader#combine}) and include a property to map its
+ * value to an identifier in the parser.
+ */
+public class Parser {
+  public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
+
+  /**
+   * Tagged-union type for tokens from the join expression.
+   * @see Parser.TType
+   */
+  public static class Token {
+
+    private TType type;
+
+    Token(TType type) {
+      this.type = type;
+    }
+
+    public TType getType() { return type; }
+    public Node getNode() throws IOException {
+      throw new IOException("Expected nodetype");
+    }
+    public double getNum() throws IOException {
+      throw new IOException("Expected numtype");
+    }
+    public String getStr() throws IOException {
+      throw new IOException("Expected strtype");
+    }
+  }
+
+  public static class NumToken extends Token {
+    private double num;
+    public NumToken(double num) {
+      super(TType.NUM);
+      this.num = num;
+    }
+    public double getNum() { return num; }
+  }
+
+  public static class NodeToken extends Token {
+    private Node node;
+    NodeToken(Node node) {
+      super(TType.CIF);
+      this.node = node;
+    }
+    public Node getNode() {
+      return node;
+    }
+  }
+
+  public static class StrToken extends Token {
+    private String str;
+    public StrToken(TType type, String str) {
+      super(type);
+      this.str = str;
+    }
+    public String getStr() {
+      return str;
+    }
+  }
+
+  /**
+   * Simple lexer wrapping a StreamTokenizer.
+   * This encapsulates the creation of tagged-union Tokens and initializes the
+   * SteamTokenizer.
+   */
+  private static class Lexer {
+
+    private StreamTokenizer tok;
+
+    Lexer(String s) {
+      tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
+      tok.quoteChar('"');
+      tok.parseNumbers();
+      tok.ordinaryChar(',');
+      tok.ordinaryChar('(');
+      tok.ordinaryChar(')');
+    }
+
+    Token next() throws IOException {
+      int type = tok.nextToken();
+      switch (type) {
+        case StreamTokenizer.TT_EOF:
+        case StreamTokenizer.TT_EOL:
+          return null;
+        case StreamTokenizer.TT_NUMBER:
+          return new NumToken(tok.nval);
+        case StreamTokenizer.TT_WORD:
+          return new StrToken(TType.IDENT, tok.sval);
+        case '"':
+          return new StrToken(TType.QUOT, tok.sval);
+        default:
+          switch (type) {
+            case ',':
+              return new Token(TType.COMMA);
+            case '(':
+              return new Token(TType.LPAREN);
+            case ')':
+              return new Token(TType.RPAREN);
+            default:
+              throw new IOException("Unexpected: " + type);
+          }
+      }
+    }
+  }
+
+  public abstract static class Node implements ComposableInputFormat {
+    /**
+     * Return the node type registered for the particular identifier.
+     * By default, this is a CNode for any composite node and a WNode
+     * for &quot;wrapped&quot; nodes. User nodes will likely be composite
+     * nodes.
+     * @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
+     * @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
+     */
+    static Node forIdent(String ident) throws IOException {
+      try {
+        if (!nodeCstrMap.containsKey(ident)) {
+          throw new IOException("No nodetype for " + ident);
+        }
+        return nodeCstrMap.get(ident).newInstance(ident);
+      } catch (IllegalAccessException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+    }
+
+    private static final Class<?>[] ncstrSig = { String.class };
+    private static final
+        Map<String,Constructor<? extends Node>> nodeCstrMap =
+        new HashMap<String,Constructor<? extends Node>>();
+    protected static final
+        Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
+        new HashMap<String,Constructor<? extends ComposableRecordReader>>();
+
+    /**
+     * For a given identifier, add a mapping to the nodetype for the parse
+     * tree and to the ComposableRecordReader to be created, including the
+     * formals required to invoke the constructor.
+     * The nodetype and constructor signature should be filled in from the
+     * child node.
+     */
+    protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
+                              Class<? extends Node> nodetype,
+                              Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Constructor<? extends Node> ncstr =
+        nodetype.getDeclaredConstructor(ncstrSig);
+      ncstr.setAccessible(true);
+      nodeCstrMap.put(ident, ncstr);
+      Constructor<? extends ComposableRecordReader> mcstr =
+        cl.getDeclaredConstructor(mcstrSig);
+      mcstr.setAccessible(true);
+      rrCstrMap.put(ident, mcstr);
+    }
+
+    // inst
+    protected int id = -1;
+    protected String ident;
+    protected Class<? extends WritableComparator> cmpcl;
+
+    protected Node(String ident) {
+      this.ident = ident;
+    }
+
+    protected void setID(int id) {
+      this.id = id;
+    }
+
+    protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+      this.cmpcl = cmpcl;
+    }
+    abstract void parse(List<Token> args) throws IOException;
+  }
+
+  /**
+   * Nodetype in the parse tree for &quot;wrapped&quot; InputFormats.
+   */
+  static class WNode extends Node {
+    private static final Class<?>[] cstrSig =
+      { Integer.TYPE, RecordReader.class, Class.class };
+
+    static void addIdentifier(String ident,
+                              Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Node.addIdentifier(ident, cstrSig, WNode.class, cl);
+    }
+
+    private String indir;
+    private InputFormat inf;
+
+    public WNode(String ident) {
+      super(ident);
+    }
+
+    /**
+     * Let the first actual define the InputFormat and the second define
+     * the <tt>mapred.input.dir</tt> property.
+     */
+    public void parse(List<Token> ll) throws IOException {
+      StringBuilder sb = new StringBuilder();
+      Iterator<Token> i = ll.iterator();
+      while (i.hasNext()) {
+        Token t = i.next();
+        if (TType.COMMA.equals(t.getType())) {
+          try {
+            inf = (InputFormat)ReflectionUtils.newInstance(
+                Class.forName(sb.toString()).asSubclass(InputFormat.class),
+                null);
+          } catch (ClassNotFoundException e) {
+            throw (IOException)new IOException().initCause(e);
+          } catch (IllegalArgumentException e) {
+            throw (IOException)new IOException().initCause(e);
+          }
+          break;
+        }
+        sb.append(t.getStr());
+      }
+      if (!i.hasNext()) {
+        throw new IOException("Parse error");
+      }
+      Token t = i.next();
+      if (!TType.QUOT.equals(t.getType())) {
+        throw new IOException("Expected quoted string");
+      }
+      indir = t.getStr();
+      // no check for ll.isEmpty() to permit extension
+    }
+
+    private JobConf getConf(JobConf job) {
+      JobConf conf = new JobConf(job);
+      conf.setInputPath(new Path(indir));
+      return conf;
+    }
+
+    public void validateInput(JobConf job) throws IOException {
+      inf.validateInput(getConf(job));
+    }
+
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      return inf.getSplits(getConf(job), numSplits);
+    }
+
+    public ComposableRecordReader getRecordReader(
+        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+      try {
+        if (!rrCstrMap.containsKey(ident)) {
+          throw new IOException("No RecordReader for " + ident);
+        }
+        return rrCstrMap.get(ident).newInstance(id,
+            inf.getRecordReader(split, getConf(job), reporter), cmpcl);
+      } catch (IllegalAccessException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+    }
+
+    public String toString() {
+      return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
+    }
+  }
+
+  /**
+   * Internal nodetype for &quot;composite&quot; InputFormats.
+   */
+  static class CNode extends Node {
+
+    private static final Class<?>[] cstrSig =
+      { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
+
+    static void addIdentifier(String ident,
+                              Class<? extends ComposableRecordReader> cl)
+        throws NoSuchMethodException {
+      Node.addIdentifier(ident, cstrSig, CNode.class, cl);
+    }
+
+    // inst
+    private ArrayList<Node> kids = new ArrayList<Node>();
+
+    public CNode(String ident) {
+      super(ident);
+    }
+
+    public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+      super.setKeyComparator(cmpcl);
+      for (Node n : kids) {
+        n.setKeyComparator(cmpcl);
+      }
+    }
+
+    public void validateInput(JobConf job) throws IOException {
+      if (0 == kids.size()) {
+        throw new IOException("Childless composite");
+      }
+      for (Node n : kids) {
+        n.validateInput(job);
+      }
+    }
+
+    /**
+     * Combine InputSplits from child InputFormats into a
+     * {@link CompositeInputSplit}.
+     */
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      InputSplit[][] splits = new InputSplit[kids.size()][];
+      for (int i = 0; i < kids.size(); ++i) {
+        final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
+        if (null == tmp) {
+          throw new IOException("Error gathering splits from child RReader");
+        }
+        if (i > 0 && splits[i-1].length != tmp.length) {
+          throw new IOException("Inconsistent split cardinality from child");
+        }
+        splits[i] = tmp;
+      }
+      final int size = splits[0].length;
+      CompositeInputSplit[] ret = new CompositeInputSplit[size];
+      for (int i = 0; i < size; ++i) {
+        ret[i] = new CompositeInputSplit(splits.length);
+        for (int j = 0; j < splits.length; ++j) {
+          ret[i].add(splits[j][i]);
+        }
+      }
+      return ret;
+    }
+
+    @SuppressWarnings("unchecked") // child types unknowable
+    public ComposableRecordReader getRecordReader(
+        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+      if (!(split instanceof CompositeInputSplit)) {
+        throw new IOException("Invalid split type:" +
+                              split.getClass().getName());
+      }
+      final CompositeInputSplit spl = (CompositeInputSplit)split;
+      final int capacity = kids.size();
+      CompositeRecordReader ret = null;
+      try {
+        if (!rrCstrMap.containsKey(ident)) {
+          throw new IOException("No RecordReader for " + ident);
+        }
+        ret = (CompositeRecordReader)
+          rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
+      } catch (IllegalAccessException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InstantiationException e) {
+        throw (IOException)new IOException().initCause(e);
+      } catch (InvocationTargetException e) {
+        throw (IOException)new IOException().initCause(e);
+      }
+      for (int i = 0; i < capacity; ++i) {
+        ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
+      }
+      return (ComposableRecordReader)ret;
+    }
+
+    /**
+     * Parse a list of comma-separated nodes.
+     */
+    public void parse(List<Token> args) throws IOException {
+      ListIterator<Token> i = args.listIterator();
+      while (i.hasNext()) {
+        Token t = i.next();
+        t.getNode().setID(i.previousIndex() >> 1);
+        kids.add(t.getNode());
+        if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
+          throw new IOException("Expected ','");
+        }
+      }
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(ident + "(");
+      for (Node n : kids) {
+        sb.append(n.toString() + ",");
+      }
+      sb.setCharAt(sb.length() - 1, ')');
+      return sb.toString();
+    }
+  }
+
+  private static Token reduce(Stack<Token> st) throws IOException {
+    LinkedList<Token> args = new LinkedList<Token>();
+    while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
+      args.addFirst(st.pop());
+    }
+    if (st.isEmpty()) {
+      throw new IOException("Unmatched ')'");
+    }
+    st.pop();
+    if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
+      throw new IOException("Identifier expected");
+    }
+    Node n = Node.forIdent(st.pop().getStr());
+    n.parse(args);
+    return new NodeToken(n);
+  }
+
+  /**
+   * Given an expression and an optional comparator, build a tree of
+   * InputFormats using the comparator to sort keys.
+   */
+  static Node parse(String expr, Class<? extends WritableComparator> cmpcl)
+      throws IOException {
+    if (null == expr) {
+      throw new IOException("Expression is null");
+    }
+    Lexer lex = new Lexer(expr);
+    Stack<Token> st = new Stack<Token>();
+    Token tok;
+    while ((tok = lex.next()) != null) {
+      if (TType.RPAREN.equals(tok.getType())) {
+        st.push(reduce(st));
+      } else {
+        st.push(tok);
+      }
+    }
+    if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
+      Node ret = st.pop().getNode();
+      if (cmpcl != null) {
+        ret.setKeyComparator(cmpcl);
+      }
+      return ret;
+    }
+    throw new IOException("Missing ')'");
+  }
+
+}

+ 92 - 0
src/java/org/apache/hadoop/mapred/join/ResetableIterator.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This defines an interface to a stateful Iterator that can replay elements
+ * added to it directly.
+ * Note that this does not extend {@link java.util.Iterator}.
+ */
+public interface ResetableIterator<T extends Writable> {
+
+  public static class EMPTY<U extends Writable>
+    implements ResetableIterator<U> {
+    public boolean hasNext() { return false; }
+    public void reset() { }
+    public void close() throws IOException { }
+    public void clear() { }
+    public boolean next(U val) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    public void replay(U val) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    public void add(U item) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * True iff a call to next will succeed.
+   */
+  public boolean hasNext();
+
+  /**
+   * Assign next value to actual.
+   * It is required that elements added to a ResetableIterator be returned in
+   * the same order after a call to {@link #reset} (FIFO).
+   *
+   * Note that a call to this may fail for nested joins (i.e. more elements
+   * available, but none satisfying the constraints of the join)
+   */
+  public boolean next(T val) throws IOException;
+
+  /**
+   * Assign last value returned to actual.
+   */
+  public void replay(T val) throws IOException;
+
+  /**
+   * Set iterator to return to the start of its range. Must be called after
+   * calling {@link #add} to avoid a ConcurrentModificationException.
+   */
+  public void reset();
+
+  /**
+   * Add an element to the collection of elements to iterate over.
+   */
+  public void add(T item) throws IOException;
+
+  /**
+   * Close datasources and release resources. Calling methods on the iterator
+   * after calling close has undefined behavior.
+   */
+  // XXX is this necessary?
+  public void close() throws IOException;
+
+  /**
+   * Close datasources, but do not release internal resources. Calling this
+   * method should permit the object to be reused with a different datasource.
+   */
+  public void clear();
+
+}

+ 96 - 0
src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class provides an implementation of ResetableIterator. This
+ * implementation uses a byte array to store elements added to it.
+ */
+public class StreamBackedIterator<X extends Writable>
+    implements ResetableIterator<X> {
+
+  private static class ReplayableByteInputStream extends ByteArrayInputStream {
+    public ReplayableByteInputStream(byte[] arr) {
+      super(arr);
+    }
+    public void resetStream() {
+      mark = 0;
+      reset();
+    }
+  }
+
+  private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
+  private DataOutputStream outfbuf = new DataOutputStream(outbuf);
+  private ReplayableByteInputStream inbuf;
+  private DataInputStream infbuf;
+
+  public StreamBackedIterator() { }
+
+  public boolean hasNext() {
+    return infbuf != null && inbuf.available() > 0;
+  }
+
+  public boolean next(X val) throws IOException {
+    if (hasNext()) {
+      inbuf.mark(0);
+      val.readFields(infbuf);
+      return true;
+    }
+    return false;
+  }
+
+  public void replay(X val) throws IOException {
+    inbuf.reset();
+    val.readFields(infbuf);
+  }
+
+  public void reset() {
+    if (null != outfbuf) {
+      inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
+      infbuf =  new DataInputStream(inbuf);
+      outfbuf = null;
+    }
+    inbuf.resetStream();
+  }
+
+  public void add(X item) throws IOException {
+    item.write(outfbuf);
+  }
+
+  public void close() throws IOException {
+    if (null != infbuf)
+      infbuf.close();
+    if (null != outfbuf)
+      outfbuf.close();
+  }
+
+  public void clear() {
+    if (null != inbuf)
+      inbuf.resetStream();
+    outbuf.reset();
+    outfbuf = new DataOutputStream(outbuf);
+  }
+}

+ 226 - 0
src/java/org/apache/hadoop/mapred/join/TupleWritable.java

@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
+ */
+public class TupleWritable implements Writable, Iterable<Writable> {
+
+  private long written;
+  private Writable[] values;
+
+  /**
+   * Create an empty tuple with no allocated storage for writables.
+   */
+  public TupleWritable() { }
+
+  /**
+   * Initialize tuple with storage; unknown whether any of them contain
+   * &quot;written&quot; values.
+   */
+  public TupleWritable(Writable[] vals) {
+    written = 0L;
+    values = vals;
+  }
+
+  /**
+   * Return true if tuple has an element at the position provided.
+   */
+  public boolean has(int i) {
+    return 0 != ((1 << i) & written);
+  }
+
+  /**
+   * Get ith Writable from Tuple.
+   */
+  public Writable get(int i) {
+    return values[i];
+  }
+
+  /**
+   * The number of children in this Tuple.
+   */
+  public int size() {
+    return values.length;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean equals(Object other) {
+    if (other instanceof TupleWritable) {
+      TupleWritable that = (TupleWritable)other;
+      if (this.size() != that.size() || this.mask() != that.mask()) {
+        return false;
+      }
+      for (int i = 0; i < values.length; ++i) {
+        if (!has(i)) continue;
+        if (!values[i].equals(that.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return (int)written;
+  }
+
+  /**
+   * Return an iterator over the elements in this tuple.
+   * Note that this doesn't flatten the tuple; one may receive tuples
+   * from this iterator.
+   */
+  public Iterator<Writable> iterator() {
+    final TupleWritable t = this;
+    return new Iterator<Writable>() {
+      long i = written;
+      long last = 0L;
+      public boolean hasNext() {
+        return 0L != i;
+      }
+      public Writable next() {
+        last = Long.lowestOneBit(i);
+        if (0 == last)
+          throw new NoSuchElementException();
+        i ^= last;
+        // numberOfTrailingZeros rtn 64 if lsb set
+        return t.get(Long.numberOfTrailingZeros(last) % 64);
+      }
+      public void remove() {
+        t.written ^= last;
+        if (t.has(Long.numberOfTrailingZeros(last))) {
+          throw new IllegalStateException("Attempt to remove non-existent val");
+        }
+      }
+    };
+  }
+
+  /**
+   * Convert Tuple to String as in the following.
+   * <tt>[<child1>,<child2>,...,<childn>]</tt>
+   */
+  public String toString() {
+    StringBuffer buf = new StringBuffer("[");
+    for (int i = 0; i < values.length; ++i) {
+      buf.append(has(i) ? values[i].toString() : "");
+      buf.append(",");
+    }
+    if (values.length != 0)
+      buf.setCharAt(buf.length() - 1, ']');
+    else
+      buf.append(']');
+    return buf.toString();
+  }
+
+  // Writable
+
+  /** Writes each Writable to <code>out</code>.
+   * TupleWritable format:
+   * {@code
+   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    WritableUtils.writeVLong(out, written);
+    for (int i = 0; i < values.length; ++i) {
+      Text.writeString(out, values[i].getClass().getName());
+    }
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        values[i].write(out);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    values = new Writable[card];
+    written = WritableUtils.readVLong(in);
+    Class<? extends Writable>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
+      }
+      for (int i = 0; i < card; ++i) {
+          values[i] = cls[i].newInstance();
+        if (has(i)) {
+          values[i].readFields(in);
+        }
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException)new IOException("Failed tuple init").initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException)new IOException("Failed tuple init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException)new IOException("Failed tuple init").initCause(e);
+    }
+  }
+
+  /**
+   * Record that the tuple contains an element at the position provided.
+   */
+  void setWritten(int i) {
+    written |= 1 << i;
+  }
+
+  /**
+   * Record that the tuple does not contain an element at the position
+   * provided.
+   */
+  void clearWritten(int i) {
+    written &= -1 ^ (1 << i);
+  }
+
+  /**
+   * Clear any record of which writables have been written to, without
+   * releasing storage.
+   */
+  void clearWritten() {
+    written = 0L;
+  }
+
+  /**
+   * Return a bitmap recording which of the writables that have been
+   * written to.
+   */
+  long mask() {
+    return written;
+  }
+
+}

+ 206 - 0
src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java

@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Proxy class for a RecordReader participating in the join framework.
+ * This class keeps track of the &quot;head&quot; key-value pair for the
+ * provided RecordReader and keeps a store of values matching a key when
+ * this source is participating in a join.
+ */
+class WrappedRecordReader<K extends WritableComparable,
+                          U extends Writable>
+    implements ComposableRecordReader<K,U> {
+
+  private boolean empty = false;
+  private RecordReader<K,U> rr;
+  private int id;  // index at which values will be inserted in collector
+
+  private K khead; // key at the top of this RR
+  private U vhead; // value assoc with khead
+  private WritableComparator cmp;
+
+  private ResetableIterator<U> vjoin;
+
+  /**
+   * For a given RecordReader rr, occupy position id in collector.
+   */
+  WrappedRecordReader(int id, RecordReader<K,U> rr,
+      Class<? extends WritableComparator> cmpcl) throws IOException {
+    this.id = id;
+    this.rr = rr;
+    khead = rr.createKey();
+    vhead = rr.createValue();
+    try {
+      cmp = (null == cmpcl)
+        ? WritableComparator.get(khead.getClass())
+        : cmpcl.newInstance();
+    } catch (InstantiationException e) {
+      throw (IOException)new IOException().initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException)new IOException().initCause(e);
+    }
+    vjoin = new StreamBackedIterator<U>();
+    next();
+  }
+
+  /** {@inheritDoc} */
+  public int id() {
+    return id;
+  }
+
+  /**
+   * Return the key at the head of this RR.
+   */
+  public K key() {
+    return khead;
+  }
+
+  /**
+   * Clone the key at the head of this RR into the object supplied.
+   */
+  public void key(K qkey) throws IOException {
+    WritableUtils.cloneInto(qkey, khead);
+  }
+
+  /**
+   * Return true if the RR- including the k,v pair stored in this object-
+   * is exhausted.
+   */
+  public boolean hasNext() {
+    return !empty;
+  }
+
+  /**
+   * Skip key-value pairs with keys less than or equal to the key provided.
+   */
+  public void skip(K key) throws IOException {
+    if (hasNext()) {
+      while (cmp.compare(khead, key) <= 0 && next());
+    }
+  }
+
+  /**
+   * Read the next k,v pair into the head of this object; return true iff
+   * the RR and this are exhausted.
+   */
+  protected boolean next() throws IOException {
+    empty = !rr.next(khead, vhead);
+    return hasNext();
+  }
+
+  /**
+   * Add an iterator to the collector at the position occupied by this
+   * RecordReader over the values in this stream paired with the key
+   * provided (ie register a stream of values from this source matching K
+   * with a collector).
+   */
+                                 // JoinCollector comes from parent, which has
+  @SuppressWarnings("unchecked") // no static type for the slot this sits in
+  public void accept(CompositeRecordReader.JoinCollector i, K key)
+      throws IOException {
+    vjoin.clear();
+    if (0 == cmp.compare(key, khead)) {
+      do {
+        vjoin.add(vhead);
+      } while (next() && 0 == cmp.compare(key, khead));
+    }
+    i.add(id, vjoin);
+  }
+
+  /**
+   * Write key-value pair at the head of this stream to the objects provided;
+   * get next key-value pair from proxied RR.
+   */
+  public boolean next(K key, U value) throws IOException {
+    if (hasNext()) {
+      WritableUtils.cloneInto(key, khead);
+      WritableUtils.cloneInto(value, vhead);
+      next();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Request new key from proxied RR.
+   */
+  public K createKey() {
+    return rr.createKey();
+  }
+
+  /**
+   * Request new value from proxied RR.
+   */
+  public U createValue() {
+    return rr.createValue();
+  }
+
+  /**
+   * Request progress from proxied RR.
+   */
+  public float getProgress() throws IOException {
+    return rr.getProgress();
+  }
+
+  /**
+   * Request position from proxied RR.
+   */
+  public long getPos() throws IOException {
+    return rr.getPos();
+  }
+
+  /**
+   * Forward close request to proxied RR.
+   */
+  public void close() throws IOException {
+    rr.close();
+  }
+
+  /**
+   * Implement Comparable contract (compare key at head of proxied RR
+   * with that of another).
+   */
+  public int compareTo(ComposableRecordReader<K,?> other) {
+    return cmp.compare(key(), other.key());
+  }
+
+  /**
+   * Return true iff compareTo(other) retn true.
+   */
+  @SuppressWarnings("unchecked") // Explicit type check prior to cast
+  public boolean equals(Object other) {
+    return other instanceof ComposableRecordReader
+        && 0 == compareTo((ComposableRecordReader)other);
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return 42;
+  }
+
+}

+ 88 - 0
src/java/org/apache/hadoop/mapred/join/package.html

@@ -0,0 +1,88 @@
+<HTML>
+
+<BODY>
+
+<p>Given a set of sorted datasets keyed with the same class and yielding equal
+partitions, it is possible to effect a join of those datasets prior to the map.
+This could save costs in re-partitioning, sorting, shuffling, and writing out
+data required in the general case.</p>
+
+<h3><a name="Interface"></a>Interface</h3>
+
+<p>The attached code offers the following interface to users of these
+classes.</p>
+
+<table>
+<tr><th>property</th><th>required</th><th>value</th></tr>
+<tr><td>mapred.join.expr</td><td>yes</td>
+    <td>Join expression to effect over input data</td></tr>
+<tr><td>mapred.join.keycomparator</td><td>no</td>
+    <td><tt>WritableComparator</tt> class to use for comparing keys</td></tr>
+<tr><td>mapred.join.define.&lt;ident&gt;</td><td>no</td>
+    <td>Class mapped to identifier in join expression</td></tr>
+</table>
+
+<p>The join expression understands the following grammar:</p>
+
+<pre>func ::= &lt;ident&gt;([&lt;func&gt;,]*&lt;func&gt;)
+func ::= tbl(&lt;class&gt;,"&lt;path&gt;");
+
+</pre>
+
+<p>Operations included in this patch are partitioned into one of two types:
+join operations emitting tuples and "multi-filter" operations emitting a
+single value from (but not necessarily included in) a set of input values.
+For a given key, each operation will consider the cross product of all
+values for all sources at that node.</p>
+
+<p>Identifiers supported by default:</p>
+
+<table>
+<tr><th>identifier</th><th>type</th><th>description</th></tr>
+<tr><td>inner</td><td>Join</td><td>Full inner join</td></tr>
+<tr><td>outer</td><td>Join</td><td>Full outer join</td></tr>
+<tr><td>override</td><td>MultiFilter</td>
+    <td>For a given key, prefer values from the rightmost source</td></tr>
+</table>
+
+<p>A user of this class must set the <tt>InputFormat</tt> for the job to
+<tt>CompositeInputFormat</tt> and define a join expression accepted by the
+preceding grammar. For example, both of the following are acceptable:</p>
+
+<pre>inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+          "hdfs://host:8020/foo/bar"),
+      tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+          "hdfs://host:8020/foo/baz"))
+
+outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+                   "hdfs://host:8020/foo/bar"),
+               tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+                   "hdfs://host:8020/foo/baz")),
+      tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
+          "hdfs://host:8020/foo/rab"))
+</pre>
+
+<p><tt>CompositeInputFormat</tt> includes a handful of convenience methods to
+aid construction of these verbose statements.</p>
+
+<p>As in the second example, joins may be nested. Users may provide a
+comparator class in the <tt>mapred.join.keycomparator</tt> property to specify
+the ordering of their keys, or accept the default comparator as returned by
+<tt>WritableComparator.get(keyclass)</tt>.</p>
+
+<p>Users can specify their own join operations, typically by overriding
+<tt>JoinRecordReader</tt> or <tt>MultiFilterRecordReader</tt> and mapping that
+class to an identifier in the join expression using the
+<tt>mapred.join.define.<em>ident</em></tt> property, where <em>ident</em> is
+the identifier appearing in the join expression. Users may elect to emit- or
+modify- values passing through their join operation. Consulting the existing
+operations for guidance is recommended. Adding arguments is considerably more
+complex (and only partially supported), as one must also add a <tt>Node</tt>
+type to the parse tree. One is probably better off extending
+<tt>RecordReader</tt> in most cases.</p>
+
+<a href="http://issues.apache.org/jira/browse/HADOOP-2085">JIRA</a>
+
+</BODY>
+
+</HTML>

+ 245 - 0
src/test/org/apache/hadoop/mapred/join/TestDatamerge.java

@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+public class TestDatamerge extends TestCase {
+
+  private static MiniDFSCluster cluster = null;
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) {
+      protected void setUp() throws Exception {
+        Configuration conf = new Configuration();
+        cluster = new MiniDFSCluster(conf, 2, true, null);
+      }
+      protected void tearDown() throws Exception {
+        if (cluster != null) {
+          cluster.shutdown();
+        }
+      }
+    };
+    return setup;
+  }
+
+  private static SequenceFile.Writer[] createWriters(Path testdir,
+      Configuration conf, int srcs, Path[] src) throws IOException {
+    for (int i = 0; i < srcs; ++i) {
+      src[i] = new Path(testdir, Integer.toString(i + 10, 36));
+    }
+    SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
+    for (int i = 0; i < srcs; ++i) {
+      out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
+          src[i], IntWritable.class, IntWritable.class);
+    }
+    return out;
+  }
+
+  private static Path[] writeSimpleSrc(Path testdir, Configuration conf,
+      int srcs) throws IOException {
+    SequenceFile.Writer out[] = null;
+    Path[] src = new Path[srcs];
+    try {
+      out = createWriters(testdir, conf, srcs, src);
+      final int capacity = srcs * 2 + 1;
+      IntWritable key = new IntWritable();
+      IntWritable val = new IntWritable();
+      for (int k = 0; k < capacity; ++k) {
+        for (int i = 0; i < srcs; ++i) {
+          key.set(k % srcs == 0 ? k * srcs : k * srcs + i);
+          val.set(10 * k + i);
+          out[i].append(key, val);
+          if (i == k) {
+            // add duplicate key
+            out[i].append(key, val);
+          }
+        }
+      }
+    } finally {
+      if (out != null) {
+        for (int i = 0; i < srcs; ++i) {
+          if (out[i] != null)
+            out[i].close();
+        }
+      }
+    }
+    return src;
+  }
+
+  private static String stringify(IntWritable key, Writable val) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(" + key);
+    sb.append("," + val + ")");
+    return sb.toString();
+  }
+
+  private static abstract class SimpleCheckerBase<V extends Writable>
+      implements Mapper<IntWritable, V, IntWritable, IntWritable>,
+                 Reducer<IntWritable, IntWritable, Text, Text> {
+    protected final static IntWritable one = new IntWritable(1);
+    int srcs;
+    public void close() { }
+    public void configure(JobConf job) {
+      srcs = job.getInt("testdatamerge.sources", 0);
+      assertTrue("Invalid src count: " + srcs, srcs > 0);
+    }
+    public abstract void map(IntWritable key, V val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException;
+    public void reduce(IntWritable key, Iterator<IntWritable> values,
+                       OutputCollector<Text, Text> output,
+                       Reporter reporter) throws IOException {
+      int seen = 0;
+      while (values.hasNext()) {
+        seen += values.next().get();
+      }
+      assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
+    }
+    public abstract boolean verify(int key, int occ);
+  }
+
+  private static class InnerJoinChecker
+      extends SimpleCheckerBase<TupleWritable> {
+    public void map(IntWritable key, TupleWritable val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException {
+      int k = key.get();
+      final String kvstr = "Unexpected tuple: " + stringify(key, val);
+      assertTrue(kvstr, 0 == k % (srcs * srcs));
+      for (int i = 0; i < val.size(); ++i) {
+        final int vali = ((IntWritable)val.get(i)).get();
+        assertTrue(kvstr, (vali - i) * srcs == 10 * k);
+      }
+      out.collect(key, one);
+    }
+    public boolean verify(int key, int occ) {
+      return (key == 0 && occ == 2) ||
+             (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
+    }
+  }
+
+  private static class OuterJoinChecker
+      extends SimpleCheckerBase<TupleWritable> {
+    public void map(IntWritable key, TupleWritable val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException {
+      int k = key.get();
+      final String kvstr = "Unexpected tuple: " + stringify(key, val);
+      if (0 == k % (srcs * srcs)) {
+        for (int i = 0; i < val.size(); ++i) {
+          assertTrue(kvstr, val.get(i) instanceof IntWritable);
+          final int vali = ((IntWritable)val.get(i)).get();
+          assertTrue(kvstr, (vali - i) * srcs == 10 * k);
+        }
+      } else {
+        for (int i = 0; i < val.size(); ++i) {
+          if (i == k % srcs) {
+            assertTrue(kvstr, val.get(i) instanceof IntWritable);
+            final int vali = ((IntWritable)val.get(i)).get();
+            assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
+          } else {
+            assertTrue(kvstr, !val.has(i));
+          }
+        }
+      }
+      out.collect(key, one);
+    }
+    public boolean verify(int key, int occ) {
+      if (key < srcs * srcs && (key % (srcs + 1)) == 0)
+        return 2 == occ;
+      return 1 == occ;
+    }
+  }
+
+  private static class OverrideChecker
+      extends SimpleCheckerBase<IntWritable> {
+    public void map(IntWritable key, IntWritable val,
+        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
+        throws IOException {
+      int k = key.get();
+      final int vali = val.get();
+      final String kvstr = "Unexpected tuple: " + stringify(key, val);
+      if (0 == k % (srcs * srcs)) {
+        assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1);
+      } else {
+        final int i = k % srcs;
+        assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
+      }
+      out.collect(key, one);
+    }
+    public boolean verify(int key, int occ) {
+      if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0)
+        return 2 == occ;
+      return 1 == occ;
+    }
+  }
+
+  private static void joinAs(String jointype,
+      Class<? extends SimpleCheckerBase> c) throws Exception {
+    final int srcs = 4;
+    Configuration conf = new Configuration();
+    JobConf job = new JobConf(conf, c);
+    Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
+    Path[] src = writeSimpleSrc(base, conf, srcs);
+    job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
+        SequenceFileInputFormat.class, src));
+    job.setInt("testdatamerge.sources", srcs);
+    job.setInputFormat(CompositeInputFormat.class);
+    job.setOutputPath(new Path(base, "out"));
+
+    job.setMapperClass(c);
+    job.setReducerClass(c);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(IntWritable.class);
+    JobClient.runJob(job);
+    base.getFileSystem(job).delete(base);
+  }
+
+  public void testSimpleInnerJoin() throws Exception {
+    joinAs("inner", InnerJoinChecker.class);
+  }
+
+  public void testSimpleOuterJoin() throws Exception {
+    joinAs("outer", OuterJoinChecker.class);
+  }
+
+  public void testSimpleOverride() throws Exception {
+    joinAs("override", OverrideChecker.class);
+  }
+}

+ 133 - 0
src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java

@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+public class TestTupleWritable extends TestCase {
+
+  private TupleWritable makeTuple(Writable[] writs) {
+    Writable[] sub1 = { writs[1], writs[2] };
+    Writable[] sub3 = { writs[4], writs[5] };
+    Writable[] sub2 = { writs[3], new TupleWritable(sub3), writs[6] };
+    Writable[] vals = { writs[0], new TupleWritable(sub1),
+                        new TupleWritable(sub2), writs[7], writs[8],
+                        writs[9] };
+    // [v0, [v1, v2], [v3, [v4, v5], v6], v7, v8, v9]
+    TupleWritable ret = new TupleWritable(vals);
+    for (int i = 0; i < 6; ++i) {
+      ret.setWritten(i);
+    }
+    ((TupleWritable)sub2[1]).setWritten(0);
+    ((TupleWritable)sub2[1]).setWritten(1);
+    ((TupleWritable)vals[1]).setWritten(0);
+    ((TupleWritable)vals[1]).setWritten(1);
+    for (int i = 0; i < 3; ++i) {
+      ((TupleWritable)vals[2]).setWritten(i);
+    }
+    return ret;
+  }
+
+  private int verifIter(Writable[] writs, TupleWritable t, int i) {
+    for (Writable w : t) {
+      if (w instanceof TupleWritable) {
+        i = verifIter(writs, ((TupleWritable)w), i);
+        continue;
+      }
+      assertTrue("Bad value", w.equals(writs[i++]));
+    }
+    return i;
+  }
+
+  public void testIterable() throws Exception {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    TupleWritable t = new TupleWritable(writs);
+    for (int i = 0; i < 6; ++i) {
+      t.setWritten(i);
+    }
+    verifIter(writs, t, 0);
+  }
+
+  public void testNestedIterable() throws Exception {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    TupleWritable sTuple = makeTuple(writs);
+    assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0));
+  }
+
+  public void testWritable() throws Exception {
+    Random r = new Random();
+    Writable[] writs = {
+      new BooleanWritable(r.nextBoolean()),
+      new FloatWritable(r.nextFloat()),
+      new FloatWritable(r.nextFloat()),
+      new IntWritable(r.nextInt()),
+      new LongWritable(r.nextLong()),
+      new BytesWritable("dingo".getBytes()),
+      new LongWritable(r.nextLong()),
+      new IntWritable(r.nextInt()),
+      new BytesWritable("yak".getBytes()),
+      new IntWritable(r.nextInt())
+    };
+    TupleWritable sTuple = makeTuple(writs);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    sTuple.write(new DataOutputStream(out));
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    TupleWritable dTuple = new TupleWritable();
+    dTuple.readFields(new DataInputStream(in));
+    assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
+  }
+
+}