|
@@ -22,14 +22,20 @@ import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
@@ -40,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
@@ -403,6 +410,89 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test(timeout = 60000)
|
|
|
|
+ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
|
|
|
|
+ throws Exception {
|
|
|
|
+ configureScheduler();
|
|
|
|
+ YarnConfiguration conf = getConf();
|
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
|
+ try {
|
|
|
|
+ rm1.start();
|
|
|
|
+ RMApp app1 =
|
|
|
|
+ rm1.submitApp(200, "name", "user",
|
|
|
|
+ new HashMap<ApplicationAccessType, String>(), false, "default",
|
|
|
|
+ -1, null, "Test", false, true);
|
|
|
|
+ MockNM nm1 =
|
|
|
|
+ new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
|
|
|
|
+ nm1.registerNode();
|
|
|
|
+
|
|
|
|
+ MockNM nm2 =
|
|
|
|
+ new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService());
|
|
|
|
+ nm2.registerNode();
|
|
|
|
+
|
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
+
|
|
|
|
+ int NUM_CONTAINERS = 1;
|
|
|
|
+ // allocate NUM_CONTAINERS containers
|
|
|
|
+ am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
|
|
|
+ new ArrayList<ContainerId>());
|
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
|
+
|
|
|
|
+ // wait for containers to be allocated.
|
|
|
|
+ List<Container> containers =
|
|
|
|
+ am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
|
+ new ArrayList<ContainerId>()).getAllocatedContainers();
|
|
|
|
+ while (containers.size() != NUM_CONTAINERS) {
|
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
|
+ containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
|
+ new ArrayList<ContainerId>()).getAllocatedContainers());
|
|
|
|
+ Thread.sleep(200);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // launch the 2nd container, for testing running container transferred.
|
|
|
|
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
|
|
|
|
+ ContainerState.RUNNING);
|
|
|
|
+ ContainerId containerId2 =
|
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
|
+ rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
|
|
|
+
|
|
|
|
+ // 3rd container is in Allocated state.
|
|
|
|
+ am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
|
|
|
+ new ArrayList<ContainerId>());
|
|
|
|
+ nm2.nodeHeartbeat(true);
|
|
|
|
+ ContainerId containerId3 =
|
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
|
|
|
+ rm1.waitForContainerAllocated(nm2, containerId3);
|
|
|
|
+ rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
|
|
|
|
+
|
|
|
|
+ // NodeManager restart
|
|
|
|
+ nm2.registerNode();
|
|
|
|
+
|
|
|
|
+ // NM restart kills all allocated and running containers.
|
|
|
|
+ rm1.waitForState(nm2, containerId3, RMContainerState.KILLED);
|
|
|
|
+
|
|
|
|
+ // The killed RMContainer request should be restored. In successive
|
|
|
|
+ // nodeHeartBeats AM should be able to get container allocated.
|
|
|
|
+ containers =
|
|
|
|
+ am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
|
+ new ArrayList<ContainerId>()).getAllocatedContainers();
|
|
|
|
+ while (containers.size() != NUM_CONTAINERS) {
|
|
|
|
+ nm2.nodeHeartbeat(true);
|
|
|
|
+ containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
|
+ new ArrayList<ContainerId>()).getAllocatedContainers());
|
|
|
|
+ Thread.sleep(200);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4,
|
|
|
|
+ ContainerState.RUNNING);
|
|
|
|
+ ContainerId containerId4 =
|
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
|
|
|
|
+ rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING);
|
|
|
|
+ } finally {
|
|
|
|
+ rm1.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private void verifyMaximumResourceCapability(
|
|
private void verifyMaximumResourceCapability(
|
|
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
|
|
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
|
|
|
|
|