Bläddra i källkod

YARN-5476. Non existent application reported as ACCEPTED by YarnClientImpl (Junping Du via Varun Saxena)

Varun Saxena 8 år sedan
förälder
incheckning
ca139a3f87

+ 8 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -843,8 +843,9 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.startTime = appState.getStartTime();
     this.callerContext = appState.getCallerContext();
 
-    // send the ATS create Event
-    sendATSCreateEvent(this, this.startTime);
+    // send the ATS create Event during RM recovery.
+    // NOTE: it could be duplicated with events sent before RM get restarted.
+    sendATSCreateEvent();
 
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
@@ -1043,6 +1044,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     public void transition(RMAppImpl app, RMAppEvent event) {
       app.handler.handle(new AppAddedSchedulerEvent(app.user,
           app.submissionContext, false));
+      // send the ATS create Event
+      app.sendATSCreateEvent();
     }
   }
 
@@ -1121,9 +1124,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       // communication
       LOG.info("Storing application with id " + app.applicationId);
       app.rmContext.getStateStore().storeNewApplication(app);
-
-      // send the ATS create Event
-      app.sendATSCreateEvent(app, app.startTime);
     }
   }
 
@@ -1796,9 +1796,9 @@ public class RMAppImpl implements RMApp, Recoverable {
     return callerContext;
   }
 
-  private void sendATSCreateEvent(RMApp app, long startTime) {
-    rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
-    rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
+  private void sendATSCreateEvent() {
+    rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
+    rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
   }
 
   @VisibleForTesting

+ 14 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -370,25 +371,32 @@ public class TestRMAppTransitions {
 
   protected RMApp testCreateAppNewSaving(
       ApplicationSubmissionContext submissionContext) throws IOException {
-  RMApp application = createNewTestApp(submissionContext);
+    RMApp application = createNewTestApp(submissionContext);
     // NEW => NEW_SAVING event RMAppEventType.START
     RMAppEvent event = 
         new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.NEW_SAVING, application);
+    // verify sendATSCreateEvent() is not get called during
+    // RMAppNewlySavingTransition.
+    verify(publisher, times(0)).appCreated(eq(application), anyLong());
     return application;
   }
 
   protected RMApp testCreateAppSubmittedNoRecovery(
       ApplicationSubmissionContext submissionContext) throws IOException {
-  RMApp application = testCreateAppNewSaving(submissionContext);
-    // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
+    RMApp application = testCreateAppNewSaving(submissionContext);
+    // NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED
     RMAppEvent event =
-        new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
+        new RMAppEvent(application.getApplicationId(),
+        RMAppEventType.APP_NEW_SAVED);
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
+    // verify sendATSCreateEvent() is get called during
+    // AddApplicationToSchedulerTransition.
+    verify(publisher).appCreated(eq(application), anyLong());
     return application;
   }
 
@@ -403,7 +411,6 @@ public class TestRMAppTransitions {
     RMAppEvent event =
         new RMAppRecoverEvent(application.getApplicationId(), state);
 
-
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
@@ -413,7 +420,7 @@ public class TestRMAppTransitions {
   protected RMApp testCreateAppAccepted(
       ApplicationSubmissionContext submissionContext) throws IOException {
     RMApp application = testCreateAppSubmittedNoRecovery(submissionContext);
-  // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
+    // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
     RMAppEvent event = 
         new RMAppEvent(application.getApplicationId(), 
             RMAppEventType.APP_ACCEPTED);
@@ -425,7 +432,7 @@ public class TestRMAppTransitions {
 
   protected RMApp testCreateAppRunning(
       ApplicationSubmissionContext submissionContext) throws IOException {
-  RMApp application = testCreateAppAccepted(submissionContext);
+    RMApp application = testCreateAppAccepted(submissionContext);
     // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED
     RMAppEvent event = 
         new RMAppEvent(application.getApplicationId(),