|
@@ -31,13 +31,14 @@ import java.util.IdentityHashMap;
|
|
|
import java.util.LinkedHashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.concurrent.LinkedBlockingDeque;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.mapred.FairScheduler.JobInfo;
|
|
|
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
|
|
|
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
|
|
@@ -3092,8 +3093,6 @@ public class TestFairScheduler extends TestCase {
|
|
|
assertNull(scheduler.assignTasks(tracker("tt2")));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
class TestJobSchedulableSort extends JobSchedulable {
|
|
|
|
|
|
private final double testFairShare;
|
|
@@ -3154,10 +3153,6 @@ public class TestFairScheduler extends TestCase {
|
|
|
|
|
|
public void testFairShareComparator()
|
|
|
{
|
|
|
- List<TestJobSchedulableSort> jobs = new ArrayList<TestJobSchedulableSort>();
|
|
|
- final int iterations = 100;
|
|
|
- int jobCount = 100;
|
|
|
-
|
|
|
Comparator<Schedulable> comparator = new
|
|
|
SchedulingAlgorithms.FairShareComparator();
|
|
|
|
|
@@ -3186,8 +3181,54 @@ public class TestFairScheduler extends TestCase {
|
|
|
// s3 has a higher running task to weight ratio (infinity)
|
|
|
assertTrue(comparator.compare(s1, s3) < 0);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test verifies that sorting of JobSchedulables with a custom
|
|
|
+ * weightadjuster that returns different values when called does not break
|
|
|
+ * the sorting. If the weight changes during the sort,
|
|
|
+ * the sort would fail in jdk7
|
|
|
+ */
|
|
|
+ public void testJobSchedulableSortingWithCustomWeightAdjuster() throws
|
|
|
+ IOException, InterruptedException {
|
|
|
+ final int iterations = 100, jobCount = 100, racks = 100, nodesPerRack = 2;
|
|
|
+ final int totalTaskTrackers = nodesPerRack * racks;
|
|
|
+
|
|
|
+ setUpCluster(racks, nodesPerRack, true);
|
|
|
+
|
|
|
+ scheduler.weightAdjuster = new WeightAdjuster() {
|
|
|
+ Random r = new Random();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public double adjustWeight(JobInProgress job, TaskType taskType, double
|
|
|
+ curWeight) {
|
|
|
+ return curWeight * r.nextInt(100);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ for (int j = 0; j < jobCount; j++) {
|
|
|
+ advanceTime(100);
|
|
|
+ submitJob(JobStatus.RUNNING, 2 * iterations, iterations);
|
|
|
+ scheduler.updateMetrics();
|
|
|
+ }
|
|
|
+
|
|
|
+ final LinkedBlockingDeque<Task> tasks =
|
|
|
+ new LinkedBlockingDeque<Task>();
|
|
|
+
|
|
|
+ final String taskTrackerNamePrefix = "tt";
|
|
|
+
|
|
|
+ Random r1 = new Random();
|
|
|
+ for (int i = 0; i < iterations; i++) {
|
|
|
+
|
|
|
+ int randomTaskTrackerId = r1.nextInt(totalTaskTrackers) + 1;
|
|
|
+ String taskTrackerName = taskTrackerNamePrefix + randomTaskTrackerId;
|
|
|
+ List<Task> assignedTasks = scheduler.assignTasks(tracker
|
|
|
+ (taskTrackerName));
|
|
|
+ if (assignedTasks != null) {
|
|
|
+ tasks.addAll(assignedTasks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Ask scheduler to update metrics and then verify that they're all
|
|
|
* correctly published to the metrics context
|