|
@@ -10,7 +10,6 @@ import java.util.HashSet;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.examples.RandomWriter;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
@@ -20,17 +19,34 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapred.UtilsForTests;
|
|
|
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
|
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
|
+import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
+import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
+import org.apache.hadoop.net.NodeBase;
|
|
|
+import org.apache.hadoop.net.StaticMapping;
|
|
|
|
|
|
-import junit.framework.TestCase;
|
|
|
+import static org.junit.Assert.*;
|
|
|
+import org.junit.Test;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.AfterClass;
|
|
|
|
|
|
-public class TestJobInProgress extends TestCase {
|
|
|
+public class TestJobInProgress {
|
|
|
static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
|
|
|
|
|
|
- private MiniMRCluster mrCluster;
|
|
|
+ private static MiniMRCluster mrCluster;
|
|
|
+
|
|
|
+ private static MiniDFSCluster dfsCluster;
|
|
|
+ private static JobTracker jt;
|
|
|
+
|
|
|
+ static final String trackers[] = new String[] {
|
|
|
+ "tracker_tracker1.r1.com:1000", "tracker_tracker2.r1.com:1000",
|
|
|
+ "tracker_tracker3.r2.com:1000", "tracker_tracker4.r3.com:1000" };
|
|
|
+
|
|
|
+ static final String[] hosts = new String[] { "tracker1.r1.com",
|
|
|
+ "tracker2.r1.com", "tracker3.r2.com", "tracker4.r3.com" };
|
|
|
+
|
|
|
+ static final String[] racks = new String[] { "/r1", "/r1", "/r2", "/r3" };
|
|
|
|
|
|
- private MiniDFSCluster dfsCluster;
|
|
|
- JobTracker jt;
|
|
|
private static Path TEST_DIR =
|
|
|
new Path(System.getProperty("test.build.data","/tmp"), "jip-testing");
|
|
|
private static int numSlaves = 4;
|
|
@@ -72,22 +88,33 @@ public class TestJobInProgress extends TestCase {
|
|
|
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- protected void setUp() throws Exception {
|
|
|
- // TODO Auto-generated method stub
|
|
|
- super.setUp();
|
|
|
+ @BeforeClass
|
|
|
+ public static void setUp() throws Exception {
|
|
|
Configuration conf = new Configuration();
|
|
|
+ conf.set("mapreduce.jobtracker.address", "localhost:0");
|
|
|
+ conf.set("mapreduce.jobtracker.http.address", "0.0.0.0:0");
|
|
|
+ conf.setClass("topology.node.switch.mapping.impl", StaticMapping.class,
|
|
|
+ DNSToSwitchMapping.class);
|
|
|
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
|
|
|
mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
|
|
|
.getUri().toString(), 1);
|
|
|
jt = mrCluster.getJobTrackerRunner().getJobTracker();
|
|
|
+ // Set up the Topology Information
|
|
|
+ for (int i = 0; i < hosts.length; i++) {
|
|
|
+ StaticMapping.addNodeToRack(hosts[i], racks[i]);
|
|
|
+ }
|
|
|
+ for (String s : trackers) {
|
|
|
+ FakeObjectUtilities.establishFirstContact(jt, s);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
public void testPendingMapTaskCount() throws Exception {
|
|
|
launchTask(FailMapTaskJob.class, IdentityReducer.class);
|
|
|
checkTaskCounts();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
public void testPendingReduceTaskCount() throws Exception {
|
|
|
launchTask(IdentityMapper.class, FailReduceTaskJob.class);
|
|
|
checkTaskCounts();
|
|
@@ -113,7 +140,7 @@ public class TestJobInProgress extends TestCase {
|
|
|
job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
|
|
|
job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
|
|
|
|
|
|
- // Disable slow-start for reduces since this maps don't complete
|
|
|
+ // Disable slow-start for reduces since this maps don't complete
|
|
|
// in these test-cases...
|
|
|
job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
|
|
|
|
|
@@ -174,6 +201,7 @@ public class TestJobInProgress extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
public void testRunningTaskCount() throws Exception {
|
|
|
// test with spec = false and locality=true
|
|
|
testRunningTaskCount(false, true);
|
|
@@ -188,11 +216,40 @@ public class TestJobInProgress extends TestCase {
|
|
|
testRunningTaskCount(true, false);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- protected void tearDown() throws Exception {
|
|
|
+ @Test
|
|
|
+ public void testLocality() throws Exception {
|
|
|
+ NetworkTopology nt = new NetworkTopology();
|
|
|
+
|
|
|
+ Node r1n1 = new NodeBase("/default/rack1/node1");
|
|
|
+ nt.add(r1n1);
|
|
|
+ Node r1n2 = new NodeBase("/default/rack1/node2");
|
|
|
+ nt.add(r1n2);
|
|
|
+
|
|
|
+ Node r2n3 = new NodeBase("/default/rack2/node3");
|
|
|
+ nt.add(r2n3);
|
|
|
+
|
|
|
+ Node r2n4 = new NodeBase("/default/rack2/s1/node4");
|
|
|
+ nt.add(r2n4);
|
|
|
+
|
|
|
+ LOG.debug("r1n1 parent: " + r1n1.getParent() + "\n" +
|
|
|
+ "r1n2 parent: " + r1n2.getParent() + "\n" +
|
|
|
+ "r2n3 parent: " + r2n3.getParent() + "\n" +
|
|
|
+ "r2n4 parent: " + r2n4.getParent());
|
|
|
+
|
|
|
+ // Same host
|
|
|
+ assertEquals(0, JobInProgress.getMatchingLevelForNodes(r1n1, r1n1, 3));
|
|
|
+ // Same rack
|
|
|
+ assertEquals(1, JobInProgress.getMatchingLevelForNodes(r1n1, r1n2, 3));
|
|
|
+ // Different rack
|
|
|
+ assertEquals(2, JobInProgress.getMatchingLevelForNodes(r1n1, r2n3, 3));
|
|
|
+ // Different rack at different depth
|
|
|
+ assertEquals(3, JobInProgress.getMatchingLevelForNodes(r1n1, r2n4, 3));
|
|
|
+ }
|
|
|
+
|
|
|
+ @AfterClass
|
|
|
+ public static void tearDown() throws Exception {
|
|
|
mrCluster.shutdown();
|
|
|
dfsCluster.shutdown();
|
|
|
- super.tearDown();
|
|
|
}
|
|
|
|
|
|
|