Explorar o código

Configurable timeout between YARNRunner terminate the application and forcefully kill. Contributed by Eric Payne.
(cherry picked from commit d39bc903a0069a740744bafe10e506e452ed7018)
(cherry picked from commit dbcdcb0d3ccc67db12104137d31cfc01cf6825ce)

Junping Du %!s(int64=10) %!d(string=hai) anos
pai
achega
9d1f67f2f2

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -78,6 +78,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own 
     class. (Chris Trezzo via kasha)
 
+    MAPREDUCE-6263. Configurable timeout between YARNRunner terminate the 
+    application and forcefully kill. (Eric Payne via junping_du)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -633,6 +633,11 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
       50;
 
+  public static final String MR_AM_HARD_KILL_TIMEOUT_MS =
+      MR_AM_PREFIX + "hard-kill-timeout-ms";
+  public static final long DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS =
+      10 * 1000l;
+
   /**
    * The threshold in terms of seconds after which an unsatisfied mapper request
    * triggers reducer preemption to free space. Default 0 implies that the reduces

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1782,6 +1782,14 @@
   </description>
 </property>
 
+<property>
+  <name>yarn.app.mapreduce.am.hard-kill-timeout-ms</name>
+  <value>10000</value>
+  <description>
+     Number of milliseconds to wait before the job client kills the application.
+  </description>
+</property>
+
 <property>
   <description>CLASSPATH for MR applications. A comma-separated list
   of CLASSPATH entries. If mapreduce.application.framework is set then this

+ 4 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -640,7 +640,10 @@ public class YARNRunner implements ClientProtocol {
       clientCache.getClient(arg0).killJob(arg0);
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L)
+      long killTimeOut =
+          conf.getLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
+                       MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS);
+      while ((currentTimeMillis < timeKillIssued + killTimeOut)
           && !isJobInTerminalState(status)) {
         try {
           Thread.sleep(1000L);

+ 26 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

@@ -201,6 +201,32 @@ public class TestYARNRunner extends TestCase {
     verify(clientDelegate).killJob(jobId);
   }
 
+  @Test(timeout=60000)
+  public void testJobKillTimeout() throws Exception {
+    long timeToWaitBeforeHardKill =
+        10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS;
+    conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
+        timeToWaitBeforeHardKill);
+    clientDelegate = mock(ClientServiceDelegate.class);
+    doAnswer(
+        new Answer<ClientServiceDelegate>() {
+          @Override
+          public ClientServiceDelegate answer(InvocationOnMock invocation)
+              throws Throwable {
+            return clientDelegate;
+          }
+        }
+      ).when(clientCache).getClient(any(JobID.class));
+    when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
+        org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
+            State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
+    long startTimeMillis = System.currentTimeMillis();
+    yarnRunner.killJob(jobId);
+    assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill
+        + " ms.", System.currentTimeMillis() - startTimeMillis
+                  >= timeToWaitBeforeHardKill);
+  }
+
   @Test(timeout=20000)
   public void testJobSubmissionFailure() throws Exception {
     when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).