|
@@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
|
|
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
|
|
-import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
import static org.junit.jupiter.api.Assertions.fail;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -35,8 +34,10 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
|
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
@@ -625,7 +626,6 @@ public class TestCapacitySchedulerMultiNodes {
|
|
|
// mock node tracker with 2000 nodes
|
|
|
// to simulate the scenario where there are many nodes in the cluster
|
|
|
List<FiCaSchedulerNode> mockNodes = new ArrayList<>();
|
|
|
- long ss = System.currentTimeMillis();
|
|
|
for (int i = 0; i < 2000; i++) {
|
|
|
FiCaSchedulerNode node =
|
|
|
TestUtils.getMockNode("host" + i + ":1234", "", 0, 10 * GB, 10);
|
|
@@ -660,26 +660,34 @@ public class TestCapacitySchedulerMultiNodes {
|
|
|
// create an unsatisfied request which will reach the headroom
|
|
|
am1.allocate("*", 2 * GB, 10, new ArrayList<>());
|
|
|
|
|
|
- // verify that when headroom is reached for an unsatisfied request,
|
|
|
- // scheduler should only check the request once before checking all nodes.
|
|
|
- CandidateNodeSet<FiCaSchedulerNode> candidates =
|
|
|
- new SimpleCandidateNodeSet<>(Collections.emptyMap(), "");
|
|
|
- int numSchedulingCycles = 10;
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
- for (int i = 0; i < numSchedulingCycles; i++) {
|
|
|
- spyCs.allocateContainersToNode(candidates, false);
|
|
|
+ List<Long> elapsedMsLst = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ // verify that when headroom is reached for an unsatisfied request,
|
|
|
+ // scheduler should only check the request once before checking all nodes.
|
|
|
+ CandidateNodeSet<FiCaSchedulerNode> candidates =
|
|
|
+ new SimpleCandidateNodeSet<>(Collections.emptyMap(), "");
|
|
|
+ int numSchedulingCycles = 10;
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ for (int i = 0; i < numSchedulingCycles; i++) {
|
|
|
+ spyCs.allocateContainersToNode(candidates, false);
|
|
|
+ }
|
|
|
+ long avgElapsedMs =
|
|
|
+ (System.currentTimeMillis() - startTime) / numSchedulingCycles;
|
|
|
+ LOG.info("Average elapsed time for a scheduling cycle: {} ms",
|
|
|
+ avgElapsedMs);
|
|
|
+
|
|
|
+ elapsedMsLst.add(avgElapsedMs);
|
|
|
+ // verify that the scheduling cycle is less than 10ms,
|
|
|
+ // ideally the latency should be less than 2ms.
|
|
|
+ return avgElapsedMs < 10;
|
|
|
+ }, 500, 3000);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ fail("Scheduling cycle expected to be less than 10ms, " +
|
|
|
+ "but took too long, elapsedMs:" + elapsedMsLst);
|
|
|
+ } finally {
|
|
|
+ rm.stop();
|
|
|
}
|
|
|
- long avgElapsedMs =
|
|
|
- (System.currentTimeMillis() - startTime) / numSchedulingCycles;
|
|
|
- LOG.info("Average elapsed time for a scheduling cycle: {} ms",
|
|
|
- avgElapsedMs);
|
|
|
- // verify that the scheduling cycle is less than 5ms,
|
|
|
- // ideally the latency should be less than 2ms.
|
|
|
- assertTrue(avgElapsedMs < 5,
|
|
|
- String.format("%d ms elapsed in average for a scheduling cycle, " +
|
|
|
- "expected to be less than 5ms.", avgElapsedMs));
|
|
|
-
|
|
|
- rm.stop();
|
|
|
}
|
|
|
|
|
|
private static void moveReservation(CapacityScheduler cs,
|