|
@@ -80,6 +80,8 @@ public class MiniMRCluster {
|
|
|
class TaskTrackerRunner implements Runnable {
|
|
|
TaskTracker tt;
|
|
|
String localDir;
|
|
|
+ boolean isInitialized = false;
|
|
|
+ boolean isDead = false;
|
|
|
|
|
|
/**
|
|
|
* Create and run the task tracker.
|
|
@@ -101,8 +103,10 @@ public class MiniMRCluster {
|
|
|
this.localDir = ttDir.getAbsolutePath();
|
|
|
jc.set("mapred.local.dir", ttDir.getAbsolutePath());
|
|
|
tt = new TaskTracker(jc);
|
|
|
+ isInitialized = true;
|
|
|
tt.run();
|
|
|
} catch (Throwable e) {
|
|
|
+ isDead = true;
|
|
|
tt = null;
|
|
|
System.err.println("Task tracker crashed:");
|
|
|
e.printStackTrace();
|
|
@@ -154,10 +158,14 @@ public class MiniMRCluster {
|
|
|
*/
|
|
|
public void waitUntilIdle() {
|
|
|
for(Iterator itr= taskTrackerList.iterator(); itr.hasNext(); ) {
|
|
|
- TaskTracker tracker = ((TaskTrackerRunner) itr.next()).tt;
|
|
|
- while (!tracker.isIdle()) {
|
|
|
- System.out.println("Waiting for task tracker " + tracker.getName() +
|
|
|
- " to finish.");
|
|
|
+ TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();
|
|
|
+ while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
|
|
|
+ if (!runner.isInitialized) {
|
|
|
+ System.out.println("Waiting for task tracker to start.");
|
|
|
+ } else {
|
|
|
+ System.out.println("Waiting for task tracker " + runner.tt.getName() +
|
|
|
+ " to be idle.");
|
|
|
+ }
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
} catch (InterruptedException ie) {}
|
|
@@ -206,17 +214,15 @@ public class MiniMRCluster {
|
|
|
if (taskTrackerFirst) {
|
|
|
jobTrackerThread.start();
|
|
|
}
|
|
|
- try { // let taskTrackers get started
|
|
|
- Thread.sleep(2000);
|
|
|
- } catch(InterruptedException e) {
|
|
|
- }
|
|
|
+ waitUntilIdle();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Shut down the servers.
|
|
|
*/
|
|
|
public void shutdown() {
|
|
|
- try {
|
|
|
+ try {
|
|
|
+ waitUntilIdle();
|
|
|
for (int idx = 0; idx < numTaskTrackers; idx++) {
|
|
|
TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx);
|
|
|
Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);
|