|
@@ -17,6 +17,8 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.mapred.join;
|
|
package org.apache.hadoop.mapred.join;
|
|
|
|
|
|
|
|
+import java.io.DataInput;
|
|
|
|
+import java.io.DataOutput;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
|
|
@@ -26,25 +28,31 @@ import junit.framework.TestSuite;
|
|
import junit.extensions.TestSetup;
|
|
import junit.extensions.TestSetup;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.io.IntWritable;
|
|
import org.apache.hadoop.io.IntWritable;
|
|
import org.apache.hadoop.io.NullWritable;
|
|
import org.apache.hadoop.io.NullWritable;
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.Writable;
|
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
|
|
+import org.apache.hadoop.mapred.InputFormat;
|
|
|
|
+import org.apache.hadoop.mapred.InputSplit;
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
|
+import org.apache.hadoop.mapred.JobConfigurable;
|
|
import org.apache.hadoop.mapred.Mapper;
|
|
import org.apache.hadoop.mapred.Mapper;
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
|
+import org.apache.hadoop.mapred.RecordReader;
|
|
import org.apache.hadoop.mapred.Reducer;
|
|
import org.apache.hadoop.mapred.Reducer;
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
|
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
public class TestDatamerge extends TestCase {
|
|
public class TestDatamerge extends TestCase {
|
|
|
|
|
|
@@ -285,7 +293,7 @@ public class TestDatamerge extends TestCase {
|
|
if (i + 1 != SOURCES) sb.append(",");
|
|
if (i + 1 != SOURCES) sb.append(",");
|
|
}
|
|
}
|
|
sb.append("),outer(");
|
|
sb.append("),outer(");
|
|
- sb.append(CompositeInputFormat.compose(FakeIF.class,"foobar"));
|
|
|
|
|
|
+ sb.append(CompositeInputFormat.compose(Fake_IF.class,"foobar"));
|
|
sb.append(",");
|
|
sb.append(",");
|
|
for (int i = 0; i < SOURCES; ++i) {
|
|
for (int i = 0; i < SOURCES; ++i) {
|
|
sb.append(
|
|
sb.append(
|
|
@@ -293,13 +301,13 @@ public class TestDatamerge extends TestCase {
|
|
src[i].toString()));
|
|
src[i].toString()));
|
|
sb.append(",");
|
|
sb.append(",");
|
|
}
|
|
}
|
|
- sb.append(CompositeInputFormat.compose(FakeIF.class,"raboof") + "))");
|
|
|
|
|
|
+ sb.append(CompositeInputFormat.compose(Fake_IF.class,"raboof") + "))");
|
|
job.set("mapred.join.expr", sb.toString());
|
|
job.set("mapred.join.expr", sb.toString());
|
|
job.setInputFormat(CompositeInputFormat.class);
|
|
job.setInputFormat(CompositeInputFormat.class);
|
|
Path outf = new Path(base, "out");
|
|
Path outf = new Path(base, "out");
|
|
FileOutputFormat.setOutputPath(job, outf);
|
|
FileOutputFormat.setOutputPath(job, outf);
|
|
- FakeIF.setKeyClass(job, IntWritable.class);
|
|
|
|
- FakeIF.setValClass(job, IntWritable.class);
|
|
|
|
|
|
+ Fake_IF.setKeyClass(job, IntWritable.class);
|
|
|
|
+ Fake_IF.setValClass(job, IntWritable.class);
|
|
|
|
|
|
job.setMapperClass(IdentityMapper.class);
|
|
job.setMapperClass(IdentityMapper.class);
|
|
job.setReducerClass(IdentityReducer.class);
|
|
job.setReducerClass(IdentityReducer.class);
|
|
@@ -345,7 +353,7 @@ public class TestDatamerge extends TestCase {
|
|
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
|
|
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
|
|
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
|
|
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
|
|
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
|
|
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
|
|
- FakeIF.class, src));
|
|
|
|
|
|
+ Fake_IF.class, src));
|
|
job.setInputFormat(CompositeInputFormat.class);
|
|
job.setInputFormat(CompositeInputFormat.class);
|
|
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
|
|
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
|
|
|
|
|
|
@@ -357,4 +365,56 @@ public class TestDatamerge extends TestCase {
|
|
JobClient.runJob(job);
|
|
JobClient.runJob(job);
|
|
base.getFileSystem(job).delete(base, true);
|
|
base.getFileSystem(job).delete(base, true);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public static class Fake_IF<K,V>
|
|
|
|
+ implements InputFormat<K,V>, JobConfigurable {
|
|
|
|
+
|
|
|
|
+ public static class FakeSplit implements InputSplit {
|
|
|
|
+ public void write(DataOutput out) throws IOException { }
|
|
|
|
+ public void readFields(DataInput in) throws IOException { }
|
|
|
|
+ public long getLength() { return 0L; }
|
|
|
|
+ public String[] getLocations() { return new String[0]; }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static void setKeyClass(JobConf job, Class<?> k) {
|
|
|
|
+ job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static void setValClass(JobConf job, Class<?> v) {
|
|
|
|
+ job.setClass("test.fakeif.valclass", v, Writable.class);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Class<? extends K> keyclass;
|
|
|
|
+ private Class<? extends V> valclass;
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ public void configure(JobConf job) {
|
|
|
|
+ keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
|
|
|
|
+ IncomparableKey.class, WritableComparable.class);
|
|
|
|
+ valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
|
|
|
|
+ NullWritable.class, WritableComparable.class);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Fake_IF() { }
|
|
|
|
+
|
|
|
|
+ public InputSplit[] getSplits(JobConf conf, int splits) {
|
|
|
|
+ return new InputSplit[] { new FakeSplit() };
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public RecordReader<K,V> getRecordReader(
|
|
|
|
+ InputSplit ignored, JobConf conf, Reporter reporter) {
|
|
|
|
+ return new RecordReader<K,V>() {
|
|
|
|
+ public boolean next(K key, V value) throws IOException { return false; }
|
|
|
|
+ public K createKey() {
|
|
|
|
+ return ReflectionUtils.newInstance(keyclass, null);
|
|
|
|
+ }
|
|
|
|
+ public V createValue() {
|
|
|
|
+ return ReflectionUtils.newInstance(valclass, null);
|
|
|
|
+ }
|
|
|
|
+ public long getPos() throws IOException { return 0L; }
|
|
|
|
+ public void close() throws IOException { }
|
|
|
|
+ public float getProgress() throws IOException { return 0.0f; }
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|