|
@@ -24,7 +24,6 @@ import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Matchers.anyString;
|
|
|
import static org.mockito.Mockito.*;
|
|
|
-
|
|
|
import java.lang.reflect.Type;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
@@ -33,12 +32,9 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
import java.util.Set;
|
|
|
-
|
|
|
import com.google.common.reflect.TypeToken;
|
|
|
import com.google.inject.persist.UnitOfWork;
|
|
|
-
|
|
|
import junit.framework.Assert;
|
|
|
-
|
|
|
import org.apache.ambari.server.Role;
|
|
|
import org.apache.ambari.server.RoleCommand;
|
|
|
import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
|
|
@@ -57,10 +53,15 @@ import org.apache.ambari.server.state.HostState;
|
|
|
import org.apache.ambari.server.state.Service;
|
|
|
import org.apache.ambari.server.state.ServiceComponent;
|
|
|
import org.apache.ambari.server.state.ServiceComponentHost;
|
|
|
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
|
|
|
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
|
|
|
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
|
|
|
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
|
|
|
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
|
|
|
import org.apache.ambari.server.utils.StageUtils;
|
|
|
+import org.junit.Ignore;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.ArgumentCaptor;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
import org.mockito.stubbing.Answer;
|
|
|
import org.slf4j.Logger;
|
|
@@ -277,6 +278,141 @@ public class TestActionScheduler {
|
|
|
scheduler.stop();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testOpFailedEventRaisedForAbortedHostRole() throws Exception {
|
|
|
+ ActionQueue aq = new ActionQueue();
|
|
|
+ Properties properties = new Properties();
|
|
|
+ Configuration conf = new Configuration(properties);
|
|
|
+ Clusters fsm = mock(Clusters.class);
|
|
|
+ Cluster oneClusterMock = mock(Cluster.class);
|
|
|
+ Service serviceObj = mock(Service.class);
|
|
|
+ ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
+ ServiceComponentHost sch1 = mock(ServiceComponentHost.class);
|
|
|
+ ServiceComponentHost sch2 = mock(ServiceComponentHost.class);
|
|
|
+ String hostname1 = "host1";
|
|
|
+ String hostname2 = "host2";
|
|
|
+ Host host1 = mock(Host.class);
|
|
|
+ Host host2 = mock(Host.class);
|
|
|
+ when(fsm.getHost(hostname1)).thenReturn(host1);
|
|
|
+ when(fsm.getHost(hostname2)).thenReturn(host2);
|
|
|
+ when(host1.getState()).thenReturn(HostState.HEARTBEAT_LOST);
|
|
|
+ when(host2.getState()).thenReturn(HostState.HEALTHY);
|
|
|
+ when(host1.getHostName()).thenReturn(hostname1);
|
|
|
+ when(host2.getHostName()).thenReturn(hostname2);
|
|
|
+ when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
|
|
|
+ when(scomp.getServiceComponentHost(hostname2)).thenReturn(sch2);
|
|
|
+
|
|
|
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
+
|
|
|
+ final List<Stage> stages = new ArrayList<Stage>();
|
|
|
+ Stage stage = new Stage(1, "/tmp", "cluster1", "stageWith2Tasks",
|
|
|
+ CLUSTER_HOST_INFO);
|
|
|
+ addInstallTaskToStage(stage, hostname1, "cluster1", Role.DATANODE,
|
|
|
+ RoleCommand.INSTALL, Service.Type.HDFS, 1);
|
|
|
+ addInstallTaskToStage(stage, hostname2, "cluster1", Role.NAMENODE,
|
|
|
+ RoleCommand.INSTALL, Service.Type.HDFS, 2);
|
|
|
+ stages.add(stage);
|
|
|
+
|
|
|
+ ActionDBAccessor db = mock(ActionDBAccessor.class);
|
|
|
+
|
|
|
+ when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
+
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ String host = (String) invocation.getArguments()[0];
|
|
|
+ String role = (String) invocation.getArguments()[3];
|
|
|
+ //HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
|
|
|
+ for (HostRoleCommand command : stages.get(0).getOrderedHostRoleCommands()) {
|
|
|
+ if (command.getHostName().equals(host) && command.getRole().name()
|
|
|
+ .equals(role)) {
|
|
|
+ command.setStatus(HostRoleStatus.TIMEDOUT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
|
|
|
+
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ Long requestId = (Long) invocation.getArguments()[0];
|
|
|
+ for (Stage stage : stages) {
|
|
|
+ if (requestId.equals(stage.getRequestId())) {
|
|
|
+ for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
|
|
|
+ if (command.getStatus() == HostRoleStatus.QUEUED ||
|
|
|
+ command.getStatus() == HostRoleStatus.IN_PROGRESS ||
|
|
|
+ command.getStatus() == HostRoleStatus.PENDING) {
|
|
|
+ command.setStatus(HostRoleStatus.ABORTED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(db).abortOperation(anyLong());
|
|
|
+
|
|
|
+ ArgumentCaptor<ServiceComponentHostEvent> eventsCapture1 =
|
|
|
+ ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
|
|
|
+ ArgumentCaptor<ServiceComponentHostEvent> eventsCapture2 =
|
|
|
+ ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
|
|
|
+
|
|
|
+ // Make sure the NN install doesn't timeout
|
|
|
+ ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
|
|
|
+ new HostsMap((String) null), null, unitOfWork, conf);
|
|
|
+ scheduler.setTaskTimeoutAdjustment(false);
|
|
|
+ // Start the thread
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ while (!stages.get(0).getHostRoleStatus(hostname1, "DATANODE")
|
|
|
+ .equals(HostRoleStatus.TIMEDOUT) && !stages.get(0).getHostRoleStatus
|
|
|
+ (hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) {
|
|
|
+
|
|
|
+ Thread.sleep(100L);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(HostRoleStatus.TIMEDOUT,
|
|
|
+ stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
|
|
|
+ Assert.assertEquals(HostRoleStatus.ABORTED,
|
|
|
+ stages.get(0).getHostRoleStatus(hostname2, "NAMENODE"));
|
|
|
+
|
|
|
+ verify(sch1, atLeastOnce()).handleEvent(eventsCapture1.capture());
|
|
|
+ verify(sch2, atLeastOnce()).handleEvent(eventsCapture2.capture());
|
|
|
+
|
|
|
+ List<ServiceComponentHostEvent> eventTypes = eventsCapture1.getAllValues();
|
|
|
+ eventTypes.addAll(eventsCapture2.getAllValues());
|
|
|
+
|
|
|
+ Assert.assertNotNull(eventTypes);
|
|
|
+
|
|
|
+ ServiceComponentHostOpFailedEvent datanodeFailedEvent = null;
|
|
|
+ ServiceComponentHostOpFailedEvent namenodeFailedEvent = null;
|
|
|
+
|
|
|
+ for (ServiceComponentHostEvent eventType : eventTypes) {
|
|
|
+ if (eventType instanceof ServiceComponentHostOpFailedEvent) {
|
|
|
+ ServiceComponentHostOpFailedEvent event =
|
|
|
+ (ServiceComponentHostOpFailedEvent) eventType;
|
|
|
+
|
|
|
+ if (event.getServiceComponentName().equals("DATANODE")) {
|
|
|
+ datanodeFailedEvent = event;
|
|
|
+ } else if (event.getServiceComponentName().equals("NAMENODE")) {
|
|
|
+ namenodeFailedEvent = event;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertNotNull("Datanode should be in Install failed state.",
|
|
|
+ datanodeFailedEvent);
|
|
|
+ Assert.assertNotNull("Namenode should be in Install failed state.",
|
|
|
+ namenodeFailedEvent);
|
|
|
+
|
|
|
+ scheduler.stop();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test server action
|
|
|
*/
|
|
@@ -1027,6 +1163,24 @@ public class TestActionScheduler {
|
|
|
return stage;
|
|
|
}
|
|
|
|
|
|
+ private void addInstallTaskToStage(Stage stage, String hostname,
|
|
|
+ String clusterName, Role role,
|
|
|
+ RoleCommand roleCommand, Service.Type service,
|
|
|
+ int taskId) {
|
|
|
+
|
|
|
+ stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
|
|
|
+ new ServiceComponentHostInstallEvent(role.toString(), hostname,
|
|
|
+ System.currentTimeMillis(), "HDP-0.2"), clusterName, service.toString());
|
|
|
+ ExecutionCommand command = stage.getExecutionCommandWrapper
|
|
|
+ (hostname, role.toString()).getExecutionCommand();
|
|
|
+ command.setTaskId(taskId);
|
|
|
+ for (HostRoleCommand cmd :stage.getOrderedHostRoleCommands()) {
|
|
|
+ if (cmd.getHostName().equals(hostname) && cmd.getRole().equals(role)) {
|
|
|
+ cmd.setTaskId(taskId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testSuccessFactors() {
|
|
|
Stage s = StageUtils.getATestStage(1, 1, CLUSTER_HOST_INFO);
|