فهرست منبع

HADOOP-3630. Fix NullPointerException in CompositeRecordReader from empty
sources.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@675073 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 سال پیش
والد
کامیت
d59d585bcd

+ 3 - 0
CHANGES.txt

@@ -785,6 +785,9 @@ Release 0.18.0 - Unreleased
 
     HADOOP-3691. Fix streaming and tutorial docs. (Jothi Padmanabhan via ddas)
 
+    HADOOP-3630. Fix NullPointerException in CompositeRecordReader from empty
+    sources (cdouglas)
+
 Release 0.17.2 - Unreleased
 
   BUG FIXES

+ 6 - 2
src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java

@@ -135,7 +135,9 @@ public abstract class CompositeRecordReader<
             }
           });
     }
-    q.add(rr);
+    if (rr.hasNext()) {
+      q.add(rr);
+    }
   }
 
   /**
@@ -357,7 +359,9 @@ public abstract class CompositeRecordReader<
     }
     for (ComposableRecordReader<K,?> rr : tmp) {
       rr.skip(key);
-      q.add(rr);
+      if (rr.hasNext()) {
+        q.add(rr);
+      }
     }
   }
 

+ 61 - 0
src/test/org/apache/hadoop/mapred/join/FakeIF.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class FakeIF
+    implements InputFormat<IncomparableKey,NullWritable> {
+
+  public static class FakeSplit implements InputSplit {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public FakeIF() { }
+
+  public void validateInput(JobConf conf) { }
+
+  public InputSplit[] getSplits(JobConf conf, int splits) {
+    return new InputSplit[] { new FakeSplit() };
+  }
+
+  public RecordReader<IncomparableKey,NullWritable> getRecordReader(
+      InputSplit ignored, JobConf conf, Reporter reporter) {
+    return new RecordReader<IncomparableKey,NullWritable>() {
+      public boolean next(IncomparableKey key, NullWritable value)
+          throws IOException { return false; }
+      public IncomparableKey createKey() { return new IncomparableKey(); }
+      public NullWritable createValue() { return NullWritable.get(); }
+      public long getPos() throws IOException { return 0L; }
+      public void close() throws IOException { }
+      public float getProgress() throws IOException { return 0.0f; }
+    };
+  }
+}

+ 31 - 0
src/test/org/apache/hadoop/mapred/join/IncomparableKey.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class IncomparableKey implements WritableComparable {
+  public void write(DataOutput out) { }
+  public void readFields(DataInput in) { }
+  public int compareTo(Object o) {
+    throw new RuntimeException("Should never see this.");
+  }
+}

+ 21 - 0
src/test/org/apache/hadoop/mapred/join/TestDatamerge.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -40,6 +41,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 
 public class TestDatamerge extends TestCase {
 
@@ -251,4 +254,22 @@ public class TestDatamerge extends TestCase {
     CompositeInputFormat cif = new CompositeInputFormat();
     cif.validateInput(conf);
   }
+
+  public void testEmptyJoin() throws Exception {
+    JobConf job = new JobConf();
+    Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
+    Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
+    job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
+        FakeIF.class, src));
+    job.setInputFormat(CompositeInputFormat.class);
+    FileOutputFormat.setOutputPath(job, new Path(base, "out"));
+
+    job.setMapperClass(IdentityMapper.class);
+    job.setReducerClass(IdentityReducer.class);
+    job.setOutputKeyClass(IncomparableKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    JobClient.runJob(job);
+    base.getFileSystem(job).delete(base, true);
+  }
 }