|
@@ -79,9 +79,16 @@ public class MiniMRCluster {
|
|
*/
|
|
*/
|
|
class TaskTrackerRunner implements Runnable {
|
|
class TaskTrackerRunner implements Runnable {
|
|
TaskTracker tt;
|
|
TaskTracker tt;
|
|
- String localDir;
|
|
|
|
|
|
+ // the localDirs for this taskTracker
|
|
|
|
+ String[] localDir;
|
|
boolean isInitialized = false;
|
|
boolean isInitialized = false;
|
|
boolean isDead = false;
|
|
boolean isDead = false;
|
|
|
|
+ int numDir;
|
|
|
|
+ TaskTrackerRunner(int numDir) {
|
|
|
|
+ this.numDir = numDir;
|
|
|
|
+ // a maximum of 10 local dirs can be specified in MinMRCluster
|
|
|
|
+ localDir = new String[10];
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Create and run the task tracker.
|
|
* Create and run the task tracker.
|
|
@@ -97,10 +104,19 @@ public class MiniMRCluster {
|
|
jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
|
|
jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
|
|
jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
|
|
jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
|
|
File localDir = new File(jc.get("mapred.local.dir"));
|
|
File localDir = new File(jc.get("mapred.local.dir"));
|
|
- File ttDir = new File(localDir, Integer.toString(taskTrackerPort));
|
|
|
|
|
|
+ String mapredDir = "";
|
|
|
|
+ File ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + 0);
|
|
ttDir.mkdirs();
|
|
ttDir.mkdirs();
|
|
- this.localDir = ttDir.getAbsolutePath();
|
|
|
|
- jc.set("mapred.local.dir", ttDir.getAbsolutePath());
|
|
|
|
|
|
+ this.localDir[0] = ttDir.getAbsolutePath();
|
|
|
|
+ mapredDir = ttDir.getAbsolutePath();
|
|
|
|
+ for (int i = 1; i < numDir; i++){
|
|
|
|
+ ttDir = new File(localDir, Integer.toString(taskTrackerPort) + "_" + i);
|
|
|
|
+ ttDir.mkdirs();
|
|
|
|
+ this.localDir[i] = ttDir.getAbsolutePath();
|
|
|
|
+ mapredDir = mapredDir + "," + ttDir.getAbsolutePath();
|
|
|
|
+ }
|
|
|
|
+ jc.set("mapred.local.dir", mapredDir);
|
|
|
|
+ System.out.println("mapred.local.dir is " + mapredDir);
|
|
tt = new TaskTracker(jc);
|
|
tt = new TaskTracker(jc);
|
|
isInitialized = true;
|
|
isInitialized = true;
|
|
tt.run();
|
|
tt.run();
|
|
@@ -114,12 +130,17 @@ public class MiniMRCluster {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Get the local dir for this TaskTracker.
|
|
* Get the local dir for this TaskTracker.
|
|
|
|
+ * This is there so that we do not break
|
|
|
|
+ * previous tests.
|
|
* @return the absolute pathname
|
|
* @return the absolute pathname
|
|
*/
|
|
*/
|
|
public String getLocalDir() {
|
|
public String getLocalDir() {
|
|
- return localDir;
|
|
|
|
|
|
+ return localDir[0];
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ public String[] getLocalDirs(){
|
|
|
|
+ return localDir;
|
|
|
|
+ }
|
|
/**
|
|
/**
|
|
* Shut down the server and wait for it to finish.
|
|
* Shut down the server and wait for it to finish.
|
|
*/
|
|
*/
|
|
@@ -175,11 +196,19 @@ public class MiniMRCluster {
|
|
/**
|
|
/**
|
|
* Create the config and start up the servers.
|
|
* Create the config and start up the servers.
|
|
*/
|
|
*/
|
|
|
|
+ public MiniMRCluster(int jobTrackerPort,
|
|
|
|
+ int taskTrackerPort,
|
|
|
|
+ int numTaskTrackers,
|
|
|
|
+ String namenode,
|
|
|
|
+ boolean taskTrackerFirst) throws IOException {
|
|
|
|
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, taskTrackerFirst, 1);
|
|
|
|
+ }
|
|
|
|
+
|
|
public MiniMRCluster(int jobTrackerPort,
|
|
public MiniMRCluster(int jobTrackerPort,
|
|
int taskTrackerPort,
|
|
int taskTrackerPort,
|
|
int numTaskTrackers,
|
|
int numTaskTrackers,
|
|
String namenode,
|
|
String namenode,
|
|
- boolean taskTrackerFirst) throws IOException {
|
|
|
|
|
|
+ boolean taskTrackerFirst, int numDir) throws IOException {
|
|
this.jobTrackerPort = jobTrackerPort;
|
|
this.jobTrackerPort = jobTrackerPort;
|
|
this.taskTrackerPort = taskTrackerPort;
|
|
this.taskTrackerPort = taskTrackerPort;
|
|
this.numTaskTrackers = numTaskTrackers;
|
|
this.numTaskTrackers = numTaskTrackers;
|
|
@@ -204,7 +233,7 @@ public class MiniMRCluster {
|
|
jobTrackerThread.start();
|
|
jobTrackerThread.start();
|
|
}
|
|
}
|
|
for (int idx = 0; idx < numTaskTrackers; idx++) {
|
|
for (int idx = 0; idx < numTaskTrackers; idx++) {
|
|
- TaskTrackerRunner taskTracker = new TaskTrackerRunner();
|
|
|
|
|
|
+ TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
|
|
Thread taskTrackerThread = new Thread(taskTracker);
|
|
Thread taskTrackerThread = new Thread(taskTracker);
|
|
taskTrackerThread.start();
|
|
taskTrackerThread.start();
|
|
taskTrackerList.add(taskTracker);
|
|
taskTrackerList.add(taskTracker);
|