|
@@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
|
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
@@ -247,6 +249,8 @@ public class ApplicationMaster {
|
|
|
// Execution type of the containers.
|
|
|
// Default GUARANTEED.
|
|
|
private ExecutionType containerType = ExecutionType.GUARANTEED;
|
|
|
+ // Whether to automatically promote opportunistic containers.
|
|
|
+ private boolean autoPromoteContainers = false;
|
|
|
|
|
|
// Resource profile for the container
|
|
|
private String containerResourceProfile = "";
|
|
@@ -420,6 +424,9 @@ public class ApplicationMaster {
|
|
|
"Environment for shell script. Specified as env_key=env_val pairs");
|
|
|
opts.addOption("container_type", true,
|
|
|
"Container execution type, GUARANTEED or OPPORTUNISTIC");
|
|
|
+ opts.addOption("promote_opportunistic_after_start", false,
|
|
|
+ "Flag to indicate whether to automatically promote opportunistic"
|
|
|
+ + " containers to guaranteed.");
|
|
|
opts.addOption("container_memory", true,
|
|
|
"Amount of memory in MB to be requested to run the shell command");
|
|
|
opts.addOption("container_vcores", true,
|
|
@@ -576,6 +583,9 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
containerType = ExecutionType.valueOf(containerTypeStr);
|
|
|
}
|
|
|
+ if (cliParser.hasOption("promote_opportunistic_after_start")) {
|
|
|
+ autoPromoteContainers = true;
|
|
|
+ }
|
|
|
containerMemory = Integer.parseInt(cliParser.getOptionValue(
|
|
|
"container_memory", "-1"));
|
|
|
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
|
|
@@ -977,7 +987,24 @@ public class ApplicationMaster {
|
|
|
|
|
|
@Override
|
|
|
public void onContainersUpdated(
|
|
|
- List<UpdatedContainer> containers) {}
|
|
|
+ List<UpdatedContainer> containers) {
|
|
|
+ for (UpdatedContainer container : containers) {
|
|
|
+ LOG.info("Container {} updated, updateType={}, resource={}, "
|
|
|
+ + "execType={}",
|
|
|
+ container.getContainer().getId(),
|
|
|
+ container.getUpdateType().toString(),
|
|
|
+ container.getContainer().getResource().toString(),
|
|
|
+ container.getContainer().getExecutionType());
|
|
|
+
|
|
|
+ // TODO Remove this line with finalized updateContainer API.
|
|
|
+ // Currently nm client needs to notify the NM to update container
|
|
|
+ // execution type via NMClient#updateContainerResource() or
|
|
|
+ // NMClientAsync#updateContainerResourceAsync() when
|
|
|
+ // auto-update.containers is disabled, but this API is
|
|
|
+ // under evolving and will need to be replaced by a proper new API.
|
|
|
+ nmClientAsync.updateContainerResourceAsync(container.getContainer());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public void onShutdownRequest() {
|
|
@@ -1004,7 +1031,7 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
|
|
|
+ class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
|
|
|
|
|
|
private ConcurrentMap<ContainerId, Container> containers =
|
|
|
new ConcurrentHashMap<ContainerId, Container>();
|
|
@@ -1033,6 +1060,24 @@ public class ApplicationMaster {
|
|
|
LOG.debug("Container Status: id=" + containerId + ", status=" +
|
|
|
containerStatus);
|
|
|
}
|
|
|
+
|
|
|
+ // If promote_opportunistic_after_start is set, automatically promote
|
|
|
+ // opportunistic containers to guaranteed.
|
|
|
+ if (autoPromoteContainers) {
|
|
|
+ if (containerStatus.getState() == ContainerState.RUNNING) {
|
|
|
+ Container container = containers.get(containerId);
|
|
|
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
|
|
+ // Promote container
|
|
|
+ LOG.info("Promoting container {} to {}", container.getId(),
|
|
|
+ container.getExecutionType());
|
|
|
+ UpdateContainerRequest updateRequest = UpdateContainerRequest
|
|
|
+ .newInstance(container.getVersion(), container.getId(),
|
|
|
+ ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null,
|
|
|
+ ExecutionType.GUARANTEED);
|
|
|
+ amRMClient.requestContainerUpdate(container, updateRequest);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|