Browse Source

YARN-3387. Previous AM's container completed status couldn't pass to current AM if AM and RM restarted during the same time. Contributed by Sandflee

Jian He 10 năm trước cách đây
mục cha
commit
d03dcb9635

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -262,6 +262,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3516. killing ContainerLocalizer action doesn't take effect when
     private localizer receives FETCH_FAILURE status.(zhihai xu via xgong)
 
+    YARN-3387. Previous AM's container completed status couldn't pass to current
+    AM if AM and RM restarted during the same time. (sandflee via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1273,7 +1273,7 @@ public class RMAppImpl implements RMApp, Recoverable {
         // finished containers so that they can be acked to NM,
         // but when pulling finished container we will check this flag again.
         ((RMAppAttemptImpl) app.currentAttempt)
-          .transferStateFromPreviousAttempt(oldAttempt);
+          .transferStateFromAttempt(oldAttempt);
         return initialState;
       } else {
         if (numberOfFailure >= app.maxAppAttempts) {

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -845,7 +845,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
   }
 
-  public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
+  public void transferStateFromAttempt(RMAppAttempt attempt) {
     this.justFinishedContainers = attempt.getJustFinishedContainersReference();
     this.finishedContainersSentToAM =
         attempt.getFinishedContainersSentToAMReference();
@@ -1044,6 +1044,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         appAttempt.progress = 1.0f;
         RMApp rmApp =appAttempt.rmContext.getRMApps().get(
             appAttempt.getAppAttemptId().getApplicationId());
+
+        if (appAttempt.submissionContext
+            .getKeepContainersAcrossApplicationAttempts()
+            && !appAttempt.submissionContext.getUnmanagedAM()
+            && rmApp.getCurrentAppAttempt() != appAttempt) {
+          appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
+        }
         // We will replay the final attempt only if last attempt is in final
         // state but application is not in final state.
         if (rmApp.getCurrentAppAttempt() == appAttempt

+ 60 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -1037,4 +1037,64 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     rm2.start();
   }
+
+  @Test(timeout = 20000)
+  public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // submit app with keepContainersAcrossApplicationAttempts true
+    RMApp app0 = rm1.submitApp(200, "", UserGroupInformation.getCurrentUser()
+        .getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, 
+        null, null, true, true, false, null, 0, null, true);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>());
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+
+    // am failed,and relaunch it
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    // rm failover
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    // container launched by first am completed
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer=
+        TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 2,
+          ContainerState.COMPLETE);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 3,
+          ContainerState.RUNNING);
+    nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+        completedContainer), null);
+    Thread.sleep(200);
+
+    // check whether current am could get containerCompleteMsg
+    RMApp recoveredApp0 =
+        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    RMAppAttempt loadedAttempt1 = recoveredApp0.getCurrentAppAttempt();
+    assertEquals(1,loadedAttempt1.getJustFinishedContainers().size());
+  }
+
 }