|
@@ -18,13 +18,14 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
|
@@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.Clock;
|
|
@@ -208,8 +213,23 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
|
|
|
|
private final StateMachine<TaskState, TaskEventType, TaskEvent>
|
|
|
stateMachine;
|
|
|
-
|
|
|
- protected int nextAttemptNumber;
|
|
|
+
|
|
|
+ // By default, the next TaskAttempt number is zero. Changes during recovery
|
|
|
+ protected int nextAttemptNumber = 0;
|
|
|
+ private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
|
|
|
+ new ArrayList<TaskAttemptInfo>();
|
|
|
+
|
|
|
+ private static final class RecoverdAttemptsComparator implements
|
|
|
+ Comparator<TaskAttemptInfo> {
|
|
|
+ @Override
|
|
|
+ public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
|
|
|
+ long diff = attempt1.getStartTime() - attempt2.getStartTime();
|
|
|
+ return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
|
|
|
+ new RecoverdAttemptsComparator();
|
|
|
|
|
|
//should be set to one which comes first
|
|
|
//saying COMMIT_PENDING
|
|
@@ -230,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
|
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
|
|
|
Token<JobTokenIdentifier> jobToken,
|
|
|
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
|
|
|
- Set<TaskId> completedTasksFromPreviousRun, int startCount,
|
|
|
+ Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
|
|
MRAppMetrics metrics) {
|
|
|
this.conf = conf;
|
|
|
this.clock = clock;
|
|
@@ -243,10 +263,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
|
// have a convention that none of the overrides depends on any
|
|
|
// fields that need initialization.
|
|
|
maxAttempts = getMaxAttempts();
|
|
|
- taskId = recordFactory.newRecordInstance(TaskId.class);
|
|
|
- taskId.setJobId(jobId);
|
|
|
- taskId.setId(partition);
|
|
|
- taskId.setTaskType(taskType);
|
|
|
+ taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType);
|
|
|
this.partition = partition;
|
|
|
this.taskAttemptListener = taskAttemptListener;
|
|
|
this.eventHandler = eventHandler;
|
|
@@ -255,18 +272,38 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
|
this.jobToken = jobToken;
|
|
|
this.metrics = metrics;
|
|
|
|
|
|
+ // See if this is from a previous generation.
|
|
|
if (completedTasksFromPreviousRun != null
|
|
|
- && completedTasksFromPreviousRun.contains(taskId)) {
|
|
|
+ && completedTasksFromPreviousRun.containsKey(taskId)) {
|
|
|
+ // This task has TaskAttempts from previous generation. We have to replay
|
|
|
+ // them.
|
|
|
LOG.info("Task is from previous run " + taskId);
|
|
|
- startCount = startCount - 1;
|
|
|
+ TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
|
|
|
+ Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
|
|
|
+ taskInfo.getAllTaskAttempts();
|
|
|
+ taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
|
|
|
+ taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
|
|
|
+ Collections.sort(taskAttemptsFromPreviousGeneration,
|
|
|
+ RECOVERED_ATTEMPTS_COMPARATOR);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (taskAttemptsFromPreviousGeneration.isEmpty()) {
|
|
|
+ // All the previous attempts are exhausted, now start with a new
|
|
|
+ // generation.
|
|
|
+
|
|
|
+ // All the new TaskAttemptIDs are generated based on MR
|
|
|
+ // ApplicationAttemptID so that attempts from previous lives don't
|
|
|
+ // over-step the current one. This assumes that a task won't have more
|
|
|
+ // than 1000 attempts in its single generation, which is very reasonable.
|
|
|
+ // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
|
|
|
+ // and requires serious medical attention.
|
|
|
+ nextAttemptNumber = (startCount - 1) * 1000;
|
|
|
+ } else {
|
|
|
+ // There are still some TaskAttempts from previous generation, use them
|
|
|
+ nextAttemptNumber =
|
|
|
+ taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
|
|
}
|
|
|
|
|
|
- //attempt ids are generated based on MR app startCount so that attempts
|
|
|
- //from previous lives don't overstep the current one.
|
|
|
- //this assumes that a task won't have more than 1000 attempts in its single
|
|
|
- //life
|
|
|
- nextAttemptNumber = (startCount - 1) * 1000;
|
|
|
-
|
|
|
// This "this leak" is okay because the retained pointer is in an
|
|
|
// instance variable.
|
|
|
stateMachine = stateMachineFactory.make(this);
|
|
@@ -390,17 +427,23 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
|
|
|
|
//this is always called in read/write lock
|
|
|
private long getLaunchTime() {
|
|
|
- long launchTime = 0;
|
|
|
+ long taskLaunchTime = 0;
|
|
|
+ boolean launchTimeSet = false;
|
|
|
for (TaskAttempt at : attempts.values()) {
|
|
|
- //select the least launch time of all attempts
|
|
|
- if (launchTime == 0 || launchTime > at.getLaunchTime()) {
|
|
|
- launchTime = at.getLaunchTime();
|
|
|
+ // select the least launch time of all attempts
|
|
|
+ long attemptLaunchTime = at.getLaunchTime();
|
|
|
+ if (attemptLaunchTime != 0 && !launchTimeSet) {
|
|
|
+ // For the first non-zero launch time
|
|
|
+ launchTimeSet = true;
|
|
|
+ taskLaunchTime = attemptLaunchTime;
|
|
|
+ } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
|
|
|
+ taskLaunchTime = attemptLaunchTime;
|
|
|
}
|
|
|
}
|
|
|
- if (launchTime == 0) {
|
|
|
+ if (!launchTimeSet) {
|
|
|
return this.scheduledTime;
|
|
|
}
|
|
|
- return launchTime;
|
|
|
+ return taskLaunchTime;
|
|
|
}
|
|
|
|
|
|
//this is always called in read/write lock
|
|
@@ -525,7 +568,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|
|
attempts.put(attempt.getID(), attempt);
|
|
|
break;
|
|
|
}
|
|
|
- ++nextAttemptNumber;
|
|
|
+
|
|
|
+ // Update nextATtemptNumber
|
|
|
+ if (taskAttemptsFromPreviousGeneration.isEmpty()) {
|
|
|
+ ++nextAttemptNumber;
|
|
|
+ } else {
|
|
|
+ // There are still some TaskAttempts from previous generation, use them
|
|
|
+ nextAttemptNumber =
|
|
|
+ taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
|
|
+ }
|
|
|
+
|
|
|
++numberUncompletedAttempts;
|
|
|
//schedule the nextAttemptNumber
|
|
|
if (failedAttempts > 0) {
|