|
@@ -66,6 +66,8 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
|
|
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Locality;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
|
@@ -156,7 +158,8 @@ public abstract class TaskAttemptImpl implements
|
|
|
private final org.apache.hadoop.mapred.JobID oldJobId;
|
|
|
private final TaskAttemptListener taskAttemptListener;
|
|
|
private final Resource resourceCapability;
|
|
|
- private final String[] dataLocalHosts;
|
|
|
+ protected Set<String> dataLocalHosts;
|
|
|
+ protected Set<String> dataLocalRacks;
|
|
|
private final List<String> diagnostics = new ArrayList<String>();
|
|
|
private final Lock readLock;
|
|
|
private final Lock writeLock;
|
|
@@ -175,6 +178,8 @@ public abstract class TaskAttemptImpl implements
|
|
|
private int shufflePort = -1;
|
|
|
private String trackerName;
|
|
|
private int httpPort;
|
|
|
+ private Locality locality;
|
|
|
+ private Avataar avataar;
|
|
|
|
|
|
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
|
|
|
new CleanupContainerTransition();
|
|
@@ -532,8 +537,16 @@ public abstract class TaskAttemptImpl implements
|
|
|
getMemoryRequired(conf, taskId.getTaskType()));
|
|
|
this.resourceCapability.setVirtualCores(
|
|
|
getCpuRequired(conf, taskId.getTaskType()));
|
|
|
- this.dataLocalHosts = dataLocalHosts;
|
|
|
+
|
|
|
+ this.dataLocalHosts = resolveHosts(dataLocalHosts);
|
|
|
RackResolver.init(conf);
|
|
|
+ this.dataLocalRacks = new HashSet<String>();
|
|
|
+ for (String host : this.dataLocalHosts) {
|
|
|
+ this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
|
|
|
+ }
|
|
|
+
|
|
|
+ locality = Locality.OFF_SWITCH;
|
|
|
+ avataar = Avataar.VIRGIN;
|
|
|
|
|
|
// This "this leak" is okay because the retained pointer is in an
|
|
|
// instance variable.
|
|
@@ -1032,6 +1045,23 @@ public abstract class TaskAttemptImpl implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public Locality getLocality() {
|
|
|
+ return locality;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setLocality(Locality locality) {
|
|
|
+ this.locality = locality;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Avataar getAvataar()
|
|
|
+ {
|
|
|
+ return avataar;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setAvataar(Avataar avataar) {
|
|
|
+ this.avataar = avataar;
|
|
|
+ }
|
|
|
+
|
|
|
private static TaskAttemptState getExternalState(
|
|
|
TaskAttemptStateInternal smState) {
|
|
|
switch (smState) {
|
|
@@ -1232,25 +1262,27 @@ public abstract class TaskAttemptImpl implements
|
|
|
taskAttempt.attemptId,
|
|
|
taskAttempt.resourceCapability));
|
|
|
} else {
|
|
|
- Set<String> racks = new HashSet<String>();
|
|
|
- for (String host : taskAttempt.dataLocalHosts) {
|
|
|
- racks.add(RackResolver.resolve(host).getNetworkLocation());
|
|
|
- }
|
|
|
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
|
|
|
- taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
|
|
|
- .resolveHosts(taskAttempt.dataLocalHosts), racks
|
|
|
- .toArray(new String[racks.size()])));
|
|
|
+ taskAttempt.attemptId, taskAttempt.resourceCapability,
|
|
|
+ taskAttempt.dataLocalHosts.toArray(
|
|
|
+ new String[taskAttempt.dataLocalHosts.size()]),
|
|
|
+ taskAttempt.dataLocalRacks.toArray(
|
|
|
+ new String[taskAttempt.dataLocalRacks.size()])));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected String[] resolveHosts(String[] src) {
|
|
|
- String[] result = new String[src.length];
|
|
|
- for (int i = 0; i < src.length; i++) {
|
|
|
- if (isIP(src[i])) {
|
|
|
- result[i] = resolveHost(src[i]);
|
|
|
- } else {
|
|
|
- result[i] = src[i];
|
|
|
+ protected Set<String> resolveHosts(String[] src) {
|
|
|
+ Set<String> result = new HashSet<String>();
|
|
|
+ if (src != null) {
|
|
|
+ for (int i = 0; i < src.length; i++) {
|
|
|
+ if (src[i] == null) {
|
|
|
+ continue;
|
|
|
+ } else if (isIP(src[i])) {
|
|
|
+ result.add(resolveHost(src[i]));
|
|
|
+ } else {
|
|
|
+ result.add(src[i]);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
return result;
|
|
@@ -1300,6 +1332,20 @@ public abstract class TaskAttemptImpl implements
|
|
|
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
|
|
|
taskAttempt.taskAttemptListener.registerPendingTask(
|
|
|
taskAttempt.remoteTask, taskAttempt.jvmID);
|
|
|
+
|
|
|
+ taskAttempt.locality = Locality.OFF_SWITCH;
|
|
|
+ if (taskAttempt.dataLocalHosts.size() > 0) {
|
|
|
+ String cHost = taskAttempt.resolveHost(
|
|
|
+ taskAttempt.containerNodeId.getHost());
|
|
|
+ if (taskAttempt.dataLocalHosts.contains(cHost)) {
|
|
|
+ taskAttempt.locality = Locality.NODE_LOCAL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (taskAttempt.locality == Locality.OFF_SWITCH) {
|
|
|
+ if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
|
|
|
+ taskAttempt.locality = Locality.RACK_LOCAL;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
//launch the container
|
|
|
//create the container object to be launched for a given Task attempt
|
|
@@ -1376,7 +1422,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
|
|
} else {
|
|
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
|
|
- "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
+ "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1421,7 +1467,8 @@ public abstract class TaskAttemptImpl implements
|
|
|
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
|
|
|
taskAttempt.launchTime,
|
|
|
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
|
|
|
- taskAttempt.shufflePort, taskAttempt.containerID);
|
|
|
+ taskAttempt.shufflePort, taskAttempt.containerID,
|
|
|
+ taskAttempt.locality.toString(), taskAttempt.avataar.toString());
|
|
|
taskAttempt.eventHandler.handle
|
|
|
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
|
|
|
taskAttempt.eventHandler.handle
|
|
@@ -1510,7 +1557,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
// handling failed map/reduce events.
|
|
|
}else {
|
|
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
|
|
- "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
+ "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
}
|
|
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
|
|
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
|
|
@@ -1580,7 +1627,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
|
|
}else {
|
|
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
|
|
- "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
+ "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
}
|
|
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
|
|
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
|
|
@@ -1648,7 +1695,7 @@ public abstract class TaskAttemptImpl implements
|
|
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
|
|
}else {
|
|
|
LOG.debug("Not generating HistoryFinish event since start event not " +
|
|
|
- "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
+ "generated for taskAttempt: " + taskAttempt.getID());
|
|
|
}
|
|
|
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
|
|
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|