|
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
-import org.apache.hadoop.yarn.api.records.QueueACL;
|
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
@@ -86,7 +85,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
@@ -121,6 +119,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
private FairScheduler scheduler;
|
|
private FairScheduler scheduler;
|
|
private ResourceManager resourceManager;
|
|
private ResourceManager resourceManager;
|
|
|
|
+ private Configuration conf;
|
|
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
|
|
|
private int APP_ID = 1; // Incrementing counter for schedling apps
|
|
private int APP_ID = 1; // Incrementing counter for schedling apps
|
|
@@ -130,7 +129,7 @@ public class TestFairScheduler {
|
|
@Before
|
|
@Before
|
|
public void setUp() throws IOException {
|
|
public void setUp() throws IOException {
|
|
scheduler = new FairScheduler();
|
|
scheduler = new FairScheduler();
|
|
- Configuration conf = createConfiguration();
|
|
|
|
|
|
+ conf = createConfiguration();
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
|
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
1024);
|
|
1024);
|
|
@@ -145,7 +144,6 @@ public class TestFairScheduler {
|
|
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
|
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
|
resourceManager.getRMContext().getStateStore().start();
|
|
resourceManager.getRMContext().getStateStore().start();
|
|
|
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
// to initialize the master key
|
|
// to initialize the master key
|
|
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
|
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
|
}
|
|
}
|
|
@@ -291,7 +289,6 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test(timeout=2000)
|
|
@Test(timeout=2000)
|
|
public void testLoadConfigurationOnInitialize() throws IOException {
|
|
public void testLoadConfigurationOnInitialize() throws IOException {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
|
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
|
conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 3);
|
|
conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 3);
|
|
conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
|
|
conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
|
|
@@ -362,6 +359,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testAggregateCapacityTracking() throws Exception {
|
|
public void testAggregateCapacityTracking() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
@@ -384,7 +383,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testSimpleFairShareCalculation() {
|
|
|
|
|
|
+ public void testSimpleFairShareCalculation() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add one big node (only care about aggregate capacity)
|
|
// Add one big node (only care about aggregate capacity)
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024), 1,
|
|
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024), 1,
|
|
@@ -409,7 +410,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testSimpleHierarchicalFairShareCalculation() {
|
|
|
|
|
|
+ public void testSimpleHierarchicalFairShareCalculation() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add one big node (only care about aggregate capacity)
|
|
// Add one big node (only care about aggregate capacity)
|
|
int capacity = 10 * 24;
|
|
int capacity = 10 * 24;
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
@@ -440,7 +443,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testHierarchicalQueuesSimilarParents() {
|
|
|
|
|
|
+ public void testHierarchicalQueuesSimilarParents() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child", true);
|
|
FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child", true);
|
|
Assert.assertEquals(2, queueManager.getLeafQueues().size());
|
|
Assert.assertEquals(2, queueManager.getLeafQueues().size());
|
|
@@ -462,8 +467,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testSchedulerRootQueueMetrics() throws InterruptedException {
|
|
|
|
-
|
|
|
|
|
|
+ public void testSchedulerRootQueueMetrics() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
@@ -500,7 +506,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
- public void testSimpleContainerAllocation() {
|
|
|
|
|
|
+ public void testSimpleContainerAllocation() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
@@ -546,7 +554,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
- public void testSimpleContainerReservation() throws InterruptedException {
|
|
|
|
|
|
+ public void testSimpleContainerReservation() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
@@ -598,7 +608,6 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testUserAsDefaultQueue() throws Exception {
|
|
public void testUserAsDefaultQueue() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
RMContext rmContext = resourceManager.getRMContext();
|
|
RMContext rmContext = resourceManager.getRMContext();
|
|
@@ -617,14 +626,24 @@ public class TestFairScheduler {
|
|
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
|
|
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
|
|
.getRunnableAppSchedulables().size());
|
|
.getRunnableAppSchedulables().size());
|
|
assertEquals("root.user1", rmApp.getQueue());
|
|
assertEquals("root.user1", rmApp.getQueue());
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testNotUserAsDefaultQueue() throws Exception {
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
- scheduler.getQueueManager().initialize();
|
|
|
|
|
|
+ RMContext rmContext = resourceManager.getRMContext();
|
|
|
|
+ Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
|
|
|
|
+ ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
|
|
|
|
+ RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
|
|
|
|
+ null, null, null, ApplicationSubmissionContext.newInstance(null, null,
|
|
|
|
+ null, null, null, false, false, 0, null, null), null, null, 0, null);
|
|
|
|
+ appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
|
|
|
+
|
|
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
|
|
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
|
|
- createAppAttemptId(2, 1), "default", "user2");
|
|
|
|
|
|
+ appAttemptId, "default", "user2");
|
|
scheduler.handle(appAddedEvent2);
|
|
scheduler.handle(appAddedEvent2);
|
|
- assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
|
|
|
|
|
+ assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
|
|
.getRunnableAppSchedulables().size());
|
|
.getRunnableAppSchedulables().size());
|
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
|
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
|
|
.getRunnableAppSchedulables().size());
|
|
.getRunnableAppSchedulables().size());
|
|
@@ -634,7 +653,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testEmptyQueueName() throws Exception {
|
|
public void testEmptyQueueName() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// only default queue
|
|
// only default queue
|
|
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
|
|
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
|
|
@@ -653,7 +672,6 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testAssignToQueue() throws Exception {
|
|
public void testAssignToQueue() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
@@ -672,9 +690,10 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testQueuePlacementWithPolicy() throws Exception {
|
|
public void testQueuePlacementWithPolicy() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
ApplicationAttemptId appId;
|
|
ApplicationAttemptId appId;
|
|
Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
|
|
Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
|
|
|
|
|
|
@@ -684,10 +703,10 @@ public class TestFairScheduler {
|
|
rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
|
|
rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
|
|
rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
|
|
rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
|
|
rules.add(new QueuePlacementRule.Default().initialize(true, null));
|
|
rules.add(new QueuePlacementRule.Default().initialize(true, null));
|
|
- Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
|
|
|
|
|
|
+ Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
|
|
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
|
|
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
|
|
- scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
|
|
|
|
- rules, queues, conf);
|
|
|
|
|
|
+ scheduler.getAllocationConfiguration().placementPolicy =
|
|
|
|
+ new QueuePlacementPolicy(rules, queues, conf);
|
|
appId = createSchedulingRequest(1024, "somequeue", "user1");
|
|
appId = createSchedulingRequest(1024, "somequeue", "user1");
|
|
assertEquals("root.somequeue", apps.get(appId).getQueueName());
|
|
assertEquals("root.somequeue", apps.get(appId).getQueueName());
|
|
appId = createSchedulingRequest(1024, "default", "user1");
|
|
appId = createSchedulingRequest(1024, "default", "user1");
|
|
@@ -706,8 +725,8 @@ public class TestFairScheduler {
|
|
rules.add(new QueuePlacementRule.User().initialize(false, null));
|
|
rules.add(new QueuePlacementRule.User().initialize(false, null));
|
|
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
|
|
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
|
|
rules.add(new QueuePlacementRule.Default().initialize(true, null));
|
|
rules.add(new QueuePlacementRule.Default().initialize(true, null));
|
|
- scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
|
|
|
|
- rules, queues, conf);
|
|
|
|
|
|
+ scheduler.getAllocationConfiguration().placementPolicy =
|
|
|
|
+ new QueuePlacementPolicy(rules, queues, conf);
|
|
appId = createSchedulingRequest(1024, "somequeue", "user1");
|
|
appId = createSchedulingRequest(1024, "somequeue", "user1");
|
|
assertEquals("root.user1", apps.get(appId).getQueueName());
|
|
assertEquals("root.user1", apps.get(appId).getQueueName());
|
|
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
|
|
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
|
|
@@ -718,9 +737,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testFairShareWithMinAlloc() throws Exception {
|
|
public void testFairShareWithMinAlloc() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -733,9 +750,8 @@ public class TestFairScheduler {
|
|
out.println("</queue>");
|
|
out.println("</queue>");
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
-
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// Add one big node (only care about aggregate capacity)
|
|
// Add one big node (only care about aggregate capacity)
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
@@ -767,6 +783,8 @@ public class TestFairScheduler {
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testQueueDemandCalculation() throws Exception {
|
|
public void testQueueDemandCalculation() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
|
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
|
scheduler.addApplication(id11, "root.queue1", "user1");
|
|
scheduler.addApplication(id11, "root.queue1", "user1");
|
|
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
|
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
|
@@ -812,6 +830,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testAppAdditionAndRemoval() throws Exception {
|
|
public void testAppAdditionAndRemoval() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
|
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
|
createAppAttemptId(1, 1), "default", "user1");
|
|
createAppAttemptId(1, 1), "default", "user1");
|
|
scheduler.handle(appAddedEvent1);
|
|
scheduler.handle(appAddedEvent1);
|
|
@@ -834,133 +854,10 @@ public class TestFairScheduler {
|
|
.getRunnableAppSchedulables().size());
|
|
.getRunnableAppSchedulables().size());
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testAllocationFileParsing() throws Exception {
|
|
|
|
- Configuration conf = createConfiguration();
|
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
-
|
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
|
- out.println("<allocations>");
|
|
|
|
- // Give queue A a minimum of 1024 M
|
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- // Give queue B a minimum of 2048 M
|
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
|
- out.println("<minResources>2048mb,0vcores</minResources>");
|
|
|
|
- out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
|
|
|
- out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- // Give queue C no minimum
|
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
|
- out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- // Give queue D a limit of 3 running apps
|
|
|
|
- out.println("<queue name=\"queueD\">");
|
|
|
|
- out.println("<maxRunningApps>3</maxRunningApps>");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- // Give queue E a preemption timeout of one minute
|
|
|
|
- out.println("<queue name=\"queueE\">");
|
|
|
|
- out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- // Set default limit of apps per queue to 15
|
|
|
|
- out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
|
|
|
|
- // Set default limit of apps per user to 5
|
|
|
|
- out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
|
|
|
|
- // Give user1 a limit of 10 jobs
|
|
|
|
- out.println("<user name=\"user1\">");
|
|
|
|
- out.println("<maxRunningApps>10</maxRunningApps>");
|
|
|
|
- out.println("</user>");
|
|
|
|
- // Set default min share preemption timeout to 2 minutes
|
|
|
|
- out.println("<defaultMinSharePreemptionTimeout>120"
|
|
|
|
- + "</defaultMinSharePreemptionTimeout>");
|
|
|
|
- // Set fair share preemption timeout to 5 minutes
|
|
|
|
- out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
|
|
|
|
- // Set default scheduling policy to DRF
|
|
|
|
- out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
|
|
|
- out.println("</allocations>");
|
|
|
|
- out.close();
|
|
|
|
-
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
-
|
|
|
|
- assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
-
|
|
|
|
- assertEquals(Resources.createResource(1024, 0),
|
|
|
|
- queueManager.getMinResources("root.queueA"));
|
|
|
|
- assertEquals(Resources.createResource(2048, 0),
|
|
|
|
- queueManager.getMinResources("root.queueB"));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root.queueC"));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root.queueD"));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root.queueE"));
|
|
|
|
-
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
|
|
|
|
- assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
|
|
|
|
- assertEquals(10, queueManager.getUserMaxApps("user1"));
|
|
|
|
- assertEquals(5, queueManager.getUserMaxApps("user2"));
|
|
|
|
-
|
|
|
|
- // Root should get * ACL
|
|
|
|
- assertEquals("*",queueManager.getQueueAcl("root",
|
|
|
|
- QueueACL.ADMINISTER_QUEUE).getAclString());
|
|
|
|
- assertEquals("*", queueManager.getQueueAcl("root",
|
|
|
|
- QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
|
|
|
-
|
|
|
|
- // Unspecified queues should get default ACL
|
|
|
|
- assertEquals(" ",queueManager.getQueueAcl("root.queueA",
|
|
|
|
- QueueACL.ADMINISTER_QUEUE).getAclString());
|
|
|
|
- assertEquals(" ", queueManager.getQueueAcl("root.queueA",
|
|
|
|
- QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
|
|
|
-
|
|
|
|
- // Queue B ACL
|
|
|
|
- assertEquals("alice,bob admins",queueManager.getQueueAcl("root.queueB",
|
|
|
|
- QueueACL.ADMINISTER_QUEUE).getAclString());
|
|
|
|
-
|
|
|
|
- // Queue C ACL
|
|
|
|
- assertEquals("alice,bob admins",queueManager.getQueueAcl("root.queueC",
|
|
|
|
- QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
|
|
|
-
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
|
|
|
|
- YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
|
|
|
|
- assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
|
|
|
|
- assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
|
|
|
|
-
|
|
|
|
- // Verify existing queues have default scheduling policy
|
|
|
|
- assertEquals(DominantResourceFairnessPolicy.NAME,
|
|
|
|
- queueManager.getQueue("root").getPolicy().getName());
|
|
|
|
- assertEquals(DominantResourceFairnessPolicy.NAME,
|
|
|
|
- queueManager.getQueue("root.queueA").getPolicy().getName());
|
|
|
|
- // Verify default is overriden if specified explicitly
|
|
|
|
- assertEquals(FairSharePolicy.NAME,
|
|
|
|
- queueManager.getQueue("root.queueB").getPolicy().getName());
|
|
|
|
- // Verify new queue gets default scheduling policy
|
|
|
|
- assertEquals(DominantResourceFairnessPolicy.NAME,
|
|
|
|
- queueManager.getLeafQueue("root.newqueue", true).getPolicy().getName());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test
|
|
@Test
|
|
public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException,
|
|
public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException,
|
|
AllocationConfigurationException, ParserConfigurationException {
|
|
AllocationConfigurationException, ParserConfigurationException {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -980,9 +877,9 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
- queueManager.initialize();
|
|
|
|
-
|
|
|
|
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
|
|
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
|
|
Assert.assertEquals(4, leafQueues.size());
|
|
Assert.assertEquals(4, leafQueues.size());
|
|
Assert.assertNotNull(queueManager.getLeafQueue("queueA", false));
|
|
Assert.assertNotNull(queueManager.getLeafQueue("queueA", false));
|
|
@@ -995,9 +892,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testConfigureRootQueue() throws Exception {
|
|
public void testConfigureRootQueue() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -1014,9 +909,9 @@ public class TestFairScheduler {
|
|
out.println("</queue>");
|
|
out.println("</queue>");
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
- queueManager.initialize();
|
|
|
|
|
|
|
|
FSQueue root = queueManager.getRootQueue();
|
|
FSQueue root = queueManager.getRootQueue();
|
|
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
|
|
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
|
|
@@ -1025,136 +920,9 @@ public class TestFairScheduler {
|
|
assertNotNull(queueManager.getLeafQueue("child2", false));
|
|
assertNotNull(queueManager.getLeafQueue("child2", false));
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Verify that you can't place queues at the same level as the root queue in
|
|
|
|
- * the allocations file.
|
|
|
|
- */
|
|
|
|
- @Test (expected = AllocationConfigurationException.class)
|
|
|
|
- public void testQueueAlongsideRoot() throws Exception {
|
|
|
|
- Configuration conf = createConfiguration();
|
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
-
|
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
|
- out.println("<allocations>");
|
|
|
|
- out.println("<queue name=\"root\">");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- out.println("<queue name=\"other\">");
|
|
|
|
- out.println("</queue>");
|
|
|
|
- out.println("</allocations>");
|
|
|
|
- out.close();
|
|
|
|
-
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test
|
|
|
|
- public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
|
|
|
|
- Configuration conf = createConfiguration();
|
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
-
|
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
|
- out.println("<allocations>");
|
|
|
|
- // Give queue A a minimum of 1024 M
|
|
|
|
- out.println("<pool name=\"queueA\">");
|
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
|
- out.println("</pool>");
|
|
|
|
- // Give queue B a minimum of 2048 M
|
|
|
|
- out.println("<pool name=\"queueB\">");
|
|
|
|
- out.println("<minResources>2048mb,0vcores</minResources>");
|
|
|
|
- out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
|
|
|
- out.println("</pool>");
|
|
|
|
- // Give queue C no minimum
|
|
|
|
- out.println("<pool name=\"queueC\">");
|
|
|
|
- out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
|
|
|
|
- out.println("</pool>");
|
|
|
|
- // Give queue D a limit of 3 running apps
|
|
|
|
- out.println("<pool name=\"queueD\">");
|
|
|
|
- out.println("<maxRunningApps>3</maxRunningApps>");
|
|
|
|
- out.println("</pool>");
|
|
|
|
- // Give queue E a preemption timeout of one minute
|
|
|
|
- out.println("<pool name=\"queueE\">");
|
|
|
|
- out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
|
|
|
|
- out.println("</pool>");
|
|
|
|
- // Set default limit of apps per queue to 15
|
|
|
|
- out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
|
|
|
|
- // Set default limit of apps per user to 5
|
|
|
|
- out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
|
|
|
|
- // Give user1 a limit of 10 jobs
|
|
|
|
- out.println("<user name=\"user1\">");
|
|
|
|
- out.println("<maxRunningApps>10</maxRunningApps>");
|
|
|
|
- out.println("</user>");
|
|
|
|
- // Set default min share preemption timeout to 2 minutes
|
|
|
|
- out.println("<defaultMinSharePreemptionTimeout>120"
|
|
|
|
- + "</defaultMinSharePreemptionTimeout>");
|
|
|
|
- // Set fair share preemption timeout to 5 minutes
|
|
|
|
- out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
|
|
|
|
- out.println("</allocations>");
|
|
|
|
- out.close();
|
|
|
|
-
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
-
|
|
|
|
- assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
-
|
|
|
|
- assertEquals(Resources.createResource(1024, 0),
|
|
|
|
- queueManager.getMinResources("root.queueA"));
|
|
|
|
- assertEquals(Resources.createResource(2048, 0),
|
|
|
|
- queueManager.getMinResources("root.queueB"));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root.queueC"));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root.queueD"));
|
|
|
|
- assertEquals(Resources.createResource(0),
|
|
|
|
- queueManager.getMinResources("root.queueE"));
|
|
|
|
-
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
|
|
|
|
- assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
|
|
|
|
- assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
|
|
|
|
- assertEquals(10, queueManager.getUserMaxApps("user1"));
|
|
|
|
- assertEquals(5, queueManager.getUserMaxApps("user2"));
|
|
|
|
-
|
|
|
|
- // Unspecified queues should get default ACL
|
|
|
|
- assertEquals(" ", queueManager.getQueueAcl("root.queueA",
|
|
|
|
- QueueACL.ADMINISTER_QUEUE).getAclString());
|
|
|
|
- assertEquals(" ", queueManager.getQueueAcl("root.queueA",
|
|
|
|
- QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
|
|
|
-
|
|
|
|
- // Queue B ACL
|
|
|
|
- assertEquals("alice,bob admins", queueManager.getQueueAcl("root.queueB",
|
|
|
|
- QueueACL.ADMINISTER_QUEUE).getAclString());
|
|
|
|
-
|
|
|
|
- // Queue C ACL
|
|
|
|
- assertEquals("alice,bob admins", queueManager.getQueueAcl("root.queueC",
|
|
|
|
- QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
|
|
|
|
- YarnConfiguration.DEFAULT_QUEUE_NAME));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
|
|
|
|
- assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
|
|
|
|
- assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
|
|
|
|
- assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
public void testIsStarvedForMinShare() throws Exception {
|
|
public void testIsStarvedForMinShare() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -1168,8 +936,7 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// Add one big node (only care about aggregate capacity)
|
|
// Add one big node (only care about aggregate capacity)
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
@@ -1212,9 +979,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
public void testIsStarvedForFairShare() throws Exception {
|
|
public void testIsStarvedForFairShare() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -1228,9 +993,8 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
-
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add one big node (only care about aggregate capacity)
|
|
// Add one big node (only care about aggregate capacity)
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
|
|
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
|
|
@@ -1277,13 +1041,9 @@ public class TestFairScheduler {
|
|
* now this means decreasing order of priority.
|
|
* now this means decreasing order of priority.
|
|
*/
|
|
*/
|
|
public void testChoiceOfPreemptedContainers() throws Exception {
|
|
public void testChoiceOfPreemptedContainers() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
-
|
|
|
|
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
|
|
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
|
|
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
|
|
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
|
|
-
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
MockClock clock = new MockClock();
|
|
MockClock clock = new MockClock();
|
|
scheduler.setClock(clock);
|
|
scheduler.setClock(clock);
|
|
@@ -1305,9 +1065,8 @@ public class TestFairScheduler {
|
|
out.println("</queue>");
|
|
out.println("</queue>");
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
-
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// Create four nodes
|
|
// Create four nodes
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
@@ -1443,15 +1202,16 @@ public class TestFairScheduler {
|
|
* Tests the timing of decision to preempt tasks.
|
|
* Tests the timing of decision to preempt tasks.
|
|
*/
|
|
*/
|
|
public void testPreemptionDecision() throws Exception {
|
|
public void testPreemptionDecision() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
MockClock clock = new MockClock();
|
|
MockClock clock = new MockClock();
|
|
scheduler.setClock(clock);
|
|
scheduler.setClock(clock);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<allocations>");
|
|
out.println("<allocations>");
|
|
|
|
+ out.println("<queue name=\"default\">");
|
|
|
|
+ out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
|
+ out.println("</queue>");
|
|
out.println("<queue name=\"queueA\">");
|
|
out.println("<queue name=\"queueA\">");
|
|
out.println("<weight>.25</weight>");
|
|
out.println("<weight>.25</weight>");
|
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
|
@@ -1473,8 +1233,7 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// Create four nodes
|
|
// Create four nodes
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
@@ -1570,7 +1329,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
- public void testMultipleContainersWaitingForReservation() {
|
|
|
|
|
|
+ public void testMultipleContainersWaitingForReservation() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
@@ -1600,9 +1361,7 @@ public class TestFairScheduler {
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
public void testUserMaxRunningApps() throws Exception {
|
|
public void testUserMaxRunningApps() throws Exception {
|
|
// Set max running apps
|
|
// Set max running apps
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -1613,8 +1372,7 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
@@ -1654,7 +1412,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
- public void testReservationWhileMultiplePriorities() {
|
|
|
|
|
|
+ public void testReservationWhileMultiplePriorities() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
// Add a node
|
|
// Add a node
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
@@ -1717,9 +1477,7 @@ public class TestFairScheduler {
|
|
@Test
|
|
@Test
|
|
public void testAclSubmitApplication() throws Exception {
|
|
public void testAclSubmitApplication() throws Exception {
|
|
// Set acl's
|
|
// Set acl's
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -1735,8 +1493,7 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
|
|
"norealuserhasthisname", 1);
|
|
"norealuserhasthisname", 1);
|
|
@@ -1751,6 +1508,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
public void testMultipleNodesSingleRackRequest() throws Exception {
|
|
public void testMultipleNodesSingleRackRequest() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
@@ -1797,6 +1556,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@Test (timeout = 5000)
|
|
public void testFifoWithinQueue() throws Exception {
|
|
public void testFifoWithinQueue() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
.newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
|
|
.newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
|
|
@@ -1837,11 +1598,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test(timeout = 3000)
|
|
@Test(timeout = 3000)
|
|
- public void testMaxAssign() throws AllocationConfigurationException {
|
|
|
|
- // set required scheduler configs
|
|
|
|
- scheduler.assignMultiple = true;
|
|
|
|
- scheduler.getQueueManager().getLeafQueue("root.default", true)
|
|
|
|
- .setPolicy(SchedulingPolicy.getDefault());
|
|
|
|
|
|
+ public void testMaxAssign() throws Exception {
|
|
|
|
+ conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
RMNode node =
|
|
RMNode node =
|
|
MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
|
|
MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
|
|
@@ -1884,6 +1643,8 @@ public class TestFairScheduler {
|
|
*/
|
|
*/
|
|
@Test(timeout = 5000)
|
|
@Test(timeout = 5000)
|
|
public void testAssignContainer() throws Exception {
|
|
public void testAssignContainer() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
final String user = "user1";
|
|
final String user = "user1";
|
|
final String fifoQueue = "fifo";
|
|
final String fifoQueue = "fifo";
|
|
final String fairParent = "fairParent";
|
|
final String fairParent = "fairParent";
|
|
@@ -1951,9 +1712,7 @@ public class TestFairScheduler {
|
|
@Test
|
|
@Test
|
|
public void testNotAllowSubmitApplication() throws Exception {
|
|
public void testNotAllowSubmitApplication() throws Exception {
|
|
// Set acl's
|
|
// Set acl's
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<allocations>");
|
|
out.println("<allocations>");
|
|
@@ -1967,8 +1726,8 @@ public class TestFairScheduler {
|
|
out.println("</queue>");
|
|
out.println("</queue>");
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
int appId = this.APP_ID++;
|
|
int appId = this.APP_ID++;
|
|
String user = "usernotallow";
|
|
String user = "usernotallow";
|
|
@@ -2017,7 +1776,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testReservationThatDoesntFit() {
|
|
|
|
|
|
+ public void testReservationThatDoesntFit() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 =
|
|
RMNode node1 =
|
|
MockNodes
|
|
MockNodes
|
|
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
@@ -2043,7 +1804,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testRemoveNodeUpdatesRootQueueMetrics() {
|
|
|
|
|
|
+ public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
|
|
|
|
|
@@ -2069,7 +1832,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testStrictLocality() {
|
|
|
|
|
|
+ public void testStrictLocality() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
scheduler.handle(nodeEvent1);
|
|
scheduler.handle(nodeEvent1);
|
|
@@ -2107,7 +1872,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testCancelStrictLocality() {
|
|
|
|
|
|
+ public void testCancelStrictLocality() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
scheduler.handle(nodeEvent1);
|
|
scheduler.handle(nodeEvent1);
|
|
@@ -2155,7 +1922,9 @@ public class TestFairScheduler {
|
|
* a reservation on another.
|
|
* a reservation on another.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
- public void testReservationsStrictLocality() {
|
|
|
|
|
|
+ public void testReservationsStrictLocality() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
|
|
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
|
|
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
|
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
|
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
|
|
@@ -2193,7 +1962,9 @@ public class TestFairScheduler {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testNoMoreCpuOnNode() {
|
|
|
|
|
|
+ public void testNoMoreCpuOnNode() throws IOException {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
|
|
1, "127.0.0.1");
|
|
1, "127.0.0.1");
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
@@ -2213,6 +1984,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testBasicDRFAssignment() throws Exception {
|
|
public void testBasicDRFAssignment() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
|
|
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
|
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
scheduler.handle(nodeEvent);
|
|
scheduler.handle(nodeEvent);
|
|
@@ -2251,6 +2024,8 @@ public class TestFairScheduler {
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testBasicDRFWithQueues() throws Exception {
|
|
public void testBasicDRFWithQueues() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
|
|
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
|
|
1, "127.0.0.1");
|
|
1, "127.0.0.1");
|
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
@@ -2285,6 +2060,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testDRFHierarchicalQueues() throws Exception {
|
|
public void testDRFHierarchicalQueues() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
|
|
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
|
|
1, "127.0.0.1");
|
|
1, "127.0.0.1");
|
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
@@ -2349,9 +2126,9 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
@Test(timeout = 30000)
|
|
public void testHostPortNodeName() throws Exception {
|
|
public void testHostPortNodeName() throws Exception {
|
|
- scheduler.getConf().setBoolean(YarnConfiguration
|
|
|
|
|
|
+ conf.setBoolean(YarnConfiguration
|
|
.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
|
|
.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
|
|
- scheduler.reinitialize(scheduler.getConf(),
|
|
|
|
|
|
+ scheduler.reinitialize(conf,
|
|
resourceManager.getRMContext());
|
|
resourceManager.getRMContext());
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
|
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
|
|
1, "127.0.0.1", 1);
|
|
1, "127.0.0.1", 1);
|
|
@@ -2426,9 +2203,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testUserAndQueueMaxRunningApps() throws Exception {
|
|
public void testUserAndQueueMaxRunningApps() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -2442,8 +2217,7 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// exceeds no limits
|
|
// exceeds no limits
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
|
|
@@ -2479,9 +2253,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
|
|
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
MockClock clock = new MockClock();
|
|
MockClock clock = new MockClock();
|
|
scheduler.setClock(clock);
|
|
scheduler.setClock(clock);
|
|
|
|
|
|
@@ -2499,8 +2271,7 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
- QueueManager queueManager = scheduler.getQueueManager();
|
|
|
|
- queueManager.initialize();
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
// exceeds no limits
|
|
// exceeds no limits
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
|
|
@@ -2629,10 +2400,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testDontAllowUndeclaredPools() throws Exception{
|
|
public void testDontAllowUndeclaredPools() throws Exception{
|
|
- Configuration conf = createConfiguration();
|
|
|
|
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
|
|
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
@@ -2642,8 +2411,8 @@ public class TestFairScheduler {
|
|
out.println("</allocations>");
|
|
out.println("</allocations>");
|
|
out.close();
|
|
out.close();
|
|
|
|
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
QueueManager queueManager = scheduler.getQueueManager();
|
|
- queueManager.initialize();
|
|
|
|
|
|
|
|
FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false);
|
|
FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false);
|
|
FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false);
|
|
FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false);
|
|
@@ -2672,6 +2441,8 @@ public class TestFairScheduler {
|
|
@SuppressWarnings("resource")
|
|
@SuppressWarnings("resource")
|
|
@Test
|
|
@Test
|
|
public void testBlacklistNodes() throws Exception {
|
|
public void testBlacklistNodes() throws Exception {
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+
|
|
final int GB = 1024;
|
|
final int GB = 1024;
|
|
String host = "127.0.0.1";
|
|
String host = "127.0.0.1";
|
|
RMNode node =
|
|
RMNode node =
|