|
@@ -67,11 +67,11 @@ public class TestReduceFetch extends TestCase {
|
|
Text key = new Text();
|
|
Text key = new Text();
|
|
Text val = new Text();
|
|
Text val = new Text();
|
|
key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
|
|
key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
|
|
- byte[] b = new byte[1024];
|
|
|
|
|
|
+ byte[] b = new byte[1000];
|
|
Arrays.fill(b, (byte)'V');
|
|
Arrays.fill(b, (byte)'V');
|
|
val.set(b);
|
|
val.set(b);
|
|
b = null;
|
|
b = null;
|
|
- for (int i = 0; i < 1024; ++i) {
|
|
|
|
|
|
+ for (int i = 0; i < 4 * 1024; ++i) {
|
|
output.collect(key, val);
|
|
output.collect(key, val);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -84,7 +84,6 @@ public class TestReduceFetch extends TestCase {
|
|
conf.setReducerClass(IdentityReducer.class);
|
|
conf.setReducerClass(IdentityReducer.class);
|
|
conf.setOutputKeyClass(Text.class);
|
|
conf.setOutputKeyClass(Text.class);
|
|
conf.setOutputValueClass(Text.class);
|
|
conf.setOutputValueClass(Text.class);
|
|
- conf.setNumMapTasks(3);
|
|
|
|
conf.setNumReduceTasks(1);
|
|
conf.setNumReduceTasks(1);
|
|
conf.setInputFormat(FakeIF.class);
|
|
conf.setInputFormat(FakeIF.class);
|
|
FileInputFormat.setInputPaths(conf, new Path("/in"));
|
|
FileInputFormat.setInputPaths(conf, new Path("/in"));
|
|
@@ -106,25 +105,41 @@ public class TestReduceFetch extends TestCase {
|
|
public void testReduceFromDisk() throws Exception {
|
|
public void testReduceFromDisk() throws Exception {
|
|
JobConf job = mrCluster.createJobConf();
|
|
JobConf job = mrCluster.createJobConf();
|
|
job.set("mapred.job.reduce.input.buffer.percent", "0.0");
|
|
job.set("mapred.job.reduce.input.buffer.percent", "0.0");
|
|
|
|
+ job.setNumMapTasks(3);
|
|
Counters c = runJob(job);
|
|
Counters c = runJob(job);
|
|
- assertTrue(c.findCounter(HDFS_WRITE).getCounter() <=
|
|
|
|
- c.findCounter(LOCAL_READ).getCounter());
|
|
|
|
|
|
+ final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
|
|
|
|
+ final long localRead = c.findCounter(LOCAL_READ).getCounter();
|
|
|
|
+ assertTrue("Expected more bytes read from local (" +
|
|
|
|
+ localRead + ") than written to HDFS (" + hdfsWritten + ")",
|
|
|
|
+ hdfsWritten <= localRead);
|
|
}
|
|
}
|
|
|
|
|
|
public void testReduceFromPartialMem() throws Exception {
|
|
public void testReduceFromPartialMem() throws Exception {
|
|
JobConf job = mrCluster.createJobConf();
|
|
JobConf job = mrCluster.createJobConf();
|
|
- job.setInt("mapred.inmem.merge.threshold", 2);
|
|
|
|
|
|
+ job.setNumMapTasks(5);
|
|
|
|
+ job.setInt("mapred.inmem.merge.threshold", 0);
|
|
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
|
|
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
|
|
|
|
+ job.setInt("mapred.reduce.parallel.copies", 1);
|
|
|
|
+ job.setInt("io.sort.mb", 10);
|
|
|
|
+ job.set("mapred.child.java.opts", "-Xmx128m");
|
|
|
|
+ job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
|
|
|
|
+ job.setNumTasksToExecutePerJvm(1);
|
|
|
|
+ job.set("mapred.job.shuffle.merge.percent", "1.0");
|
|
Counters c = runJob(job);
|
|
Counters c = runJob(job);
|
|
- assertTrue(c.findCounter(HDFS_WRITE).getCounter() >=
|
|
|
|
- c.findCounter(LOCAL_READ).getCounter() + 1024 * 1024);
|
|
|
|
|
|
+ final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
|
|
|
|
+ final long localRead = c.findCounter(LOCAL_READ).getCounter();
|
|
|
|
+ assertTrue("Expected at least 1MB fewer bytes read from local (" +
|
|
|
|
+ localRead + ") than written to HDFS (" + hdfsWritten + ")",
|
|
|
|
+ hdfsWritten >= localRead + 1024 * 1024);
|
|
}
|
|
}
|
|
|
|
|
|
public void testReduceFromMem() throws Exception {
|
|
public void testReduceFromMem() throws Exception {
|
|
JobConf job = mrCluster.createJobConf();
|
|
JobConf job = mrCluster.createJobConf();
|
|
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
|
|
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
|
|
|
|
+ job.setNumMapTasks(3);
|
|
Counters c = runJob(job);
|
|
Counters c = runJob(job);
|
|
- assertTrue(c.findCounter(LOCAL_READ).getCounter() == 0);
|
|
|
|
|
|
+ final long localRead = c.findCounter(LOCAL_READ).getCounter();
|
|
|
|
+ assertTrue("Non-zero read from local: " + localRead, localRead == 0);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|