|
@@ -101,48 +101,53 @@ public class TestReduceFetch extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testReduceFromDisk() throws Exception {
|
|
|
+ final int MAP_TASKS = 8;
|
|
|
JobConf job = mrCluster.createJobConf();
|
|
|
job.set("mapred.job.reduce.input.buffer.percent", "0.0");
|
|
|
- job.setNumMapTasks(3);
|
|
|
+ job.setNumMapTasks(MAP_TASKS);
|
|
|
+ job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
|
|
|
+ job.set("mapred.job.shuffle.input.buffer.percent", "0.05");
|
|
|
+ job.setInt("io.sort.factor", 2);
|
|
|
+ job.setInt("mapred.inmem.merge.threshold", 4);
|
|
|
Counters c = runJob(job);
|
|
|
- final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
|
|
|
- Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
|
|
|
- final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
|
|
|
- Task.getFileSystemCounterNames("file")[0]).getCounter();
|
|
|
- assertTrue("Expected more bytes read from local (" +
|
|
|
- localRead + ") than written to HDFS (" + hdfsWritten + ")",
|
|
|
- hdfsWritten <= localRead);
|
|
|
+ final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
|
|
|
+ final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
|
|
|
+ assertTrue("Expected all records spilled during reduce (" + spill + ")",
|
|
|
+ spill >= 2 * out); // all records spill at map, reduce
|
|
|
+ assertTrue("Expected intermediate merges (" + spill + ")",
|
|
|
+ spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
|
|
|
}
|
|
|
|
|
|
public void testReduceFromPartialMem() throws Exception {
|
|
|
+ final int MAP_TASKS = 7;
|
|
|
JobConf job = mrCluster.createJobConf();
|
|
|
- job.setNumMapTasks(5);
|
|
|
+ job.setNumMapTasks(MAP_TASKS);
|
|
|
job.setInt("mapred.inmem.merge.threshold", 0);
|
|
|
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
|
|
|
job.setInt("mapred.reduce.parallel.copies", 1);
|
|
|
job.setInt("io.sort.mb", 10);
|
|
|
- job.set("mapred.child.java.opts", "-Xmx128m");
|
|
|
+ job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
|
|
|
job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
|
|
|
job.setNumTasksToExecutePerJvm(1);
|
|
|
job.set("mapred.job.shuffle.merge.percent", "1.0");
|
|
|
Counters c = runJob(job);
|
|
|
- final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
|
|
|
- Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
|
|
|
- final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
|
|
|
- Task.getFileSystemCounterNames("file")[0]).getCounter();
|
|
|
- assertTrue("Expected at least 1MB fewer bytes read from local (" +
|
|
|
- localRead + ") than written to HDFS (" + hdfsWritten + ")",
|
|
|
- hdfsWritten >= localRead + 1024 * 1024);
|
|
|
+ final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
|
|
|
+ final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
|
|
|
+ assertTrue("Expected some records not spilled during reduce" + spill + ")",
|
|
|
+ spill < 2 * out); // spilled map records, some records at the reduce
|
|
|
}
|
|
|
|
|
|
public void testReduceFromMem() throws Exception {
|
|
|
+ final int MAP_TASKS = 3;
|
|
|
JobConf job = mrCluster.createJobConf();
|
|
|
job.set("mapred.job.reduce.input.buffer.percent", "1.0");
|
|
|
- job.setNumMapTasks(3);
|
|
|
+ job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
|
|
|
+ job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
|
|
|
+ job.setNumMapTasks(MAP_TASKS);
|
|
|
Counters c = runJob(job);
|
|
|
- final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
|
|
|
- Task.getFileSystemCounterNames("file")[0]).getCounter();
|
|
|
- assertTrue("Non-zero read from local: " + localRead, localRead == 0);
|
|
|
+ final long spill = c.findCounter(Task.Counter.SPILLED_RECORDS).getCounter();
|
|
|
+ final long out = c.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
|
|
|
+ assertEquals("Spilled records: " + spill, out, spill); // no reduce spill
|
|
|
}
|
|
|
|
|
|
}
|