|
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
import static org.apache.hadoop.yarn.service.api.records.Component
|
|
import static org.apache.hadoop.yarn.service.api.records.Component
|
|
@@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
|
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
|
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
|
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
|
|
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
|
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
|
|
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
|
|
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
|
|
import org.apache.hadoop.yarn.service.ServiceContext;
|
|
import org.apache.hadoop.yarn.service.ServiceContext;
|
|
@@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
|
|
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
|
|
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
|
|
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
|
|
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
|
|
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
|
|
|
|
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE;
|
|
|
|
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE;
|
|
import static org.apache.hadoop.yarn.service.component.ComponentState.*;
|
|
import static org.apache.hadoop.yarn.service.component.ComponentState.*;
|
|
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
|
|
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
|
|
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
|
|
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
|
|
@@ -126,9 +126,8 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
new ConcurrentHashMap<>();
|
|
new ConcurrentHashMap<>();
|
|
private boolean healthThresholdMonitorEnabled = false;
|
|
private boolean healthThresholdMonitorEnabled = false;
|
|
|
|
|
|
- private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
|
|
|
|
- private ComponentEvent upgradeEvent;
|
|
|
|
- private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
|
|
|
|
|
|
+ private UpgradeStatus upgradeStatus = new UpgradeStatus();
|
|
|
|
+ private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus();
|
|
|
|
|
|
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
|
|
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
|
|
stateMachine;
|
|
stateMachine;
|
|
@@ -160,6 +159,8 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
// Flex while previous flex is still in progress
|
|
// Flex while previous flex is still in progress
|
|
.addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
|
|
.addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
|
|
new FlexComponentTransition())
|
|
new FlexComponentTransition())
|
|
|
|
+ .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
|
|
|
|
+ CHECK_STABLE, new CheckStableTransition())
|
|
|
|
|
|
// container failed while stable
|
|
// container failed while stable
|
|
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
|
|
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
|
|
@@ -172,19 +173,28 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
// For flex down, go to STABLE state
|
|
// For flex down, go to STABLE state
|
|
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
|
|
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
|
|
FLEX, new FlexComponentTransition())
|
|
FLEX, new FlexComponentTransition())
|
|
- .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
|
|
|
|
- new ComponentNeedsUpgradeTransition())
|
|
|
|
- //Upgrade while previous upgrade is still in progress
|
|
|
|
- .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
|
|
|
|
- new ComponentNeedsUpgradeTransition())
|
|
|
|
- .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
|
|
|
|
- CHECK_STABLE, new CheckStableTransition())
|
|
|
|
- .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
|
|
|
|
- CHECK_STABLE, new CheckStableTransition())
|
|
|
|
|
|
+ .addTransition(STABLE, UPGRADING, UPGRADE,
|
|
|
|
+ new NeedsUpgradeTransition())
|
|
|
|
+ .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
|
|
|
|
+ new NeedsUpgradeTransition())
|
|
.addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
|
|
.addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
|
|
new CheckStableTransition())
|
|
new CheckStableTransition())
|
|
- .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
|
|
|
|
- new ContainerCompletedTransition())
|
|
|
|
|
|
+
|
|
|
|
+ // Cancel upgrade while previous upgrade is still in progress
|
|
|
|
+ .addTransition(UPGRADING, CANCEL_UPGRADING,
|
|
|
|
+ CANCEL_UPGRADE, new NeedsUpgradeTransition())
|
|
|
|
+ .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
|
|
|
|
+ CHECK_STABLE, new CheckStableTransition())
|
|
|
|
+ .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
|
|
|
|
+ new CompletedAfterUpgradeTransition())
|
|
|
|
+
|
|
|
|
+ .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
|
|
|
|
+ STABLE), CHECK_STABLE, new CheckStableTransition())
|
|
|
|
+ .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING,
|
|
|
|
+ CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
|
|
|
|
+ .addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
|
|
|
|
+ new ContainerAllocatedTransition())
|
|
|
|
+
|
|
.installTopology();
|
|
.installTopology();
|
|
|
|
|
|
public Component(
|
|
public Component(
|
|
@@ -332,7 +342,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
+ before + " to " + event.getDesired());
|
|
+ before + " to " + event.getDesired());
|
|
component.requestContainers(delta);
|
|
component.requestContainers(delta);
|
|
component.createNumCompInstances(delta);
|
|
component.createNumCompInstances(delta);
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
|
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
|
return FLEXING;
|
|
return FLEXING;
|
|
@@ -430,11 +440,11 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
if (component.getNumRunningInstances() + component
|
|
if (component.getNumRunningInstances() + component
|
|
.getNumSucceededInstances() + component.getNumFailedInstances()
|
|
.getNumSucceededInstances() + component.getNumFailedInstances()
|
|
< component.getComponentSpec().getNumberOfContainers()) {
|
|
< component.getComponentSpec().getNumberOfContainers()) {
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
return FLEXING;
|
|
return FLEXING;
|
|
} else{
|
|
} else{
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
return STABLE;
|
|
return STABLE;
|
|
}
|
|
}
|
|
@@ -444,22 +454,22 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
Component component) {
|
|
Component component) {
|
|
// if desired == running
|
|
// if desired == running
|
|
if (component.componentMetrics.containersReady.value() == component
|
|
if (component.componentMetrics.containersReady.value() == component
|
|
- .getComponentSpec().getNumberOfContainers()
|
|
|
|
- && component.numContainersThatNeedUpgrade.get() == 0) {
|
|
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ .getComponentSpec().getNumberOfContainers() &&
|
|
|
|
+ !component.doesNeedUpgrade()) {
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
return STABLE;
|
|
return STABLE;
|
|
|
|
+ } else if (component.doesNeedUpgrade()) {
|
|
|
|
+ component.setComponentState(org.apache.hadoop.yarn.service.api.records.
|
|
|
|
+ ComponentState.NEEDS_UPGRADE);
|
|
|
|
+ return component.getState();
|
|
} else if (component.componentMetrics.containersReady.value() != component
|
|
} else if (component.componentMetrics.containersReady.value() != component
|
|
.getComponentSpec().getNumberOfContainers()) {
|
|
.getComponentSpec().getNumberOfContainers()) {
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
return FLEXING;
|
|
return FLEXING;
|
|
- } else {
|
|
|
|
- // component.numContainersThatNeedUpgrade.get() > 0
|
|
|
|
- component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
|
|
|
|
- records.ComponentState.NEEDS_UPGRADE);
|
|
|
|
- return UPGRADING;
|
|
|
|
}
|
|
}
|
|
|
|
+ return component.getState();
|
|
}
|
|
}
|
|
|
|
|
|
// This method should be called whenever there is an increment or decrement
|
|
// This method should be called whenever there is an increment or decrement
|
|
@@ -467,22 +477,16 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
//This should not matter for terminating components
|
|
//This should not matter for terminating components
|
|
private static synchronized void checkAndUpdateComponentState(
|
|
private static synchronized void checkAndUpdateComponentState(
|
|
Component component, boolean isIncrement) {
|
|
Component component, boolean isIncrement) {
|
|
- org.apache.hadoop.yarn.service.api.records.ComponentState curState =
|
|
|
|
- component.componentSpec.getState();
|
|
|
|
|
|
|
|
if (component.getRestartPolicyHandler().isLongLived()) {
|
|
if (component.getRestartPolicyHandler().isLongLived()) {
|
|
if (isIncrement) {
|
|
if (isIncrement) {
|
|
// check if all containers are in READY state
|
|
// check if all containers are in READY state
|
|
- if (component.numContainersThatNeedUpgrade.get() == 0
|
|
|
|
- && component.componentMetrics.containersReady.value()
|
|
|
|
- == component.componentMetrics.containersDesired.value()) {
|
|
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ if (!component.upgradeStatus.areContainersUpgrading() &&
|
|
|
|
+ !component.cancelUpgradeStatus.areContainersUpgrading() &&
|
|
|
|
+ component.componentMetrics.containersReady.value() ==
|
|
|
|
+ component.componentMetrics.containersDesired.value()) {
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
- if (curState != component.componentSpec.getState()) {
|
|
|
|
- LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
|
|
|
- component.componentSpec.getName(), curState,
|
|
|
|
- component.componentSpec.getState());
|
|
|
|
- }
|
|
|
|
// component state change will trigger re-check of service state
|
|
// component state change will trigger re-check of service state
|
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
|
}
|
|
}
|
|
@@ -491,19 +495,14 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
// still need to verify the count before changing the component state
|
|
// still need to verify the count before changing the component state
|
|
if (component.componentMetrics.containersReady.value()
|
|
if (component.componentMetrics.containersReady.value()
|
|
< component.componentMetrics.containersDesired.value()) {
|
|
< component.componentMetrics.containersDesired.value()) {
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState
|
|
.FLEXING);
|
|
.FLEXING);
|
|
} else if (component.componentMetrics.containersReady.value()
|
|
} else if (component.componentMetrics.containersReady.value()
|
|
== component.componentMetrics.containersDesired.value()) {
|
|
== component.componentMetrics.containersDesired.value()) {
|
|
- component.componentSpec.setState(
|
|
|
|
|
|
+ component.setComponentState(
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
}
|
|
}
|
|
- if (curState != component.componentSpec.getState()) {
|
|
|
|
- LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
|
|
|
- component.componentSpec.getName(), curState,
|
|
|
|
- component.componentSpec.getState());
|
|
|
|
- }
|
|
|
|
// component state change will trigger re-check of service state
|
|
// component state change will trigger re-check of service state
|
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
|
}
|
|
}
|
|
@@ -511,8 +510,8 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
// component state change will trigger re-check of service state
|
|
// component state change will trigger re-check of service state
|
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
|
}
|
|
}
|
|
- // when the service is stable then the state of component needs to
|
|
|
|
- // transition to stable
|
|
|
|
|
|
+ // triggers the state machine in component to reach appropriate state
|
|
|
|
+ // once the state in spec is changed.
|
|
component.dispatcher.getEventHandler().handle(
|
|
component.dispatcher.getEventHandler().handle(
|
|
new ComponentEvent(component.getName(),
|
|
new ComponentEvent(component.getName(),
|
|
ComponentEventType.CHECK_STABLE));
|
|
ComponentEventType.CHECK_STABLE));
|
|
@@ -544,25 +543,43 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static class ComponentNeedsUpgradeTransition extends BaseTransition {
|
|
|
|
|
|
+ private static class CompletedAfterUpgradeTransition extends BaseTransition {
|
|
@Override
|
|
@Override
|
|
public void transition(Component component, ComponentEvent event) {
|
|
public void transition(Component component, ComponentEvent event) {
|
|
- component.upgradeInProgress.set(true);
|
|
|
|
- component.upgradeEvent = event;
|
|
|
|
- component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
|
|
|
|
- records.ComponentState.NEEDS_UPGRADE);
|
|
|
|
- component.numContainersThatNeedUpgrade.set(
|
|
|
|
|
|
+ Preconditions.checkNotNull(event.getContainerId());
|
|
|
|
+ component.updateMetrics(event.getStatus());
|
|
|
|
+ component.dispatcher.getEventHandler().handle(
|
|
|
|
+ new ComponentInstanceEvent(event.getContainerId(), STOP)
|
|
|
|
+ .setStatus(event.getStatus()));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class NeedsUpgradeTransition extends BaseTransition {
|
|
|
|
+ @Override
|
|
|
|
+ public void transition(Component component, ComponentEvent event) {
|
|
|
|
+ boolean isCancel = event.getType().equals(CANCEL_UPGRADE);
|
|
|
|
+ UpgradeStatus status = !isCancel ? component.upgradeStatus :
|
|
|
|
+ component.cancelUpgradeStatus;
|
|
|
|
+
|
|
|
|
+ status.inProgress.set(true);
|
|
|
|
+ status.targetSpec = event.getTargetSpec();
|
|
|
|
+ status.targetVersion = event.getUpgradeVersion();
|
|
|
|
+ LOG.info("[COMPONENT {}]: need upgrade to {}",
|
|
|
|
+ component.getName(), status.targetVersion);
|
|
|
|
+
|
|
|
|
+ status.containersNeedUpgrade.set(
|
|
component.componentSpec.getNumberOfContainers());
|
|
component.componentSpec.getNumberOfContainers());
|
|
- component.componentSpec.getContainers().forEach(container -> {
|
|
|
|
- container.setState(ContainerState.NEEDS_UPGRADE);
|
|
|
|
- if (event.isExpressUpgrade()) {
|
|
|
|
- ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
|
|
|
|
- ContainerId.fromString(container.getId()),
|
|
|
|
- ComponentInstanceEventType.UPGRADE);
|
|
|
|
- LOG.info("Upgrade container {}", container.getId());
|
|
|
|
- component.dispatcher.getEventHandler().handle(upgradeEvent);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ component.setComponentState(org.apache.hadoop.yarn.service.api.
|
|
|
|
+ records.ComponentState.NEEDS_UPGRADE);
|
|
|
|
+
|
|
|
|
+ component.getAllComponentInstances().forEach(instance -> {
|
|
|
|
+ instance.setContainerState(ContainerState.NEEDS_UPGRADE);
|
|
});
|
|
});
|
|
|
|
+
|
|
|
|
+ if (event.getType().equals(CANCEL_UPGRADE)) {
|
|
|
|
+ component.upgradeStatus.reset();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -572,22 +589,22 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
@Override
|
|
@Override
|
|
public ComponentState transition(Component component,
|
|
public ComponentState transition(Component component,
|
|
ComponentEvent componentEvent) {
|
|
ComponentEvent componentEvent) {
|
|
- org.apache.hadoop.yarn.service.api.records.ComponentState currState =
|
|
|
|
- component.componentSpec.getState();
|
|
|
|
- if (currState.equals(org.apache.hadoop.yarn.service.api.records
|
|
|
|
- .ComponentState.STABLE)) {
|
|
|
|
- return ComponentState.STABLE;
|
|
|
|
- }
|
|
|
|
// checkIfStable also updates the state in definition when STABLE
|
|
// checkIfStable also updates the state in definition when STABLE
|
|
ComponentState targetState = checkIfStable(component);
|
|
ComponentState targetState = checkIfStable(component);
|
|
- if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
|
|
|
|
- component.componentSpec.overwrite(
|
|
|
|
- component.upgradeEvent.getTargetSpec());
|
|
|
|
- component.upgradeEvent = null;
|
|
|
|
|
|
+
|
|
|
|
+ if (targetState.equals(STABLE) &&
|
|
|
|
+ !(component.upgradeStatus.isCompleted() &&
|
|
|
|
+ component.cancelUpgradeStatus.isCompleted())) {
|
|
|
|
+ // Component stable after upgrade or cancel upgrade
|
|
|
|
+ UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ?
|
|
|
|
+ component.cancelUpgradeStatus : component.upgradeStatus;
|
|
|
|
+
|
|
|
|
+ component.componentSpec.overwrite(status.getTargetSpec());
|
|
|
|
+ status.reset();
|
|
|
|
+
|
|
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
|
|
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
|
|
CHECK_STABLE);
|
|
CHECK_STABLE);
|
|
component.dispatcher.getEventHandler().handle(checkStable);
|
|
component.dispatcher.getEventHandler().handle(checkStable);
|
|
- component.upgradeInProgress.set(false);
|
|
|
|
}
|
|
}
|
|
return targetState;
|
|
return targetState;
|
|
}
|
|
}
|
|
@@ -625,11 +642,14 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
"[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
|
|
"[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
|
|
getName(), container.getId(), instance.getCompInstanceName(),
|
|
getName(), container.getId(), instance.getCompInstanceName(),
|
|
container.getNodeId());
|
|
container.getNodeId());
|
|
- if (upgradeInProgress.get()) {
|
|
|
|
|
|
+ if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) {
|
|
|
|
+ UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ?
|
|
|
|
+ cancelUpgradeStatus : upgradeStatus;
|
|
|
|
+
|
|
scheduler.getContainerLaunchService()
|
|
scheduler.getContainerLaunchService()
|
|
.launchCompInstance(scheduler.getApp(), instance, container,
|
|
.launchCompInstance(scheduler.getApp(), instance, container,
|
|
- createLaunchContext(upgradeEvent.getTargetSpec(),
|
|
|
|
- upgradeEvent.getUpgradeVersion()));
|
|
|
|
|
|
+ createLaunchContext(status.getTargetSpec(),
|
|
|
|
+ status.getTargetVersion()));
|
|
} else {
|
|
} else {
|
|
scheduler.getContainerLaunchService().launchCompInstance(
|
|
scheduler.getContainerLaunchService().launchCompInstance(
|
|
scheduler.getApp(), instance, container,
|
|
scheduler.getApp(), instance, container,
|
|
@@ -830,6 +850,12 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private boolean doesNeedUpgrade() {
|
|
|
|
+ return cancelUpgradeStatus.areContainersUpgrading() ||
|
|
|
|
+ upgradeStatus.areContainersUpgrading() ||
|
|
|
|
+ upgradeStatus.failed.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
public boolean areDependenciesReady() {
|
|
public boolean areDependenciesReady() {
|
|
List<String> dependencies = componentSpec.getDependencies();
|
|
List<String> dependencies = componentSpec.getDependencies();
|
|
if (ServiceUtils.isEmpty(dependencies)) {
|
|
if (ServiceUtils.isEmpty(dependencies)) {
|
|
@@ -911,10 +937,6 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void decContainersThatNeedUpgrade() {
|
|
|
|
- numContainersThatNeedUpgrade.decrementAndGet();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
public int getNumReadyInstances() {
|
|
public int getNumReadyInstances() {
|
|
return componentMetrics.containersReady.value();
|
|
return componentMetrics.containersReady.value();
|
|
}
|
|
}
|
|
@@ -972,10 +994,33 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public ComponentEvent getUpgradeEvent() {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Returns whether a component is upgrading or not.
|
|
|
|
+ */
|
|
|
|
+ public boolean isUpgrading() {
|
|
|
|
+ this.readLock.lock();
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ return !(upgradeStatus.isCompleted() &&
|
|
|
|
+ cancelUpgradeStatus.isCompleted());
|
|
|
|
+ } finally {
|
|
|
|
+ this.readLock.unlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public UpgradeStatus getUpgradeStatus() {
|
|
|
|
+ this.readLock.lock();
|
|
|
|
+ try {
|
|
|
|
+ return upgradeStatus;
|
|
|
|
+ } finally {
|
|
|
|
+ this.readLock.unlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public UpgradeStatus getCancelUpgradeStatus() {
|
|
this.readLock.lock();
|
|
this.readLock.lock();
|
|
try {
|
|
try {
|
|
- return upgradeEvent;
|
|
|
|
|
|
+ return cancelUpgradeStatus;
|
|
} finally {
|
|
} finally {
|
|
this.readLock.unlock();
|
|
this.readLock.unlock();
|
|
}
|
|
}
|
|
@@ -1013,6 +1058,70 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Sets the state of the component in the component spec.
|
|
|
|
+ * @param state component state
|
|
|
|
+ */
|
|
|
|
+ private void setComponentState(
|
|
|
|
+ org.apache.hadoop.yarn.service.api.records.ComponentState state) {
|
|
|
|
+ org.apache.hadoop.yarn.service.api.records.ComponentState curState =
|
|
|
|
+ componentSpec.getState();
|
|
|
|
+ if (!curState.equals(state)) {
|
|
|
|
+ componentSpec.setState(state);
|
|
|
|
+ LOG.info("[COMPONENT {}] spec state changed from {} -> {}",
|
|
|
|
+ componentSpec.getName(), curState, state);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Status of upgrade.
|
|
|
|
+ */
|
|
|
|
+ public static class UpgradeStatus {
|
|
|
|
+ private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
|
|
|
|
+ private String targetVersion;
|
|
|
|
+ private AtomicBoolean inProgress = new AtomicBoolean(false);
|
|
|
|
+ private AtomicLong containersNeedUpgrade = new AtomicLong(0);
|
|
|
|
+ private AtomicBoolean failed = new AtomicBoolean(false);
|
|
|
|
+
|
|
|
|
+ public org.apache.hadoop.yarn.service.api.records.
|
|
|
|
+ Component getTargetSpec() {
|
|
|
|
+ return targetSpec;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String getTargetVersion() {
|
|
|
|
+ return targetVersion;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * @return whether the upgrade is completed or not
|
|
|
|
+ */
|
|
|
|
+ public boolean isCompleted() {
|
|
|
|
+ return !inProgress.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void decContainersThatNeedUpgrade() {
|
|
|
|
+ if (inProgress.get()) {
|
|
|
|
+ containersNeedUpgrade.decrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void containerFailedUpgrade() {
|
|
|
|
+ failed.set(true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void reset() {
|
|
|
|
+ containersNeedUpgrade.set(0);
|
|
|
|
+ targetSpec = null;
|
|
|
|
+ targetVersion = null;
|
|
|
|
+ inProgress.set(false);
|
|
|
|
+ failed.set(false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ boolean areContainersUpgrading() {
|
|
|
|
+ return containersNeedUpgrade.get() != 0;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public ServiceContext getContext() {
|
|
public ServiceContext getContext() {
|
|
return context;
|
|
return context;
|
|
}
|
|
}
|