|
@@ -19,6 +19,8 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
@@ -28,13 +30,17 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
|
|
+import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
@@ -43,7 +49,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.junit.After;
|
|
@@ -55,6 +64,9 @@ public class TestFifoScheduler {
|
|
|
|
|
|
private ResourceManager resourceManager = null;
|
|
|
|
|
|
+ private static final RecordFactory recordFactory =
|
|
|
+ RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
resourceManager = new ResourceManager();
|
|
@@ -78,14 +90,38 @@ public class TestFifoScheduler {
|
|
|
.getRMContext());
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
|
|
+ ApplicationAttemptId attId = recordFactory
|
|
|
+ .newRecordInstance(ApplicationAttemptId.class);
|
|
|
+ ApplicationId appIdImpl = recordFactory
|
|
|
+ .newRecordInstance(ApplicationId.class);
|
|
|
+ appIdImpl.setId(appId);
|
|
|
+ attId.setAttemptId(attemptId);
|
|
|
+ attId.setApplicationId(appIdImpl);
|
|
|
+ return attId;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ResourceRequest createResourceRequest(int memory, String host,
|
|
|
+ int priority, int numContainers) {
|
|
|
+ ResourceRequest request = recordFactory
|
|
|
+ .newRecordInstance(ResourceRequest.class);
|
|
|
+ request.setCapability(Resources.createResource(memory));
|
|
|
+ request.setHostName(host);
|
|
|
+ request.setNumContainers(numContainers);
|
|
|
+ Priority prio = recordFactory.newRecordInstance(Priority.class);
|
|
|
+ prio.setPriority(priority);
|
|
|
+ request.setPriority(prio);
|
|
|
+ return request;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=5000)
|
|
|
public void testFifoSchedulerCapacityWhenNoNMs() {
|
|
|
FifoScheduler scheduler = new FifoScheduler();
|
|
|
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
|
|
|
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=5000)
|
|
|
public void testAppAttemptMetrics() throws Exception {
|
|
|
AsyncDispatcher dispatcher = new InlineDispatcher();
|
|
|
RMContext rmContext = new RMContextImpl(dispatcher, null,
|
|
@@ -111,6 +147,59 @@ public class TestFifoScheduler {
|
|
|
Assert.assertEquals(1, metrics.getAppsSubmitted());
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout=2000)
|
|
|
+ public void testNodeLocalAssignment() throws Exception {
|
|
|
+ AsyncDispatcher dispatcher = new InlineDispatcher();
|
|
|
+ RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
|
|
|
+ null, null, null);
|
|
|
+
|
|
|
+ FifoScheduler scheduler = new FifoScheduler();
|
|
|
+ scheduler.reinitialize(new Configuration(), rmContext);
|
|
|
+
|
|
|
+ RMNode node0 = MockNodes.newNodeInfo(1,
|
|
|
+ Resources.createResource(1024 * 64), 1234);
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+
|
|
|
+ int _appId = 1;
|
|
|
+ int _appAttemptId = 1;
|
|
|
+ ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
|
|
+ _appAttemptId);
|
|
|
+ AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId,
|
|
|
+ "queue1", "user1");
|
|
|
+ scheduler.handle(appEvent1);
|
|
|
+
|
|
|
+ int memory = 64;
|
|
|
+ int nConts = 3;
|
|
|
+ int priority = 20;
|
|
|
+
|
|
|
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
|
+ ResourceRequest nodeLocal = createResourceRequest(memory,
|
|
|
+ node0.getHostName(), priority, nConts);
|
|
|
+ ResourceRequest rackLocal = createResourceRequest(memory,
|
|
|
+ node0.getRackName(), priority, nConts);
|
|
|
+ ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
|
|
|
+ nConts);
|
|
|
+ ask.add(nodeLocal);
|
|
|
+ ask.add(rackLocal);
|
|
|
+ ask.add(any);
|
|
|
+ scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
|
|
|
+
|
|
|
+ // Before the node update event, there are 3 local requests outstanding
|
|
|
+ Assert.assertEquals(3, nodeLocal.getNumContainers());
|
|
|
+
|
|
|
+ scheduler.handle(node0Update);
|
|
|
+
|
|
|
+ // After the node update event, check that there are no more local requests
|
|
|
+ // outstanding
|
|
|
+ Assert.assertEquals(0, nodeLocal.getNumContainers());
|
|
|
+ //Also check that the containers were scheduled
|
|
|
+ SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
|
|
|
+ Assert.assertEquals(3, info.getLiveContainers().size());
|
|
|
+ }
|
|
|
+
|
|
|
// @Test
|
|
|
public void testFifoScheduler() throws Exception {
|
|
|
|