|
@@ -32,6 +32,9 @@ import org.apache.hadoop.yarn.conf.HAUtil;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -49,6 +52,8 @@ public class TestRMHA {
|
|
|
private Log LOG = LogFactory.getLog(TestRMHA.class);
|
|
|
private final Configuration configuration = new YarnConfiguration();
|
|
|
private MockRM rm = null;
|
|
|
+ private RMApp app = null;
|
|
|
+ private RMAppAttempt attempt = null;
|
|
|
private static final String STATE_ERR =
|
|
|
"ResourceManager is in wrong HA state";
|
|
|
|
|
@@ -103,7 +108,9 @@ public class TestRMHA {
|
|
|
try {
|
|
|
rm.getNewAppId();
|
|
|
rm.registerNode("127.0.0.1:0", 2048);
|
|
|
- rm.submitApp(1024);
|
|
|
+ app = rm.submitApp(1024);
|
|
|
+ attempt = app.getCurrentAppAttempt();
|
|
|
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
|
|
|
} catch (Exception e) {
|
|
|
fail("Unable to perform Active RM functions");
|
|
|
LOG.error("ActiveRM check failed", e);
|
|
@@ -122,7 +129,7 @@ public class TestRMHA {
|
|
|
* become Active
|
|
|
*/
|
|
|
@Test (timeout = 30000)
|
|
|
- public void testStartAndTransitions() throws IOException {
|
|
|
+ public void testStartAndTransitions() throws Exception {
|
|
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
|
|
Configuration conf = new YarnConfiguration(configuration);
|
|
|
rm = new MockRM(conf);
|
|
@@ -377,19 +384,34 @@ public class TestRMHA {
|
|
|
|
|
|
private void verifyClusterMetrics(int activeNodes, int appsSubmitted,
|
|
|
int appsPending, int containersPending, int availableMB,
|
|
|
- int activeApplications) {
|
|
|
+ int activeApplications) throws Exception {
|
|
|
+ int timeoutSecs = 0;
|
|
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
|
|
- // verify queue metrics
|
|
|
- assertMetric("appsSubmitted", appsSubmitted, metrics.getAppsSubmitted());
|
|
|
- assertMetric("appsPending", appsPending, metrics.getAppsPending());
|
|
|
- assertMetric("containersPending", containersPending,
|
|
|
- metrics.getPendingContainers());
|
|
|
- assertMetric("availableMB", availableMB, metrics.getAvailableMB());
|
|
|
- assertMetric("activeApplications", activeApplications,
|
|
|
- metrics.getActiveApps());
|
|
|
- // verify node metric
|
|
|
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
|
|
- assertMetric("activeNodes", activeNodes, clusterMetrics.getNumActiveNMs());
|
|
|
+ boolean isAllMetricAssertionDone = false;
|
|
|
+ String message = null;
|
|
|
+ while (timeoutSecs++ < 5) {
|
|
|
+ try {
|
|
|
+ // verify queue metrics
|
|
|
+ assertMetric("appsSubmitted", appsSubmitted, metrics.getAppsSubmitted());
|
|
|
+ assertMetric("appsPending", appsPending, metrics.getAppsPending());
|
|
|
+ assertMetric("containersPending", containersPending,
|
|
|
+ metrics.getPendingContainers());
|
|
|
+ assertMetric("availableMB", availableMB, metrics.getAvailableMB());
|
|
|
+ assertMetric("activeApplications", activeApplications,
|
|
|
+ metrics.getActiveApps());
|
|
|
+ // verify node metric
|
|
|
+ assertMetric("activeNodes", activeNodes,
|
|
|
+ clusterMetrics.getNumActiveNMs());
|
|
|
+ isAllMetricAssertionDone = true;
|
|
|
+ break;
|
|
|
+ } catch (AssertionError e) {
|
|
|
+ message = e.getMessage();
|
|
|
+ System.out.println("Waiting for metrics assertion to complete");
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertTrue(message, isAllMetricAssertionDone);
|
|
|
}
|
|
|
|
|
|
private void assertMetric(String metricName, int expected, int actual) {
|