|
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
|
@@ -67,7 +68,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Rule;
|
|
|
import org.junit.Test;
|
|
@@ -382,7 +385,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
// we verify both that C has priority on B and D (has it has >0 guarantees)
|
|
|
// and that B and D are force to share their over capacity fairly (as they
|
|
|
// are both zero-guarantees) hence D sees some of its containers preempted
|
|
|
- verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
+ verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -407,8 +410,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
|
|
|
// XXX note: compensating for rounding error in Resources.multiplyTo
|
|
|
// which is likely triggered since we use small numbers for readability
|
|
|
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
- verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
|
|
|
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -571,7 +574,35 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
setAMContainer = false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPreemptionWithVCoreResource() {
|
|
|
+ int[][] qData = new int[][] {
|
|
|
+ // / A B
|
|
|
+ { 100, 100, 100 }, // maxcap
|
|
|
+ { 5, 1, 1 }, // apps
|
|
|
+ { 2, 0, 0 }, // subqueues
|
|
|
+ };
|
|
|
+
|
|
|
+ // Resources can be set like memory:vcores
|
|
|
+ String[][] resData = new String[][] {
|
|
|
+ // / A B
|
|
|
+ { "100:100", "50:50", "50:50" },// abs
|
|
|
+ { "10:100", "10:100", "0" }, // used
|
|
|
+ { "70:20", "70:20", "10:100" }, // pending
|
|
|
+ { "0", "0", "0" }, // reserved
|
|
|
+ { "-1", "1:10", "1:10" }, // req granularity
|
|
|
+ };
|
|
|
+
|
|
|
+ // Passing last param as TRUE to use DominantResourceCalculator
|
|
|
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
|
|
|
+ true);
|
|
|
+ policy.editSchedule();
|
|
|
+
|
|
|
+ // 5 containers will be preempted here
|
|
|
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
|
|
+ }
|
|
|
+
|
|
|
static class IsPreemptionRequestFor
|
|
|
extends ArgumentMatcher<ContainerPreemptEvent> {
|
|
|
private final ApplicationAttemptId appAttId;
|
|
@@ -598,37 +629,103 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
|
|
|
ProportionalCapacityPreemptionPolicy policy =
|
|
|
new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
|
|
|
+ Resource clusterResources =
|
|
|
+ Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
|
|
|
ParentQueue mRoot = buildMockRootQueue(rand, qData);
|
|
|
when(mCS.getRootQueue()).thenReturn(mRoot);
|
|
|
|
|
|
- Resource clusterResources =
|
|
|
- Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
|
|
|
when(mCS.getClusterResource()).thenReturn(clusterResources);
|
|
|
return policy;
|
|
|
}
|
|
|
|
|
|
+ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
|
|
+ String[][] resData) {
|
|
|
+ return buildPolicy(qData, resData, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
|
|
|
+ String[][] resData, boolean useDominantResourceCalculator) {
|
|
|
+ if (useDominantResourceCalculator) {
|
|
|
+ when(mCS.getResourceCalculator()).thenReturn(
|
|
|
+ new DominantResourceCalculator());
|
|
|
+ }
|
|
|
+ ProportionalCapacityPreemptionPolicy policy =
|
|
|
+ new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
|
|
|
+ Resource clusterResources = leafAbsCapacities(
|
|
|
+ parseResourceDetails(resData[0]), qData[2]);
|
|
|
+ when(mCS.getClusterResource()).thenReturn(clusterResources);
|
|
|
+ ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
|
|
|
+ when(mCS.getRootQueue()).thenReturn(mRoot);
|
|
|
+
|
|
|
+ return policy;
|
|
|
+ }
|
|
|
+
|
|
|
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
|
|
|
- int[] abs = queueData[0];
|
|
|
- int[] maxCap = queueData[1];
|
|
|
- int[] used = queueData[2];
|
|
|
- int[] pending = queueData[3];
|
|
|
- int[] reserved = queueData[4];
|
|
|
- int[] apps = queueData[5];
|
|
|
- int[] gran = queueData[6];
|
|
|
- int[] queues = queueData[7];
|
|
|
+ Resource[] abs = generateResourceList(queueData[0]);
|
|
|
+ Resource[] used = generateResourceList(queueData[2]);
|
|
|
+ Resource[] pending = generateResourceList(queueData[3]);
|
|
|
+ Resource[] reserved = generateResourceList(queueData[4]);
|
|
|
+ Resource[] gran = generateResourceList(queueData[6]);
|
|
|
+ int[] maxCap = queueData[1];
|
|
|
+ int[] apps = queueData[5];
|
|
|
+ int[] queues = queueData[7];
|
|
|
+
|
|
|
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
|
|
|
+ }
|
|
|
+
|
|
|
+ private ParentQueue buildMockRootQueue(Random rand2, String[][] resData,
|
|
|
+ int[][] queueData) {
|
|
|
+ Resource[] abs = parseResourceDetails(resData[0]);
|
|
|
+ Resource[] used = parseResourceDetails(resData[1]);
|
|
|
+ Resource[] pending = parseResourceDetails(resData[2]);
|
|
|
+ Resource[] reserved = parseResourceDetails(resData[3]);
|
|
|
+ Resource[] gran = parseResourceDetails(resData[4]);
|
|
|
+ int[] maxCap = queueData[0];
|
|
|
+ int[] apps = queueData[1];
|
|
|
+ int[] queues = queueData[2];
|
|
|
+
|
|
|
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
|
|
|
+ }
|
|
|
+
|
|
|
+ Resource[] parseResourceDetails(String[] resData) {
|
|
|
+ List<Resource> resourceList = new ArrayList<Resource>();
|
|
|
+ for (int i = 0; i < resData.length; i++) {
|
|
|
+ String[] resource = resData[i].split(":");
|
|
|
+ if (resource.length == 1) {
|
|
|
+ resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
|
|
|
+ } else {
|
|
|
+ resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
|
|
|
+ Integer.valueOf(resource[1])));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return resourceList.toArray(new Resource[resourceList.size()]);
|
|
|
+ }
|
|
|
|
|
|
- return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
|
|
|
+ Resource[] generateResourceList(int[] qData) {
|
|
|
+ List<Resource> resourceList = new ArrayList<Resource>();
|
|
|
+ for (int i = 0; i < qData.length; i++) {
|
|
|
+ resourceList.add(Resource.newInstance(qData[i], 0));
|
|
|
+ }
|
|
|
+ return resourceList.toArray(new Resource[resourceList.size()]);
|
|
|
}
|
|
|
|
|
|
- ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
|
|
|
- int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
|
|
|
- float tot = leafAbsCapacities(abs, queues);
|
|
|
+ ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
|
|
|
+ Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
|
|
|
+ int[] queues) {
|
|
|
+ ResourceCalculator rc = mCS.getResourceCalculator();
|
|
|
+ Resource tot = leafAbsCapacities(abs, queues);
|
|
|
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
|
|
|
ParentQueue root = mockParentQueue(null, queues[0], pqs);
|
|
|
+ ResourceUsage resUsage = new ResourceUsage();
|
|
|
+ resUsage.setUsed(used[0]);
|
|
|
when(root.getQueueName()).thenReturn("/");
|
|
|
- when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
|
|
|
- when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
|
|
|
- when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
|
|
|
+ when(root.getAbsoluteUsedCapacity()).thenReturn(
|
|
|
+ Resources.divide(rc, tot, used[0], tot));
|
|
|
+ when(root.getAbsoluteCapacity()).thenReturn(
|
|
|
+ Resources.divide(rc, tot, abs[0], tot));
|
|
|
+ when(root.getAbsoluteMaximumCapacity()).thenReturn(
|
|
|
+ maxCap[0] / (float) tot.getMemory());
|
|
|
+ when(root.getQueueResourceUsage()).thenReturn(resUsage);
|
|
|
|
|
|
for (int i = 1; i < queues.length; ++i) {
|
|
|
final CSQueue q;
|
|
@@ -636,14 +733,20 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
final String queueName = "queue" + ((char)('A' + i - 1));
|
|
|
if (queues[i] > 0) {
|
|
|
q = mockParentQueue(p, queues[i], pqs);
|
|
|
+ ResourceUsage resUsagePerQueue = new ResourceUsage();
|
|
|
+ resUsagePerQueue.setUsed(used[i]);
|
|
|
+ when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
|
|
|
} else {
|
|
|
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
|
|
|
}
|
|
|
when(q.getParent()).thenReturn(p);
|
|
|
when(q.getQueueName()).thenReturn(queueName);
|
|
|
- when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
|
|
|
- when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
|
|
|
- when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
|
|
|
+ when(q.getAbsoluteUsedCapacity()).thenReturn(
|
|
|
+ Resources.divide(rc, tot, used[i], tot));
|
|
|
+ when(q.getAbsoluteCapacity()).thenReturn(
|
|
|
+ Resources.divide(rc, tot, abs[i], tot));
|
|
|
+ when(q.getAbsoluteMaximumCapacity()).thenReturn(
|
|
|
+ maxCap[i] / (float) tot.getMemory());
|
|
|
}
|
|
|
assert 0 == pqs.size();
|
|
|
return root;
|
|
@@ -663,11 +766,17 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
return pq;
|
|
|
}
|
|
|
|
|
|
- LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
|
|
|
- int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
|
|
|
+ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
|
|
|
+ Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
|
|
|
+ Resource[] gran) {
|
|
|
LeafQueue lq = mock(LeafQueue.class);
|
|
|
- when(lq.getTotalResourcePending()).thenReturn(
|
|
|
- Resource.newInstance(pending[i], 0));
|
|
|
+ ResourceCalculator rc = mCS.getResourceCalculator();
|
|
|
+ when(lq.getTotalResourcePending()).thenReturn(pending[i]);
|
|
|
+ // need to set pending resource in resource usage as well
|
|
|
+ ResourceUsage ru = new ResourceUsage();
|
|
|
+ ru.setPending(pending[i]);
|
|
|
+ ru.setUsed(used[i]);
|
|
|
+ when(lq.getQueueResourceUsage()).thenReturn(ru);
|
|
|
// consider moving where CapacityScheduler::comparator accessible
|
|
|
NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
|
|
|
new Comparator<FiCaSchedulerApp>() {
|
|
@@ -679,9 +788,9 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
});
|
|
|
// applications are added in global L->R order in queues
|
|
|
if (apps[i] != 0) {
|
|
|
- int aUsed = used[i] / apps[i];
|
|
|
- int aPending = pending[i] / apps[i];
|
|
|
- int aReserve = reserved[i] / apps[i];
|
|
|
+ Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
|
|
|
+ Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
|
|
|
+ Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
|
|
|
for (int a = 0; a < apps[i]; ++a) {
|
|
|
qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
|
|
|
++appAlloc;
|
|
@@ -695,9 +804,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
return lq;
|
|
|
}
|
|
|
|
|
|
- FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
|
|
|
- int gran) {
|
|
|
+ FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
|
|
|
+ Resource reserved, Resource gran) {
|
|
|
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
|
|
|
+ ResourceCalculator rc = mCS.getResourceCalculator();
|
|
|
|
|
|
ApplicationId appId = ApplicationId.newInstance(TS, id);
|
|
|
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
|
|
@@ -705,22 +815,28 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
when(app.getApplicationAttemptId()).thenReturn(appAttId);
|
|
|
|
|
|
int cAlloc = 0;
|
|
|
- Resource unit = Resource.newInstance(gran, 0);
|
|
|
+ Resource unit = gran;
|
|
|
List<RMContainer> cReserved = new ArrayList<RMContainer>();
|
|
|
- for (int i = 0; i < reserved; i += gran) {
|
|
|
+ Resource resIter = Resource.newInstance(0, 0);
|
|
|
+ for (; Resources.lessThan(rc, mCS.getClusterResource(), resIter, reserved); Resources
|
|
|
+ .addTo(resIter, gran)) {
|
|
|
cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
|
|
|
++cAlloc;
|
|
|
}
|
|
|
when(app.getReservedContainers()).thenReturn(cReserved);
|
|
|
|
|
|
List<RMContainer> cLive = new ArrayList<RMContainer>();
|
|
|
- for (int i = 0; i < used; i += gran) {
|
|
|
+ Resource usedIter = Resource.newInstance(0, 0);
|
|
|
+ int i = 0;
|
|
|
+ for (; Resources.lessThan(rc, mCS.getClusterResource(), usedIter, used); Resources
|
|
|
+ .addTo(usedIter, gran)) {
|
|
|
if(setAMContainer && i == 0){
|
|
|
cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
|
|
|
}else{
|
|
|
cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
|
|
|
}
|
|
|
++cAlloc;
|
|
|
+ ++i;
|
|
|
}
|
|
|
when(app.getLiveContainers()).thenReturn(cLive);
|
|
|
return app;
|
|
@@ -752,6 +868,16 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+ static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
|
|
|
+ Resource ret = Resource.newInstance(0, 0);
|
|
|
+ for (int i = 0; i < abs.length; ++i) {
|
|
|
+ if (0 == subqueues[i]) {
|
|
|
+ Resources.addTo(ret, abs[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
void printString(CSQueue nq, String indent) {
|
|
|
if (nq instanceof ParentQueue) {
|
|
|
System.out.println(indent + nq.getQueueName()
|