Browse Source

MAPREDUCE-7369. Fixed MapReduce tasks timing out when spends more time on MultipleOutputs#close (#4247)

Contributed by Ravuri Sushma sree.

Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
Ashutosh Gupta 2 năm trước cách đây
mục cha
commit
36c4be819f

+ 10 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -50,8 +54,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -61,10 +65,6 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.VisibleForTesting;
 
 /**
  * This class is responsible for talking to the task umblical.
@@ -409,6 +409,11 @@ public class TaskAttemptListenerImpl extends CompositeService
       if (LOG.isDebugEnabled()) {
         LOG.debug("Ping from " + taskAttemptID.toString());
       }
+      // Consider ping from the tasks for liveliness check
+      if (getConfig().getBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK,
+          MRJobConfig.DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK)) {
+        taskHeartbeatHandler.progressing(yarnAttemptID);
+      }
       return feedback;
     }
 

+ 43 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -17,23 +17,30 @@
 */
 package org.apache.hadoop.mapred;
 
-import java.util.function.Supplier;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -48,9 +55,9 @@ import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
-import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -60,17 +67,22 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.junit.After;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the behavior of TaskAttemptListenerImpl.
@@ -417,6 +429,23 @@ public class TestTaskAttemptListenerImpl {
     verify(hbHandler).progressing(eq(attemptId));
   }
 
+  @Test
+  public void testPingUpdateProgress() throws IOException, InterruptedException {
+    configureMocks();
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK, true);
+    listener.init(conf);
+    listener.start();
+    listener.registerPendingTask(task, wid);
+    listener.registerLaunchedTask(attemptId, wid);
+    verify(hbHandler).register(attemptId);
+
+    // make sure a ping does report progress
+    AMFeedback feedback = listener.statusUpdate(attemptID, null);
+    assertTrue(feedback.getTaskFound());
+    verify(hbHandler, times(1)).progressing(eq(attemptId));
+  }
+
   @Test
   public void testSingleStatusUpdate()
       throws IOException, InterruptedException {

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -919,6 +919,13 @@ public interface MRJobConfig {
     MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
   public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
 
+  /** Whether to consider ping from tasks in liveliness check. */
+  String MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK =
+      "mapreduce.task.ping-for-liveliness-check.enabled";
+  boolean DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK
+      = false;
+
+
   /**
    * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
    * milliseconds before aborting. During this interval, AM will still try

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -286,6 +286,13 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.task.ping-for-liveliness-check.enabled</name>
+  <value>false</value>
+  <description>Whether to consider ping from tasks in liveliness check.
+  </description>
+</property>
+
 <property>
   <name>mapreduce.map.memory.mb</name>
   <value>-1</value>