浏览代码

YARN-9056. Improved YARN service upgrade state logic for readiness check.
Contributed by Chandni Singh

Eric Yang 6 年之前
父节点
当前提交
fc74a3f803
共有 9 个文件被更改,包括 105 次插入42 次删除
  1. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
  2. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
  3. 41 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
  4. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
  5. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java
  6. 16 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
  7. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
  8. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
  9. 27 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java

@@ -109,6 +109,7 @@ import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
     .KILLED_AFTER_APP_COMPLETION;
 import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
 import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
 import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
     .EXIT_FALSE;
 import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
@@ -825,9 +826,8 @@ public class ServiceScheduler extends CompositeService {
         LOG.error("No component instance exists for {}", containerId);
         return;
       }
-      ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
-          containerId, ComponentInstanceEventType.BECOME_READY);
-      dispatcher.getEventHandler().handle(becomeReadyEvent);
+      dispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(containerId, START));
     }
 
     @Override

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java

@@ -177,13 +177,13 @@ public class Component implements EventHandler<ComponentEvent> {
               new NeedsUpgradeTransition())
           .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
               new NeedsUpgradeTransition())
-          .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
+          .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), CHECK_STABLE,
               new CheckStableTransition())
 
           // Cancel upgrade while previous upgrade is still in progress
           .addTransition(UPGRADING, CANCEL_UPGRADING,
               CANCEL_UPGRADE, new NeedsUpgradeTransition())
-          .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
+          .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
               CHECK_STABLE, new CheckStableTransition())
           .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
               new CompletedAfterUpgradeTransition())

+ 41 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java

@@ -131,7 +131,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       .addTransition(STARTED, INIT, STOP,
           new ContainerStoppedTransition())
       .addTransition(STARTED, READY, BECOME_READY,
-          new ContainerBecomeReadyTransition())
+          new ContainerBecomeReadyTransition(false))
 
       // FROM READY
       .addTransition(READY, STARTED, BECOME_NOT_READY,
@@ -144,16 +144,20 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       // FROM UPGRADING
       .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING),
           CANCEL_UPGRADE, new CancelUpgradeTransition())
-      .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY,
-          new ReadyAfterUpgradeTransition())
+      .addTransition(UPGRADING, EnumSet.of(REINITIALIZED), START,
+          new StartedAfterUpgradeTransition())
       .addTransition(UPGRADING, UPGRADING, STOP,
           new StoppedAfterUpgradeTransition())
 
       // FROM CANCEL_UPGRADING
-      .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY),
-          BECOME_READY, new ReadyAfterUpgradeTransition())
+      .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING,
+          REINITIALIZED), START, new StartedAfterUpgradeTransition())
       .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
           STOP, new StoppedAfterCancelUpgradeTransition())
+      .addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE,
+          new CancelledAfterReinitTransition())
+      .addTransition(REINITIALIZED, READY, BECOME_READY,
+           new ContainerBecomeReadyTransition(true))
       .installTopology();
 
   public ComponentInstance(Component component,
@@ -229,16 +233,30 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
   }
 
   private static class ContainerBecomeReadyTransition extends BaseTransition {
+    private final boolean isReinitialized;
+
+    ContainerBecomeReadyTransition(boolean isReinitialized) {
+      this.isReinitialized = isReinitialized;
+    }
+
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
       compInstance.setContainerState(ContainerState.READY);
-      compInstance.component.incContainersReady(true);
+      if (!isReinitialized) {
+        compInstance.component.incContainersReady(true);
+      } else {
+        compInstance.component.incContainersReady(false);
+        ComponentEvent checkState = new ComponentEvent(
+            compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
+        compInstance.scheduler.getDispatcher().getEventHandler().handle(
+            checkState);
+      }
       compInstance.postContainerReady();
     }
   }
 
-  private static class ReadyAfterUpgradeTransition implements
+  private static class StartedAfterUpgradeTransition implements
       MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
           ComponentInstanceState> {
 
@@ -249,7 +267,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       if (instance.pendingCancelUpgrade) {
         // cancellation of upgrade was triggered before the upgrade was
         // finished.
-        LOG.info("{} received ready but cancellation pending",
+        LOG.info("{} received started but cancellation pending",
             event.getContainerId());
         instance.upgradeInProgress.set(true);
         instance.cancelUpgrade();
@@ -258,8 +276,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       }
 
       instance.upgradeInProgress.set(false);
-      instance.setContainerState(ContainerState.READY);
-      instance.component.incContainersReady(false);
+      instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
 
       Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
           instance.component.getUpgradeStatus() :
@@ -267,12 +284,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       status.decContainersThatNeedUpgrade();
 
       instance.serviceVersion = status.getTargetVersion();
-      ComponentEvent checkState = new ComponentEvent(
-          instance.component.getName(),
-          ComponentEventType.CHECK_STABLE);
-      instance.scheduler.getDispatcher().getEventHandler().handle(checkState);
-      instance.postContainerReady();
-      return ComponentInstanceState.READY;
+      return ComponentInstanceState.REINITIALIZED;
     }
   }
 
