|
@@ -55,11 +55,47 @@ public class TestRackAwareTaskPlacement extends TestCase {
|
|
|
final Path inDir = new Path("/racktesting");
|
|
|
final Path outputPath = new Path("/output");
|
|
|
|
|
|
+ /**
|
|
|
+ * Launches a MR job and tests the job counters against the expected values.
|
|
|
+ * @param testName The name for the job
|
|
|
+ * @param mr The MR cluster
|
|
|
+ * @param fileSys The FileSystem
|
|
|
+ * @param in Input path
|
|
|
+ * @param out Output path
|
|
|
+ * @param numMaps Number of maps
|
|
|
+ * @param otherLocalMaps Expected value of other local maps
|
|
|
+ * @param datalocalMaps Expected value of data(node) local maps
|
|
|
+ * @param racklocalMaps Expected value of rack local maps
|
|
|
+ */
|
|
|
+ static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
|
|
|
+ FileSystem fileSys, Path in, Path out,
|
|
|
+ int numMaps, int otherLocalMaps,
|
|
|
+ int dataLocalMaps, int rackLocalMaps)
|
|
|
+ throws IOException {
|
|
|
+ JobConf jobConf = mr.createJobConf();
|
|
|
+ if (fileSys.exists(out)) {
|
|
|
+ fileSys.delete(out, true);
|
|
|
+ }
|
|
|
+ RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
|
|
|
+ Counters counters = job.getCounters();
|
|
|
+ assertEquals("Number of local maps",
|
|
|
+ counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
|
|
|
+ assertEquals("Number of Data-local maps",
|
|
|
+ counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS),
|
|
|
+ dataLocalMaps);
|
|
|
+ assertEquals("Number of Rack-local maps",
|
|
|
+ counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS),
|
|
|
+ rackLocalMaps);
|
|
|
+ mr.waitUntilIdle();
|
|
|
+ mr.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
public void testTaskPlacement() throws IOException {
|
|
|
String namenode = null;
|
|
|
MiniDFSCluster dfs = null;
|
|
|
MiniMRCluster mr = null;
|
|
|
FileSystem fileSys = null;
|
|
|
+ String testName = "TestForRackAwareness";
|
|
|
try {
|
|
|
final int taskTrackers = 1;
|
|
|
|
|
@@ -95,42 +131,29 @@ public class TestRackAwareTaskPlacement extends TestCase {
|
|
|
* datanodes.
|
|
|
*/
|
|
|
mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
|
|
|
- JobConf jobConf = mr.createJobConf();
|
|
|
- if (fileSys.exists(outputPath)) {
|
|
|
- fileSys.delete(outputPath, true);
|
|
|
- }
|
|
|
+
|
|
|
/* The job is configured with three maps since there are three
|
|
|
* (non-splittable) files. On rack2, there are two files and both
|
|
|
* have repl of three. The blocks for those files must therefore be
|
|
|
* present on all the datanodes, in particular, the datanodes on rack2.
|
|
|
* The third input file is pulled from rack1.
|
|
|
*/
|
|
|
- RunningJob job = launchJob(jobConf, 3);
|
|
|
- Counters counters = job.getCounters();
|
|
|
- assertEquals("Number of Data-local maps",
|
|
|
- counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 2);
|
|
|
- assertEquals("Number of Rack-local maps",
|
|
|
- counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 0);
|
|
|
- mr.waitUntilIdle();
|
|
|
+ launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
|
|
|
+ 2, 0);
|
|
|
+ mr.shutdown();
|
|
|
|
|
|
/* Run a job with the (only)tasktracker on rack1.
|
|
|
*/
|
|
|
mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
|
|
|
- jobConf = mr.createJobConf();
|
|
|
- fileSys = dfs.getFileSystem();
|
|
|
- if (fileSys.exists(outputPath)) {
|
|
|
- fileSys.delete(outputPath, true);
|
|
|
- }
|
|
|
+
|
|
|
/* The job is configured with three maps since there are three
|
|
|
* (non-splittable) files. On rack1, because of the way in which repl
|
|
|
* was setup while creating the files, we will have all the three files.
|
|
|
* Thus, a tasktracker will find all inputs in this rack.
|
|
|
*/
|
|
|
- job = launchJob(jobConf, 3);
|
|
|
- counters = job.getCounters();
|
|
|
- assertEquals("Number of Rack-local maps",
|
|
|
- counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 3);
|
|
|
- mr.waitUntilIdle();
|
|
|
+ launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
|
|
|
+ 0, 3);
|
|
|
+ mr.shutdown();
|
|
|
|
|
|
} finally {
|
|
|
if (dfs != null) {
|
|
@@ -141,7 +164,7 @@ public class TestRackAwareTaskPlacement extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- private void writeFile(NameNode namenode, Configuration conf, Path name,
|
|
|
+ static void writeFile(NameNode namenode, Configuration conf, Path name,
|
|
|
short replication) throws IOException {
|
|
|
FileSystem fileSys = FileSystem.get(conf);
|
|
|
SequenceFile.Writer writer =
|
|
@@ -153,7 +176,7 @@ public class TestRackAwareTaskPlacement extends TestCase {
|
|
|
fileSys.setReplication(name, replication);
|
|
|
waitForReplication(fileSys, namenode, name, replication);
|
|
|
}
|
|
|
- private void waitForReplication(FileSystem fileSys, NameNode namenode,
|
|
|
+ static void waitForReplication(FileSystem fileSys, NameNode namenode,
|
|
|
Path name, short replication) throws IOException {
|
|
|
//wait for the replication to happen
|
|
|
boolean isReplicationDone;
|
|
@@ -173,8 +196,10 @@ public class TestRackAwareTaskPlacement extends TestCase {
|
|
|
}
|
|
|
} while(!isReplicationDone);
|
|
|
}
|
|
|
- private RunningJob launchJob(JobConf jobConf, int numMaps) throws IOException {
|
|
|
- jobConf.setJobName("TestForRackAwareness");
|
|
|
+
|
|
|
+ static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
|
|
|
+ int numMaps, String jobName) throws IOException {
|
|
|
+ jobConf.setJobName(jobName);
|
|
|
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
|
|
|
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
FileInputFormat.setInputPaths(jobConf, inDir);
|