Selaa lähdekoodia

YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user kill request races with ApplicationMaster finish. Contributed by Jian He.

(cherry picked from commit 3651fe1b089851b38be351c00a9899817166bf3e)
Vinod Kumar Vavilapalli 10 vuotta sitten
vanhempi
commit
ed3e5cb164

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -60,6 +60,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2603. ApplicationConstants missing HADOOP_MAPRED_HOME (Ray Chiang via
     aw)
 
+    YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user
+    kill request races with ApplicationMaster finish. (Jian He via vinodkv)
+
 Release 2.6.0 - 2014-11-15
 
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -348,6 +348,7 @@ public class ApplicationMasterService extends AbstractService implements
     // ApplicationDoesNotExistInCacheException before and after
     // RM work-preserving restart.
     if (rmApp.isAppFinalStateStored()) {
+      LOG.info(rmApp.getApplicationId() + " unregistered successfully. ");
       return FinishApplicationMasterResponse.newInstance(true);
     }
 

+ 21 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -293,13 +293,23 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
           new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_UNREGISTERED,
+        new FinalSavingTransition(
+          new AttemptUnregisteredTransition(),
+          RMAppState.FINISHING, RMAppState.FINISHED))
+    .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
+      // UnManagedAM directly jumps to finished
+        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.KILLING,
+        EnumSet.of(RMAppState.FINAL_SAVING),
+        RMAppEventType.ATTEMPT_FAILED,
+        new AttemptFailedTransition(RMAppState.KILLING))
+
     .addTransition(RMAppState.KILLING, RMAppState.KILLING,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_REGISTERED,
-            RMAppEventType.ATTEMPT_UNREGISTERED,
-            RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.ATTEMPT_FAILED,
             RMAppEventType.APP_UPDATE_SAVED,
             RMAppEventType.KILL, RMAppEventType.MOVE))
 
@@ -1199,6 +1209,14 @@ public class RMAppImpl implements RMApp, Recoverable {
           + app.maxAppAttempts);
       if (!app.submissionContext.getUnmanagedAM()
           && numberOfFailure < app.maxAppAttempts) {
+        if (initialState.equals(RMAppState.KILLING)) {
+          // If this is not last attempt, app should be killed instead of
+          // launching a new attempt
+          app.rememberTargetTransitionsAndStoreState(event,
+            new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
+          return RMAppState.FINAL_SAVING;
+        }
+
         boolean transferStateFromPreviousAttempt;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =

+ 111 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import org.junit.Before;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
@@ -37,16 +38,19 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -57,6 +61,8 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -73,6 +79,8 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRM extends ParameterizedSchedulerTestBase {
@@ -638,4 +646,107 @@ public class TestRM extends ParameterizedSchedulerTestBase {
     Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
   }
 
+  // Test Kill an app while the app is finishing in the meanwhile.
+  @Test (timeout = 30000)
+  public void testKillFinishingApp() throws Exception{
+
+    // this dispatcher ignores RMAppAttemptEventType.KILL event
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppAttemptEvent) {
+              if (((RMAppAttemptEvent) argument).getType().equals(
+                RMAppAttemptEventType.KILL)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+
+    MockRM rm1 = new MockRM(conf){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    rm1.killApp(app1.getApplicationId());
+
+    FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "", "");
+    am1.unregisterAppAttempt(req,true);
+
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.FINISHED);
+  }
+
+  // Test Kill an app while the app is failing
+  @Test (timeout = 30000)
+  public void testKillFailingApp() throws Exception{
+
+    // this dispatcher ignores RMAppAttemptEventType.KILL event
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppAttemptEvent) {
+              if (((RMAppAttemptEvent) argument).getType().equals(
+                RMAppAttemptEventType.KILL)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+
+    MockRM rm1 = new MockRM(conf){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    rm1.killApp(app1.getApplicationId());
+
+    // fail the app by sending container_finished event.
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    // app is killed, not launching a new attempt
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+  }
 }

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -726,12 +726,6 @@ public class TestRMAppTransitions {
     application.handle(event);
     rmDispatcher.await();
 
-    // Ignore Attempt_Finished if we were supposed to go to Finished.
-    assertAppState(RMAppState.KILLING, application);
-    RMAppEvent finishEvent =
-        new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
-    application.handle(finishEvent);
-    assertAppState(RMAppState.KILLING, application);
     sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);