|
@@ -18,35 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
|
|
|
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
-import static org.junit.Assert.assertNotEquals;
|
|
|
-import static org.junit.Assert.assertNotNull;
|
|
|
-import static org.junit.Assert.assertNull;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
-import static org.mockito.Mockito.when;
|
|
|
-
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileWriter;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.PrintWriter;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-
|
|
|
-import javax.xml.parsers.ParserConfigurationException;
|
|
|
-
|
|
|
import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
|
@@ -73,8 +46,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
-import org.apache.hadoop.yarn.exceptions
|
|
|
- .SchedulerInvalidResoureRequestException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
|
@@ -101,8 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
-
|
|
|
-
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
@@ -129,7 +99,32 @@ import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.xml.sax.SAXException;
|
|
|
|
|
|
-import com.google.common.collect.Sets;
|
|
|
+import javax.xml.parsers.ParserConfigurationException;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileWriter;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.PrintWriter;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNotEquals;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public class TestFairScheduler extends FairSchedulerTestBase {
|
|
@@ -610,14 +605,24 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node1);
|
|
|
|
|
|
+ // Send 4 node heartbeats, this should be enough to allocate 4 containers
|
|
|
+ // As we have 2 queues with capacity: 2GB,2cores, we could only have
|
|
|
+ // 4 containers at most
|
|
|
scheduler.handle(nodeEvent);
|
|
|
scheduler.handle(nodeEvent);
|
|
|
scheduler.handle(nodeEvent);
|
|
|
scheduler.handle(nodeEvent);
|
|
|
+ drainEventsOnRM();
|
|
|
+
|
|
|
+ // Apps should be running with 2 containers
|
|
|
+ assertEquals("App 1 is not running with the correct number of containers",
|
|
|
+ 2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
|
|
+ assertEquals("App 2 is not running with the correct number of containers",
|
|
|
+ 2, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
|
|
+
|
|
|
+ //ensure that a 5th node heartbeat does not allocate more containers
|
|
|
scheduler.handle(nodeEvent);
|
|
|
- scheduler.handle(nodeEvent);
|
|
|
- scheduler.handle(nodeEvent);
|
|
|
- scheduler.handle(nodeEvent);
|
|
|
+ drainEventsOnRM();
|
|
|
|
|
|
// Apps should be running with 2 containers
|
|
|
assertEquals("App 1 is not running with the correct number of containers",
|
|
@@ -635,12 +640,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
out.close();
|
|
|
|
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
scheduler.update();
|
|
|
+
|
|
|
+ // Send 2 node heartbeats, this should be enough to allocate 2
|
|
|
+ // more containers.
|
|
|
+ // As we have 2 queues with capacity: 3GB,3cores, we could only have
|
|
|
+ // 6 containers at most
|
|
|
scheduler.handle(nodeEvent);
|
|
|
scheduler.handle(nodeEvent);
|
|
|
- scheduler.handle(nodeEvent);
|
|
|
- scheduler.handle(nodeEvent);
|
|
|
+ drainEventsOnRM();
|
|
|
|
|
|
// Apps should be running with 3 containers now
|
|
|
assertEquals("App 1 is not running with the correct number of containers",
|
|
@@ -657,10 +665,16 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
out.println("</allocations>");
|
|
|
out.close();
|
|
|
|
|
|
+ //ensure that a 7th node heartbeat does not allocate more containers
|
|
|
+ scheduler.handle(nodeEvent);
|
|
|
+ drainEventsOnRM();
|
|
|
+ assertEquals(6, scheduler.getRootQueueMetrics().getAllocatedContainers());
|
|
|
+
|
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
scheduler.update();
|
|
|
scheduler.handle(nodeEvent);
|
|
|
+ drainEventsOnRM();
|
|
|
|
|
|
// Apps still should be running with 3 containers because we don't preempt
|
|
|
assertEquals("App 1 is not running with the correct number of containers",
|
|
@@ -669,6 +683,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
|
|
}
|
|
|
|
|
|
+ private void drainEventsOnRM() {
|
|
|
+ if (resourceManager instanceof MockRM) {
|
|
|
+ ((MockRM) resourceManager).drainEvents();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testFairShareWithZeroWeight() throws IOException {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
@@ -5180,9 +5200,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// after YARN-5375, scheduler event is processed in rm main dispatcher,
|
|
|
// wait it processed, or may lead dead lock
|
|
|
- if (resourceManager instanceof MockRM) {
|
|
|
- ((MockRM) resourceManager).drainEvents();
|
|
|
- }
|
|
|
+ drainEventsOnRM();
|
|
|
|
|
|
NodeAddedSchedulerEvent nodeAddEvent1 =
|
|
|
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
|