|
@@ -39,6 +39,7 @@ import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import javax.xml.parsers.ParserConfigurationException;
|
|
import javax.xml.parsers.ParserConfigurationException;
|
|
|
|
|
|
@@ -86,6 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
@@ -100,6 +103,9 @@ import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.stubbing.Answer;
|
|
import org.mockito.stubbing.Answer;
|
|
import org.xml.sax.SAXException;
|
|
import org.xml.sax.SAXException;
|
|
|
|
|
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
|
+import com.google.common.collect.UnmodifiableIterator;
|
|
|
|
+
|
|
public class TestFairScheduler {
|
|
public class TestFairScheduler {
|
|
|
|
|
|
private class MockClock implements Clock {
|
|
private class MockClock implements Clock {
|
|
@@ -2312,4 +2318,91 @@ public class TestFairScheduler {
|
|
Assert.assertEquals(1, consumption.getVirtualCores());
|
|
Assert.assertEquals(1, consumption.getVirtualCores());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testAggregateCapacityTrackingWithPreemptionEnabled() throws Exception {
|
|
|
|
+ int KB = 1024;
|
|
|
|
+ int iterationNumber = 10;
|
|
|
|
+ Configuration conf = createConfiguration();
|
|
|
|
+ conf.setBoolean("yarn.scheduler.fair.preemption", true);
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+ RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(KB * iterationNumber));
|
|
|
|
+ NodeAddedSchedulerEvent nodeAddEvent = new NodeAddedSchedulerEvent(node);
|
|
|
|
+ scheduler.handle(nodeAddEvent);
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < iterationNumber; i++) {
|
|
|
|
+ createSchedulingRequest(KB, "queue1", "user1", 1);
|
|
|
|
+ scheduler.update();
|
|
|
|
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
|
+
|
|
|
|
+ assertEquals(KB,
|
|
|
|
+ scheduler.getQueueManager().getQueue("queue1").getResourceUsage().getMemory());
|
|
|
|
+ TimeUnit.SECONDS.sleep(1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class ExternalAppAddedSchedulerEvent extends SchedulerEvent {
|
|
|
|
+ public ExternalAppAddedSchedulerEvent() {
|
|
|
|
+ super(SchedulerEventType.APP_ADDED);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class ExternalNodeRemovedSchedulerEvent extends SchedulerEvent {
|
|
|
|
+ public ExternalNodeRemovedSchedulerEvent() {
|
|
|
|
+ super(SchedulerEventType.NODE_REMOVED);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class ExternalNodeUpdateSchedulerEvent extends SchedulerEvent {
|
|
|
|
+ public ExternalNodeUpdateSchedulerEvent() {
|
|
|
|
+ super(SchedulerEventType.NODE_UPDATE);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class ExternalNodeAddedSchedulerEvent extends SchedulerEvent {
|
|
|
|
+ public ExternalNodeAddedSchedulerEvent() {
|
|
|
|
+ super(SchedulerEventType.NODE_ADDED);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class ExternalAppRemovedSchedulerEvent extends SchedulerEvent {
|
|
|
|
+ public ExternalAppRemovedSchedulerEvent() {
|
|
|
|
+ super(SchedulerEventType.APP_REMOVED);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class ExternalContainerExpiredSchedulerEvent extends SchedulerEvent {
|
|
|
|
+ public ExternalContainerExpiredSchedulerEvent() {
|
|
|
|
+ super(SchedulerEventType.CONTAINER_EXPIRED);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * try to handle external events type
|
|
|
|
+ * and get {@code RuntimeException}
|
|
|
|
+ *
|
|
|
|
+ * @throws Exception
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testSchedulerHandleFailWithExternalEvents() throws Exception {
|
|
|
|
+ Configuration conf = createConfiguration();
|
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
+ ImmutableSet<? extends SchedulerEvent> externalEvents = ImmutableSet.of(new ExternalAppAddedSchedulerEvent(),
|
|
|
|
+ new ExternalNodeRemovedSchedulerEvent(), new ExternalNodeUpdateSchedulerEvent(),
|
|
|
|
+ new ExternalNodeAddedSchedulerEvent(), new ExternalAppRemovedSchedulerEvent(),
|
|
|
|
+ new ExternalContainerExpiredSchedulerEvent());
|
|
|
|
+
|
|
|
|
+ UnmodifiableIterator<? extends SchedulerEvent> iter = externalEvents.iterator();
|
|
|
|
+ while(iter.hasNext())
|
|
|
|
+ handleExternalEvent(iter.next());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void handleExternalEvent(SchedulerEvent event) throws Exception {
|
|
|
|
+ try {
|
|
|
|
+ scheduler.handle(event);
|
|
|
|
+ } catch(RuntimeException ex) {
|
|
|
|
+ //expected
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|