|
@@ -21,13 +21,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.Stack;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
|
|
|
public class TestSchedulingPolicy {
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class);
|
|
|
|
|
|
@Test(timeout = 1000)
|
|
|
public void testParseSchedulingPolicy()
|
|
@@ -125,4 +137,220 @@ public class TestSchedulingPolicy {
|
|
|
assertFalse(ERR,
|
|
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test whether {@link FairSharePolicy.FairShareComparator} is transitive.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFairShareComparatorTransitivity() {
|
|
|
+ FairSharePolicy policy = new FairSharePolicy();
|
|
|
+ Comparator<Schedulable> fairShareComparator = policy.getComparator();
|
|
|
+ FairShareComparatorTester tester =
|
|
|
+ new FairShareComparatorTester(fairShareComparator);
|
|
|
+ tester.testTransitivity();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class is responsible for testing the transitivity of
|
|
|
+ * {@link FairSharePolicy.FairShareComparator}. We will generate
|
|
|
+ * a lot of triples(each triple contains three {@link Schedulable}),
|
|
|
+ * and then we verify transitivity by using each triple.
|
|
|
+ *
|
|
|
+ * <p>How to generate:</p>
|
|
|
+ * For each field in {@link Schedulable} we all have a data collection. We
|
|
|
+ * combine these data to construct a {@link Schedulable}, and generate all
|
|
|
+ * cases of triple by DFS(depth first search algorithm). We can get 100% code
|
|
|
+ * coverage by DFS.
|
|
|
+ */
|
|
|
+ private class FairShareComparatorTester {
|
|
|
+ private Comparator<Schedulable> fairShareComparator;
|
|
|
+
|
|
|
+ // Use the following data collections to generate three Schedulable.
|
|
|
+ private Resource minShare = Resource.newInstance(0, 1);
|
|
|
+
|
|
|
+ private Resource demand = Resource.newInstance(4, 1);
|
|
|
+
|
|
|
+ private String[] nameCollection = {"A", "B", "C"};
|
|
|
+
|
|
|
+ private long[] startTimeColloection = {1L, 2L, 3L};
|
|
|
+
|
|
|
+ private Resource[] usageCollection = {
|
|
|
+ Resource.newInstance(0, 1), Resource.newInstance(2, 1),
|
|
|
+ Resource.newInstance(4, 1) };
|
|
|
+
|
|
|
+ private ResourceWeights[] weightsCollection = {
|
|
|
+ new ResourceWeights(0.0f), new ResourceWeights(1.0f),
|
|
|
+ new ResourceWeights(2.0f) };
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ public FairShareComparatorTester(
|
|
|
+ Comparator<Schedulable> fairShareComparator) {
|
|
|
+ this.fairShareComparator = fairShareComparator;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testTransitivity() {
|
|
|
+ generateAndTest(new Stack<Schedulable>());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void generateAndTest(Stack<Schedulable> genSchedulable) {
|
|
|
+ if (genSchedulable.size() == 3) {
|
|
|
+ // We get three Schedulable objects, let's use them to check the
|
|
|
+ // comparator.
|
|
|
+ Assert.assertTrue("The comparator must ensure transitivity",
|
|
|
+ checkTransitivity(genSchedulable));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < nameCollection.length; i++) {
|
|
|
+ for (int j = 0; j < startTimeColloection.length; j++) {
|
|
|
+ for (int k = 0; k < usageCollection.length; k++) {
|
|
|
+ for (int t = 0; t < weightsCollection.length; t++) {
|
|
|
+ genSchedulable.push(createSchedulable(i, j, k, t));
|
|
|
+ generateAndTest(genSchedulable);
|
|
|
+ genSchedulable.pop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private Schedulable createSchedulable(
|
|
|
+ int nameIdx, int startTimeIdx, int usageIdx, int weightsIdx) {
|
|
|
+ return new MockSchedulable(minShare, demand, nameCollection[nameIdx],
|
|
|
+ startTimeColloection[startTimeIdx], usageCollection[usageIdx],
|
|
|
+ weightsCollection[weightsIdx]);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean checkTransitivity(
|
|
|
+ Collection<Schedulable> schedulableObjs) {
|
|
|
+
|
|
|
+ Assert.assertEquals(3, schedulableObjs.size());
|
|
|
+ Schedulable[] copy = schedulableObjs.toArray(new Schedulable[3]);
|
|
|
+
|
|
|
+ if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
|
|
|
+ swap(copy, 0, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (fairShareComparator.compare(copy[1], copy[2]) > 0) {
|
|
|
+ swap(copy, 1, 2);
|
|
|
+
|
|
|
+ if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
|
|
|
+ swap(copy, 0, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Here, we have got the following condition:
|
|
|
+ // copy[0] <= copy[1] && copy[1] <= copy[2]
|
|
|
+ //
|
|
|
+ // So, just check copy[0] <= copy[2]
|
|
|
+ if (fairShareComparator.compare(copy[0], copy[2]) <= 0) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ LOG.fatal("Failure data: " + copy[0] + " " + copy[1] + " " + copy[2]);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void swap(Schedulable[] array, int x, int y) {
|
|
|
+ Schedulable tmp = array[x];
|
|
|
+ array[x] = array[y];
|
|
|
+ array[y] = tmp;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private class MockSchedulable implements Schedulable {
|
|
|
+ private Resource minShare;
|
|
|
+ private Resource demand;
|
|
|
+ private String name;
|
|
|
+ private long startTime;
|
|
|
+ private Resource usage;
|
|
|
+ private ResourceWeights weights;
|
|
|
+
|
|
|
+ public MockSchedulable(Resource minShare, Resource demand, String name,
|
|
|
+ long startTime, Resource usage, ResourceWeights weights) {
|
|
|
+ this.minShare = minShare;
|
|
|
+ this.demand = demand;
|
|
|
+ this.name = name;
|
|
|
+ this.startTime = startTime;
|
|
|
+ this.usage = usage;
|
|
|
+ this.weights = weights;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getName() {
|
|
|
+ return name;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getDemand() {
|
|
|
+ return demand;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getResourceUsage() {
|
|
|
+ return usage;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getMinShare() {
|
|
|
+ return minShare;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ResourceWeights getWeights() {
|
|
|
+ return weights;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long getStartTime() {
|
|
|
+ return startTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getMaxShare() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Priority getPriority() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void updateDemand() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource assignContainer(FSSchedulerNode node) {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RMContainer preemptContainer() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Resource getFairShare() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setFairShare(Resource fairShare) {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "{name:" + name + ", start:" + startTime + ", usage:" + usage +
|
|
|
+ ", weights:" + weights + ", demand:" + demand +
|
|
|
+ ", minShare:" + minShare + "}";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|