Browse Source

Revert "MAPREDUCE-7053: Timed out tasks can fail to produce thread dump. Contributed by Jason Lowe."

This reverts commit a881a89f02434d8ada2ea6784cfd90de67fcc7bd.
Eric Payne 7 năm trước cách đây
mục cha
commit
6b23e5dc24

+ 5 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -369,22 +369,17 @@ public class TaskAttemptListenerImpl extends CompositeService
         TypeConverter.toYarn(taskAttemptID);
 
     AMFeedback feedback = new AMFeedback();
-    feedback.setTaskFound(true);
-
     AtomicReference<TaskAttemptStatus> lastStatusRef =
         attemptIdToStatus.get(yarnAttemptID);
     if (lastStatusRef == null) {
-      // The task is not known, but it could be in the process of tearing
-      // down gracefully or receiving a thread dump signal. Tolerate unknown
-      // tasks as long as they have unregistered recently.
-      if (!taskHeartbeatHandler.hasRecentlyUnregistered(yarnAttemptID)) {
-        LOG.error("Status update was called with illegal TaskAttemptId: "
-            + yarnAttemptID);
-        feedback.setTaskFound(false);
-      }
+      LOG.error("Status update was called with illegal TaskAttemptId: "
+          + yarnAttemptID);
+      feedback.setTaskFound(false);
       return feedback;
     }
 
+    feedback.setTaskFound(true);
+
     // Propagating preemption to the task if TASK_PREEMPTION is enabled
     if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
         && preemptionPolicy.isPreempted(yarnAttemptID)) {

+ 20 - 47
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java

@@ -71,14 +71,12 @@ public class TaskHeartbeatHandler extends AbstractService {
   private Thread lostTaskCheckerThread;
   private volatile boolean stopped;
   private long taskTimeOut;
-  private long unregisterTimeOut;
   private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
 
   private final EventHandler eventHandler;
   private final Clock clock;
   
   private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
-  private ConcurrentMap<TaskAttemptId, ReportTime> recentlyUnregisteredAttempts;
 
   public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
       int numThreads) {
@@ -87,8 +85,6 @@ public class TaskHeartbeatHandler extends AbstractService {
     this.clock = clock;
     runningAttempts =
       new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
-    recentlyUnregisteredAttempts =
-        new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
   }
 
   @Override
@@ -96,8 +92,6 @@ public class TaskHeartbeatHandler extends AbstractService {
     super.serviceInit(conf);
     taskTimeOut = conf.getLong(
         MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
-    unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
-        MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
 
     // enforce task timeout is at least twice as long as task report interval
     long taskProgressReportIntervalMillis = MRJobConfUtil.
@@ -146,12 +140,6 @@ public class TaskHeartbeatHandler extends AbstractService {
 
   public void unregister(TaskAttemptId attemptID) {
     runningAttempts.remove(attemptID);
-    recentlyUnregisteredAttempts.put(attemptID,
-        new ReportTime(clock.getTime()));
-  }
-
-  public boolean hasRecentlyUnregistered(TaskAttemptId attemptID) {
-    return recentlyUnregisteredAttempts.containsKey(attemptID);
   }
 
   private class PingChecker implements Runnable {
@@ -159,9 +147,27 @@ public class TaskHeartbeatHandler extends AbstractService {
     @Override
     public void run() {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
+        Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
+            runningAttempts.entrySet().iterator();
+
+        // avoid calculating current time everytime in loop
         long currentTime = clock.getTime();
-        checkRunning(currentTime);
-        checkRecentlyUnregistered(currentTime);
+
+        while (iterator.hasNext()) {
+          Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
+          boolean taskTimedOut = (taskTimeOut > 0) &&
+              (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
+           
+          if(taskTimedOut) {
+            // task is lost, remove from the list and raise lost event
+            iterator.remove();
+            eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+                .getKey(), "AttemptID:" + entry.getKey().toString()
+                + " Timed out after " + taskTimeOut / 1000 + " secs"));
+            eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+                TaskAttemptEventType.TA_TIMED_OUT));
+          }
+        }
         try {
           Thread.sleep(taskTimeOutCheckInterval);
         } catch (InterruptedException e) {
@@ -170,39 +176,6 @@ public class TaskHeartbeatHandler extends AbstractService {
         }
       }
     }
-
-    private void checkRunning(long currentTime) {
-      Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
-          runningAttempts.entrySet().iterator();
-
-      while (iterator.hasNext()) {
-        Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
-        boolean taskTimedOut = (taskTimeOut > 0) &&
-            (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
-
-        if(taskTimedOut) {
-          // task is lost, remove from the list and raise lost event
-          iterator.remove();
-          eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
-              .getKey(), "AttemptID:" + entry.getKey().toString()
-              + " Timed out after " + taskTimeOut / 1000 + " secs"));
-          eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
-              TaskAttemptEventType.TA_TIMED_OUT));
-        }
-      }
-    }
-
-    private void checkRecentlyUnregistered(long currentTime) {
-      Iterator<ReportTime> iterator =
-          recentlyUnregisteredAttempts.values().iterator();
-      while (iterator.hasNext()) {
-        ReportTime unregisteredTime = iterator.next();
-        if (currentTime >
-            unregisteredTime.getLastProgress() + unregisterTimeOut) {
-          iterator.remove();
-        }
-      }
-    }
   }
 
   @VisibleForTesting

