|
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
|
|
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
|
|
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
|
|
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
|
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
|
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
|
|
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
|
|
|
|
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
|
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
|
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
|
@@ -142,6 +143,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
// container recovered on AM restart
|
|
// container recovered on AM restart
|
|
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
|
|
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
|
|
new ContainerRecoveredTransition())
|
|
new ContainerRecoveredTransition())
|
|
|
|
+ // instance decommissioned
|
|
|
|
+ .addTransition(INIT, INIT, DECOMMISSION_INSTANCE,
|
|
|
|
+ new DecommissionInstanceTransition())
|
|
|
|
|
|
// container recovered in AM heartbeat
|
|
// container recovered in AM heartbeat
|
|
.addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED,
|
|
.addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED,
|
|
@@ -161,6 +165,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
new FlexComponentTransition())
|
|
new FlexComponentTransition())
|
|
.addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
|
|
.addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
|
|
CHECK_STABLE, new CheckStableTransition())
|
|
CHECK_STABLE, new CheckStableTransition())
|
|
|
|
+ // instance decommissioned
|
|
|
|
+ .addTransition(FLEXING, FLEXING, DECOMMISSION_INSTANCE,
|
|
|
|
+ new DecommissionInstanceTransition())
|
|
|
|
|
|
// container failed while stable
|
|
// container failed while stable
|
|
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
|
|
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
|
|
@@ -173,6 +180,10 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
// For flex down, go to STABLE state
|
|
// For flex down, go to STABLE state
|
|
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
|
|
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
|
|
FLEX, new FlexComponentTransition())
|
|
FLEX, new FlexComponentTransition())
|
|
|
|
+ // instance decommissioned
|
|
|
|
+ .addTransition(STABLE, STABLE, DECOMMISSION_INSTANCE,
|
|
|
|
+ new DecommissionInstanceTransition())
|
|
|
|
+ // upgrade component
|
|
.addTransition(STABLE, UPGRADING, UPGRADE,
|
|
.addTransition(STABLE, UPGRADING, UPGRADE,
|
|
new NeedsUpgradeTransition())
|
|
new NeedsUpgradeTransition())
|
|
.addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
|
|
.addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
|
|
@@ -187,6 +198,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
CHECK_STABLE, new CheckStableTransition())
|
|
CHECK_STABLE, new CheckStableTransition())
|
|
.addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
|
|
.addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
|
|
new CompletedAfterUpgradeTransition())
|
|
new CompletedAfterUpgradeTransition())
|
|
|
|
+ // instance decommissioned
|
|
|
|
+ .addTransition(UPGRADING, UPGRADING, DECOMMISSION_INSTANCE,
|
|
|
|
+ new DecommissionInstanceTransition())
|
|
|
|
|
|
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
|
|
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
|
|
STABLE), CHECK_STABLE, new CheckStableTransition())
|
|
STABLE), CHECK_STABLE, new CheckStableTransition())
|
|
@@ -194,7 +208,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
|
|
CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
|
|
.addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
|
|
.addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
|
|
new ContainerAllocatedTransition())
|
|
new ContainerAllocatedTransition())
|
|
-
|
|
|
|
|
|
+ // instance decommissioned
|
|
|
|
+ .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING,
|
|
|
|
+ DECOMMISSION_INSTANCE, new DecommissionInstanceTransition())
|
|
.installTopology();
|
|
.installTopology();
|
|
|
|
|
|
public Component(
|
|
public Component(
|
|
@@ -241,6 +257,11 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
ComponentInstanceId id =
|
|
ComponentInstanceId id =
|
|
new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
|
|
new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
|
|
componentSpec.getName());
|
|
componentSpec.getName());
|
|
|
|
+ while (componentSpec.getDecommissionedInstances().contains(id
|
|
|
|
+ .getCompInstanceName())) {
|
|
|
|
+ id = new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
|
|
|
|
+ componentSpec.getName());
|
|
|
|
+ }
|
|
ComponentInstance instance = new ComponentInstance(this, id);
|
|
ComponentInstance instance = new ComponentInstance(this, id);
|
|
compInstances.put(instance.getCompInstanceName(), instance);
|
|
compInstances.put(instance.getCompInstanceName(), instance);
|
|
pendingInstances.add(instance);
|
|
pendingInstances.add(instance);
|
|
@@ -377,6 +398,38 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static class DecommissionInstanceTransition extends BaseTransition {
|
|
|
|
+ @Override
|
|
|
|
+ public void transition(Component component, ComponentEvent event) {
|
|
|
|
+ String instanceName = event.getInstanceName();
|
|
|
|
+ String hostnameSuffix = component.getHostnameSuffix();
|
|
|
|
+ if (instanceName.endsWith(hostnameSuffix)) {
|
|
|
|
+ instanceName = instanceName.substring(0,
|
|
|
|
+ instanceName.length() - hostnameSuffix.length());
|
|
|
|
+ }
|
|
|
|
+ if (component.getComponentSpec().getDecommissionedInstances()
|
|
|
|
+ .contains(instanceName)) {
|
|
|
|
+ LOG.info("Instance {} already decommissioned", instanceName);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ component.getComponentSpec().addDecommissionedInstance(instanceName);
|
|
|
|
+ ComponentInstance instance = component.getComponentInstance(instanceName);
|
|
|
|
+ if (instance == null) {
|
|
|
|
+ LOG.info("Instance was null for decommissioned instance {}",
|
|
|
|
+ instanceName);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // remove the instance
|
|
|
|
+ component.compInstances.remove(instance.getCompInstanceName());
|
|
|
|
+ component.pendingInstances.remove(instance);
|
|
|
|
+ component.scheduler.getServiceMetrics().containersDesired.decr();
|
|
|
|
+ component.componentMetrics.containersDesired.decr();
|
|
|
|
+ component.getComponentSpec().setNumberOfContainers(component
|
|
|
|
+ .getComponentSpec().getNumberOfContainers() - 1);
|
|
|
|
+ instance.destroy();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class ContainerAllocatedTransition extends BaseTransition {
|
|
private static class ContainerAllocatedTransition extends BaseTransition {
|
|
@Override
|
|
@Override
|
|
public void transition(Component component, ComponentEvent event) {
|
|
public void transition(Component component, ComponentEvent event) {
|
|
@@ -808,10 +861,8 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
|
|
|
|
private void setDesiredContainers(int n) {
|
|
private void setDesiredContainers(int n) {
|
|
int delta = n - scheduler.getServiceMetrics().containersDesired.value();
|
|
int delta = n - scheduler.getServiceMetrics().containersDesired.value();
|
|
- if (delta > 0) {
|
|
|
|
|
|
+ if (delta != 0) {
|
|
scheduler.getServiceMetrics().containersDesired.incr(delta);
|
|
scheduler.getServiceMetrics().containersDesired.incr(delta);
|
|
- } else {
|
|
|
|
- scheduler.getServiceMetrics().containersDesired.decr(delta);
|
|
|
|
}
|
|
}
|
|
componentMetrics.containersDesired.set(n);
|
|
componentMetrics.containersDesired.set(n);
|
|
}
|
|
}
|
|
@@ -1203,4 +1254,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
|
RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy();
|
|
RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy();
|
|
return getRestartPolicyHandler(restartPolicyEnum);
|
|
return getRestartPolicyHandler(restartPolicyEnum);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public String getHostnameSuffix() {
|
|
|
|
+ return ServiceApiUtil.getHostnameSuffix(context.service.getName(),
|
|
|
|
+ scheduler.getConfig());
|
|
|
|
+ }
|
|
}
|
|
}
|