|
@@ -32,7 +32,7 @@ public class MiniMRCluster {
|
|
|
|
|
|
private int jobTrackerPort = 0;
|
|
|
private int taskTrackerPort = 0;
|
|
|
-
|
|
|
+ private int jobTrackerInfoPort = 0;
|
|
|
private int numTaskTrackers;
|
|
|
|
|
|
private List taskTrackerList = new ArrayList();
|
|
@@ -40,10 +40,17 @@ public class MiniMRCluster {
|
|
|
|
|
|
private String namenode;
|
|
|
|
|
|
+ private int MAX_RETRIES_PER_PORT = 10;
|
|
|
+ private int MAX_RETRIES = 10;
|
|
|
+
|
|
|
/**
|
|
|
* An inner class that runs a job tracker.
|
|
|
*/
|
|
|
class JobTrackerRunner implements Runnable {
|
|
|
+
|
|
|
+ public boolean isUp() {
|
|
|
+ return (JobTracker.getTracker() != null);
|
|
|
+ }
|
|
|
/**
|
|
|
* Create the job tracker and run it.
|
|
|
*/
|
|
@@ -52,6 +59,7 @@ public class MiniMRCluster {
|
|
|
JobConf jc = new JobConf();
|
|
|
jc.set("fs.name.node", namenode);
|
|
|
jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
|
|
|
+ jc.set("mapred.job.tracker.info.port", jobTrackerInfoPort);
|
|
|
// this timeout seems to control the minimum time for the test, so
|
|
|
// set it down at 2 seconds.
|
|
|
jc.setInt("ipc.client.timeout", 1000);
|
|
@@ -194,9 +202,18 @@ public class MiniMRCluster {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the actual rpc port used.
|
|
|
+ */
|
|
|
+ public int getJobTrackerPort() {
|
|
|
+ return jobTrackerPort;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Create the config and start up the servers.
|
|
|
+ * Create the config and start up the servers. The ports supplied by the user are
|
|
|
+ * just used as suggestions. If those ports are already in use, new ports
|
|
|
+ * are tried. The caller should call getJobTrackerPort to get the actual rpc port used.
|
|
|
*/
|
|
|
public MiniMRCluster(int jobTrackerPort,
|
|
|
int taskTrackerPort,
|
|
@@ -211,39 +228,65 @@ public class MiniMRCluster {
|
|
|
int numTaskTrackers,
|
|
|
String namenode,
|
|
|
boolean taskTrackerFirst, int numDir) throws IOException {
|
|
|
+
|
|
|
this.jobTrackerPort = jobTrackerPort;
|
|
|
this.taskTrackerPort = taskTrackerPort;
|
|
|
+ this.jobTrackerInfoPort = 50030;
|
|
|
this.numTaskTrackers = numTaskTrackers;
|
|
|
this.namenode = namenode;
|
|
|
-
|
|
|
- File configDir = new File("build", "minimr");
|
|
|
- configDir.mkdirs();
|
|
|
- File siteFile = new File(configDir, "hadoop-site.xml");
|
|
|
- PrintWriter pw = new PrintWriter(siteFile);
|
|
|
- pw.print("<?xml version=\"1.0\"?>\n"+
|
|
|
- "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+
|
|
|
- "<configuration>\n"+
|
|
|
- " <property>\n"+
|
|
|
- " <name>mapred.system.dir</name>\n"+
|
|
|
- " <value>build/test/mapred/system</value>\n"+
|
|
|
- " </property>\n"+
|
|
|
- "</configuration>\n");
|
|
|
- pw.close();
|
|
|
- jobTracker = new JobTrackerRunner();
|
|
|
- jobTrackerThread = new Thread(jobTracker);
|
|
|
- if (!taskTrackerFirst) {
|
|
|
- jobTrackerThread.start();
|
|
|
- }
|
|
|
- for (int idx = 0; idx < numTaskTrackers; idx++) {
|
|
|
+
|
|
|
+ // Loop until we find a set of ports that are all unused or until we
|
|
|
+ // give up because it's taken too many tries.
|
|
|
+ boolean foundPorts = false;
|
|
|
+ int portsTried = 0;
|
|
|
+ while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
|
|
|
+ jobTracker = new JobTrackerRunner();
|
|
|
+ jobTrackerThread = new Thread(jobTracker);
|
|
|
+ if (!taskTrackerFirst) {
|
|
|
+ jobTrackerThread.start();
|
|
|
+ }
|
|
|
+ for (int idx = 0; idx < numTaskTrackers; idx++) {
|
|
|
TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
|
|
|
Thread taskTrackerThread = new Thread(taskTracker);
|
|
|
taskTrackerThread.start();
|
|
|
taskTrackerList.add(taskTracker);
|
|
|
taskTrackerThreadList.add(taskTrackerThread);
|
|
|
+ }
|
|
|
+ if (taskTrackerFirst) {
|
|
|
+ jobTrackerThread.start();
|
|
|
+ }
|
|
|
+ int retry = 0;
|
|
|
+ while (!jobTracker.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
|
|
|
+ try { // let daemons get started
|
|
|
+ System.err.println("waiting for jobtracker to start");
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ }
|
|
|
+ retry++;
|
|
|
+ }
|
|
|
+ if (retry >= MAX_RETRIES_PER_PORT) {
|
|
|
+ // Try new ports.
|
|
|
+ this.jobTrackerPort += 7;
|
|
|
+ this.jobTrackerInfoPort += 3;
|
|
|
+ this.taskTrackerPort++;
|
|
|
+
|
|
|
+ System.err.println("Failed to start MR minicluster in " + retry +
|
|
|
+ " attempts. Retrying with new ports:");
|
|
|
+ System.err.println("\tJobTracker RPC port = " + jobTrackerPort);
|
|
|
+ System.err.println("\tJobTracker info port = " + jobTrackerInfoPort);
|
|
|
+ System.err.println("\tTaskTracker RPC port(s) = " +
|
|
|
+ taskTrackerPort + "-" + (taskTrackerPort+numTaskTrackers-1));
|
|
|
+ shutdown();
|
|
|
+ taskTrackerList.clear();
|
|
|
+ } else {
|
|
|
+ foundPorts = true;
|
|
|
+ }
|
|
|
+ portsTried++;
|
|
|
}
|
|
|
- if (taskTrackerFirst) {
|
|
|
- jobTrackerThread.start();
|
|
|
+ if (portsTried >= MAX_RETRIES) {
|
|
|
+ throw new IOException("Failed to start MR minicluster after trying " + portsTried + " ports.");
|
|
|
}
|
|
|
+
|
|
|
waitUntilIdle();
|
|
|
}
|
|
|
|