|
@@ -70,12 +70,10 @@ import org.apache.hadoop.yarn.service.CompositeService;
|
|
|
public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
implements TaskUmbilicalProtocol, TaskAttemptListener {
|
|
|
|
|
|
- // TODO XXX: Ideally containerId registration and unregistration should be taken care of by the Container.
|
|
|
- // .... TaskAttemptId registration and unregistration by the TaskAttempt. Can this be split into a
|
|
|
- // ContainerListener + TaskAttemptListener ?
|
|
|
-
|
|
|
- // TODO XXX. Re-look at big chunks. Possibly redo bits.
|
|
|
- // ..launchedJvm map etc.
|
|
|
+ // TODO: Eventually, split this into a ContainerListener (getTask) and a
|
|
|
+ // TaskAttemptListener (attempt specific requests). After that, AMContainer
|
|
|
+ // registers containers, AMTask registers attempts.
|
|
|
+
|
|
|
// ..Sending back errors for unknown tasks.
|
|
|
|
|
|
private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true);
|
|
@@ -90,17 +88,13 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
private InetSocketAddress address;
|
|
|
private Server server;
|
|
|
|
|
|
- // TODO XXX: Use this to figure out whether an incoming ping is valid.
|
|
|
+ // TODO Use this to figure out whether an incoming ping is valid.
|
|
|
private ConcurrentMap<TaskAttemptID, WrappedJvmID> attemptToJvmIdMap =
|
|
|
new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
|
|
|
// jvmIdToContainerIdMap also serving to check whether the container is still running.
|
|
|
private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap =
|
|
|
- new ConcurrentHashMap<WrappedJvmID, ContainerId>();
|
|
|
-// private Set<WrappedJvmID> launchedJVMs = Collections
|
|
|
-// .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ new ConcurrentHashMap<WrappedJvmID, ContainerId>();
|
|
|
+
|
|
|
public TaskAttemptListenerImpl2(AppContext context, TaskHeartbeatHandler thh,
|
|
|
ContainerHeartbeatHandler chh, JobTokenSecretManager jobTokenSecretManager) {
|
|
|
super(TaskAttemptListenerImpl2.class.getName());
|
|
@@ -112,7 +106,6 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
|
|
|
@Override
|
|
|
public void start() {
|
|
|
- LOG.info("XXX: Starting TAL2");
|
|
|
startRpcServer();
|
|
|
super.start();
|
|
|
}
|
|
@@ -160,10 +153,26 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
return address;
|
|
|
}
|
|
|
|
|
|
+ private void pingContainerHeartbeatHandler(WrappedJvmID jvmId) {
|
|
|
+ ContainerId containerId = jvmIDToContainerIdMap.get(jvmId);
|
|
|
+ if (containerId != null) {
|
|
|
+ containerHeartbeatHandler.pinged(containerId);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Handling communication from JvmId: " + jvmId
|
|
|
+ + ", ContainerId not known for this jvm");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void pingContainerHeartbeatHandler(TaskAttemptID attemptID) {
|
|
|
- containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(attemptToJvmIdMap.get(attemptID)));
|
|
|
+ WrappedJvmID jvmId = attemptToJvmIdMap.get(attemptID);
|
|
|
+ if (jvmId != null) {
|
|
|
+ pingContainerHeartbeatHandler(jvmId);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Handling communication from attempt: " + attemptID
|
|
|
+ + ", JvmID not know for this attempt");
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Child checking whether it can commit.
|
|
|
*
|
|
@@ -418,10 +427,11 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
public JvmTask getTask(JvmContext jvmContext) throws IOException {
|
|
|
|
|
|
// A rough imitation of code from TaskTracker.
|
|
|
-
|
|
|
- // TODO XXX: Does ContainerHeartbeatHandler need to be pinged on getTask() ?
|
|
|
JVMId jvmId = jvmContext.jvmId;
|
|
|
- LOG.info("ZZZ: JVM with ID : " + jvmId + " asked for a task");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("JVM with ID : " + jvmId + " asked for a task");
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
JvmTask jvmTask = null;
|
|
|
// TODO: Is it an authorized container to get a task? Otherwise return null.
|
|
@@ -431,6 +441,7 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
|
|
|
WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
|
|
|
jvmId.getId());
|
|
|
+ pingContainerHeartbeatHandler(wJvmID);
|
|
|
|
|
|
ContainerId containerId = jvmIDToContainerIdMap.get(wJvmID);
|
|
|
if (containerId == null) {
|
|
@@ -453,46 +464,27 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
}
|
|
|
}
|
|
|
return jvmTask;
|
|
|
-
|
|
|
-//
|
|
|
-// // Try to look up the task. We remove it directly as we don't give
|
|
|
-// // multiple tasks to a JVM
|
|
|
-// if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) {
|
|
|
-// LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
|
|
|
-// jvmTask = TASK_FOR_INVALID_JVM;
|
|
|
-// } else {
|
|
|
-// if (!launchedJVMs.contains(wJvmID)) {
|
|
|
-// jvmTask = null;
|
|
|
-// LOG.info("JVM with ID: " + jvmId
|
|
|
-// + " asking for task before AM launch registered. Given null task");
|
|
|
-// } else {
|
|
|
-// // remove the task as it is no more needed and free up the memory.
|
|
|
-// // Also we have already told the JVM to process a task, so it is no
|
|
|
-// // longer pending, and further request should ask it to exit.
|
|
|
-// org.apache.hadoop.mapred.Task task =
|
|
|
-// jvmIDToActiveAttemptMap.remove(wJvmID);
|
|
|
-// launchedJVMs.remove(wJvmID);
|
|
|
-// LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
|
|
|
-// jvmTask = new JvmTask(task, false);
|
|
|
-// }
|
|
|
-// }
|
|
|
-// return jvmTask;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId) {
|
|
|
- LOG.info("XXX: JvmRegistration: " + jvmID + ", ContaienrId: " + containerId);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("JvmRegistartion: " + jvmID + ", ContainerId: " + containerId);
|
|
|
+ }
|
|
|
jvmIDToContainerIdMap.putIfAbsent(jvmID, containerId);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public void unregisterRunningJvm(WrappedJvmID jvmID) {
|
|
|
- LOG.info("TOREMOVE: Unregistering jvmId: " + jvmID);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("JVM Unregister: " + jvmID);
|
|
|
+ }
|
|
|
if (jvmIDToContainerIdMap.remove(jvmID) == null) {
|
|
|
LOG.warn("Attempt to unregister unknwon jvmtoContainerMap: " + jvmID);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
public void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId) {
|
|
|
attemptToJvmIdMap.put(TypeConverter.fromYarn(attemptId), jvmId);
|
|
|
}
|
|
@@ -505,58 +497,15 @@ public class TaskAttemptListenerImpl2 extends CompositeService
|
|
|
}
|
|
|
|
|
|
public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) {
|
|
|
- // TODO XXX: pullTaskAttempt as part of the interface.
|
|
|
AMContainerImpl container = (AMContainerImpl) context.getAllContainers()
|
|
|
.get(containerId);
|
|
|
return container.pullTaskAttempt();
|
|
|
}
|
|
|
|
|
|
-// @Override
|
|
|
-// public void registerPendingTask(
|
|
|
-// org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
|
|
|
-// // Create the mapping so that it is easy to look up
|
|
|
-// // when the jvm comes back to ask for Task.
|
|
|
-//
|
|
|
-// // A JVM not present in this map is an illegal task/JVM.
|
|
|
-// jvmIDToActiveAttemptMap.put(jvmID, task);
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void registerLaunchedTask(
|
|
|
-// org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
|
|
|
-// WrappedJvmID jvmId) {
|
|
|
-// // The AM considers the task to be launched (Has asked the NM to launch it)
|
|
|
-// // The JVM will only be given a task after this registartion.
|
|
|
-// launchedJVMs.add(jvmId);
|
|
|
-//
|
|
|
-// taskHeartbeatHandler.register(attemptID);
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void unregister(
|
|
|
-// org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
|
|
|
-// WrappedJvmID jvmID) {
|
|
|
-//
|
|
|
-// // Unregistration also comes from the same TaskAttempt which does the
|
|
|
-// // registration. Events are ordered at TaskAttempt, so unregistration will
|
|
|
-// // always come after registration.
|
|
|
-//
|
|
|
-// // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid
|
|
|
-// // synchronization issue with getTask(). getTask should be checking
|
|
|
-// // jvmIDToActiveAttemptMap before it checks launchedJVMs.
|
|
|
-//
|
|
|
-// // remove the mappings if not already removed
|
|
|
-// launchedJVMs.remove(jvmID);
|
|
|
-// jvmIDToActiveAttemptMap.remove(jvmID);
|
|
|
-//
|
|
|
-// //unregister this attempt
|
|
|
-// taskHeartbeatHandler.unregister(attemptID);
|
|
|
-// }
|
|
|
-
|
|
|
@Override
|
|
|
public ProtocolSignature getProtocolSignature(String protocol,
|
|
|
long clientVersion, int clientMethodsHash) throws IOException {
|
|
|
return ProtocolSignature.getProtocolSignature(this,
|
|
|
protocol, clientVersion, clientMethodsHash);
|
|
|
}
|
|
|
-}
|
|
|
+}
|