|
@@ -635,6 +635,79 @@ public class TestActionScheduler {
|
|
|
HostRoleStatus.COMPLETED);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test server actions in multiple requests.
|
|
|
+ *
|
|
|
+ * This is used to make sure the server-side actions do not get filtered out from
|
|
|
+ * {@link org.apache.ambari.server.actionmanager.ActionScheduler#filterParallelPerHostStages(java.util.List)}
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testServerActionInMultipleRequests() throws Exception {
|
|
|
+ ActionQueue aq = new ActionQueue();
|
|
|
+ Clusters fsm = mock(Clusters.class);
|
|
|
+ Cluster oneClusterMock = mock(Cluster.class);
|
|
|
+ Service serviceObj = mock(Service.class);
|
|
|
+ ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
+
|
|
|
+ String clusterName = "cluster1";
|
|
|
+ String hostname1 = "ahost.ambari.apache.org";
|
|
|
+ String hostname2 = "bhost.ambari.apache.org";
|
|
|
+ HashMap<String, ServiceComponentHost> hosts =
|
|
|
+ new HashMap<String, ServiceComponentHost>();
|
|
|
+ hosts.put(hostname1, sch);
|
|
|
+ hosts.put(hostname2, sch);
|
|
|
+ hosts.put(Stage.INTERNAL_HOSTNAME, sch);
|
|
|
+ when(scomp.getServiceComponentHosts()).thenReturn(hosts);
|
|
|
+
|
|
|
+ List<Stage> stages = new ArrayList<Stage>();
|
|
|
+ Stage stage01 = createStage(clusterName, 0, 1);
|
|
|
+ addTask(stage01, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 1);
|
|
|
+
|
|
|
+ Stage stage11 = createStage("cluster1", 1, 1);
|
|
|
+ addTask(stage11, hostname1, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 2);
|
|
|
+
|
|
|
+ Stage stage02 = createStage("cluster1", 0, 2);
|
|
|
+ addTask(stage02, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 3);
|
|
|
+
|
|
|
+ Stage stage12 = createStage("cluster1", 1, 2);
|
|
|
+ addTask(stage12, hostname2, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 4);
|
|
|
+
|
|
|
+ stages.add(stage01);
|
|
|
+ stages.add(stage11);
|
|
|
+ stages.add(stage02);
|
|
|
+ stages.add(stage12);
|
|
|
+
|
|
|
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
|
|
|
+
|
|
|
+ RequestEntity request = mock(RequestEntity.class);
|
|
|
+ when(request.isExclusive()).thenReturn(false);
|
|
|
+ when(db.getRequestEntity(anyLong())).thenReturn(request);
|
|
|
+
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
+ when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
+
|
|
|
+ Properties properties = new Properties();
|
|
|
+ properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
|
|
|
+ Configuration conf = new Configuration(properties);
|
|
|
+ ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
|
|
|
+ new HostsMap((String) null),
|
|
|
+ unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf);
|
|
|
+
|
|
|
+ scheduler.doWork();
|
|
|
+
|
|
|
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
|
|
|
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, Role.KERBEROS_CLIENT.name()));
|
|
|
+ Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
|
|
|
+ Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname2, Role.KERBEROS_CLIENT.name()));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test server action
|
|
|
*/
|
|
@@ -1560,21 +1633,32 @@ public class TestActionScheduler {
|
|
|
return report;
|
|
|
}
|
|
|
|
|
|
- private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
|
|
|
- RoleCommand roleCommand, Service.Type service, int taskId,
|
|
|
- int stageId, int requestId) {
|
|
|
+ private Stage createStage(String clusterName, int stageId, int requestId) {
|
|
|
Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, "getStageWithSingleTask",
|
|
|
CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
|
|
|
stage.setStageId(stageId);
|
|
|
+ return stage;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Stage addTask(Stage stage, String hostname, String clusterName, Role role,
|
|
|
+ RoleCommand roleCommand, String serviceName, int taskId) {
|
|
|
stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
|
|
|
new ServiceComponentHostUpgradeEvent(role.toString(), hostname, System.currentTimeMillis(), "HDP-0.2"),
|
|
|
- clusterName, service.toString(), false, false);
|
|
|
+ clusterName, serviceName, false, false);
|
|
|
stage.getExecutionCommandWrapper(hostname,
|
|
|
role.toString()).getExecutionCommand();
|
|
|
stage.getOrderedHostRoleCommands().get(0).setTaskId(taskId);
|
|
|
return stage;
|
|
|
}
|
|
|
|
|
|
+ private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
|
|
|
+ RoleCommand roleCommand, Service.Type service, int taskId,
|
|
|
+ int stageId, int requestId) {
|
|
|
+ Stage stage = createStage(clusterName, stageId, requestId);
|
|
|
+ return addTask(stage, hostname, clusterName, role, roleCommand, service.name(), taskId);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, RoleCommand roleCommand,
|
|
|
String customCommandName, Service.Type service, int taskId, int stageId, int requestId) {
|
|
|
Stage stage = getStageWithSingleTask(hostname, clusterName, role, roleCommand, service, taskId, stageId, requestId);
|