|
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
|
|
@@ -158,6 +159,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
|
RMApp app1 = rm1.submitApp(200);
|
|
|
+ Resource amResources = app1.getAMResourceRequests().get(0).getCapability();
|
|
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
|
|
// clear queue metrics
|
|
@@ -240,7 +242,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
|
|
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
|
|
} else {
|
|
|
- checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
|
|
+ checkFSQueue(rm2, schedulerApp, usedResources, availableResources,
|
|
|
+ amResources);
|
|
|
}
|
|
|
|
|
|
// *********** check scheduler attempt state.********
|
|
@@ -310,6 +313,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
RMApp app1 = rm1.submitApp(200, "dynamicQApp",
|
|
|
UserGroupInformation.getCurrentUser().getShortUserName(), null,
|
|
|
ReservationSystemTestUtil.getReservationQueueName());
|
|
|
+ Resource amResources = app1.getAMResourceRequests().get(0).getCapability();
|
|
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
|
|
// clear queue metrics
|
|
@@ -384,7 +388,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
|
|
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
|
|
} else {
|
|
|
- checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
|
|
+ checkFSQueue(rm2, schedulerApp, usedResources, availableResources,
|
|
|
+ amResources);
|
|
|
}
|
|
|
|
|
|
// *********** check scheduler attempt state.********
|
|
@@ -456,7 +461,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
|
|
|
private void checkFSQueue(ResourceManager rm,
|
|
|
SchedulerApplication schedulerApp, Resource usedResources,
|
|
|
- Resource availableResources) throws Exception {
|
|
|
+ Resource availableResources, Resource amResources) throws Exception {
|
|
|
// waiting for RM's scheduling apps
|
|
|
int retry = 0;
|
|
|
Resource assumedFairShare = Resource.newInstance(8192, 8);
|
|
@@ -488,6 +493,16 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|
|
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
|
|
|
availableResources.getVirtualCores(), usedResources.getMemorySize(),
|
|
|
usedResources.getVirtualCores());
|
|
|
+
|
|
|
+ // ************ check AM resources ****************
|
|
|
+ assertEquals(amResources,
|
|
|
+ schedulerApp.getCurrentAppAttempt().getAMResource());
|
|
|
+ FSQueueMetrics fsQueueMetrics =
|
|
|
+ (FSQueueMetrics) schedulerApp.getQueue().getMetrics();
|
|
|
+ assertEquals(amResources.getMemorySize(),
|
|
|
+ fsQueueMetrics.getAMResourceUsageMB());
|
|
|
+ assertEquals(amResources.getVirtualCores(),
|
|
|
+ fsQueueMetrics.getAMResourceUsageVCores());
|
|
|
}
|
|
|
|
|
|
// create 3 container reports for AM
|