Browse Source

MAPREDUCE-6485. Create a new task attempt with failed map task priority if in-progress attempts are unassigned. (Xianyin Xin via rohithsharmaks)

rohithsharmaks 9 năm trước cách đây
mục cha
commit
439f43ad3d

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -588,6 +588,9 @@ Release 2.8.0 - UNRELEASED
    MAPREDUCE-6494. Permission issue when running archive-logs tool as
    different users (rkanter)
 
+   MAPREDUCE-6485. Create a new task attempt with failed map task priority
+   if in-progress attempts are unassigned. (Xianyin Xin via rohithsharmaks)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1333,7 +1333,7 @@ public abstract class TaskAttemptImpl implements
     return attemptState;
   }
 
-  private static TaskAttemptState getExternalState(
+  protected static TaskAttemptState getExternalState(
       TaskAttemptStateInternal smState) {
     switch (smState) {
     case ASSIGNED:
@@ -1365,6 +1365,11 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  // check whether the attempt is assigned if container is not null
+  boolean isContainerAssigned() {
+    return container != null;
+  }
+
   //always called in write lock
   private void setFinishTime() {
     //set the finish time only if launch time is set

+ 16 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -1057,9 +1057,21 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskAttemptCompletionEventStatus.FAILED);
         // we don't need a new event if we already have a spare
         task.inProgressAttempts.remove(taskAttemptId);
-        if (task.inProgressAttempts.size() == 0
-            && task.successfulAttempt == null) {
-          task.addAndScheduleAttempt(Avataar.VIRGIN);
+        if (task.successfulAttempt == null) {
+          boolean shouldAddNewAttempt = true;
+          if (task.inProgressAttempts.size() > 0) {
+            // if not all of the inProgressAttempts are hanging for resource
+            for (TaskAttemptId attemptId : task.inProgressAttempts) {
+              if (((TaskAttemptImpl) task.getAttempt(attemptId))
+                  .isContainerAssigned()) {
+                shouldAddNewAttempt = false;
+                break;
+              }
+            }
+          }
+          if (shouldAddNewAttempt) {
+            task.addAndScheduleAttempt(Avataar.VIRGIN);
+          }
         }
       } else {
         task.handleTaskAttemptCompletion(
@@ -1080,7 +1092,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
             taskFailedEvent));
         } else {
           LOG.debug("Not generating HistoryFinish event since start event not" +
-          		" generated for task: " + task.getID());
+              " generated for task: " + task.getID());
         }
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));

+ 92 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -50,6 +50,8 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.util.Clock;
@@ -140,6 +143,8 @@ public class TestTaskImpl {
 
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
+    boolean rescheduled = false;
+    boolean containerAssigned = false;
     private TaskType taskType;
     private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
 
@@ -153,6 +158,15 @@ public class TestTaskImpl {
       this.taskType = taskType;
     }
 
+    public void assignContainer() {
+      containerAssigned = true;
+    }
+
+    @Override
+    boolean isContainerAssigned() {
+      return containerAssigned;
+    }
+
     public TaskAttemptId getAttemptId() {
       return getID();
     }
@@ -173,11 +187,20 @@ public class TestTaskImpl {
     public void setState(TaskAttemptState state) {
       this.state = state;
     }
-    
+
+    @Override
     public TaskAttemptState getState() {
       return state;
     }
 
+    public boolean getRescheduled() {
+      return this.rescheduled;
+    }
+
+    public void setRescheduled(boolean rescheduled) {
+      this.rescheduled = rescheduled;
+    }
+
     @Override
     public Counters getCounters() {
       return attemptCounters;
@@ -279,7 +302,9 @@ public class TestTaskImpl {
   private void launchTaskAttempt(TaskAttemptId attemptId) {
     mockTask.handle(new TaskTAttemptEvent(attemptId, 
         TaskEventType.T_ATTEMPT_LAUNCHED));
-    assertTaskRunningState();    
+    ((MockTaskAttemptImpl)(mockTask.getAttempt(attemptId)))
+        .assignContainer();
+    assertTaskRunningState();
   }
   
   private void commitTaskAttempt(TaskAttemptId attemptId) {
@@ -708,6 +733,71 @@ public class TestTaskImpl {
     assertEquals(TaskState.FAILED, mockTask.getState());
   }
 
+  private class PartialAttemptEventHandler implements EventHandler {
+
+    @Override
+    public void handle(Event event) {
+      if (event instanceof TaskAttemptEvent)
+        if (event.getType() == TaskAttemptEventType.TA_RESCHEDULE) {
+          TaskAttempt attempt = mockTask.getAttempt(((TaskAttemptEvent) event).getTaskAttemptID());
+          ((MockTaskAttemptImpl)attempt).setRescheduled(true);
+        }
+    }
+  }
+
+  @Test
+  public void testFailedTransitionWithHangingSpeculativeMap() {
+    mockTask = new MockTaskImpl(jobId, partition, new PartialAttemptEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
+        credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
+      @Override
+      protected int getMaxAttempts() {
+        return 4;
+      }
+    };
+
+    // start a new task, schedule and launch a new attempt
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+
+    // add a speculative attempt(#2), but not launch it
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+
+    // have the first attempt(#1) fail, verify task still running since the
+    // max attempts is 4
+    MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
+    taskAttempt.setState(TaskAttemptState.FAILED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.RUNNING, mockTask.getState());
+
+    // verify a new attempt(#3) added because the speculative attempt(#2)
+    // is hanging
+    assertEquals(3, taskAttempts.size());
+
+    // verify the speculative attempt(#2) is not a rescheduled attempt
+    assertEquals(false, taskAttempts.get(1).getRescheduled());
+
+    // verify the third attempt is a rescheduled attempt
+    assertEquals(true, taskAttempts.get(2).getRescheduled());
+
+    // now launch the latest attempt(#3) and set the internal state to running
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+
+    // have the speculative attempt(#2) fail, verify task still since it
+    // hasn't reach the max attempts which is 4
+    MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1);
+    taskAttempt1.setState(TaskAttemptState.FAILED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.RUNNING, mockTask.getState());
+
+    // verify there's no new attempt added because of the running attempt(#3)
+    assertEquals(3, taskAttempts.size());
+  }
+
   @Test
   public void testCountersWithSpeculation() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),