Forráskód Böngészése

MAPREDUCE-7053: Timed out tasks can fail to produce thread dump. Contributed by Jason Lowe.

(cherry picked from commit 904a6bf26311750d09b2d8e662fe97b30cdb95b4)
Eric Payne 7 éve
szülő
commit
c3aa02d856

+ 9 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -343,9 +343,15 @@ public class TaskAttemptListenerImpl extends CompositeService
     AtomicReference<TaskAttemptStatus> lastStatusRef =
         attemptIdToStatus.get(yarnAttemptID);
     if (lastStatusRef == null) {
-      LOG.error("Status update was called with illegal TaskAttemptId: "
-          + yarnAttemptID);
-      return false;
+      // The task is not known, but it could be in the process of tearing
+      // down gracefully or receiving a thread dump signal. Tolerate unknown
+      // tasks as long as they have unregistered recently.
+      if (!taskHeartbeatHandler.hasRecentlyUnregistered(yarnAttemptID)) {
+        LOG.error("Status update was called with illegal TaskAttemptId: "
+            + yarnAttemptID);
+        return false;
+      }
+      return true;
     }
 
     taskHeartbeatHandler.progressing(yarnAttemptID);

+ 47 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java

@@ -70,12 +70,14 @@ public class TaskHeartbeatHandler extends AbstractService {
   private Thread lostTaskCheckerThread;
   private volatile boolean stopped;
   private long taskTimeOut;
+  private long unregisterTimeOut;
   private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
 
   private final EventHandler eventHandler;
   private final Clock clock;
   
   private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
+  private ConcurrentMap<TaskAttemptId, ReportTime> recentlyUnregisteredAttempts;
 
   public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
       int numThreads) {
@@ -84,6 +86,8 @@ public class TaskHeartbeatHandler extends AbstractService {
     this.clock = clock;
     runningAttempts =
       new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
+    recentlyUnregisteredAttempts =
+        new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
   }
 
   @Override
@@ -91,6 +95,8 @@ public class TaskHeartbeatHandler extends AbstractService {
     super.serviceInit(conf);
     taskTimeOut = conf.getLong(
         MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
+    unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
+        MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
 
     // enforce task timeout is at least twice as long as task report interval
     long taskProgressReportIntervalMillis = MRJobConfUtil.
@@ -139,6 +145,12 @@ public class TaskHeartbeatHandler extends AbstractService {
 
   public void unregister(TaskAttemptId attemptID) {
     runningAttempts.remove(attemptID);
+    recentlyUnregisteredAttempts.put(attemptID,
+        new ReportTime(clock.getTime()));
+  }
+
+  public boolean hasRecentlyUnregistered(TaskAttemptId attemptID) {
+    return recentlyUnregisteredAttempts.containsKey(attemptID);
   }
 
   private class PingChecker implements Runnable {
@@ -146,27 +158,9 @@ public class TaskHeartbeatHandler extends AbstractService {
     @Override
     public void run() {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
-        Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
-            runningAttempts.entrySet().iterator();
-
-        // avoid calculating current time everytime in loop
         long currentTime = clock.getTime();
-
-        while (iterator.hasNext()) {
-          Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
-          boolean taskTimedOut = (taskTimeOut > 0) &&
-              (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
-           
-          if(taskTimedOut) {
-            // task is lost, remove from the list and raise lost event
-            iterator.remove();
-            eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
-                .getKey(), "AttemptID:" + entry.getKey().toString()
-                + " Timed out after " + taskTimeOut / 1000 + " secs"));
-            eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
-                TaskAttemptEventType.TA_TIMED_OUT));
-          }
-        }
+        checkRunning(currentTime);
+        checkRecentlyUnregistered(currentTime);
         try {
           Thread.sleep(taskTimeOutCheckInterval);
         } catch (InterruptedException e) {
@@ -175,6 +169,39 @@ public class TaskHeartbeatHandler extends AbstractService {
         }
       }
     }
+
+    private void checkRunning(long currentTime) {
+      Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
+          runningAttempts.entrySet().iterator();
+
+      while (iterator.hasNext()) {
+        Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
+        boolean taskTimedOut = (taskTimeOut > 0) &&
+            (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
+
+        if(taskTimedOut) {
+          // task is lost, remove from the list and raise lost event
+          iterator.remove();
+          eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+              .getKey(), "AttemptID:" + entry.getKey().toString()
+              + " Timed out after " + taskTimeOut / 1000 + " secs"));
+          eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+              TaskAttemptEventType.TA_TIMED_OUT));
+        }
+      }
+    }
+
+    private void checkRecentlyUnregistered(long currentTime) {
+      Iterator<ReportTime> iterator =
+          recentlyUnregisteredAttempts.values().iterator();
+      while (iterator.hasNext()) {
+        ReportTime unregisteredTime = iterator.next();
+        if (currentTime >
+            unregisteredTime.getLastProgress() + unregisterTimeOut) {
+          iterator.remove();
+        }
+      }
+    }
   }
 
   @VisibleForTesting

