|
@@ -259,6 +259,7 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Deprecated
|
|
|
public void increaseContainerResourceAsync(Container container) {
|
|
|
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
|
|
|
LOG.error("Callback handler does not implement container resource "
|
|
@@ -274,7 +275,7 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
" is neither started nor scheduled to start"));
|
|
|
}
|
|
|
try {
|
|
|
- events.put(new IncreaseContainerResourceEvent(container));
|
|
|
+ events.put(new UpdateContainerResourceEvent(container, true));
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Exception when scheduling the event of increasing resource of "
|
|
|
+ "Container " + container.getId());
|
|
@@ -282,6 +283,30 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void updateContainerResourceAsync(Container container) {
|
|
|
+ if (!(callbackHandler instanceof AbstractCallbackHandler)) {
|
|
|
+ LOG.error("Callback handler does not implement container resource "
|
|
|
+ + "increase callback methods");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
|
|
|
+ if (containers.get(container.getId()) == null) {
|
|
|
+ handler.onUpdateContainerResourceError(
|
|
|
+ container.getId(),
|
|
|
+ RPCUtil.getRemoteException(
|
|
|
+ "Container " + container.getId() +
|
|
|
+ " is neither started nor scheduled to start"));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ events.put(new UpdateContainerResourceEvent(container, false));
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Exception when scheduling the event of increasing resource of "
|
|
|
+ + "Container " + container.getId());
|
|
|
+ handler.onUpdateContainerResourceError(container.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void reInitializeContainerAsync(ContainerId containerId,
|
|
|
ContainerLaunchContext containerLaunchContex, boolean autoCommit){
|
|
@@ -427,7 +452,7 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
START_CONTAINER,
|
|
|
STOP_CONTAINER,
|
|
|
QUERY_CONTAINER,
|
|
|
- INCREASE_CONTAINER_RESOURCE,
|
|
|
+ UPDATE_CONTAINER_RESOURCE,
|
|
|
REINITIALIZE_CONTAINER,
|
|
|
RESTART_CONTAINER,
|
|
|
ROLLBACK_LAST_REINIT,
|
|
@@ -503,14 +528,20 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected static class IncreaseContainerResourceEvent extends ContainerEvent {
|
|
|
+ protected static class UpdateContainerResourceEvent extends ContainerEvent {
|
|
|
private Container container;
|
|
|
+ private boolean isIncreaseEvent;
|
|
|
|
|
|
- public IncreaseContainerResourceEvent(Container container) {
|
|
|
+ // UpdateContainerResourceEvent constructor takes in a
|
|
|
+ // flag to support callback API's calling through the deprecated
|
|
|
+ // increaseContainerResource
|
|
|
+ public UpdateContainerResourceEvent(Container container,
|
|
|
+ boolean isIncreaseEvent) {
|
|
|
super(container.getId(), container.getNodeId(),
|
|
|
container.getContainerToken(),
|
|
|
- ContainerEventType.INCREASE_CONTAINER_RESOURCE);
|
|
|
+ ContainerEventType.UPDATE_CONTAINER_RESOURCE);
|
|
|
this.container = container;
|
|
|
+ this.isIncreaseEvent = isIncreaseEvent;
|
|
|
}
|
|
|
|
|
|
public Container getContainer() {
|
|
@@ -536,8 +567,8 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
|
|
|
// Transitions from RUNNING state
|
|
|
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
|
|
- ContainerEventType.INCREASE_CONTAINER_RESOURCE,
|
|
|
- new IncreaseContainerResourceTransition())
|
|
|
+ ContainerEventType.UPDATE_CONTAINER_RESOURCE,
|
|
|
+ new UpdateContainerResourceTransition())
|
|
|
|
|
|
// Transitions for Container Upgrade
|
|
|
.addTransition(ContainerState.RUNNING,
|
|
@@ -566,7 +597,7 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
|
|
EnumSet.of(ContainerEventType.START_CONTAINER,
|
|
|
ContainerEventType.STOP_CONTAINER,
|
|
|
- ContainerEventType.INCREASE_CONTAINER_RESOURCE))
|
|
|
+ ContainerEventType.UPDATE_CONTAINER_RESOURCE))
|
|
|
|
|
|
// Transition from FAILED state
|
|
|
.addTransition(ContainerState.FAILED, ContainerState.FAILED,
|
|
@@ -576,7 +607,7 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
ContainerEventType.RESTART_CONTAINER,
|
|
|
ContainerEventType.COMMIT_LAST_REINT,
|
|
|
ContainerEventType.ROLLBACK_LAST_REINIT,
|
|
|
- ContainerEventType.INCREASE_CONTAINER_RESOURCE));
|
|
|
+ ContainerEventType.UPDATE_CONTAINER_RESOURCE));
|
|
|
|
|
|
protected static class StartContainerTransition implements
|
|
|
MultipleArcTransition<StatefulContainer, ContainerEvent,
|
|
@@ -628,46 +659,61 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected static class IncreaseContainerResourceTransition implements
|
|
|
+ protected static class UpdateContainerResourceTransition implements
|
|
|
SingleArcTransition<StatefulContainer, ContainerEvent> {
|
|
|
+
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
@Override
|
|
|
public void transition(
|
|
|
StatefulContainer container, ContainerEvent event) {
|
|
|
+ boolean isIncreaseEvent = false;
|
|
|
if (!(container.nmClientAsync.getCallbackHandler()
|
|
|
instanceof AbstractCallbackHandler)) {
|
|
|
LOG.error("Callback handler does not implement container resource "
|
|
|
- + "increase callback methods");
|
|
|
+ + "update callback methods");
|
|
|
return;
|
|
|
}
|
|
|
AbstractCallbackHandler handler =
|
|
|
(AbstractCallbackHandler) container.nmClientAsync
|
|
|
.getCallbackHandler();
|
|
|
try {
|
|
|
- if (!(event instanceof IncreaseContainerResourceEvent)) {
|
|
|
+ if (!(event instanceof UpdateContainerResourceEvent)) {
|
|
|
throw new AssertionError("Unexpected event type. Expecting:"
|
|
|
- + "IncreaseContainerResourceEvent. Got:" + event);
|
|
|
+ + "UpdateContainerResourceEvent. Got:" + event);
|
|
|
}
|
|
|
- IncreaseContainerResourceEvent increaseEvent =
|
|
|
- (IncreaseContainerResourceEvent) event;
|
|
|
- container.nmClientAsync.getClient().increaseContainerResource(
|
|
|
- increaseEvent.getContainer());
|
|
|
+ UpdateContainerResourceEvent updateEvent =
|
|
|
+ (UpdateContainerResourceEvent) event;
|
|
|
+ container.nmClientAsync.getClient().updateContainerResource(
|
|
|
+ updateEvent.getContainer());
|
|
|
+ isIncreaseEvent = updateEvent.isIncreaseEvent;
|
|
|
try {
|
|
|
- handler.onContainerResourceIncreased(
|
|
|
- increaseEvent.getContainerId(), increaseEvent.getContainer()
|
|
|
- .getResource());
|
|
|
+ //If isIncreaseEvent is set, set the appropriate callbacks
|
|
|
+ //for backward compatibility
|
|
|
+ if (isIncreaseEvent) {
|
|
|
+ handler.onContainerResourceIncreased(updateEvent.getContainerId(),
|
|
|
+ updateEvent.getContainer().getResource());
|
|
|
+ } else {
|
|
|
+ handler.onContainerResourceUpdated(updateEvent.getContainerId(),
|
|
|
+ updateEvent.getContainer().getResource());
|
|
|
+ }
|
|
|
} catch (Throwable thr) {
|
|
|
// Don't process user created unchecked exception
|
|
|
LOG.info("Unchecked exception is thrown from "
|
|
|
- + "onContainerResourceIncreased for Container "
|
|
|
+ + "onContainerResourceUpdated for Container "
|
|
|
+ event.getContainerId(), thr);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
try {
|
|
|
- handler.onIncreaseContainerResourceError(event.getContainerId(), e);
|
|
|
+ if (isIncreaseEvent) {
|
|
|
+ handler
|
|
|
+ .onIncreaseContainerResourceError(event.getContainerId(), e);
|
|
|
+ } else {
|
|
|
+ handler.onUpdateContainerResourceError(event.getContainerId(), e);
|
|
|
+ }
|
|
|
} catch (Throwable thr) {
|
|
|
// Don't process user created unchecked exception
|
|
|
LOG.info("Unchecked exception is thrown from "
|
|
|
- + "onIncreaseContainerResourceError for Container "
|
|
|
+ + "onUpdateContainerResourceError for Container "
|
|
|
+ event.getContainerId(), thr);
|
|
|
}
|
|
|
}
|