Browse Source

MAPREDUCE-5817. Mappers get rescheduled on node transition even after all reducers are completed. (Sangjin Lee via kasha)
(cherry picked from commit 27d24f96ab8d17e839a1ef0d7076efc78d28724a)

(cherry picked from commit b826168173c3386738acc12a5d62577f12aa06e9)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
6ad254b821

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -36,6 +36,9 @@ Release 2.6.5 - UNRELEASED
     MAPREDUCE-6558. multibyte delimiters with compressed input files generate
     duplicate records (Wilfred Spiegelenburg via jlowe)
 
+    MAPREDUCE-5817. Mappers get rescheduled on node transition even after all
+    reducers are completed. (Sangjin Lee via kasha)
+
 Release 2.6.4 - 2016-02-11
 
   INCOMPATIBLE CHANGES

+ 27 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -130,6 +130,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /** Implementation of Job interface. Maintains the state machines of Job.
@@ -1328,15 +1329,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   
   private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
     // rerun previously successful map tasks
-    List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId);
-    if(taskAttemptIdList != null) {
-      String mesg = "TaskAttempt killed because it ran on unusable node "
-          + nodeId;
-      for(TaskAttemptId id : taskAttemptIdList) {
-        if(TaskType.MAP == id.getTaskId().getTaskType()) {
-          // reschedule only map tasks because their outputs maybe unusable
-          LOG.info(mesg + ". AttemptId:" + id);
-          eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+    // do this only if the job is still in the running state and there are
+    // running reducers
+    if (getInternalState() == JobStateInternal.RUNNING &&
+        !allReducersComplete()) {
+      List<TaskAttemptId> taskAttemptIdList =
+          nodesToSucceededTaskAttempts.get(nodeId);
+      if (taskAttemptIdList != null) {
+        String mesg = "TaskAttempt killed because it ran on unusable node "
+            + nodeId;
+        for (TaskAttemptId id : taskAttemptIdList) {
+          if (TaskType.MAP == id.getTaskId().getTaskType()) {
+            // reschedule only map tasks because their outputs maybe unusable
+            LOG.info(mesg + ". AttemptId:" + id);
+            eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+          }
         }
       }
     }
@@ -1344,6 +1351,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     // RMContainerAllocator
   }
 
+  private boolean allReducersComplete() {
+    return numReduceTasks == 0 || numReduceTasks == getCompletedReduces();
+  }
+
   /*
   private int getBlockSize() {
     String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
@@ -2082,13 +2093,18 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
   }
 
+  @VisibleForTesting
+  void decrementSucceededMapperCount() {
+    completedTaskCount--;
+    succeededMapTaskCount--;
+  }
+
   private static class MapTaskRescheduledTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
       //succeeded map task is restarted back
-      job.completedTaskCount--;
-      job.succeededMapTaskCount--;
+      job.decrementSucceededMapperCount();
     }
   }
 

+ 126 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -19,15 +19,21 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
@@ -35,10 +41,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.jobhistory.EventType;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -46,10 +48,17 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -64,7 +73,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@@ -76,6 +89,9 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -508,6 +524,112 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
+  @Test(timeout=20000)
+  public void testUnusableNodeTransition() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
+    // add a special task event handler to put the task back to running in case
+    // of task rescheduling/killing
+    EventHandler<TaskAttemptEvent> taskAttemptEventHandler =
+        new EventHandler<TaskAttemptEvent>() {
+      @Override
+      public void handle(TaskAttemptEvent event) {
+        if (event.getType() == TaskAttemptEventType.TA_KILL) {
+          job.decrementSucceededMapperCount();
+        }
+      }
+    };
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
+
+    // replace the tasks with spied versions to return the right attempts
+    Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
+    List<NodeReport> nodeReports = new ArrayList<NodeReport>();
+    Map<NodeReport,TaskId> nodeReportsToTaskIds =
+        new HashMap<NodeReport,TaskId>();
+    for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
+      TaskId taskId = e.getKey();
+      Task task = e.getValue();
+      if (taskId.getTaskType() == TaskType.MAP) {
+        // add an attempt to the task to simulate nodes
+        NodeId nodeId = mock(NodeId.class);
+        TaskAttempt attempt = mock(TaskAttempt.class);
+        when(attempt.getNodeId()).thenReturn(nodeId);
+        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+        when(attempt.getID()).thenReturn(attemptId);
+        // create a spied task
+        Task spied = spy(task);
+        doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
+        spiedTasks.put(taskId, spied);
+
+        // create a NodeReport based on the node id
+        NodeReport report = mock(NodeReport.class);
+        when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
+        when(report.getNodeId()).thenReturn(nodeId);
+        nodeReports.add(report);
+        nodeReportsToTaskIds.put(report, taskId);
+      }
+    }
+    // replace the tasks with the spied tasks
+    job.tasks.putAll(spiedTasks);
+
+    // complete all mappers first
+    for (TaskId taskId: job.tasks.keySet()) {
+      if (taskId.getTaskType() == TaskType.MAP) {
+        // generate a task attempt completed event first to populate the
+        // nodes-to-succeeded-attempts map
+        TaskAttemptCompletionEvent tce =
+            Records.newRecord(TaskAttemptCompletionEvent.class);
+        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+        tce.setAttemptId(attemptId);
+        tce.setStatus(TaskAttemptCompletionEventStatus.SUCCEEDED);
+        job.handle(new JobTaskAttemptCompletedEvent(tce));
+        // complete the task itself
+        job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+        Assert.assertEquals(JobState.RUNNING, job.getState());
+      }
+    }
+
+    // add an event for a node transition
+    NodeReport firstMapperNodeReport = nodeReports.get(0);
+    NodeReport secondMapperNodeReport = nodeReports.get(1);
+    job.handle(new JobUpdatedNodesEvent(job.getID(),
+        Collections.singletonList(firstMapperNodeReport)));
+    // complete the reducer
+    for (TaskId taskId: job.tasks.keySet()) {
+      if (taskId.getTaskType() == TaskType.REDUCE) {
+        job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+      }
+    }
+    // add another event for a node transition for the other mapper
+    // this should not trigger rescheduling
+    job.handle(new JobUpdatedNodesEvent(job.getID(),
+        Collections.singletonList(secondMapperNodeReport)));
+    // complete the first mapper that was rescheduled
+    TaskId firstMapper = nodeReportsToTaskIds.get(firstMapperNodeReport);
+    job.handle(new JobTaskEvent(firstMapper, TaskState.SUCCEEDED));
+    // verify the state is moving to committing
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer complete and verify the job succeeds
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
     t.testJobNoTasks();