+ 48 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -17,12 +17,14 @@
 */
 package org.apache.hadoop.mapred;
 
+import com.google.common.base.Supplier;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -51,11 +54,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.After;
 import org.junit.Assert;
@@ -398,14 +403,52 @@ public class TestTaskAttemptListenerImpl {
   }
 
   @Test
-  public void testStatusUpdateFromUnregisteredTask()
-      throws IOException, InterruptedException{
+  public void testStatusUpdateFromUnregisteredTask() throws Exception {
     configureMocks();
-    startListener(false);
+    ControlledClock clock = new ControlledClock();
+    clock.setTime(0);
+    doReturn(clock).when(appCtx).getClock();
 
-    boolean taskFound = listener.statusUpdate(attemptID, firstReduceStatus);
+    final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx,
+        secret, rmHeartbeatHandler, null) {
+      @Override
+      protected void startRpcServer() {
+        // Empty
+      }
+      @Override
+      protected void stopRpcServer() {
+        // Empty
+      }
+    };
 
-    assertFalse(taskFound);
+    Configuration conf = new Configuration();
+    conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+    tal.init(conf);
+    tal.start();
+
+    assertFalse(tal.statusUpdate(attemptID, firstReduceStatus));
+    tal.registerPendingTask(task, wid);
+    tal.registerLaunchedTask(attemptId, wid);
+    assertTrue(tal.statusUpdate(attemptID, firstReduceStatus));
+
+    // verify attempt is still reported as found if recently unregistered
+    tal.unregister(attemptId, wid);
+    assertTrue(tal.statusUpdate(attemptID, firstReduceStatus));
+
+    // verify attempt is not found if not recently unregistered
+    long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
+        MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+    clock.setTime(unregisterTimeout + 1);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return !tal.statusUpdate(attemptID, firstReduceStatus);
+        } catch (Exception e) {
+          throw new RuntimeException("status update failed", e);
+        }
+      }
+    }, 10, 10000);
   }
 
   private void configureMocks() {

+ 38 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java

@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -30,10 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -105,6 +108,41 @@ public class TestTaskHeartbeatHandler {
     verifyTaskTimeoutConfig(conf, expectedTimeout);
   }
 
+  @Test
+  public void testTaskUnregistered() throws Exception {
+    EventHandler mockHandler = mock(EventHandler.class);
+    ControlledClock clock = new ControlledClock();
+    clock.setTime(0);
+    final TaskHeartbeatHandler hb =
+        new TaskHeartbeatHandler(mockHandler, clock, 1);
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
+    hb.init(conf);
+    hb.start();
+    try {
+      ApplicationId appId = ApplicationId.newInstance(0l, 5);
+      JobId jobId = MRBuilderUtils.newJobId(appId, 4);
+      TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
+      final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
+      Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
+      hb.register(taid);
+      Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
+      hb.unregister(taid);
+      Assert.assertTrue(hb.hasRecentlyUnregistered(taid));
+      long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
+          MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+      clock.setTime(unregisterTimeout + 1);
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return !hb.hasRecentlyUnregistered(taid);
+        }
+      }, 10, 10000);
+    } finally {
+      hb.stop();
+    }
+  }
+
   /**
    * Test if task timeout is set properly in response to the configuration of
    * the task progress report interval.