Browse Source

HADOOP-3721. Refactor CompositeRecordReader and related mapred.join classes
to make them clearer.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@675806 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 years ago
parent
commit
4115d407a7

+ 3 - 0
CHANGES.txt

@@ -100,6 +100,9 @@ Trunk (unreleased changes)
     HADOOP-3726. Throw exceptions from TestCLI setup and teardown instead of
     swallowing them. (Steve Loughran via cdouglas)
 
+    HADOOP-3721. Refactor CompositeRecordReader and related mapred.join classes
+    to make them clearer. (cdouglas)
+
 Release 0.18.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java

@@ -65,8 +65,9 @@ public class ArrayListBackedIterator<X extends Writable>
     return false;
   }
 
-  public void replay(X val) throws IOException {
+  public boolean replay(X val) throws IOException {
     WritableUtils.cloneInto(val, hold);
+    return true;
   }
 
   public void reset() {

+ 35 - 52
src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java

@@ -149,11 +149,7 @@ public abstract class CompositeRecordReader<
   class JoinCollector {
     private K key;
     private ResetableIterator<X>[] iters;
-    private long partial = 0L;
-    private long replaymask = 0L;
-    private int start = 0;
     private int pos = -1;
-    private int iterpos = -1;
     private boolean first = true;
 
     /**
@@ -190,10 +186,8 @@ public abstract class CompositeRecordReader<
      */
     public void reset(K key) {
       this.key = key;
-      start = 0;
-      pos = 0;
       first = true;
-      partial = 0L;
+      pos = iters.length - 1;
       for (int i = 0; i < iters.length; ++i) {
         iters[i].reset();
       }
@@ -205,12 +199,10 @@ public abstract class CompositeRecordReader<
     public void clear() {
       key = null;
       pos = -1;
-      first = true;
       for (int i = 0; i < iters.length; ++i) {
         iters[i].clear();
         iters[i] = EMPTY;
       }
-      partial = 0L;
     }
 
     /**
@@ -228,52 +220,42 @@ public abstract class CompositeRecordReader<
      */
     @SuppressWarnings("unchecked") // No static typeinfo on Tuples
     protected boolean next(TupleWritable val) throws IOException {
-      if (pos < 0) {
-        clear();
-        return false;
-      }
-      int i = start;
-      if (first) { // Find first iterator with elements
-        for (; i < iters.length && !iters[i].hasNext(); ++i);
-        if (iters.length <= i) { // no children had key
-          clear();
-          return false;
-        }
-        start = i;
-        for (int j = i; j < iters.length; ++j) {
-          if (iters[j].hasNext()) {
-            partial |= 1 << j;
+      if (first) {
+        int i = -1;
+        for (pos = 0; pos < iters.length; ++pos) {
+          if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
+            i = pos;
+            val.setWritten(i);
           }
         }
-        iterpos = pos = iters.length - 1;
+        pos = i;
         first = false;
-      } else { // Copy all elements in partial into tuple
-        for (; i < iterpos; ++i) {
-          if ((partial & (1 << i)) != 0) {
-            iters[i].replay((X)val.get(i));
-            val.setWritten(i);
-          }
+        if (pos < 0) {
+          clear();
+          return false;
         }
+        return true;
       }
-      long partialwritten = val.mask();
-      if (iters[i].next((X)val.get(i))) {
-        val.setWritten(i);
+      while (0 <= pos && !(iters[pos].hasNext() &&
+                           iters[pos].next((X)val.get(pos)))) {
+        --pos;
       }
-      for (++i; i < iters.length; ++i) {
-        iters[i].reset();
-        if (iters[i].hasNext() && iters[i].next((X)val.get(i))) {
+      if (pos < 0) {
+        clear();
+        return false;
+      }
+      val.setWritten(pos);
+      for (int i = 0; i < pos; ++i) {
+        if (iters[i].replay((X)val.get(i))) {
           val.setWritten(i);
         }
       }
-      iterpos = iters.length - 1;
-      for (; iterpos > pos && !iters[iterpos].hasNext(); --iterpos);
-      if (!iters[iterpos].hasNext()) {
-        for (; !(pos < 0 || iters[pos].hasNext()); --pos);
-        iterpos = pos;
-      }
-      replaymask = val.mask();
-      if ((replaymask ^ partialwritten) == 0L) {
-        return next(val);
+      while (pos + 1 < iters.length) {
+        ++pos;
+        iters[pos].reset();
+        if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
+          val.setWritten(pos);
+        }
       }
       return true;
     }
@@ -282,18 +264,19 @@ public abstract class CompositeRecordReader<
      * Replay the last Tuple emitted.
      */
     @SuppressWarnings("unchecked") // No static typeinfo on Tuples
-    public void replay(TupleWritable val) throws IOException {
+    public boolean replay(TupleWritable val) throws IOException {
       // The last emitted tuple might have drawn on an empty source;
       // it can't be cleared prematurely, b/c there may be more duplicate
       // keys in iterator positions < pos
-      if (first) {
-        throw new IllegalStateException();
-      }
+      assert !first;
+      boolean ret = false;
       for (int i = 0; i < iters.length; ++i) {
-        if ((replaymask & (1 << i)) != 0) {
-          iters[i].replay((X)val.get(i));
+        if (iters[i].replay((X)val.get(i))) {
+          val.setWritten(i);
+          ret = true;
         }
       }
+      return ret;
     }
 
     /**

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java

@@ -91,8 +91,8 @@ public abstract class JoinRecordReader<K extends WritableComparable>
       return jc.flush(val);
     }
 
-    public void replay(TupleWritable val) throws IOException {
-      jc.replay(val);
+    public boolean replay(TupleWritable val) throws IOException {
+      return jc.replay(val);
     }
 
     public void reset() {

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java

@@ -129,8 +129,9 @@ public abstract class MultiFilterRecordReader<K extends WritableComparable,
       return ret;
     }
 
-    public void replay(V val) throws IOException {
+    public boolean replay(V val) throws IOException {
       WritableUtils.cloneInto(val, emit(ivalue));
+      return true;
     }
 
     public void reset() {

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/join/Parser.java

@@ -377,7 +377,8 @@ public class Parser {
           throw new IOException("Error gathering splits from child RReader");
         }
         if (i > 0 && splits[i-1].length != tmp.length) {
-          throw new IOException("Inconsistent split cardinality from child");
+          throw new IOException("Inconsistent split cardinality from child " +
+              i + " (" + splits[i-1].length + "/" + tmp.length + ")");
         }
         splits[i] = tmp;
       }

+ 6 - 5
src/mapred/org/apache/hadoop/mapred/join/ResetableIterator.java

@@ -35,10 +35,10 @@ public interface ResetableIterator<T extends Writable> {
     public void close() throws IOException { }
     public void clear() { }
     public boolean next(U val) throws IOException {
-      throw new UnsupportedOperationException();
+      return false;
     }
-    public void replay(U val) throws IOException {
-      throw new UnsupportedOperationException();
+    public boolean replay(U val) throws IOException {
+      return false;
     }
     public void add(U item) throws IOException {
       throw new UnsupportedOperationException();
@@ -46,7 +46,8 @@ public interface ResetableIterator<T extends Writable> {
   }
 
   /**
-   * True iff a call to next will succeed.
+   * True if a call to next may return a value. This is permitted false
+   * positives, but not false negatives.
    */
   public boolean hasNext();
 
@@ -63,7 +64,7 @@ public interface ResetableIterator<T extends Writable> {
   /**
    * Assign last value returned to actual.
    */
-  public void replay(T val) throws IOException;
+  public boolean replay(T val) throws IOException;
 
   /**
    * Set iterator to return to the start of its range. Must be called after

+ 4 - 1
src/mapred/org/apache/hadoop/mapred/join/StreamBackedIterator.java

@@ -62,9 +62,12 @@ public class StreamBackedIterator<X extends Writable>
     return false;
   }
 
-  public void replay(X val) throws IOException {
+  public boolean replay(X val) throws IOException {
     inbuf.reset();
+    if (0 == inbuf.available())
+      return false;
     val.readFields(infbuf);
+    return true;
   }
 
   public void reset() {

+ 33 - 8
src/test/org/apache/hadoop/mapred/join/FakeIF.java

@@ -22,14 +22,18 @@ import java.io.DataInput;
 import java.io.DataOutput;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
 
-public class FakeIF
-    implements InputFormat<IncomparableKey,NullWritable> {
+public class FakeIF<K,V>
+    implements InputFormat<K,V>, JobConfigurable {
 
   public static class FakeSplit implements InputSplit {
     public void write(DataOutput out) throws IOException { }
@@ -38,6 +42,22 @@ public class FakeIF
     public String[] getLocations() { return new String[0]; }
   }
 
+  public static void setKeyClass(JobConf job, Class<?> k) {
+    job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
+  }
+
+  public static void setValClass(JobConf job, Class<?> v) {
+    job.setClass("test.fakeif.valclass", v, Writable.class);
+  }
+
+  private Class<?> keyclass;
+  private Class<?> valclass;
+
+  public void configure(JobConf job) {
+    keyclass = job.getClass("test.fakeif.keyclass", IncomparableKey.class, WritableComparable.class);
+    valclass = job.getClass("test.fakeif.valclass", NullWritable.class, WritableComparable.class);
+  }
+
   public FakeIF() { }
 
   public void validateInput(JobConf conf) { }
@@ -46,13 +66,18 @@ public class FakeIF
     return new InputSplit[] { new FakeSplit() };
   }
 
-  public RecordReader<IncomparableKey,NullWritable> getRecordReader(
+  public RecordReader<K,V> getRecordReader(
       InputSplit ignored, JobConf conf, Reporter reporter) {
-    return new RecordReader<IncomparableKey,NullWritable>() {
-      public boolean next(IncomparableKey key, NullWritable value)
-          throws IOException { return false; }
-      public IncomparableKey createKey() { return new IncomparableKey(); }
-      public NullWritable createValue() { return NullWritable.get(); }
+    return new RecordReader<K,V>() {
+      public boolean next(K key, V value) throws IOException { return false; }
+      @SuppressWarnings("unchecked")
+      public K createKey() {
+        return (K)ReflectionUtils.newInstance(keyclass, null);
+      }
+      @SuppressWarnings("unchecked")
+      public V createValue() {
+        return (V)ReflectionUtils.newInstance(valclass, null);
+      }
       public long getPos() throws IOException { return 0L; }
       public void close() throws IOException { }
       public float getProgress() throws IOException { return 0.0f; }

+ 93 - 0
src/test/org/apache/hadoop/mapred/join/TestDatamerge.java

@@ -27,6 +27,7 @@ import junit.extensions.TestSetup;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 
@@ -247,6 +249,97 @@ public class TestDatamerge extends TestCase {
     joinAs("override", OverrideChecker.class);
   }
 
+  public void testNestedJoin() throws Exception {
+    // outer(inner(S1,...,Sn),outer(S1,...Sn))
+    final int SOURCES = 3;
+    final int ITEMS = (SOURCES + 1) * (SOURCES + 1);
+    JobConf job = new JobConf();
+    Path base = cluster.getFileSystem().makeQualified(new Path("/nested"));
+    int[][] source = new int[SOURCES][];
+    for (int i = 0; i < SOURCES; ++i) {
+      source[i] = new int[ITEMS];
+      for (int j = 0; j < ITEMS; ++j) {
+        source[i][j] = (i + 2) * (j + 1);
+      }
+    }
+    Path[] src = new Path[SOURCES];
+    SequenceFile.Writer out[] = createWriters(base, job, SOURCES, src);
+    IntWritable k = new IntWritable();
+    for (int i = 0; i < SOURCES; ++i) {
+      IntWritable v = new IntWritable();
+      v.set(i);
+      for (int j = 0; j < ITEMS; ++j) {
+        k.set(source[i][j]);
+        out[i].append(k, v);
+      }
+      out[i].close();
+    }
+    out = null;
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("outer(inner(");
+    for (int i = 0; i < SOURCES; ++i) {
+      sb.append(
+          CompositeInputFormat.compose(SequenceFileInputFormat.class,
+            src[i].toString()));
+      if (i + 1 != SOURCES) sb.append(",");
+    }
+    sb.append("),outer(");
+    sb.append(CompositeInputFormat.compose(FakeIF.class,"foobar"));
+    sb.append(",");
+    for (int i = 0; i < SOURCES; ++i) {
+      sb.append(
+          CompositeInputFormat.compose(SequenceFileInputFormat.class,
+            src[i].toString()));
+      sb.append(",");
+    }
+    sb.append(CompositeInputFormat.compose(FakeIF.class,"raboof") + "))");
+    job.set("mapred.join.expr", sb.toString());
+    job.setInputFormat(CompositeInputFormat.class);
+    Path outf = new Path(base, "out");
+    FileOutputFormat.setOutputPath(job, outf);
+    FakeIF.setKeyClass(job, IntWritable.class);
+    FakeIF.setValClass(job, IntWritable.class);
+
+    job.setMapperClass(IdentityMapper.class);
+    job.setReducerClass(IdentityReducer.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(TupleWritable.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    JobClient.runJob(job);
+
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    assertEquals(1, outlist.length);
+    assertTrue(0 < outlist[0].getLen());
+    SequenceFile.Reader r =
+      new SequenceFile.Reader(cluster.getFileSystem(),
+          outlist[0].getPath(), job);
+    TupleWritable v = new TupleWritable();
+    while (r.next(k, v)) {
+      assertFalse(((TupleWritable)v.get(1)).has(0));
+      assertFalse(((TupleWritable)v.get(1)).has(SOURCES + 1));
+      boolean chk = true;
+      int ki = k.get();
+      for (int i = 2; i < SOURCES + 2; ++i) {
+        if ((ki % i) == 0 && ki <= i * ITEMS) {
+          assertEquals(i - 2, ((IntWritable)
+                              ((TupleWritable)v.get(1)).get((i - 1))).get());
+        } else chk = false;
+      }
+      if (chk) { // present in all sources; chk inner
+        assertTrue(v.has(0));
+        for (int i = 0; i < SOURCES; ++i)
+          assertTrue(((TupleWritable)v.get(0)).has(i));
+      } else { // should not be present in inner join
+        assertFalse(v.has(0));
+      }
+    }
+    r.close();
+    base.getFileSystem(job).delete(base, true);
+
+  }
+
   public void testConfiguredInputFormat() throws Exception {
     JobConf conf = new JobConf();
     conf.set("mapred.join.expr", CompositeInputFormat.compose(