|
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
@@ -138,32 +139,38 @@ public class TestRMHA {
|
|
rm.start();
|
|
rm.start();
|
|
checkMonitorHealth();
|
|
checkMonitorHealth();
|
|
checkStandbyRMFunctionality();
|
|
checkStandbyRMFunctionality();
|
|
-
|
|
|
|
|
|
+ verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
|
|
|
+
|
|
// 1. Transition to Standby - must be a no-op
|
|
// 1. Transition to Standby - must be a no-op
|
|
rm.adminService.transitionToStandby(requestInfo);
|
|
rm.adminService.transitionToStandby(requestInfo);
|
|
checkMonitorHealth();
|
|
checkMonitorHealth();
|
|
checkStandbyRMFunctionality();
|
|
checkStandbyRMFunctionality();
|
|
-
|
|
|
|
|
|
+ verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
|
|
|
+
|
|
// 2. Transition to active
|
|
// 2. Transition to active
|
|
rm.adminService.transitionToActive(requestInfo);
|
|
rm.adminService.transitionToActive(requestInfo);
|
|
checkMonitorHealth();
|
|
checkMonitorHealth();
|
|
checkActiveRMFunctionality();
|
|
checkActiveRMFunctionality();
|
|
-
|
|
|
|
|
|
+ verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
|
|
|
|
+
|
|
// 3. Transition to active - no-op
|
|
// 3. Transition to active - no-op
|
|
rm.adminService.transitionToActive(requestInfo);
|
|
rm.adminService.transitionToActive(requestInfo);
|
|
checkMonitorHealth();
|
|
checkMonitorHealth();
|
|
checkActiveRMFunctionality();
|
|
checkActiveRMFunctionality();
|
|
-
|
|
|
|
|
|
+ verifyClusterMetrics(1, 2, 2, 2, 2048, 2);
|
|
|
|
+
|
|
// 4. Transition to standby
|
|
// 4. Transition to standby
|
|
rm.adminService.transitionToStandby(requestInfo);
|
|
rm.adminService.transitionToStandby(requestInfo);
|
|
checkMonitorHealth();
|
|
checkMonitorHealth();
|
|
checkStandbyRMFunctionality();
|
|
checkStandbyRMFunctionality();
|
|
-
|
|
|
|
|
|
+ verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
|
|
|
+
|
|
// 5. Transition to active to check Active->Standby->Active works
|
|
// 5. Transition to active to check Active->Standby->Active works
|
|
rm.adminService.transitionToActive(requestInfo);
|
|
rm.adminService.transitionToActive(requestInfo);
|
|
checkMonitorHealth();
|
|
checkMonitorHealth();
|
|
checkActiveRMFunctionality();
|
|
checkActiveRMFunctionality();
|
|
-
|
|
|
|
|
|
+ verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
|
|
|
|
+
|
|
// 6. Stop the RM. All services should stop and RM should not be ready to
|
|
// 6. Stop the RM. All services should stop and RM should not be ready to
|
|
// become active
|
|
// become active
|
|
rm.stop();
|
|
rm.stop();
|
|
@@ -367,6 +374,27 @@ public class TestRMHA {
|
|
fail("Should not throw any exceptions.");
|
|
fail("Should not throw any exceptions.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void verifyClusterMetrics(int activeNodes, int appsSubmitted,
|
|
|
|
+ int appsPending, int containersPending, int availableMB,
|
|
|
|
+ int activeApplications) {
|
|
|
|
+ QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
|
|
|
+ // verify queue metrics
|
|
|
|
+ assertMetric("appsSubmitted", appsSubmitted, metrics.getAppsSubmitted());
|
|
|
|
+ assertMetric("appsPending", appsPending, metrics.getAppsPending());
|
|
|
|
+ assertMetric("containersPending", containersPending,
|
|
|
|
+ metrics.getPendingContainers());
|
|
|
|
+ assertMetric("availableMB", availableMB, metrics.getAvailableMB());
|
|
|
|
+ assertMetric("activeApplications", activeApplications,
|
|
|
|
+ metrics.getActiveApps());
|
|
|
|
+ // verify node metric
|
|
|
|
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
|
|
|
+ assertMetric("activeNodes", activeNodes, clusterMetrics.getNumActiveNMs());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void assertMetric(String metricName, int expected, int actual) {
|
|
|
|
+ assertEquals("Incorrect value for metric " + metricName, expected, actual);
|
|
|
|
+ }
|
|
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
@SuppressWarnings("rawtypes")
|
|
class MyCountingDispatcher extends AbstractService implements Dispatcher {
|
|
class MyCountingDispatcher extends AbstractService implements Dispatcher {
|