+ 4 - 50
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -17,7 +17,6 @@
 */
 package org.apache.hadoop.mapred;
 
-import com.google.common.base.Supplier;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 
@@ -52,13 +51,11 @@ import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.After;
 import org.junit.Test;
@@ -491,57 +488,14 @@ public class TestTaskAttemptListenerImpl {
   }
 
   @Test
-  public void testStatusUpdateFromUnregisteredTask() throws Exception {
+  public void testStatusUpdateFromUnregisteredTask()
+      throws IOException, InterruptedException{
     configureMocks();
-    ControlledClock clock = new ControlledClock();
-    clock.setTime(0);
-    doReturn(clock).when(appCtx).getClock();
-
-    final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx,
-        secret, rmHeartbeatHandler, policy) {
-      @Override
-      protected void startRpcServer() {
-        // Empty
-      }
-      @Override
-      protected void stopRpcServer() {
-        // Empty
-      }
-    };
+    startListener(false);
 
-    Configuration conf = new Configuration();
-    conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
-    tal.init(conf);
-    tal.start();
+    AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus);
 
-    AMFeedback feedback = tal.statusUpdate(attemptID, firstReduceStatus);
     assertFalse(feedback.getTaskFound());
-    tal.registerPendingTask(task, wid);
-    tal.registerLaunchedTask(attemptId, wid);
-    feedback = tal.statusUpdate(attemptID, firstReduceStatus);
-    assertTrue(feedback.getTaskFound());
-
-    // verify attempt is still reported as found if recently unregistered
-    tal.unregister(attemptId, wid);
-    feedback = tal.statusUpdate(attemptID, firstReduceStatus);
-    assertTrue(feedback.getTaskFound());
-
-    // verify attempt is not found if not recently unregistered
-    long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
-        MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
-    clock.setTime(unregisterTimeout + 1);
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        try {
-          AMFeedback response =
-              tal.statusUpdate(attemptID, firstReduceStatus);
-          return !response.getTaskFound();
-        } catch (Exception e) {
-          throw new RuntimeException("status update failed", e);
-        }
-      }
-    }, 10, 10000);
   }
 
   private void configureMocks() {

+ 0 - 38
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java

@@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -31,12 +30,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -108,41 +105,6 @@ public class TestTaskHeartbeatHandler {
     verifyTaskTimeoutConfig(conf, expectedTimeout);
   }
 
-  @Test
-  public void testTaskUnregistered() throws Exception {
-    EventHandler mockHandler = mock(EventHandler.class);
-    ControlledClock clock = new ControlledClock();
-    clock.setTime(0);
-    final TaskHeartbeatHandler hb =
-        new TaskHeartbeatHandler(mockHandler, clock, 1);
-    Configuration conf = new Configuration();
-    conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
-    hb.init(conf);
-    hb.start();
-    try {
-      ApplicationId appId = ApplicationId.newInstance(0l, 5);
-      JobId jobId = MRBuilderUtils.newJobId(appId, 4);
-      TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
-      final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
-      Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
-      hb.register(taid);
-      Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
-      hb.unregister(taid);
-      Assert.assertTrue(hb.hasRecentlyUnregistered(taid));
-      long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
-          MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
-      clock.setTime(unregisterTimeout + 1);
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          return !hb.hasRecentlyUnregistered(taid);
-        }
-      }, 10, 10000);
-    } finally {
-      hb.stop();
-    }
-  }
-
   /**
    * Test if task timeout is set properly in response to the configuration of
    * the task progress report interval.