|
@@ -65,8 +65,8 @@ public abstract class SchedulerNode {
|
|
ResourceUtilization.newInstance(0, 0, 0f);
|
|
ResourceUtilization.newInstance(0, 0, 0f);
|
|
|
|
|
|
/* set of containers that are allocated containers */
|
|
/* set of containers that are allocated containers */
|
|
- protected final Map<ContainerId, RMContainer> launchedContainers =
|
|
|
|
- new HashMap<ContainerId, RMContainer>();
|
|
|
|
|
|
+ private final Map<ContainerId, ContainerInfo> launchedContainers =
|
|
|
|
+ new HashMap<>();
|
|
|
|
|
|
private final RMNode rmNode;
|
|
private final RMNode rmNode;
|
|
private final String nodeName;
|
|
private final String nodeName;
|
|
@@ -148,12 +148,24 @@ public abstract class SchedulerNode {
|
|
* @param rmContainer
|
|
* @param rmContainer
|
|
* allocated container
|
|
* allocated container
|
|
*/
|
|
*/
|
|
- public synchronized void allocateContainer(RMContainer rmContainer) {
|
|
|
|
|
|
+ public void allocateContainer(RMContainer rmContainer) {
|
|
|
|
+ allocateContainer(rmContainer, false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The Scheduler has allocated containers on this node to the given
|
|
|
|
+ * application.
|
|
|
|
+ * @param rmContainer Allocated container
|
|
|
|
+ * @param launchedOnNode True if the container has been launched
|
|
|
|
+ */
|
|
|
|
+ private synchronized void allocateContainer(RMContainer rmContainer,
|
|
|
|
+ boolean launchedOnNode) {
|
|
Container container = rmContainer.getContainer();
|
|
Container container = rmContainer.getContainer();
|
|
deductAvailableResource(container.getResource());
|
|
deductAvailableResource(container.getResource());
|
|
++numContainers;
|
|
++numContainers;
|
|
|
|
|
|
- launchedContainers.put(container.getId(), rmContainer);
|
|
|
|
|
|
+ launchedContainers.put(container.getId(),
|
|
|
|
+ new ContainerInfo(rmContainer, launchedOnNode));
|
|
|
|
|
|
LOG.info("Assigned container " + container.getId() + " of capacity "
|
|
LOG.info("Assigned container " + container.getId() + " of capacity "
|
|
+ container.getResource() + " on host " + rmNode.getNodeAddress()
|
|
+ container.getResource() + " on host " + rmNode.getNodeAddress()
|
|
@@ -236,20 +248,25 @@ public abstract class SchedulerNode {
|
|
/**
|
|
/**
|
|
* Release an allocated container on this node.
|
|
* Release an allocated container on this node.
|
|
*
|
|
*
|
|
- * @param container
|
|
|
|
- * container to be released
|
|
|
|
|
|
+ * @param containerId ID of container to be released.
|
|
|
|
+ * @param releasedByNode whether the release originates from a node update.
|
|
*/
|
|
*/
|
|
- public synchronized void releaseContainer(Container container) {
|
|
|
|
- if (!isValidContainer(container.getId())) {
|
|
|
|
- LOG.error("Invalid container released " + container);
|
|
|
|
|
|
+ public synchronized void releaseContainer(ContainerId containerId,
|
|
|
|
+ boolean releasedByNode) {
|
|
|
|
+ ContainerInfo info = launchedContainers.get(containerId);
|
|
|
|
+ if (info == null) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- /* remove the containers from the nodemanger */
|
|
|
|
- if (null != launchedContainers.remove(container.getId())) {
|
|
|
|
- updateResourceForReleasedContainer(container);
|
|
|
|
|
|
+ if (!releasedByNode && info.launchedOnNode) {
|
|
|
|
+ // wait until node reports container has completed
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ launchedContainers.remove(containerId);
|
|
|
|
+ Container container = info.container.getContainer();
|
|
|
|
+ updateResourceForReleasedContainer(container);
|
|
|
|
+
|
|
LOG.info("Released container " + container.getId() + " of capacity "
|
|
LOG.info("Released container " + container.getId() + " of capacity "
|
|
+ container.getResource() + " on host " + rmNode.getNodeAddress()
|
|
+ container.getResource() + " on host " + rmNode.getNodeAddress()
|
|
+ ", which currently has " + numContainers + " containers, "
|
|
+ ", which currently has " + numContainers + " containers, "
|
|
@@ -257,6 +274,17 @@ public abstract class SchedulerNode {
|
|
+ " available" + ", release resources=" + true);
|
|
+ " available" + ", release resources=" + true);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Inform the node that a container has launched.
|
|
|
|
+ * @param containerId ID of the launched container
|
|
|
|
+ */
|
|
|
|
+ public synchronized void containerStarted(ContainerId containerId) {
|
|
|
|
+ ContainerInfo info = launchedContainers.get(containerId);
|
|
|
|
+ if (info != null) {
|
|
|
|
+ info.launchedOnNode = true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private synchronized void addAvailableResource(Resource resource) {
|
|
private synchronized void addAvailableResource(Resource resource) {
|
|
if (resource == null) {
|
|
if (resource == null) {
|
|
LOG.error("Invalid resource addition of null resource for "
|
|
LOG.error("Invalid resource addition of null resource for "
|
|
@@ -305,7 +333,25 @@ public abstract class SchedulerNode {
|
|
}
|
|
}
|
|
|
|
|
|
public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
|
|
public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
|
|
- return new ArrayList<RMContainer>(launchedContainers.values());
|
|
|
|
|
|
+ List<RMContainer> result = new ArrayList<>(launchedContainers.size());
|
|
|
|
+ for (ContainerInfo info : launchedContainers.values()) {
|
|
|
|
+ result.add(info.container);
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the container for the specified container ID.
|
|
|
|
+ * @param containerId The container ID
|
|
|
|
+ * @return The container for the specified container ID
|
|
|
|
+ */
|
|
|
|
+ protected synchronized RMContainer getContainer(ContainerId containerId) {
|
|
|
|
+ RMContainer container = null;
|
|
|
|
+ ContainerInfo info = launchedContainers.get(containerId);
|
|
|
|
+ if (info != null) {
|
|
|
|
+ container = info.container;
|
|
|
|
+ }
|
|
|
|
+ return container;
|
|
}
|
|
}
|
|
|
|
|
|
public synchronized RMContainer getReservedContainer() {
|
|
public synchronized RMContainer getReservedContainer() {
|
|
@@ -321,7 +367,7 @@ public abstract class SchedulerNode {
|
|
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
|
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- allocateContainer(rmContainer);
|
|
|
|
|
|
+ allocateContainer(rmContainer, true);
|
|
}
|
|
}
|
|
|
|
|
|
public Set<String> getLabels() {
|
|
public Set<String> getLabels() {
|
|
@@ -377,4 +423,15 @@ public abstract class SchedulerNode {
|
|
public ResourceUtilization getNodeUtilization() {
|
|
public ResourceUtilization getNodeUtilization() {
|
|
return this.nodeUtilization;
|
|
return this.nodeUtilization;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ private static class ContainerInfo {
|
|
|
|
+ private final RMContainer container;
|
|
|
|
+ private boolean launchedOnNode;
|
|
|
|
+
|
|
|
|
+ public ContainerInfo(RMContainer container, boolean launchedOnNode) {
|
|
|
|
+ this.container = container;
|
|
|
|
+ this.launchedOnNode = launchedOnNode;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|