|
@@ -72,6 +72,7 @@ import org.apache.hadoop.io.WritableUtils;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
+import org.apache.hadoop.mapred.JobClient.TaskStatusFilter;
|
|
|
import org.apache.hadoop.mapred.TaskStatus.Phase;
|
|
|
import org.apache.hadoop.mapred.pipes.Submitter;
|
|
|
import org.apache.hadoop.metrics.MetricsContext;
|
|
@@ -1186,7 +1187,6 @@ public class TaskTracker
|
|
|
for (TaskStatus taskStatus : status.getTaskReports()) {
|
|
|
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
|
|
|
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
|
|
|
- taskStatus.getRunState() != TaskStatus.State.INITIALIZED &&
|
|
|
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
|
|
|
if (taskStatus.getIsMap()) {
|
|
|
mapTotal--;
|
|
@@ -1250,7 +1250,6 @@ public class TaskTracker
|
|
|
// still occupied and hence memory of the task should be
|
|
|
// accounted in used memory.
|
|
|
if ((tip.getRunState() == TaskStatus.State.RUNNING)
|
|
|
- || (tip.getRunState() == TaskStatus.State.INITIALIZED)
|
|
|
|| (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
|
|
|
maxMemoryUsed += getMemoryForTask(tip.getJobConf());
|
|
|
}
|
|
@@ -1304,7 +1303,6 @@ public class TaskTracker
|
|
|
long now = System.currentTimeMillis();
|
|
|
for (TaskInProgress tip: runningTasks.values()) {
|
|
|
if (tip.getRunState() == TaskStatus.State.RUNNING ||
|
|
|
- tip.getRunState() == TaskStatus.State.INITIALIZED ||
|
|
|
tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
|
|
|
// Check the per-job timeout interval for tasks;
|
|
|
// an interval of '0' implies it is never timed-out
|
|
@@ -1611,7 +1609,15 @@ public class TaskTracker
|
|
|
numFreeSlots.set(numFreeSlots.get() - 1);
|
|
|
assert (numFreeSlots.get() >= 0);
|
|
|
}
|
|
|
-
|
|
|
+ synchronized (tip) {
|
|
|
+ //to make sure that there is no kill task action for this
|
|
|
+ if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
|
|
|
+ //got killed externally while still in the launcher queue
|
|
|
+ addFreeSlot();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ tip.slotTaken = true;
|
|
|
+ }
|
|
|
//got a free slot. launch the task
|
|
|
startNewTask(tip);
|
|
|
} catch (InterruptedException e) {
|
|
@@ -1740,6 +1746,7 @@ public class TaskTracker
|
|
|
private TaskStatus taskStatus;
|
|
|
private long taskTimeout;
|
|
|
private String debugCommand;
|
|
|
+ private volatile boolean slotTaken = false;
|
|
|
|
|
|
/**
|
|
|
*/
|
|
@@ -1833,7 +1840,7 @@ public class TaskTracker
|
|
|
alwaysKeepTaskFiles = false;
|
|
|
}
|
|
|
if (debugCommand != null || localJobConf.getProfileEnabled() ||
|
|
|
- alwaysKeepTaskFiles) {
|
|
|
+ alwaysKeepTaskFiles || keepFailedTaskFiles) {
|
|
|
//disable jvm reuse
|
|
|
localJobConf.setNumTasksToExecutePerJvm(1);
|
|
|
}
|
|
@@ -1885,9 +1892,16 @@ public class TaskTracker
|
|
|
* Kick off the task execution
|
|
|
*/
|
|
|
public synchronized void launchTask() throws IOException {
|
|
|
- localizeTask(task);
|
|
|
- this.runner = task.createRunner(TaskTracker.this);
|
|
|
- this.runner.start();
|
|
|
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
|
|
|
+ localizeTask(task);
|
|
|
+ this.taskStatus.setRunState(TaskStatus.State.RUNNING);
|
|
|
+ this.runner = task.createRunner(TaskTracker.this, this);
|
|
|
+ this.runner.start();
|
|
|
+ this.taskStatus.setStartTime(System.currentTimeMillis());
|
|
|
+ } else {
|
|
|
+ LOG.info("Not launching task: " + task.getTaskID() +
|
|
|
+ " since it's state is " + this.taskStatus.getRunState());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2095,13 +2109,6 @@ public class TaskTracker
|
|
|
|
|
|
}
|
|
|
|
|
|
- synchronized void taskInitialized() {
|
|
|
- if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
|
|
|
- //one-way state change to INITIALIZED
|
|
|
- this.taskStatus.setRunState(TaskStatus.State.INITIALIZED);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
/**
|
|
|
* Runs the script given in args
|
|
@@ -2193,7 +2200,6 @@ public class TaskTracker
|
|
|
synchronized(this){
|
|
|
if (getRunState() == TaskStatus.State.RUNNING ||
|
|
|
getRunState() == TaskStatus.State.UNASSIGNED ||
|
|
|
- getRunState() == TaskStatus.State.INITIALIZED ||
|
|
|
getRunState() == TaskStatus.State.COMMIT_PENDING) {
|
|
|
kill(wasFailure);
|
|
|
}
|
|
@@ -2209,12 +2215,12 @@ public class TaskTracker
|
|
|
*/
|
|
|
public synchronized void kill(boolean wasFailure) throws IOException {
|
|
|
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
|
|
|
- taskStatus.getRunState() == TaskStatus.State.INITIALIZED ||
|
|
|
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
|
|
|
wasKilled = true;
|
|
|
if (wasFailure) {
|
|
|
failures += 1;
|
|
|
}
|
|
|
+ runner.kill();
|
|
|
taskStatus.setRunState((wasFailure) ?
|
|
|
TaskStatus.State.FAILED :
|
|
|
TaskStatus.State.KILLED);
|
|
@@ -2225,15 +2231,12 @@ public class TaskTracker
|
|
|
} else {
|
|
|
taskStatus.setRunState(TaskStatus.State.KILLED);
|
|
|
}
|
|
|
- }
|
|
|
- if (runner != null) {
|
|
|
- runner.kill();
|
|
|
- runner.signalDone();
|
|
|
- } else {
|
|
|
- if (task.isMapTask()) {
|
|
|
- addFreeMapSlot();
|
|
|
- } else {
|
|
|
- addFreeReduceSlot();
|
|
|
+ if (slotTaken) {
|
|
|
+ if (task.isMapTask()) {
|
|
|
+ addFreeMapSlot();
|
|
|
+ } else {
|
|
|
+ addFreeReduceSlot();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2344,7 +2347,7 @@ public class TaskTracker
|
|
|
/**
|
|
|
* Called upon startup by the child process, to fetch Task data.
|
|
|
*/
|
|
|
- public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId)
|
|
|
+ public synchronized JvmTask getTask(JVMId jvmId)
|
|
|
throws IOException {
|
|
|
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
|
|
|
if (!jvmManager.isJvmKnown(jvmId)) {
|
|
@@ -2353,41 +2356,24 @@ public class TaskTracker
|
|
|
}
|
|
|
RunningJob rjob = runningJobs.get(jvmId.getJobId());
|
|
|
if (rjob == null) { //kill the JVM since the job is dead
|
|
|
+ LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
|
|
|
+ " is dead");
|
|
|
jvmManager.killJvm(jvmId);
|
|
|
return new JvmTask(null, true);
|
|
|
}
|
|
|
- TaskInProgress t = runningTasks.get(firstTaskId);
|
|
|
- //if we can give the JVM the task it is asking for, well and good;
|
|
|
- //if not, we give it some other task from the same job (note that some
|
|
|
- //other JVM might have run this task while this JVM was init'ing)
|
|
|
- if (t == null || t.getStatus().getRunState() !=
|
|
|
- TaskStatus.State.INITIALIZED) {
|
|
|
- boolean isMap = jvmId.isMapJVM();
|
|
|
- synchronized (rjob) {
|
|
|
- for (TaskInProgress tip : runningTasks.values()) {
|
|
|
- synchronized (tip) {
|
|
|
- if (tip.getTask().getJobID().equals(jvmId.getJobId()) &&
|
|
|
- tip.getRunState() == TaskStatus.State.INITIALIZED
|
|
|
- && ((isMap && tip.getTask().isMapTask()) ||
|
|
|
- (!isMap && !tip.getTask().isMapTask()))) {
|
|
|
- t = tip;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- //now the task could be null or we could have got a task that already
|
|
|
- //ran earlier (the firstTaskId case)
|
|
|
- if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) {
|
|
|
- jvmManager.setRunningTaskForJvm(jvmId, null);
|
|
|
+ TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
|
|
|
+ if (tip == null) {
|
|
|
return new JvmTask(null, false);
|
|
|
}
|
|
|
- t.getStatus().setRunState(TaskStatus.State.RUNNING);
|
|
|
- t.getStatus().setStartTime(System.currentTimeMillis());
|
|
|
- jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner());
|
|
|
- LOG.info("JVM with ID: " + jvmId + " given task: " +
|
|
|
- t.getTask().getTaskID().toString());
|
|
|
- return new JvmTask(t.getTask(), false);
|
|
|
+ if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present
|
|
|
+ LOG.info("JVM with ID: " + jvmId + " given task: " +
|
|
|
+ tip.getTask().getTaskID());
|
|
|
+ return new JvmTask(tip.getTask(), false);
|
|
|
+ } else {
|
|
|
+ LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " +
|
|
|
+ tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState());
|
|
|
+ return new JvmTask(null, true);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2543,12 +2529,6 @@ public class TaskTracker
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized void taskInitialized(TaskAttemptID taskid) {
|
|
|
- TaskInProgress tip = tasks.get(taskid);
|
|
|
- if (tip != null) {
|
|
|
- tip.taskInitialized();
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* A completed map task's output has been lost.
|