|
@@ -100,6 +100,12 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
|
|
|
}
|
|
|
|
|
|
+ private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
|
|
|
+ int total) {
|
|
|
+ checkUsedCapacity(rm, queueName, capacity, total,
|
|
|
+ RMNodeLabelsManager.NO_LABEL);
|
|
|
+ }
|
|
|
+
|
|
|
private void checkUsedResource(MockRM rm, String queueName, int memory,
|
|
|
String label) {
|
|
|
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
|
@@ -108,6 +114,15 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
.getMemory());
|
|
|
}
|
|
|
|
|
|
+ private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
|
|
|
+ int total, String label) {
|
|
|
+ float epsillon = 0.0001f;
|
|
|
+ CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+ CSQueue queue = scheduler.getQueue(queueName);
|
|
|
+ Assert.assertEquals((float)capacity/total,
|
|
|
+ queue.getQueueCapacities().getUsedCapacity(label), epsillon);
|
|
|
+ }
|
|
|
+
|
|
|
private void checkAMUsedResource(MockRM rm, String queueName, int memory,
|
|
|
String label) {
|
|
|
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
|
@@ -188,7 +203,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
rm.stop();
|
|
|
}
|
|
|
|
|
|
- @Test (timeout = 60000)
|
|
|
+ @Test
|
|
|
public void testResourceUsageWhenNodeUpdatesPartition()
|
|
|
throws Exception {
|
|
|
// set node -> label
|
|
@@ -233,16 +248,23 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
// queue-a used x=1G, ""=1G
|
|
|
checkUsedResource(rm, "a", 1024, "x");
|
|
|
checkUsedResource(rm, "a", 1024);
|
|
|
+ checkUsedCapacity(rm, "a", 1024, 8000, "x");
|
|
|
+ checkUsedCapacity(rm, "a", 1024, 8000);
|
|
|
|
|
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
|
|
|
|
|
// change h1's label to z
|
|
|
+ mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("z")));
|
|
|
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
|
|
|
toSet("z"))));
|
|
|
+ Thread.sleep(100);
|
|
|
checkUsedResource(rm, "a", 0, "x");
|
|
|
checkUsedResource(rm, "a", 1024, "z");
|
|
|
checkUsedResource(rm, "a", 1024);
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "x");
|
|
|
+ checkUsedCapacity(rm, "a", 1024, 8000, "z");
|
|
|
+ checkUsedCapacity(rm, "a", 1024, 8000);
|
|
|
checkUsedResource(rm, "root", 0, "x");
|
|
|
checkUsedResource(rm, "root", 1024, "z");
|
|
|
checkUsedResource(rm, "root", 1024);
|
|
@@ -254,12 +276,18 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
app.getAppAttemptResourceUsage().getUsed("z").getMemory());
|
|
|
|
|
|
// change h1's label to y
|
|
|
+ mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("y")));
|
|
|
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
|
|
|
toSet("y"))));
|
|
|
+ Thread.sleep(100);
|
|
|
checkUsedResource(rm, "a", 0, "x");
|
|
|
checkUsedResource(rm, "a", 1024, "y");
|
|
|
checkUsedResource(rm, "a", 0, "z");
|
|
|
checkUsedResource(rm, "a", 1024);
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "x");
|
|
|
+ checkUsedCapacity(rm, "a", 1024, 16000, "y");
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "z");
|
|
|
+ checkUsedCapacity(rm, "a", 1024, 8000);
|
|
|
checkUsedResource(rm, "root", 0, "x");
|
|
|
checkUsedResource(rm, "root", 1024, "y");
|
|
|
checkUsedResource(rm, "root", 0, "z");
|
|
@@ -278,11 +306,17 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
Set<String> emptyLabels = new HashSet<>();
|
|
|
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
|
|
|
emptyLabels);
|
|
|
+ mgr.replaceLabelsOnNode(map);
|
|
|
cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
|
|
|
+ Thread.sleep(100);
|
|
|
checkUsedResource(rm, "a", 0, "x");
|
|
|
checkUsedResource(rm, "a", 0, "y");
|
|
|
checkUsedResource(rm, "a", 0, "z");
|
|
|
checkUsedResource(rm, "a", 2048);
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "x");
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "y");
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "z");
|
|
|
+ checkUsedCapacity(rm, "a", 2048, 16000);
|
|
|
checkUsedResource(rm, "root", 0, "x");
|
|
|
checkUsedResource(rm, "root", 0, "y");
|
|
|
checkUsedResource(rm, "root", 0, "z");
|
|
@@ -314,6 +348,10 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
|
|
checkUsedResource(rm, "a", 0, "y");
|
|
|
checkUsedResource(rm, "a", 0, "z");
|
|
|
checkUsedResource(rm, "a", 0);
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "x");
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "y");
|
|
|
+ checkUsedCapacity(rm, "a", 0, 8000, "z");
|
|
|
+ checkUsedCapacity(rm, "a", 0, 16000);
|
|
|
checkUsedResource(rm, "root", 0, "x");
|
|
|
checkUsedResource(rm, "root", 0, "y");
|
|
|
checkUsedResource(rm, "root", 0, "z");
|