|
@@ -23,8 +23,10 @@ import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -69,12 +71,14 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
|
|
|
private AppContext context;
|
|
|
private Server server;
|
|
|
- private TaskHeartbeatHandler taskHeartbeatHandler;
|
|
|
+ protected TaskHeartbeatHandler taskHeartbeatHandler;
|
|
|
private InetSocketAddress address;
|
|
|
- private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap =
|
|
|
+ private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap =
|
|
|
Collections.synchronizedMap(new HashMap<WrappedJvmID,
|
|
|
org.apache.hadoop.mapred.Task>());
|
|
|
private JobTokenSecretManager jobTokenSecretManager = null;
|
|
|
+ private Set<WrappedJvmID> pendingJvms =
|
|
|
+ Collections.synchronizedSet(new HashSet<WrappedJvmID>());
|
|
|
|
|
|
public TaskAttemptListenerImpl(AppContext context,
|
|
|
JobTokenSecretManager jobTokenSecretManager) {
|
|
@@ -395,35 +399,55 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
|
|
|
JVMId jvmId = context.jvmId;
|
|
|
LOG.info("JVM with ID : " + jvmId + " asked for a task");
|
|
|
-
|
|
|
- // TODO: Is it an authorised container to get a task? Otherwise return null.
|
|
|
-
|
|
|
- // TODO: Is the request for task-launch still valid?
|
|
|
+
|
|
|
+ JvmTask jvmTask = null;
|
|
|
+ // TODO: Is it an authorized container to get a task? Otherwise return null.
|
|
|
|
|
|
// TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
|
|
|
// to jobId and task-type.
|
|
|
|
|
|
WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
|
|
|
jvmId.getId());
|
|
|
- org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID);
|
|
|
- if (task != null) { //there may be lag in the attempt getting added here
|
|
|
- LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
|
|
|
- JvmTask jvmTask = new JvmTask(task, false);
|
|
|
-
|
|
|
- //remove the task as it is no more needed and free up the memory
|
|
|
- jvmIDToAttemptMap.remove(wJvmID);
|
|
|
-
|
|
|
- return jvmTask;
|
|
|
+ synchronized(this) {
|
|
|
+ if(pendingJvms.contains(wJvmID)) {
|
|
|
+ org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
|
|
|
+ if (task != null) { //there may be lag in the attempt getting added here
|
|
|
+ LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
|
|
|
+ jvmTask = new JvmTask(task, false);
|
|
|
+
|
|
|
+ //remove the task as it is no more needed and free up the memory
|
|
|
+ //Also we have already told the JVM to process a task, so it is no
|
|
|
+ //longer pending, and further request should ask it to exit.
|
|
|
+ pendingJvms.remove(wJvmID);
|
|
|
+ jvmIDToActiveAttemptMap.remove(wJvmID);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
|
|
|
+ jvmTask = new JvmTask(null, true);
|
|
|
+ }
|
|
|
}
|
|
|
- return null;
|
|
|
+ return jvmTask;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void registerPendingTask(WrappedJvmID jvmID) {
|
|
|
+ //Save this JVM away as one that has not been handled yet
|
|
|
+ pendingJvms.add(jvmID);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
|
|
|
+ public void registerLaunchedTask(
|
|
|
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
|
|
|
org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
|
|
|
- //create the mapping so that it is easy to look up
|
|
|
- //when it comes back to ask for Task.
|
|
|
- jvmIDToAttemptMap.put(jvmID, task);
|
|
|
+ synchronized(this) {
|
|
|
+ //create the mapping so that it is easy to look up
|
|
|
+ //when it comes back to ask for Task.
|
|
|
+ jvmIDToActiveAttemptMap.put(jvmID, task);
|
|
|
+ //This should not need to happen here, but just to be on the safe side
|
|
|
+ if(!pendingJvms.add(jvmID)) {
|
|
|
+ LOG.warn(jvmID+" launched without first being registered");
|
|
|
+ }
|
|
|
+ }
|
|
|
//register this attempt
|
|
|
taskHeartbeatHandler.register(attemptID);
|
|
|
}
|
|
@@ -432,8 +456,9 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
|
|
|
WrappedJvmID jvmID) {
|
|
|
//remove the mapping if not already removed
|
|
|
- jvmIDToAttemptMap.remove(jvmID);
|
|
|
-
|
|
|
+ jvmIDToActiveAttemptMap.remove(jvmID);
|
|
|
+ //remove the pending if not already removed
|
|
|
+ pendingJvms.remove(jvmID);
|
|
|
//unregister this attempt
|
|
|
taskHeartbeatHandler.unregister(attemptID);
|
|
|
}
|