|
@@ -30,9 +30,11 @@ import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -866,18 +868,57 @@ public class TestNodeStatusUpdater {
|
|
|
public ContainerState getCurrentState() {
|
|
|
return ContainerState.COMPLETE;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
|
|
|
+ return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
+ ContainerId runningContainerId =
|
|
|
+ ContainerId.newInstance(appAttemptId, 3);
|
|
|
+ Token runningContainerToken =
|
|
|
+ BuilderUtils.newContainerToken(runningContainerId, "anyHost",
|
|
|
+ 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
|
|
|
+ "password".getBytes(), 0);
|
|
|
+ Container runningContainer =
|
|
|
+ new ContainerImpl(conf, null, null, null, null, null,
|
|
|
+ BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
|
|
|
+ @Override
|
|
|
+ public ContainerState getCurrentState() {
|
|
|
+ return ContainerState.RUNNING;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
|
|
|
+ return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
nm.getNMContext().getApplications().putIfAbsent(appId,
|
|
|
mock(Application.class));
|
|
|
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
|
|
|
- Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
|
|
|
+ nm.getNMContext().getContainers()
|
|
|
+ .put(runningContainerId, runningContainer);
|
|
|
+
|
|
|
+ Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size());
|
|
|
|
|
|
List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
|
|
|
ackedContainers.add(cId);
|
|
|
+ ackedContainers.add(runningContainerId);
|
|
|
|
|
|
- nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
|
|
|
- Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
|
|
|
+ nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
|
|
|
+
|
|
|
+ Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
|
|
|
+ for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) {
|
|
|
+ containerIdSet.add(status.getContainerId());
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1);
|
|
|
+ // completed container is removed;
|
|
|
+ Assert.assertFalse(containerIdSet.contains(anyCompletedContainer));
|
|
|
+ // running container is not removed;
|
|
|
+ Assert.assertTrue(containerIdSet.contains(runningContainerId));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1467,6 +1508,13 @@ public class TestNodeStatusUpdater {
|
|
|
when(container.getCurrentState()).thenReturn(containerStatus.getState());
|
|
|
when(container.getContainerId()).thenReturn(
|
|
|
containerStatus.getContainerId());
|
|
|
+ if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
|
|
|
+ when(container.getContainerState())
|
|
|
+ .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE);
|
|
|
+ } else if (containerStatus.getState().equals(ContainerState.RUNNING)) {
|
|
|
+ when(container.getContainerState())
|
|
|
+ .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING);
|
|
|
+ }
|
|
|
return container;
|
|
|
}
|
|
|
|