|
@@ -1,19 +1,19 @@
|
|
|
/*******************************************************************************
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
- * distributed with this work for additional information
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
- * limitations under the License.
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ * <p/>
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ * <p/>
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
*******************************************************************************/
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
|
|
|
|
@@ -73,11 +73,11 @@ public class ReservationSystemTestUtil {
|
|
|
public static ReservationSchedulerConfiguration createConf(
|
|
|
String reservationQ, long timeWindow, float instConstraint,
|
|
|
float avgConstraint) {
|
|
|
- ReservationSchedulerConfiguration conf = mock
|
|
|
- (ReservationSchedulerConfiguration.class);
|
|
|
+ ReservationSchedulerConfiguration conf =
|
|
|
+ mock(ReservationSchedulerConfiguration.class);
|
|
|
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
|
|
|
- when(conf.getInstantaneousMaxCapacity(reservationQ)).thenReturn
|
|
|
- (instConstraint);
|
|
|
+ when(conf.getInstantaneousMaxCapacity(reservationQ))
|
|
|
+ .thenReturn(instConstraint);
|
|
|
when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint);
|
|
|
return conf;
|
|
|
}
|
|
@@ -91,21 +91,8 @@ public class ReservationSystemTestUtil {
|
|
|
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
|
|
|
Assert.assertTrue(
|
|
|
plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
|
|
|
- Assert.assertTrue(
|
|
|
- plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
|
|
|
- }
|
|
|
-
|
|
|
- public static void validateNewReservationQueue(
|
|
|
- AbstractReservationSystem reservationSystem, String newQ) {
|
|
|
- Plan newPlan = reservationSystem.getPlan(newQ);
|
|
|
- Assert.assertNotNull(newPlan);
|
|
|
- Assert.assertTrue(newPlan instanceof InMemoryPlan);
|
|
|
- Assert.assertEquals(newQ, newPlan.getQueueName());
|
|
|
- Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
|
|
|
Assert
|
|
|
- .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
|
|
|
- Assert
|
|
|
- .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
|
|
|
+ .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
|
|
|
}
|
|
|
|
|
|
public static void setupFSAllocationFile(String allocationFile)
|
|
@@ -129,7 +116,8 @@ public class ReservationSystemTestUtil {
|
|
|
out.println("<reservation></reservation>");
|
|
|
out.println("<weight>8</weight>");
|
|
|
out.println("</queue>");
|
|
|
- out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
|
|
+ out.println(
|
|
|
+ "<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
|
|
out.println("</allocations>");
|
|
|
out.close();
|
|
|
}
|
|
@@ -153,21 +141,20 @@ public class ReservationSystemTestUtil {
|
|
|
out.println("</queue>");
|
|
|
out.println("<queue name=\"dedicated\">");
|
|
|
out.println("<reservation></reservation>");
|
|
|
- out.println("<weight>80</weight>");
|
|
|
+ out.println("<weight>10</weight>");
|
|
|
out.println("</queue>");
|
|
|
out.println("<queue name=\"reservation\">");
|
|
|
out.println("<reservation></reservation>");
|
|
|
- out.println("<weight>10</weight>");
|
|
|
+ out.println("<weight>80</weight>");
|
|
|
out.println("</queue>");
|
|
|
- out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
|
|
+ out.println(
|
|
|
+ "<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
|
|
out.println("</allocations>");
|
|
|
out.close();
|
|
|
}
|
|
|
|
|
|
- public static FairScheduler setupFairScheduler(
|
|
|
- ReservationSystemTestUtil testUtil,
|
|
|
- RMContext rmContext, Configuration conf, int numContainers) throws
|
|
|
- IOException {
|
|
|
+ public static FairScheduler setupFairScheduler(RMContext rmContext,
|
|
|
+ Configuration conf, int numContainers) throws IOException {
|
|
|
FairScheduler scheduler = new FairScheduler();
|
|
|
scheduler.setRMContext(rmContext);
|
|
|
|
|
@@ -178,7 +165,8 @@ public class ReservationSystemTestUtil {
|
|
|
scheduler.reinitialize(conf, rmContext);
|
|
|
|
|
|
|
|
|
- Resource resource = testUtil.calculateClusterResource(numContainers);
|
|
|
+ Resource resource =
|
|
|
+ ReservationSystemTestUtil.calculateClusterResource(numContainers);
|
|
|
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
|
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
scheduler.handle(nodeEvent1);
|
|
@@ -224,8 +212,9 @@ public class ReservationSystemTestUtil {
|
|
|
return cs;
|
|
|
}
|
|
|
|
|
|
- public static void initializeRMContext(int numContainers,
|
|
|
- AbstractYarnScheduler scheduler, RMContext mockRMContext) {
|
|
|
+ @SuppressWarnings("rawtypes") public static void initializeRMContext(
|
|
|
+ int numContainers, AbstractYarnScheduler scheduler,
|
|
|
+ RMContext mockRMContext) {
|
|
|
|
|
|
when(mockRMContext.getScheduler()).thenReturn(scheduler);
|
|
|
Resource r = calculateClusterResource(numContainers);
|
|
@@ -233,18 +222,17 @@ public class ReservationSystemTestUtil {
|
|
|
}
|
|
|
|
|
|
public static RMContext createRMContext(Configuration conf) {
|
|
|
- RMContext mockRmContext =
|
|
|
- Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
|
|
|
+ RMContext mockRmContext = Mockito.spy(
|
|
|
+ new RMContextImpl(null, null, null, null, null, null,
|
|
|
new RMContainerTokenSecretManager(conf),
|
|
|
new NMTokenSecretManagerInRM(conf),
|
|
|
new ClientToAMTokenSecretManagerInRM(), null));
|
|
|
|
|
|
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
|
|
|
- when(
|
|
|
- nlm.getQueueResource(any(String.class), anySetOf(String.class),
|
|
|
+ when(nlm.getQueueResource(any(String.class), anySetOf(String.class),
|
|
|
any(Resource.class))).thenAnswer(new Answer<Resource>() {
|
|
|
- @Override
|
|
|
- public Resource answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ @Override public Resource answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
Object[] args = invocation.getArguments();
|
|
|
return (Resource) args[2];
|
|
|
}
|
|
@@ -252,8 +240,8 @@ public class ReservationSystemTestUtil {
|
|
|
|
|
|
when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
|
|
|
.thenAnswer(new Answer<Resource>() {
|
|
|
- @Override
|
|
|
- public Resource answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ @Override public Resource answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
Object[] args = invocation.getArguments();
|
|
|
return (Resource) args[1];
|
|
|
}
|
|
@@ -263,21 +251,22 @@ public class ReservationSystemTestUtil {
|
|
|
return mockRmContext;
|
|
|
}
|
|
|
|
|
|
- public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
|
|
+ public static void setupQueueConfiguration(
|
|
|
+ CapacitySchedulerConfiguration conf) {
|
|
|
// Define default queue
|
|
|
final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
|
|
|
conf.setCapacity(defQ, 10);
|
|
|
|
|
|
// Define top-level queues
|
|
|
- conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
|
|
|
- "default", "a", reservationQ });
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[] { "default", "a", reservationQ });
|
|
|
|
|
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
conf.setCapacity(A, 10);
|
|
|
|
|
|
final String dedicated =
|
|
|
- CapacitySchedulerConfiguration.ROOT
|
|
|
- + CapacitySchedulerConfiguration.DOT + reservationQ;
|
|
|
+ CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT
|
|
|
+ + reservationQ;
|
|
|
conf.setCapacity(dedicated, 80);
|
|
|
// Set as reservation queue
|
|
|
conf.setReservable(dedicated, true);
|
|
@@ -290,44 +279,43 @@ public class ReservationSystemTestUtil {
|
|
|
conf.setCapacity(A2, 70);
|
|
|
}
|
|
|
|
|
|
- public String getFullReservationQueueName() {
|
|
|
+ public static String getFullReservationQueueName() {
|
|
|
return CapacitySchedulerConfiguration.ROOT
|
|
|
+ CapacitySchedulerConfiguration.DOT + reservationQ;
|
|
|
}
|
|
|
|
|
|
- public String getreservationQueueName() {
|
|
|
+ public static String getReservationQueueName() {
|
|
|
return reservationQ;
|
|
|
}
|
|
|
|
|
|
- public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
|
|
|
- String newQ) {
|
|
|
+ public static void updateQueueConfiguration(
|
|
|
+ CapacitySchedulerConfiguration conf, String newQ) {
|
|
|
// Define default queue
|
|
|
- final String prefix =
|
|
|
- CapacitySchedulerConfiguration.ROOT
|
|
|
- + CapacitySchedulerConfiguration.DOT;
|
|
|
+ final String prefix = CapacitySchedulerConfiguration.ROOT
|
|
|
+ + CapacitySchedulerConfiguration.DOT;
|
|
|
final String defQ = prefix + "default";
|
|
|
conf.setCapacity(defQ, 5);
|
|
|
|
|
|
// Define top-level queues
|
|
|
- conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
|
|
|
- "default", "a", reservationQ, newQ });
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[] { "default", "a", reservationQ, newQ });
|
|
|
|
|
|
final String A = prefix + "a";
|
|
|
conf.setCapacity(A, 5);
|
|
|
|
|
|
final String dedicated = prefix + reservationQ;
|
|
|
- conf.setCapacity(dedicated, 80);
|
|
|
+ conf.setCapacity(dedicated, 10);
|
|
|
// Set as reservation queue
|
|
|
conf.setReservable(dedicated, true);
|
|
|
|
|
|
- conf.setCapacity(prefix + newQ, 10);
|
|
|
+ conf.setCapacity(prefix + newQ, 80);
|
|
|
// Set as reservation queue
|
|
|
conf.setReservable(prefix + newQ, true);
|
|
|
|
|
|
// Define 2nd-level queues
|
|
|
final String A1 = A + ".a1";
|
|
|
final String A2 = A + ".a2";
|
|
|
- conf.setQueues(A, new String[]{"a1", "a2"});
|
|
|
+ conf.setQueues(A, new String[] { "a1", "a2" });
|
|
|
conf.setCapacity(A1, 30);
|
|
|
conf.setCapacity(A2, 70);
|
|
|
}
|
|
@@ -349,9 +337,8 @@ public class ReservationSystemTestUtil {
|
|
|
int gang = 1 + rand.nextInt(9);
|
|
|
int par = (rand.nextInt(1000) + 1) * gang;
|
|
|
long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
|
|
|
- ReservationRequest r =
|
|
|
- ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
|
|
|
- gang, dur);
|
|
|
+ ReservationRequest r = ReservationRequest
|
|
|
+ .newInstance(Resource.newInstance(1024, 1), par, gang, dur);
|
|
|
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
|
|
reqs.setReservationResources(Collections.singletonList(r));
|
|
|
rand.nextInt(3);
|
|
@@ -364,53 +351,19 @@ public class ReservationSystemTestUtil {
|
|
|
|
|
|
}
|
|
|
|
|
|
- public static ReservationDefinition generateBigRR(Random rand, long i) {
|
|
|
- rand.setSeed(i);
|
|
|
- long now = System.currentTimeMillis();
|
|
|
-
|
|
|
- // start time at random in the next 2 hours
|
|
|
- long arrival = rand.nextInt(2 * 3600 * 1000);
|
|
|
- // deadline at random in the next day
|
|
|
- long deadline = rand.nextInt(24 * 3600 * 1000);
|
|
|
-
|
|
|
- // create a request with a single atomic ask
|
|
|
- ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
|
|
- rr.setArrival(now + arrival);
|
|
|
- rr.setDeadline(now + deadline);
|
|
|
-
|
|
|
- int gang = 1;
|
|
|
- int par = 100000; // 100k tasks
|
|
|
- long dur = rand.nextInt(60 * 1000); // 1min tasks
|
|
|
- ReservationRequest r =
|
|
|
- ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
|
|
|
- gang, dur);
|
|
|
- ReservationRequests reqs = new ReservationRequestsPBImpl();
|
|
|
- reqs.setReservationResources(Collections.singletonList(r));
|
|
|
- rand.nextInt(3);
|
|
|
- ReservationRequestInterpreter[] type =
|
|
|
- ReservationRequestInterpreter.values();
|
|
|
- reqs.setInterpreter(type[rand.nextInt(type.length)]);
|
|
|
- rr.setReservationRequests(reqs);
|
|
|
-
|
|
|
- return rr;
|
|
|
- }
|
|
|
-
|
|
|
public static Map<ReservationInterval, Resource> generateAllocation(
|
|
|
long startTime, long step, int[] alloc) {
|
|
|
- Map<ReservationInterval, Resource> req =
|
|
|
- new TreeMap<ReservationInterval, Resource>();
|
|
|
+ Map<ReservationInterval, Resource> req = new TreeMap<>();
|
|
|
for (int i = 0; i < alloc.length; i++) {
|
|
|
- req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
|
|
|
- * step), ReservationSystemUtil.toResource(ReservationRequest
|
|
|
- .newInstance(
|
|
|
- Resource.newInstance(1024, 1), alloc[i])));
|
|
|
+ req.put(new ReservationInterval(startTime + i * step,
|
|
|
+ startTime + (i + 1) * step), ReservationSystemUtil.toResource(
|
|
|
+ ReservationRequest
|
|
|
+ .newInstance(Resource.newInstance(1024, 1), alloc[i])));
|
|
|
}
|
|
|
return req;
|
|
|
}
|
|
|
|
|
|
public static Resource calculateClusterResource(int numContainers) {
|
|
|
- Resource clusterResource = Resource.newInstance(numContainers * 1024,
|
|
|
- numContainers);
|
|
|
- return clusterResource;
|
|
|
+ return Resource.newInstance(numContainers * 1024, numContainers);
|
|
|
}
|
|
|
}
|