|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|
|
|
|
|
|
|
+import java.io.IOException;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@@ -46,9 +48,23 @@ public class TestSchedulingWithAllocationRequestId
|
|
LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
|
|
LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
|
|
private static final int GB = 1024;
|
|
private static final int GB = 1024;
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ public TestSchedulingWithAllocationRequestId(SchedulerType type) throws IOException {
|
|
|
|
+ super(type);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public YarnConfiguration getConf() {
|
|
|
|
+ YarnConfiguration conf = super.getConf();
|
|
|
|
+ if (getSchedulerType().equals(SchedulerType.FAIR)) {
|
|
|
|
+ // Some tests here rely on being able to assign multiple containers with
|
|
|
|
+ // a single heartbeat
|
|
|
|
+ conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
|
|
|
+ }
|
|
|
|
+ return conf;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test (timeout = 10000)
|
|
public void testMultipleAllocationRequestIds() throws Exception {
|
|
public void testMultipleAllocationRequestIds() throws Exception {
|
|
- configureScheduler();
|
|
|
|
YarnConfiguration conf = getConf();
|
|
YarnConfiguration conf = getConf();
|
|
MockRM rm = new MockRM(conf);
|
|
MockRM rm = new MockRM(conf);
|
|
try {
|
|
try {
|
|
@@ -63,32 +79,20 @@ public class TestSchedulingWithAllocationRequestId
|
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
am1.registerAppAttempt();
|
|
am1.registerAppAttempt();
|
|
|
|
|
|
- // add request for containers with id 10 & 20
|
|
|
|
- am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L);
|
|
|
|
- AllocateResponse allocResponse = am1.schedule(); // send the request
|
|
|
|
- am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
|
|
|
|
- allocResponse = am1.schedule(); // send the request
|
|
|
|
|
|
+ // send requests for containers with id 10 & 20
|
|
|
|
+ am1.allocate(am1.createReq(
|
|
|
|
+ new String[] {"127.0.0.1"}, 2 * GB, 1, 1, 10L), null);
|
|
|
|
+ am1.allocate(am1.createReq(
|
|
|
|
+ new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null);
|
|
|
|
|
|
// check if request id 10 is satisfied
|
|
// check if request id 10 is satisfied
|
|
- nm1.nodeHeartbeat(true);
|
|
|
|
- allocResponse = am1.schedule(); // send the request
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 1) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am1.schedule();
|
|
|
|
- }
|
|
|
|
|
|
+ AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
|
|
List<Container> allocated = allocResponse.getAllocatedContainers();
|
|
List<Container> allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(1, allocated.size());
|
|
Assert.assertEquals(1, allocated.size());
|
|
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
|
|
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
|
|
|
|
|
|
// check now if request id 20 is satisfied
|
|
// check now if request id 20 is satisfied
|
|
- nm2.nodeHeartbeat(true);
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 2) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am1.schedule();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ allocResponse = waitForAllocResponse(rm, am1, nm2, 2);
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(2, allocated.size());
|
|
Assert.assertEquals(2, allocated.size());
|
|
for (Container container : allocated) {
|
|
for (Container container : allocated) {
|
|
@@ -101,9 +105,8 @@ public class TestSchedulingWithAllocationRequestId
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test (timeout = 10000)
|
|
public void testMultipleAllocationRequestDiffPriority() throws Exception {
|
|
public void testMultipleAllocationRequestDiffPriority() throws Exception {
|
|
- configureScheduler();
|
|
|
|
YarnConfiguration conf = getConf();
|
|
YarnConfiguration conf = getConf();
|
|
MockRM rm = new MockRM(conf);
|
|
MockRM rm = new MockRM(conf);
|
|
try {
|
|
try {
|
|
@@ -118,20 +121,14 @@ public class TestSchedulingWithAllocationRequestId
|
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
am1.registerAppAttempt();
|
|
am1.registerAppAttempt();
|
|
|
|
|
|
- // add request for containers with id 10 & 20
|
|
|
|
- am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L);
|
|
|
|
- AllocateResponse allocResponse = am1.schedule(); // send the request
|
|
|
|
- am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
|
|
|
|
- allocResponse = am1.schedule(); // send the request
|
|
|
|
|
|
+ // send requests for containers with id 10 & 20
|
|
|
|
+ am1.allocate(am1.createReq(
|
|
|
|
+ new String[] {"127.0.0.1"}, 2 * GB, 2, 1, 10L), null);
|
|
|
|
+ am1.allocate(am1.createReq(
|
|
|
|
+ new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null);
|
|
|
|
|
|
// check if request id 20 is satisfied first
|
|
// check if request id 20 is satisfied first
|
|
- nm2.nodeHeartbeat(true);
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 2) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am1.schedule();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm2, 2);
|
|
List<Container> allocated = allocResponse.getAllocatedContainers();
|
|
List<Container> allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(2, allocated.size());
|
|
Assert.assertEquals(2, allocated.size());
|
|
for (Container container : allocated) {
|
|
for (Container container : allocated) {
|
|
@@ -139,13 +136,7 @@ public class TestSchedulingWithAllocationRequestId
|
|
}
|
|
}
|
|
|
|
|
|
// check now if request id 10 is satisfied
|
|
// check now if request id 10 is satisfied
|
|
- nm1.nodeHeartbeat(true);
|
|
|
|
- allocResponse = am1.schedule(); // send the request
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 1) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am1.schedule();
|
|
|
|
- }
|
|
|
|
|
|
+ allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(1, allocated.size());
|
|
Assert.assertEquals(1, allocated.size());
|
|
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
|
|
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
|
|
@@ -164,9 +155,8 @@ public class TestSchedulingWithAllocationRequestId
|
|
allocated.getAllocationRequestId());
|
|
allocated.getAllocationRequestId());
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test (timeout = 10000)
|
|
public void testMultipleAppsWithAllocationReqId() throws Exception {
|
|
public void testMultipleAppsWithAllocationReqId() throws Exception {
|
|
- configureScheduler();
|
|
|
|
YarnConfiguration conf = getConf();
|
|
YarnConfiguration conf = getConf();
|
|
MockRM rm = new MockRM(conf);
|
|
MockRM rm = new MockRM(conf);
|
|
try {
|
|
try {
|
|
@@ -190,19 +180,11 @@ public class TestSchedulingWithAllocationRequestId
|
|
|
|
|
|
// Submit app1 RR with allocationReqId = 5
|
|
// Submit app1 RR with allocationReqId = 5
|
|
int numContainers = 1;
|
|
int numContainers = 1;
|
|
- am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers,
|
|
|
|
- 5L);
|
|
|
|
- AllocateResponse allocResponse = am1.schedule();
|
|
|
|
-
|
|
|
|
- // wait for containers to be allocated.
|
|
|
|
- nm1.nodeHeartbeat(true);
|
|
|
|
- allocResponse = am1.schedule(); // send the request
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 1) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am1.schedule();
|
|
|
|
- }
|
|
|
|
|
|
+ am1.allocate(am1.createReq(
|
|
|
|
+ new String[] {host0, host1}, 1 * GB, 1, numContainers, 5L), null);
|
|
|
|
|
|
|
|
+ // wait for container to be allocated.
|
|
|
|
+ AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
|
|
List<Container> allocated = allocResponse.getAllocatedContainers();
|
|
List<Container> allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(1, allocated.size());
|
|
Assert.assertEquals(1, allocated.size());
|
|
checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);
|
|
checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);
|
|
@@ -212,55 +194,31 @@ public class TestSchedulingWithAllocationRequestId
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
|
|
|
|
|
|
// Submit app2 RR with allocationReqId = 5
|
|
// Submit app2 RR with allocationReqId = 5
|
|
- am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers,
|
|
|
|
- 5L);
|
|
|
|
- am2.schedule();
|
|
|
|
-
|
|
|
|
- // wait for containers to be allocated.
|
|
|
|
- nm2.nodeHeartbeat(true);
|
|
|
|
- allocResponse = am2.schedule(); // send the request
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 1) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am2.schedule();
|
|
|
|
- }
|
|
|
|
|
|
+ am2.allocate(am1.createReq(
|
|
|
|
+ new String[] {host0, host1}, 2 * GB, 1, numContainers, 5L), null);
|
|
|
|
|
|
|
|
+ // wait for container to be allocated.
|
|
|
|
+ allocResponse = waitForAllocResponse(rm, am2, nm2, 1);
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(1, allocated.size());
|
|
Assert.assertEquals(1, allocated.size());
|
|
checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);
|
|
checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);
|
|
|
|
|
|
// Now submit app2 RR with allocationReqId = 10
|
|
// Now submit app2 RR with allocationReqId = 10
|
|
- am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers,
|
|
|
|
- 10L);
|
|
|
|
- am2.schedule();
|
|
|
|
-
|
|
|
|
- // wait for containers to be allocated.
|
|
|
|
- nm1.nodeHeartbeat(true);
|
|
|
|
- allocResponse = am2.schedule(); // send the request
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 1) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am2.schedule();
|
|
|
|
- }
|
|
|
|
|
|
+ am2.allocate(am1.createReq(
|
|
|
|
+ new String[] {host0, host1}, 3 * GB, 1, numContainers, 10L), null);
|
|
|
|
|
|
|
|
+ // wait for container to be allocated.
|
|
|
|
+ allocResponse = waitForAllocResponse(rm, am2, nm1, 1);
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(1, allocated.size());
|
|
Assert.assertEquals(1, allocated.size());
|
|
checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);
|
|
checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);
|
|
|
|
|
|
// Now submit app1 RR with allocationReqId = 10
|
|
// Now submit app1 RR with allocationReqId = 10
|
|
- am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers,
|
|
|
|
- 10L);
|
|
|
|
- am1.schedule();
|
|
|
|
-
|
|
|
|
- // wait for containers to be allocated.
|
|
|
|
- nm2.nodeHeartbeat(true);
|
|
|
|
- allocResponse = am1.schedule(); // send the request
|
|
|
|
- while (allocResponse.getAllocatedContainers().size() < 1) {
|
|
|
|
- LOG.info("Waiting for containers to be created for app 1...");
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- allocResponse = am1.schedule();
|
|
|
|
- }
|
|
|
|
|
|
+ am1.allocate(am1.createReq(
|
|
|
|
+ new String[] {host0, host1}, 4 * GB, 1, numContainers, 10L), null);
|
|
|
|
|
|
|
|
+ // wait for container to be allocated.
|
|
|
|
+ allocResponse = waitForAllocResponse(rm, am1, nm2, 1);
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
allocated = allocResponse.getAllocatedContainers();
|
|
Assert.assertEquals(1, allocated.size());
|
|
Assert.assertEquals(1, allocated.size());
|
|
checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
|
|
checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
|
|
@@ -271,4 +229,17 @@ public class TestSchedulingWithAllocationRequestId
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private AllocateResponse waitForAllocResponse(MockRM rm, MockAM am, MockNM nm,
|
|
|
|
+ int size) throws Exception {
|
|
|
|
+ AllocateResponse allocResponse = am.doHeartbeat();
|
|
|
|
+ while (allocResponse.getAllocatedContainers().size() < size) {
|
|
|
|
+ LOG.info("Waiting for containers to be created for app...");
|
|
|
|
+ nm.nodeHeartbeat(true);
|
|
|
|
+ ((AbstractYarnScheduler) rm.getResourceScheduler()).update();
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ allocResponse = am.doHeartbeat();
|
|
|
|
+ }
|
|
|
|
+ return allocResponse;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|