|
@@ -31,9 +31,10 @@ import org.apache.hadoop.yarn.api.records.*;
|
|
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
import org.apache.hadoop.yarn.service.api.records.Component;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Configuration;
|
|
|
import org.apache.hadoop.yarn.service.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
|
@@ -62,7 +63,7 @@ import java.util.*;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
|
|
|
-import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
|
|
|
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
|
|
|
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
|
|
|
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_NOT_FOUND;
|
|
|
|
|
@@ -582,6 +583,109 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|
|
return containerIds;
|
|
|
}
|
|
|
|
|
|
+ // Test to verify component health threshold monitor. It uses anti-affinity
|
|
|
+ // placement policy to make it easier to simulate container failure by
|
|
|
+ // allocating more containers than the no of NMs.
|
|
|
+ // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
|
|
|
+ // 2. Create an example service of 3 containers with anti-affinity placement
|
|
|
+ // policy and health threshold = 65%, window = 3 secs, init-delay = 0 secs,
|
|
|
+ // poll-frequency = 1 secs
|
|
|
+ // 3. Flex the component to 4 containers. This makes health = 75%, so based on
|
|
|
+ // threshold the service will continue to run beyond the window of 3 secs.
|
|
|
+ // 4. Flex the component to 5 containers. This makes health = 60%, so based on
|
|
|
+ // threshold the service will be stopped after the window of 3 secs.
|
|
|
+ @Test (timeout = 200000)
|
|
|
+ public void testComponentHealthThresholdMonitor() throws Exception {
|
|
|
+ // We need to enable scheduler placement-constraint at the cluster level to
|
|
|
+ // let apps use placement policies.
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
|
|
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
|
|
+ setConf(conf);
|
|
|
+ setupInternal(3);
|
|
|
+ ServiceClient client = createClient(getConf());
|
|
|
+ Service exampleApp = new Service();
|
|
|
+ exampleApp.setName("example-app");
|
|
|
+ exampleApp.setVersion("v1");
|
|
|
+ Component comp = createComponent("compa", 3L, "sleep 1000");
|
|
|
+ PlacementPolicy pp = new PlacementPolicy();
|
|
|
+ PlacementConstraint pc = new PlacementConstraint();
|
|
|
+ pc.setName("CA1");
|
|
|
+ pc.setTargetTags(Collections.singletonList("compa"));
|
|
|
+ pc.setScope(PlacementScope.NODE);
|
|
|
+ pc.setType(PlacementType.ANTI_AFFINITY);
|
|
|
+ pp.setConstraints(Collections.singletonList(pc));
|
|
|
+ comp.setPlacementPolicy(pp);
|
|
|
+ Configuration config = new Configuration();
|
|
|
+ config.setProperty(CONTAINER_HEALTH_THRESHOLD_PERCENT, "65");
|
|
|
+ config.setProperty(CONTAINER_HEALTH_THRESHOLD_WINDOW_SEC, "3");
|
|
|
+ config.setProperty(CONTAINER_HEALTH_THRESHOLD_INIT_DELAY_SEC, "0");
|
|
|
+ config.setProperty(CONTAINER_HEALTH_THRESHOLD_POLL_FREQUENCY_SEC, "1");
|
|
|
+ config.setProperty(DEFAULT_READINESS_CHECK_ENABLED, "false");
|
|
|
+ comp.setConfiguration(config);
|
|
|
+ exampleApp.addComponent(comp);
|
|
|
+ // Make sure AM does not come up after service is killed for this test
|
|
|
+ Configuration serviceConfig = new Configuration();
|
|
|
+ serviceConfig.setProperty(AM_RESTART_MAX, "1");
|
|
|
+ exampleApp.setConfiguration(serviceConfig);
|
|
|
+ client.actionCreate(exampleApp);
|
|
|
+ waitForServiceToBeStable(client, exampleApp);
|
|
|
+
|
|
|
+ // Check service is stable and all 3 containers are running
|
|
|
+ Service service = client.getStatus(exampleApp.getName());
|
|
|
+ Component component = service.getComponent("compa");
|
|
|
+ Assert.assertEquals("Service state should be STABLE", ServiceState.STABLE,
|
|
|
+ service.getState());
|
|
|
+ Assert.assertEquals("3 containers are expected to be running", 3,
|
|
|
+ component.getContainers().size());
|
|
|
+
|
|
|
+ // Flex compa up to 4 - will make health 75% (3 out of 4 running), but still
|
|
|
+ // above threshold of 65%, so service will continue to run.
|
|
|
+ Map<String, Long> compCounts = new HashMap<>();
|
|
|
+ compCounts.put("compa", 4L);
|
|
|
+ exampleApp.getComponent("compa").setNumberOfContainers(4L);
|
|
|
+ client.flexByRestService(exampleApp.getName(), compCounts);
|
|
|
+ try {
|
|
|
+ // Wait for 6 secs (window 3 secs + 1 for next poll + 2 for buffer). Since
|
|
|
+ // the service will never go to stable state (because of anti-affinity the
|
|
|
+ // 4th container will never be allocated) it will timeout. However, after
|
|
|
+ // the timeout the service should continue to run since health is 75%
|
|
|
+ // which is above the threshold of 65%.
|
|
|
+ waitForServiceToBeStable(client, exampleApp, 6000);
|
|
|
+ Assert.fail("Service should not be in a stable state. It should throw "
|
|
|
+ + "a timeout exception.");
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Check that service state is STARTED and only 3 containers are running
|
|
|
+ service = client.getStatus(exampleApp.getName());
|
|
|
+ component = service.getComponent("compa");
|
|
|
+ Assert.assertEquals("Service state should be STARTED",
|
|
|
+ ServiceState.STARTED, service.getState());
|
|
|
+ Assert.assertEquals("Component state should be FLEXING",
|
|
|
+ ComponentState.FLEXING, component.getState());
|
|
|
+ Assert.assertEquals("3 containers are expected to be running", 3,
|
|
|
+ component.getContainers().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Flex compa up to 5 - will make health 60% (3 out of 5 running), so
|
|
|
+ // service will stop since it is below threshold of 65%.
|
|
|
+ compCounts.put("compa", 5L);
|
|
|
+ exampleApp.getComponent("compa").setNumberOfContainers(5L);
|
|
|
+ client.flexByRestService(exampleApp.getName(), compCounts);
|
|
|
+ try {
|
|
|
+ // Wait for 14 secs (window 3 secs + 1 for next poll + 2 for buffer + 5
|
|
|
+ // secs of service wait before shutting down + 3 secs app cleanup so that
|
|
|
+ // API returns that service is in FAILED state). Note, because of
|
|
|
+ // anti-affinity the 4th and 5th container will never be allocated.
|
|
|
+ waitForServiceToBeInState(client, exampleApp, ServiceState.FAILED,
|
|
|
+ 14000);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail("Should not have thrown exception");
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Destroy service {}", exampleApp);
|
|
|
+ client.actionDestroy(exampleApp.getName());
|
|
|
+ }
|
|
|
+
|
|
|
// Check containers launched are in dependency order
|
|
|
// Get all containers into a list and sort based on container launch time e.g.
|
|
|
// compa-c1, compa-c2, compb-c1, compb-c2;
|