|
@@ -17,17 +17,20 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import java.io.*;
|
|
|
-import java.util.*;
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.net.StaticMapping;
|
|
|
-import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
|
|
|
/**
|
|
|
* This class creates a single-process Map-Reduce cluster for junit testing.
|
|
@@ -50,6 +53,8 @@ public class MiniMRCluster {
|
|
|
private String namenode;
|
|
|
private UnixUserGroupInformation ugi = null;
|
|
|
|
|
|
+ private JobConf job;
|
|
|
+
|
|
|
/**
|
|
|
* An inner class that runs a job tracker.
|
|
|
*/
|
|
@@ -222,8 +227,7 @@ public class MiniMRCluster {
|
|
|
* @return the absolute pathname of the local dir
|
|
|
*/
|
|
|
public String getTaskTrackerLocalDir(int taskTracker) {
|
|
|
- return ((TaskTrackerRunner)
|
|
|
- taskTrackerList.get(taskTracker)).getLocalDir();
|
|
|
+ return (taskTrackerList.get(taskTracker)).getLocalDir();
|
|
|
}
|
|
|
|
|
|
public JobTrackerRunner getJobTrackerRunner() {
|
|
@@ -241,8 +245,32 @@ public class MiniMRCluster {
|
|
|
* Wait until the system is idle.
|
|
|
*/
|
|
|
public void waitUntilIdle() {
|
|
|
- for(Iterator itr= taskTrackerList.iterator(); itr.hasNext();) {
|
|
|
- TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();
|
|
|
+ waitTaskTrackers();
|
|
|
+
|
|
|
+ JobClient client;
|
|
|
+ try {
|
|
|
+ client = new JobClient(job);
|
|
|
+ while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) {
|
|
|
+ for(TaskTrackerRunner runner : taskTrackerList) {
|
|
|
+ if(runner.isDead) {
|
|
|
+ throw new RuntimeException("TaskTracker is dead");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IOException ex) {
|
|
|
+ throw new RuntimeException(ex);
|
|
|
+ }
|
|
|
+ catch (InterruptedException ex) {
|
|
|
+ throw new RuntimeException(ex);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitTaskTrackers() {
|
|
|
+ for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();) {
|
|
|
+ TaskTrackerRunner runner = itr.next();
|
|
|
while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
|
|
|
if (!runner.isInitialized) {
|
|
|
LOG.info("Waiting for task tracker to start.");
|
|
@@ -256,7 +284,7 @@ public class MiniMRCluster {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Get the actual rpc port used.
|
|
|
*/
|
|
@@ -269,6 +297,9 @@ public class MiniMRCluster {
|
|
|
}
|
|
|
|
|
|
public JobConf createJobConf(JobConf conf) {
|
|
|
+ if(conf == null) {
|
|
|
+ conf = new JobConf();
|
|
|
+ }
|
|
|
JobConf result = new JobConf(conf);
|
|
|
FileSystem.setDefaultUri(result, namenode);
|
|
|
result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
|
|
@@ -399,7 +430,7 @@ public class MiniMRCluster {
|
|
|
this.numTaskTrackers = numTaskTrackers;
|
|
|
this.namenode = namenode;
|
|
|
this.ugi = ugi;
|
|
|
-
|
|
|
+
|
|
|
// Create the JobTracker
|
|
|
jobTracker = new JobTrackerRunner(conf);
|
|
|
jobTrackerThread = new Thread(jobTracker);
|
|
@@ -439,7 +470,8 @@ public class MiniMRCluster {
|
|
|
for (Thread taskTrackerThread : taskTrackerThreadList){
|
|
|
taskTrackerThread.start();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ this.job = createJobConf(conf);
|
|
|
waitUntilIdle();
|
|
|
}
|
|
|
|
|
@@ -448,10 +480,10 @@ public class MiniMRCluster {
|
|
|
*/
|
|
|
public void shutdown() {
|
|
|
try {
|
|
|
- waitUntilIdle();
|
|
|
+ waitTaskTrackers();
|
|
|
for (int idx = 0; idx < numTaskTrackers; idx++) {
|
|
|
- TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx);
|
|
|
- Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);
|
|
|
+ TaskTrackerRunner taskTracker = taskTrackerList.get(idx);
|
|
|
+ Thread taskTrackerThread = taskTrackerThreadList.get(idx);
|
|
|
taskTracker.shutdown();
|
|
|
taskTrackerThread.interrupt();
|
|
|
try {
|