|
@@ -37,10 +37,12 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
@@ -62,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
+import org.apache.hadoop.yarn.util.ControlledClock;
|
|
|
+import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
@@ -479,6 +483,7 @@ public class TestWorkPreservingRMRestart {
|
|
|
@Test(timeout = 20000)
|
|
|
public void testAMfailedBetweenRMRestart() throws Exception {
|
|
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
|
|
|
memStore.init(conf);
|
|
|
rm1 = new MockRM(conf, memStore);
|
|
|
rm1.start();
|
|
@@ -762,4 +767,55 @@ public class TestWorkPreservingRMRestart {
|
|
|
Thread.sleep(200);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test (timeout = 20000)
|
|
|
+ public void testNewContainersNotAllocatedDuringSchedulerRecovery()
|
|
|
+ throws Exception {
|
|
|
+ conf.setLong(
|
|
|
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000);
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(conf);
|
|
|
+ rm1 = new MockRM(conf, memStore);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 =
|
|
|
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ RMApp app1 = rm1.submitApp(200);
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // Restart RM
|
|
|
+ rm2 = new MockRM(conf, memStore);
|
|
|
+ rm2.start();
|
|
|
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ ControlledClock clock = new ControlledClock(new SystemClock());
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ ((RMContextImpl)rm2.getRMContext()).setSystemClock(clock);
|
|
|
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
|
|
+ am1.registerAppAttempt(true);
|
|
|
+ rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
|
+
|
|
|
+ // AM request for new containers
|
|
|
+ am1.allocate("127.0.0.1", 1000, 1, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ List<Container> containers = new ArrayList<Container>();
|
|
|
+ clock.setTime(startTime + 2000);
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+
|
|
|
+ // sleep some time as allocation happens asynchronously.
|
|
|
+ Thread.sleep(3000);
|
|
|
+ containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
+ new ArrayList<ContainerId>()).getAllocatedContainers());
|
|
|
+ // container is not allocated during scheduling recovery.
|
|
|
+ Assert.assertTrue(containers.isEmpty());
|
|
|
+
|
|
|
+ clock.setTime(startTime + 8000);
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ // Container is created after recovery is done.
|
|
|
+ while (containers.isEmpty()) {
|
|
|
+ containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
+ new ArrayList<ContainerId>()).getAllocatedContainers());
|
|
|
+ Thread.sleep(500);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|