|
@@ -18,10 +18,18 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+
|
|
|
+import junit.extensions.TestSetup;
|
|
|
+import junit.framework.Test;
|
|
|
import junit.framework.TestCase;
|
|
|
+import junit.framework.TestSuite;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.io.BytesWritable;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
@@ -39,9 +47,30 @@ public class TestMiniMRDFSSort extends TestCase {
|
|
|
|
|
|
// Knobs to control randomwriter; and hence sort
|
|
|
private static final int NUM_HADOOP_SLAVES = 3;
|
|
|
- private static final int RW_BYTES_PER_MAP = 2 * 1024 * 1024;
|
|
|
+ // make it big enough to cause a spill in the map
|
|
|
+ private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
|
|
|
private static final int RW_MAPS_PER_HOST = 2;
|
|
|
|
|
|
+ private static MiniMRCluster mrCluster = null;
|
|
|
+ private static MiniDFSCluster dfsCluster = null;
|
|
|
+ private static FileSystem dfs = null;
|
|
|
+ public static Test suite() {
|
|
|
+ TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) {
|
|
|
+ protected void setUp() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
|
|
|
+ dfs = dfsCluster.getFileSystem();
|
|
|
+ mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES,
|
|
|
+ dfs.getUri().toString(), 1);
|
|
|
+ }
|
|
|
+ protected void tearDown() throws Exception {
|
|
|
+ if (dfsCluster != null) { dfsCluster.shutdown(); }
|
|
|
+ if (mrCluster != null) { mrCluster.shutdown(); }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ return setup;
|
|
|
+ }
|
|
|
+
|
|
|
private static void runRandomWriter(JobConf job, Path sortInput)
|
|
|
throws Exception {
|
|
|
// Scale down the default settings for RandomWriter for the test-case
|
|
@@ -57,8 +86,10 @@ public class TestMiniMRDFSSort extends TestCase {
|
|
|
private static void runSort(JobConf job, Path sortInput, Path sortOutput)
|
|
|
throws Exception {
|
|
|
|
|
|
+ job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
|
|
|
job.setInt("io.sort.mb", 1);
|
|
|
- job.setLong("mapred.min.split.size", Long.MAX_VALUE);
|
|
|
+ job.setNumMapTasks(12);
|
|
|
+
|
|
|
// Setup command-line arguments to 'sort'
|
|
|
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
|
|
|
|
|
@@ -75,34 +106,66 @@ public class TestMiniMRDFSSort extends TestCase {
|
|
|
// Run Sort-Validator
|
|
|
assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
|
|
|
}
|
|
|
- Configuration conf = new Configuration();
|
|
|
- public void testMapReduceSort() throws Exception {
|
|
|
- MiniDFSCluster dfs = null;
|
|
|
- MiniMRCluster mr = null;
|
|
|
- FileSystem fileSys = null;
|
|
|
- try {
|
|
|
-
|
|
|
- // Start the mini-MR and mini-DFS clusters
|
|
|
- dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
|
|
|
- fileSys = dfs.getFileSystem();
|
|
|
- mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
|
|
|
-
|
|
|
- // Run randomwriter to generate input for 'sort'
|
|
|
- runRandomWriter(mr.createJobConf(), SORT_INPUT_PATH);
|
|
|
-
|
|
|
- // Run sort
|
|
|
- runSort(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
|
|
|
-
|
|
|
- // Run sort-validator to check if sort worked correctly
|
|
|
- runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
|
|
|
- } finally {
|
|
|
- if (dfs != null) { dfs.shutdown(); }
|
|
|
- if (mr != null) { mr.shutdown();
|
|
|
- }
|
|
|
+
|
|
|
+ private static class ReuseDetector extends MapReduceBase
|
|
|
+ implements Mapper<BytesWritable,BytesWritable, Text, Text> {
|
|
|
+ static int instances = 0;
|
|
|
+ Reporter reporter = null;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void map(BytesWritable key, BytesWritable value,
|
|
|
+ OutputCollector<Text, Text> output,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
+ this.reporter = reporter;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() throws IOException {
|
|
|
+ reporter.incrCounter("jvm", "use", ++instances);
|
|
|
}
|
|
|
}
|
|
|
- public void testMapReduceSortWithJvmReuse() throws Exception {
|
|
|
- conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
|
|
|
- testMapReduceSort();
|
|
|
+
|
|
|
+ private static void runJvmReuseTest(JobConf job,
|
|
|
+ boolean reuse) throws IOException {
|
|
|
+ // setup a map-only job that reads the input and only sets the counters
|
|
|
+ // based on how many times the jvm was reused.
|
|
|
+ job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
|
|
|
+ FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
|
|
|
+ job.setInputFormat(SequenceFileInputFormat.class);
|
|
|
+ job.setOutputFormat(NullOutputFormat.class);
|
|
|
+ job.setMapperClass(ReuseDetector.class);
|
|
|
+ job.setOutputKeyClass(Text.class);
|
|
|
+ job.setOutputValueClass(Text.class);
|
|
|
+ job.setNumMapTasks(24);
|
|
|
+ job.setNumReduceTasks(0);
|
|
|
+ RunningJob result = JobClient.runJob(job);
|
|
|
+ long uses = result.getCounters().findCounter("jvm", "use").getValue();
|
|
|
+ System.out.println("maps = " + job.getNumMapTasks());
|
|
|
+ System.out.println(result.getCounters());
|
|
|
+ int maps = job.getNumMapTasks();
|
|
|
+ if (reuse) {
|
|
|
+ assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
|
|
|
+ } else {
|
|
|
+ assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testMapReduceSort() throws Exception {
|
|
|
+ // Run randomwriter to generate input for 'sort'
|
|
|
+ runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH);
|
|
|
+
|
|
|
+ // Run sort
|
|
|
+ runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
|
|
|
+
|
|
|
+ // Run sort-validator to check if sort worked correctly
|
|
|
+ runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH,
|
|
|
+ SORT_OUTPUT_PATH);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testJvmReuse() throws Exception {
|
|
|
+ runJvmReuseTest(mrCluster.createJobConf(), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testNoJvmReuse() throws Exception {
|
|
|
+ runJvmReuseTest(mrCluster.createJobConf(), false);
|
|
|
}
|
|
|
}
|