@@ -570,6 +582,19 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  private static class CancelledAfterReinitTransition extends BaseTransition {
+    @Override
+    public void transition(ComponentInstance instance,
+        ComponentInstanceEvent event) {
+      if (instance.upgradeInProgress.compareAndSet(false, true)) {
+        instance.cancelUpgrade();
+      } else {
+        LOG.info("{} pending cancellation", event.getContainerId());
+        instance.pendingCancelUpgrade = true;
+      }
+    }
+  }
+
   private static class CancelUpgradeTransition implements
       MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
           ComponentInstanceState> {

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java

@@ -23,5 +23,6 @@ public enum ComponentInstanceState {
   STARTED,
   READY,
   UPGRADING,
-  CANCEL_UPGRADING
+  CANCEL_UPGRADING,
+  REINITIALIZED
 }

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
@@ -37,6 +38,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.REINITIALIZED;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.STARTED;
 import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_NOT_READY;
@@ -108,8 +110,9 @@ public class ServiceMonitor extends AbstractService {
         ComponentInstance instance = entry.getValue();
 
         ProbeStatus status = instance.ping();
+        ComponentInstanceState instanceState = instance.getState();
         if (status.isSuccess()) {
-          if (instance.getState() == STARTED) {
+          if (instanceState == STARTED || instanceState == REINITIALIZED) {
             LOG.info("Readiness check succeeded for {}: {}", instance
                 .getCompInstanceName(), status);
             // synchronously update the state.

+ 16 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java

@@ -306,10 +306,14 @@ public class TestServiceManager {
   private void makeAllInstancesReady(ServiceContext context)
       throws TimeoutException, InterruptedException {
     context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
-      ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
-          ComponentInstanceEventType.BECOME_READY);
-
-      context.scheduler.getDispatcher().getEventHandler().handle(event);
+      ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
+          containerId, ComponentInstanceEventType.START);
+      context.scheduler.getDispatcher().getEventHandler().handle(startEvent);
+
+      ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
+          containerId, ComponentInstanceEventType.BECOME_READY);
+      context.scheduler.getDispatcher().getEventHandler().handle(
+          becomeReadyEvent);
     }));
     GenericTestUtils.waitFor(()-> {
       for (ComponentInstance instance:
@@ -350,11 +354,17 @@ public class TestServiceManager {
     // instances of comp1 get upgraded and become ready event is triggered
     // become ready
     compInstances.forEach(instance -> {
-      ComponentInstanceEvent event = new ComponentInstanceEvent(
+      ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.START);
+      context.scheduler.getDispatcher().getEventHandler().handle(startEvent);
+
+      ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
           instance.getContainer().getId(),
           ComponentInstanceEventType.BECOME_READY);
 
-      context.scheduler.getDispatcher().getEventHandler().handle(event);
+      context.scheduler.getDispatcher().getEventHandler().handle(
+          becomeReadyEvent);
     });
 
     GenericTestUtils.waitFor(() -> {

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java

@@ -495,7 +495,8 @@ public class TestYarnNativeServices extends ServiceTestUtils {
   }
 
   // Test to verify ANTI_AFFINITY placement policy
-  // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
+  // 1. Start mini cluster
+  // with 3 NMs and scheduler placement-constraint handler
   // 2. Create an example service with 3 containers
   // 3. Verify no more than 1 container comes up in each of the 3 NMs
   // 4. Flex the component to 4 containers

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java

@@ -38,6 +38,7 @@ import org.junit.Test;
 import java.util.Iterator;
 
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
 
 /**
@@ -159,6 +160,8 @@ public class TestComponent {
 
     // reinitialization of a container done
     for(ComponentInstance instance : comp.getAllComponentInstances()) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), START));
       instance.handle(new ComponentInstanceEvent(
           instance.getContainer().getId(), BECOME_READY));
     }
@@ -199,6 +202,8 @@ public class TestComponent {
 
     // second instance finished upgrading
     ComponentInstance instance2 = iter.next();
+    instance2.handle(new ComponentInstanceEvent(
+        instance2.getContainer().getId(), ComponentInstanceEventType.START));
     instance2.handle(new ComponentInstanceEvent(
         instance2.getContainer().getId(),
         ComponentInstanceEventType.BECOME_READY));
@@ -230,6 +235,9 @@ public class TestComponent {
 
     // cancel upgrade successful for both instances
     for(ComponentInstance instance : comp.getAllComponentInstances()) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.START));
       instance.handle(new ComponentInstanceEvent(
           instance.getContainer().getId(),
           ComponentInstanceEventType.BECOME_READY));

+ 27 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java

@@ -98,7 +98,12 @@ public class TestComponentInstance {
     ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
         instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
     instance.handle(instanceEvent);
-
+    instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
+        ComponentInstanceEventType.START));
+    Assert.assertEquals("instance not running",
+        ContainerState.RUNNING_BUT_UNREADY,
+        component.getComponentSpec().getContainer(instance.getContainer()
+            .getId().toString()).getState());
     instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
         ComponentInstanceEventType.BECOME_READY));
     Assert.assertEquals("instance not ready", ContainerState.READY,
@@ -246,23 +251,33 @@ public class TestComponentInstance {
     instance.handle(cancelEvent);
 
     // either upgrade failed or successful
-    ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent(
-        instance.getContainer().getId(),
-        upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
-            ComponentInstanceEventType.STOP);
+    if (upgradeSuccessful) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), ComponentInstanceEventType.START));
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.BECOME_READY));
+    } else {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.STOP));
+    }
 
-    instance.handle(readyOrStopEvent);
     Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
         component.getComponentSpec().getContainer(instance.getContainer()
             .getId().toString()).getState());
 
     // response for cancel received
-    ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent(
-        instance.getContainer().getId(),
-        cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
-            ComponentInstanceEventType.STOP);
-
-    instance.handle(readyOrStopCancel);
+    if (cancelUpgradeSuccessful) {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), ComponentInstanceEventType.START));
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.BECOME_READY));
+    } else {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), ComponentInstanceEventType.STOP));
+    }
     if (cancelUpgradeSuccessful) {
       Assert.assertEquals("instance not ready", ContainerState.READY,
           component.getComponentSpec().getContainer(instance.getContainer()