Browse Source

YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user kill request races with ApplicationMaster finish. Contributed by Jian He.

(cherry picked from commit 3651fe1b089851b38be351c00a9899817166bf3e)

Conflicts:
	hadoop-yarn-project/CHANGES.txt
Vinod Kumar Vavilapalli 10 years ago
parent
commit
bf4d080100

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -348,6 +348,7 @@ public class ApplicationMasterService extends AbstractService implements
     // ApplicationDoesNotExistInCacheException before and after
     // RM work-preserving restart.
     if (rmApp.isAppFinalStateStored()) {
+      LOG.info(rmApp.getApplicationId() + " unregistered successfully. ");
       return FinishApplicationMasterResponse.newInstance(true);
     }
 

+ 21 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -293,13 +293,23 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
           new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+        RMAppEventType.ATTEMPT_UNREGISTERED,
+        new FinalSavingTransition(
+          new AttemptUnregisteredTransition(),
+          RMAppState.FINISHING, RMAppState.FINISHED))
+    .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
+      // UnManagedAM directly jumps to finished
+        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.KILLING,
+        EnumSet.of(RMAppState.FINAL_SAVING),
+        RMAppEventType.ATTEMPT_FAILED,
+        new AttemptFailedTransition(RMAppState.KILLING))
+
     .addTransition(RMAppState.KILLING, RMAppState.KILLING,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_REGISTERED,
-            RMAppEventType.ATTEMPT_UNREGISTERED,
-            RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.ATTEMPT_FAILED,
             RMAppEventType.APP_UPDATE_SAVED,
             RMAppEventType.KILL, RMAppEventType.MOVE))
 
@@ -1199,6 +1209,14 @@ public class RMAppImpl implements RMApp, Recoverable {
           + app.maxAppAttempts);
       if (!app.submissionContext.getUnmanagedAM()
           && numberOfFailure < app.maxAppAttempts) {
+        if (initialState.equals(RMAppState.KILLING)) {
+          // If this is not last attempt, app should be killed instead of
+          // launching a new attempt
+          app.rememberTargetTransitionsAndStoreState(event,
+            new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
+          return RMAppState.FINAL_SAVING;
+        }
+
         boolean transferStateFromPreviousAttempt;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =

+ 111 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import org.junit.Before;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
@@ -37,16 +38,19 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -57,6 +61,8 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -73,6 +79,8 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRM extends ParameterizedSchedulerTestBase {
@@ -638,4 +646,107 @@ public class TestRM extends ParameterizedSchedulerTestBase {
     Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
   }
 
+  // Test Kill an app while the app is finishing in the meanwhile.
+  @Test (timeout = 30000)
+  public void testKillFinishingApp() throws Exception{
+
+    // this dispatcher ignores RMAppAttemptEventType.KILL event
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppAttemptEvent) {
+              if (((RMAppAttemptEvent) argument).getType().equals(
+                RMAppAttemptEventType.KILL)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+
+    MockRM rm1 = new MockRM(conf){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    rm1.killApp(app1.getApplicationId());
+
+    FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "", "");
+    am1.unregisterAppAttempt(req,true);
+
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.FINISHED);
+  }
+
+  // Test Kill an app while the app is failing
+  @Test (timeout = 30000)
+  public void testKillFailingApp() throws Exception{
+
+    // this dispatcher ignores RMAppAttemptEventType.KILL event
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppAttemptEvent) {
+              if (((RMAppAttemptEvent) argument).getType().equals(
+                RMAppAttemptEventType.KILL)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+
+    MockRM rm1 = new MockRM(conf){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    rm1.killApp(app1.getApplicationId());
+
+    // fail the app by sending container_finished event.
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    // app is killed, not launching a new attempt
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+  }
 }

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -726,12 +726,6 @@ public class TestRMAppTransitions {
     application.handle(event);
     rmDispatcher.await();
 
-    // Ignore Attempt_Finished if we were supposed to go to Finished.
-    assertAppState(RMAppState.KILLING, application);
-    RMAppEvent finishEvent =
-        new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
-    application.handle(finishEvent);
-    assertAppState(RMAppState.KILLING, application);
     sendAttemptUpdateSavedEvent(application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);