|
@@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
@@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
@@ -3552,7 +3554,58 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
verifyAppRunnable(attId5, false);
|
|
|
verifyQueueNumRunnable("queue1", 2, 1);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMultipleCompletedEvent() throws Exception {
|
|
|
+ // Set up a fair scheduler
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
+
|
|
|
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
+ out.println("<?xml version=\"1.0\"?>");
|
|
|
+ out.println("<allocations>");
|
|
|
+ out.println("<queue name=\"queue1\">");
|
|
|
+ out.println("<maxAMShare>0.2</maxAMShare>");
|
|
|
+ out.println("</queue>");
|
|
|
+ out.println("</allocations>");
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
+
|
|
|
+ // Create a node
|
|
|
+ RMNode node =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
|
|
|
+ 0, "127.0.0.1");
|
|
|
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
|
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
|
|
+ scheduler.handle(nodeEvent);
|
|
|
+ scheduler.update();
|
|
|
+
|
|
|
+ // Launch an app
|
|
|
+ ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
|
|
|
+ createApplicationWithAMResource(
|
|
|
+ attId1, "queue1", "user1",
|
|
|
+ Resource.newInstance(1024, 1));
|
|
|
+ createSchedulingRequestExistingApplication(
|
|
|
+ 1024, 1,
|
|
|
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(), attId1);
|
|
|
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
|
|
|
+ scheduler.update();
|
|
|
+ scheduler.handle(updateEvent);
|
|
|
+
|
|
|
+ RMContainer container = app1.getLiveContainersMap().
|
|
|
+ values().iterator().next();
|
|
|
+ scheduler.completedContainer(container, SchedulerUtils
|
|
|
+ .createAbnormalContainerStatus(container.getContainerId(),
|
|
|
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
|
|
|
+ scheduler.completedContainer(container, SchedulerUtils
|
|
|
+ .createAbnormalContainerStatus(container.getContainerId(),
|
|
|
+ SchedulerUtils.COMPLETED_APPLICATION),
|
|
|
+ RMContainerEventType.FINISHED);
|
|
|
+ assertEquals(Resources.none(), app1.getResourceUsage());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testQueueMaxAMShare() throws Exception {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|