Pārlūkot izejas kodu

YARN-9438. launchTime not written to state store for running applications

Jonathan Hung 5 gadi atpakaļ
vecāks
revīzija
8ef46595da

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -722,6 +722,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
               app.getStartTime(), app.getApplicationSubmissionContext(),
               app.getUser(), app.getCallerContext());
       appState.setApplicationTimeouts(currentExpireTimeouts);
+      appState.setLaunchTime(app.getLaunchTime());
 
       // update to state store. Though it synchronous call, update via future to
       // know any exception has been set. It is required because in non-HA mode,
@@ -847,6 +848,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         app.getApplicationSubmissionContext(), app.getUser(),
         app.getCallerContext());
     appState.setApplicationTimeouts(app.getApplicationTimeouts());
+    appState.setLaunchTime(app.getLaunchTime());
     rmContext.getStateStore().updateApplicationStateSynchronously(appState,
         false, future);
 

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1032,6 +1032,12 @@ public class RMAppImpl implements RMApp, Recoverable {
                 app.getApplicationId()+", attemptId: "+
                 app.getCurrentAppAttempt().getAppAttemptId()+
                 "launchTime: "+event.getTimestamp());
+        ApplicationStateData appState = ApplicationStateData.newInstance(
+            app.submitTime, app.startTime, app.submissionContext, app.user,
+            app.callerContext);
+        appState.setApplicationTimeouts(app.getApplicationTimeouts());
+        appState.setLaunchTime(event.getTimestamp());
+        app.rmContext.getStateStore().updateApplicationState(appState);
         app.launchTime = event.getTimestamp();
         app.rmContext.getSystemMetricsPublisher().appLaunched(
             app, app.launchTime);

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2675,6 +2675,7 @@ public class CapacityScheduler extends
           rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
           rmApp.getCallerContext());
       appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
+      appState.setLaunchTime(rmApp.getLaunchTime());
       rmContext.getStateStore().updateApplicationStateSynchronously(appState,
           false, future);
 

+ 11 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -754,13 +754,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       @Override
       public void updateApplicationStateInternal(ApplicationId appId,
           ApplicationStateData appStateData) throws Exception {
-        if (count == 0) {
-          // do nothing; simulate app final state is not saved.
+        if (count == 1) {
+          // Application state is updated on attempt launch.
+          // After that, do nothing; simulate app final state is not saved.
           LOG.info(appId + " final state is not saved.");
-          count++;
         } else {
           super.updateApplicationStateInternal(appId, appStateData);
         }
+        count++;
       }
     };
     memStore.init(conf);
@@ -774,7 +775,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
     RMApp app0 = rm1.submitApp(200);
     MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
-
     FinishApplicationMasterRequest req =
         FinishApplicationMasterRequest.newInstance(
           FinalApplicationStatus.SUCCEEDED, "", "");
@@ -1798,8 +1798,11 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
     rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
     rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
-    Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt);
-    Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
+    // count = 1 on storing RMApp launchTime
+    // count = 2 on storing attempt state on kill
+    // count = 3 on storing app state on kill
+    Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateAttempt);
+    Assert.assertEquals(3, ((TestMemoryRMStateStore) memStore).updateApp);
   }
 
   // Test Application that fails on submission is saved in state store.
@@ -2548,8 +2551,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
     // create an app and finish the app.
     RMApp app0 = rm1.submitApp(200);
-    ApplicationStateData app0State = memStore.getState().getApplicationState()
-        .get(app0.getApplicationId());
 
     MockAM am0 = launchAndFailAM(app0, rm1, nm1);
     MockAM am1 = launchAndFailAM(app0, rm1, nm1);
@@ -2558,6 +2559,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
     // am1 is missed from MemoryRMStateStore
     memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
+    ApplicationStateData app0State = memStore.getState().getApplicationState()
+        .get(app0.getApplicationId());
     ApplicationAttemptStateData am2State = app0State.getAttempt(
         am2.getApplicationAttemptId());
     // am2's state is not consistent: MemoryRMStateStore just saved its initial

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -483,6 +483,13 @@ public class TestRMAppTransitions {
         any(ApplicationStateData.class));
   }
 
+  private void assertAppStateLaunchTimeSaved(long expectedLaunchTime) {
+    ArgumentCaptor<ApplicationStateData> state =
+        ArgumentCaptor.forClass(ApplicationStateData.class);
+    verify(store, times(1)).updateApplicationState(state.capture());
+    assertEquals(expectedLaunchTime, state.getValue().getLaunchTime());
+  }
+
   private void assertKilled(RMApp application) {
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
@@ -932,6 +939,21 @@ public class TestRMAppTransitions {
     verifyRMAppFieldsForFinalTransitions(application);
   }
 
+  @Test
+  public void testAppAcceptedAccepted() throws IOException {
+    LOG.info("--- START: testAppAcceptedAccepted ---");
+
+    RMApp application = testCreateAppAccepted(null);
+    // ACCEPTED => ACCEPTED event RMAppEventType.ATTEMPT_LAUNCHED
+    RMAppEvent appAttemptLaunched =
+        new RMAppEvent(application.getApplicationId(),
+            RMAppEventType.ATTEMPT_LAUNCHED, 1234L);
+    application.handle(appAttemptLaunched);
+    rmDispatcher.await();
+    assertAppState(RMAppState.ACCEPTED, application);
+    assertAppStateLaunchTimeSaved(1234L);
+  }
+
   @Test
   public void testAppAcceptedAttemptKilled() throws IOException,
       InterruptedException {