|
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
@@ -1355,7 +1356,7 @@ public class TestLeafQueue {
|
|
// TODO, fix headroom in the future patch
|
|
// TODO, fix headroom in the future patch
|
|
assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
|
|
assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
|
|
// User limit = 2G, 2 in use
|
|
// User limit = 2G, 2 in use
|
|
- assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
|
|
|
|
|
|
+ assertEquals(1*GB, app_1.getHeadroom().getMemorySize());
|
|
// the application is not yet active
|
|
// the application is not yet active
|
|
|
|
|
|
// Again one to user_0 since he hasn't exceeded user limit yet
|
|
// Again one to user_0 since he hasn't exceeded user limit yet
|
|
@@ -1366,8 +1367,8 @@ public class TestLeafQueue {
|
|
assertEquals(3*GB, a.getUsedResources().getMemorySize());
|
|
assertEquals(3*GB, a.getUsedResources().getMemorySize());
|
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
|
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
|
- assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
|
|
|
|
- assertEquals(1*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
|
|
|
|
|
|
+ assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
|
|
|
|
+ assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
|
|
|
|
|
|
// Submit requests for app_1 and set max-cap
|
|
// Submit requests for app_1 and set max-cap
|
|
a.setMaxCapacity(.1f);
|
|
a.setMaxCapacity(.1f);
|
|
@@ -3874,7 +3875,7 @@ public class TestLeafQueue {
|
|
|
|
|
|
// Queue "test" consumes 100% of the cluster, so its capacity and absolute
|
|
// Queue "test" consumes 100% of the cluster, so its capacity and absolute
|
|
// capacity are both 1.0f.
|
|
// capacity are both 1.0f.
|
|
- Queue queue = createQueue("test", null, 1.0f, 1.0f);
|
|
|
|
|
|
+ Queue queue = createQueue("test", null, 1.0f, 1.0f, res);
|
|
final String user = "user1";
|
|
final String user = "user1";
|
|
FiCaSchedulerApp app =
|
|
FiCaSchedulerApp app =
|
|
new FiCaSchedulerApp(appAttId, user, queue,
|
|
new FiCaSchedulerApp(appAttId, user, queue,
|
|
@@ -3891,7 +3892,8 @@ public class TestLeafQueue {
|
|
|
|
|
|
// Queue "test2" is a child of root and its capacity is 50% of root. As a
|
|
// Queue "test2" is a child of root and its capacity is 50% of root. As a
|
|
// child of root, its absolute capaicty is also 50%.
|
|
// child of root, its absolute capaicty is also 50%.
|
|
- queue = createQueue("test2", null, 0.5f, 0.5f);
|
|
|
|
|
|
+ queue = createQueue("test2", null, 0.5f, 0.5f,
|
|
|
|
+ Resources.divideAndCeil(dominantResourceCalculator, res, 2));
|
|
app = new FiCaSchedulerApp(appAttId, user, queue,
|
|
app = new FiCaSchedulerApp(appAttId, user, queue,
|
|
queue.getAbstractUsersManager(), rmContext);
|
|
queue.getAbstractUsersManager(), rmContext);
|
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
|
@@ -3903,7 +3905,8 @@ public class TestLeafQueue {
|
|
|
|
|
|
// Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster.
|
|
// Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster.
|
|
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
|
|
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
|
|
- AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
|
|
|
|
|
|
+ AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f,
|
|
|
|
+ Resources.divideAndCeil(dominantResourceCalculator, res, 4));
|
|
app = new FiCaSchedulerApp(appAttId, user, qChild,
|
|
app = new FiCaSchedulerApp(appAttId, user, qChild,
|
|
qChild.getAbstractUsersManager(), rmContext);
|
|
qChild.getAbstractUsersManager(), rmContext);
|
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
|
@@ -3922,7 +3925,7 @@ public class TestLeafQueue {
|
|
}
|
|
}
|
|
|
|
|
|
private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
|
|
private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
|
|
- float absCap) {
|
|
|
|
|
|
+ float absCap, Resource res) {
|
|
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
|
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
|
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
|
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
|
null, QueueState.RUNNING, null, "", null, false);
|
|
null, QueueState.RUNNING, null, "", null, false);
|
|
@@ -3934,6 +3937,13 @@ public class TestLeafQueue {
|
|
QueueCapacities qCaps = mock(QueueCapacities.class);
|
|
QueueCapacities qCaps = mock(QueueCapacities.class);
|
|
when(qCaps.getAbsoluteCapacity(any())).thenReturn(absCap);
|
|
when(qCaps.getAbsoluteCapacity(any())).thenReturn(absCap);
|
|
when(queue.getQueueCapacities()).thenReturn(qCaps);
|
|
when(queue.getQueueCapacities()).thenReturn(qCaps);
|
|
|
|
+ QueueResourceQuotas qQuotas = mock(QueueResourceQuotas.class);
|
|
|
|
+ when(qQuotas.getConfiguredMinResource(any())).thenReturn(res);
|
|
|
|
+ when(qQuotas.getConfiguredMaxResource(any())).thenReturn(res);
|
|
|
|
+ when(qQuotas.getEffectiveMinResource(any())).thenReturn(res);
|
|
|
|
+ when(qQuotas.getEffectiveMaxResource(any())).thenReturn(res);
|
|
|
|
+ when(queue.getQueueResourceQuotas()).thenReturn(qQuotas);
|
|
|
|
+ when(queue.getEffectiveCapacity(any())).thenReturn(res);
|
|
return queue;
|
|
return queue;
|
|
}
|
|
}
|
|
|
|
|