|
@@ -46,10 +46,6 @@ import java.util.TreeMap;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
-import com.google.inject.Inject;
|
|
|
-import com.google.inject.persist.PersistService;
|
|
|
-import junit.framework.Assert;
|
|
|
-
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.Role;
|
|
|
import org.apache.ambari.server.RoleCommand;
|
|
@@ -98,9 +94,13 @@ import org.slf4j.LoggerFactory;
|
|
|
import com.google.common.reflect.TypeToken;
|
|
|
import com.google.inject.AbstractModule;
|
|
|
import com.google.inject.Guice;
|
|
|
+import com.google.inject.Inject;
|
|
|
import com.google.inject.Injector;
|
|
|
+import com.google.inject.persist.PersistService;
|
|
|
import com.google.inject.persist.UnitOfWork;
|
|
|
|
|
|
+import junit.framework.Assert;
|
|
|
+
|
|
|
public class TestActionScheduler {
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
|
|
@@ -113,7 +113,6 @@ public class TestActionScheduler {
|
|
|
|
|
|
private static Injector injector;
|
|
|
|
|
|
- private final String serverHostname = StageUtils.getHostName();
|
|
|
private final String hostname = "ahost.ambari.apache.org";
|
|
|
private final int MAX_CYCLE_ITERATIONS = 100;
|
|
|
|
|
@@ -291,9 +290,9 @@ public class TestActionScheduler {
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequestEntity(anyLong())).thenReturn(request);
|
|
|
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
HostRoleCommand command = s.getHostRoleCommand(host, role);
|
|
@@ -371,9 +370,9 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
HostRoleCommand command = s.getHostRoleCommand(host, role);
|
|
@@ -466,9 +465,9 @@ public class TestActionScheduler {
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
//HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
|
|
@@ -482,9 +481,9 @@ public class TestActionScheduler {
|
|
|
}
|
|
|
}).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
|
|
|
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
Long requestId = (Long) invocation.getArguments()[0];
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId())) {
|
|
@@ -564,27 +563,7 @@ public class TestActionScheduler {
|
|
|
Properties properties = new Properties();
|
|
|
Configuration conf = new Configuration(properties);
|
|
|
Clusters fsm = mock(Clusters.class);
|
|
|
- Cluster oneClusterMock = mock(Cluster.class);
|
|
|
- Service serviceObj = mock(Service.class);
|
|
|
- ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
- ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- Host host = mock(Host.class);
|
|
|
-
|
|
|
- when(host.getHostName()).thenReturn(serverHostname);
|
|
|
- when(host.getState()).thenReturn(HostState.HEALTHY);
|
|
|
-
|
|
|
- when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
- when(fsm.getHost(anyString())).thenReturn(host);
|
|
|
- when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
- when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
- when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
- when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
-
|
|
|
- HashMap<String, ServiceComponentHost> hosts =
|
|
|
- new HashMap<String, ServiceComponentHost>();
|
|
|
- hosts.put(serverHostname, sch);
|
|
|
- when(scomp.getServiceComponentHosts()).thenReturn(hosts);
|
|
|
|
|
|
List<Stage> stages = new ArrayList<Stage>();
|
|
|
Map<String, String> payload = new HashMap<String, String>();
|
|
@@ -599,37 +578,44 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
|
|
|
- HostRoleCommand command = s.getHostRoleCommand(host, role);
|
|
|
+
|
|
|
+ HostRoleCommand command = null;
|
|
|
+ if (null == host) {
|
|
|
+ command = s.getHostRoleCommand(null, role);
|
|
|
+ } else {
|
|
|
+ command = s.getHostRoleCommand(host, role);
|
|
|
+ }
|
|
|
+
|
|
|
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
|
|
|
return null;
|
|
|
}
|
|
|
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
|
|
|
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<HostRoleCommand>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
|
|
|
+ public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
|
|
|
}
|
|
|
}).when(db).getTask(anyLong());
|
|
|
- doAnswer(new Answer() {
|
|
|
+
|
|
|
+ doAnswer(new Answer<List<HostRoleCommand>>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- String host = (String) invocation.getArguments()[0];
|
|
|
+ public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String role = (String) invocation.getArguments()[1];
|
|
|
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
|
|
|
|
|
|
- HostRoleCommand task = s.getHostRoleCommand(host, role);
|
|
|
+ HostRoleCommand task = s.getHostRoleCommand(null, role);
|
|
|
|
|
|
if (task.getStatus() == status) {
|
|
|
return Arrays.asList(task);
|
|
|
} else {
|
|
|
- return null;
|
|
|
+ return Collections.emptyList();
|
|
|
}
|
|
|
}
|
|
|
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
|
|
@@ -639,13 +625,13 @@ public class TestActionScheduler {
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
int cycleCount = 0;
|
|
|
- while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")
|
|
|
+ while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
|
|
|
.equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
|
|
|
scheduler.doWork();
|
|
|
scheduler.getServerActionExecutor().doWork();
|
|
|
}
|
|
|
|
|
|
- assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"),
|
|
|
+ assertEquals(stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"),
|
|
|
HostRoleStatus.COMPLETED);
|
|
|
}
|
|
|
|
|
@@ -658,26 +644,7 @@ public class TestActionScheduler {
|
|
|
Properties properties = new Properties();
|
|
|
Configuration conf = new Configuration(properties);
|
|
|
Clusters fsm = mock(Clusters.class);
|
|
|
- Cluster oneClusterMock = mock(Cluster.class);
|
|
|
- Service serviceObj = mock(Service.class);
|
|
|
- ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
- ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
- when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
- when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
- when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
- when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
-
|
|
|
- Host host = mock(Host.class);
|
|
|
- HashMap<String, ServiceComponentHost> hosts =
|
|
|
- new HashMap<String, ServiceComponentHost>();
|
|
|
- hosts.put(serverHostname, sch);
|
|
|
- when(scomp.getServiceComponentHosts()).thenReturn(hosts);
|
|
|
-
|
|
|
- when(fsm.getHost(anyString())).thenReturn(host);
|
|
|
- when(host.getState()).thenReturn(HostState.HEALTHY);
|
|
|
- when(host.getHostName()).thenReturn(serverHostname);
|
|
|
|
|
|
List<Stage> stages = new ArrayList<Stage>();
|
|
|
Map<String, String> payload = new HashMap<String, String>();
|
|
@@ -693,38 +660,46 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
|
|
|
- HostRoleCommand command = s.getHostRoleCommand(host, role);
|
|
|
+
|
|
|
+ HostRoleCommand command = null;
|
|
|
+ if (null == host) {
|
|
|
+ command = s.getHostRoleCommand(null, role);
|
|
|
+ } else {
|
|
|
+ command = s.getHostRoleCommand(host, role);
|
|
|
+ }
|
|
|
+
|
|
|
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
|
|
|
return null;
|
|
|
}
|
|
|
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
|
|
|
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<HostRoleCommand>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
|
|
|
+ public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
|
|
|
}
|
|
|
}).when(db).getTask(anyLong());
|
|
|
- doAnswer(new Answer() {
|
|
|
+
|
|
|
+ doAnswer(new Answer<List<HostRoleCommand>>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- String host = (String) invocation.getArguments()[0];
|
|
|
+ public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String role = (String) invocation.getArguments()[1];
|
|
|
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
|
|
|
|
|
|
- HostRoleCommand task = s.getHostRoleCommand(host, role);
|
|
|
+ HostRoleCommand task = s.getHostRoleCommand(null, role);
|
|
|
|
|
|
if (task.getStatus() == status) {
|
|
|
return Arrays.asList(task);
|
|
|
} else {
|
|
|
- return null;
|
|
|
+ return Collections.emptyList();
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
|
|
|
|
|
@@ -733,14 +708,14 @@ public class TestActionScheduler {
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
int cycleCount = 0;
|
|
|
- while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION").isCompletedState()
|
|
|
+ while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState()
|
|
|
&& cycleCount++ <= MAX_CYCLE_ITERATIONS) {
|
|
|
scheduler.doWork();
|
|
|
scheduler.getServerActionExecutor().doWork();
|
|
|
}
|
|
|
|
|
|
assertEquals(HostRoleStatus.TIMEDOUT,
|
|
|
- stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"));
|
|
|
+ stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -749,21 +724,7 @@ public class TestActionScheduler {
|
|
|
Properties properties = new Properties();
|
|
|
Configuration conf = new Configuration(properties);
|
|
|
Clusters fsm = mock(Clusters.class);
|
|
|
- Cluster oneClusterMock = mock(Cluster.class);
|
|
|
- Service serviceObj = mock(Service.class);
|
|
|
- ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
- ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
- when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
- when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
- when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
- when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
-
|
|
|
- HashMap<String, ServiceComponentHost> hosts =
|
|
|
- new HashMap<String, ServiceComponentHost>();
|
|
|
- hosts.put(serverHostname, sch);
|
|
|
- when(scomp.getServiceComponentHosts()).thenReturn(hosts);
|
|
|
|
|
|
List<Stage> stages = new ArrayList<Stage>();
|
|
|
Map<String, String> payload = new HashMap<String, String>();
|
|
@@ -779,37 +740,45 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
|
|
|
- HostRoleCommand command = s.getHostRoleCommand(host, role);
|
|
|
+
|
|
|
+ HostRoleCommand command = null;
|
|
|
+ if (null == host) {
|
|
|
+ command = s.getHostRoleCommand(null, role);
|
|
|
+ } else {
|
|
|
+ command = s.getHostRoleCommand(host, role);
|
|
|
+ }
|
|
|
+
|
|
|
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
|
|
|
return null;
|
|
|
}
|
|
|
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
|
|
|
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<HostRoleCommand>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
|
|
|
+ public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
|
|
|
}
|
|
|
}).when(db).getTask(anyLong());
|
|
|
- doAnswer(new Answer() {
|
|
|
+
|
|
|
+ doAnswer(new Answer<List<HostRoleCommand>>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- String host = (String) invocation.getArguments()[0];
|
|
|
+ public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String role = (String) invocation.getArguments()[1];
|
|
|
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
|
|
|
|
|
|
- HostRoleCommand task = s.getHostRoleCommand(host, role);
|
|
|
+ HostRoleCommand task = s.getHostRoleCommand(null, role);
|
|
|
|
|
|
if (task.getStatus() == status) {
|
|
|
return Arrays.asList(task);
|
|
|
} else {
|
|
|
- return null;
|
|
|
+ return Collections.emptyList();
|
|
|
}
|
|
|
}
|
|
|
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
|
|
@@ -818,12 +787,12 @@ public class TestActionScheduler {
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
int cycleCount = 0;
|
|
|
- while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")
|
|
|
+ while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
|
|
|
.equals(HostRoleStatus.FAILED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
|
|
|
scheduler.doWork();
|
|
|
scheduler.getServerActionExecutor().doWork();
|
|
|
}
|
|
|
- assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"),
|
|
|
+ assertEquals(stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"),
|
|
|
HostRoleStatus.FAILED);
|
|
|
assertEquals("test", stages.get(0).getRequestContext());
|
|
|
}
|
|
@@ -831,14 +800,15 @@ public class TestActionScheduler {
|
|
|
private Stage getStageWithServerAction(long requestId, long stageId,
|
|
|
Map<String, String> payload, String requestContext,
|
|
|
int timeout) {
|
|
|
- String serverHostname = StageUtils.getHostName();
|
|
|
+
|
|
|
Stage stage = stageFactory.createNew(requestId, "/tmp", "cluster1", 1L, requestContext, CLUSTER_HOST_INFO,
|
|
|
"{}", "{}");
|
|
|
stage.setStageId(stageId);
|
|
|
|
|
|
- stage.addServerActionCommand(MockServerAction.class.getName(), Role.AMBARI_SERVER_ACTION,
|
|
|
+ stage.addServerActionCommand(MockServerAction.class.getName(), null,
|
|
|
+ Role.AMBARI_SERVER_ACTION,
|
|
|
RoleCommand.EXECUTE, "cluster1",
|
|
|
- new ServiceComponentHostServerActionEvent(serverHostname, System.currentTimeMillis()),
|
|
|
+ new ServiceComponentHostServerActionEvent(null, System.currentTimeMillis()),
|
|
|
payload,
|
|
|
null, null, timeout, false);
|
|
|
|
|
@@ -859,7 +829,6 @@ public class TestActionScheduler {
|
|
|
ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- RequestFactory requestFactory = mock(RequestFactory.class);
|
|
|
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
@@ -917,10 +886,6 @@ public class TestActionScheduler {
|
|
|
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
- ActionManager am = new ActionManager(
|
|
|
- 2, 2, aq, fsm, db, new HostsMap((String) null),
|
|
|
- unitOfWork, requestFactory, conf, null);
|
|
|
-
|
|
|
scheduler.doWork();
|
|
|
|
|
|
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
|
|
@@ -943,7 +908,6 @@ public class TestActionScheduler {
|
|
|
ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- RequestFactory requestFactory = mock(RequestFactory.class);
|
|
|
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
@@ -1011,11 +975,6 @@ public class TestActionScheduler {
|
|
|
new HostsMap((String) null),
|
|
|
unitOfWork, null, conf);
|
|
|
|
|
|
- ActionManager am = new ActionManager(
|
|
|
- 2, 2, aq, fsm, db, new HostsMap((String) null),
|
|
|
- unitOfWork,
|
|
|
- requestFactory, conf, null);
|
|
|
-
|
|
|
scheduler.doWork();
|
|
|
|
|
|
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "HIVE_CLIENT"));
|
|
@@ -1038,7 +997,6 @@ public class TestActionScheduler {
|
|
|
ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- RequestFactory requestFactory = mock(RequestFactory.class);
|
|
|
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
@@ -1088,11 +1046,6 @@ public class TestActionScheduler {
|
|
|
new HostsMap((String) null),
|
|
|
unitOfWork, null, conf);
|
|
|
|
|
|
- ActionManager am = new ActionManager(
|
|
|
- 2, 2, aq, fsm, db, new HostsMap((String) null),
|
|
|
- unitOfWork,
|
|
|
- requestFactory, conf, null);
|
|
|
-
|
|
|
scheduler.doWork();
|
|
|
|
|
|
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
|
|
@@ -1143,16 +1096,15 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
|
|
|
for (CommandReport report : reports) {
|
|
|
String actionId = report.getActionId();
|
|
|
long[] requestStageIds = StageUtils.getRequestStage(actionId);
|
|
|
Long requestId = requestStageIds[0];
|
|
|
Long stageId = requestStageIds[1];
|
|
|
- String role = report.getRole();
|
|
|
Long id = report.getTaskId();
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
|
|
@@ -1184,9 +1136,9 @@ public class TestActionScheduler {
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
Long requestId = (Long) invocation.getArguments()[0];
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId())) {
|
|
@@ -1251,7 +1203,6 @@ public class TestActionScheduler {
|
|
|
ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- RequestFactory requestFactory = mock(RequestFactory.class);
|
|
|
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
@@ -1332,9 +1283,9 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
Long requestId = (Long) invocation.getArguments()[1];
|
|
|
Long stageId = (Long) invocation.getArguments()[2];
|
|
@@ -1365,9 +1316,9 @@ public class TestActionScheduler {
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
Long requestId = (Long) invocation.getArguments()[0];
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId())) {
|
|
@@ -1390,8 +1341,6 @@ public class TestActionScheduler {
|
|
|
ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
|
|
|
new HostsMap((String) null),
|
|
|
unitOfWork, null, conf);
|
|
|
- ActionManager am = new ActionManager(
|
|
|
- 2, 10000, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf, null);
|
|
|
|
|
|
scheduler.doWork();
|
|
|
|
|
@@ -1511,16 +1460,15 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
|
|
|
for (CommandReport report : reports) {
|
|
|
String actionId = report.getActionId();
|
|
|
long[] requestStageIds = StageUtils.getRequestStage(actionId);
|
|
|
Long requestId = requestStageIds[0];
|
|
|
Long stageId = requestStageIds[1];
|
|
|
- String role = report.getRole();
|
|
|
Long id = report.getTaskId();
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
|
|
@@ -1552,9 +1500,9 @@ public class TestActionScheduler {
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
Long requestId = (Long) invocation.getArguments()[0];
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId())) {
|
|
@@ -1882,26 +1830,11 @@ public class TestActionScheduler {
|
|
|
Properties properties = new Properties();
|
|
|
Configuration conf = new Configuration(properties);
|
|
|
Clusters fsm = mock(Clusters.class);
|
|
|
- Cluster oneClusterMock = mock(Cluster.class);
|
|
|
- Service serviceObj = mock(Service.class);
|
|
|
- ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
- ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
- when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
- when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
- when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
- when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
-
|
|
|
- HashMap<String, ServiceComponentHost> hosts =
|
|
|
- new HashMap<String, ServiceComponentHost>();
|
|
|
- hosts.put(serverHostname, sch);
|
|
|
- when(scomp.getServiceComponentHosts()).thenReturn(hosts);
|
|
|
|
|
|
List<Stage> stages = new ArrayList<Stage>();
|
|
|
Map<String, String> payload = new HashMap<String, String>();
|
|
|
final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
|
|
|
- s.getExecutionCommands().get(serverHostname).get(0).getExecutionCommand().setServiceName(null);
|
|
|
stages.add(s);
|
|
|
|
|
|
ActionDBAccessor db = mock(ActionDBAccessor.class);
|
|
@@ -1912,37 +1845,45 @@ public class TestActionScheduler {
|
|
|
|
|
|
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String host = (String) invocation.getArguments()[0];
|
|
|
String role = (String) invocation.getArguments()[3];
|
|
|
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
|
|
|
- HostRoleCommand command = s.getHostRoleCommand(host, role);
|
|
|
+
|
|
|
+ HostRoleCommand command = null;
|
|
|
+ if (null == host) {
|
|
|
+ command = s.getHostRoleCommand(null, role);
|
|
|
+ } else {
|
|
|
+ command = s.getHostRoleCommand(host, role);
|
|
|
+ }
|
|
|
+
|
|
|
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
|
|
|
return null;
|
|
|
}
|
|
|
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
|
|
|
- doAnswer(new Answer() {
|
|
|
+
|
|
|
+ doAnswer(new Answer<List<HostRoleCommand>>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- String host = (String) invocation.getArguments()[0];
|
|
|
+ public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
|
|
|
String role = (String) invocation.getArguments()[1];
|
|
|
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
|
|
|
|
|
|
- HostRoleCommand task = s.getHostRoleCommand(host, role);
|
|
|
+ HostRoleCommand task = s.getHostRoleCommand(null, role);
|
|
|
|
|
|
if (task.getStatus() == status) {
|
|
|
return Arrays.asList(task);
|
|
|
} else {
|
|
|
- return null;
|
|
|
+ return Collections.emptyList();
|
|
|
}
|
|
|
}
|
|
|
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
|
|
|
- doAnswer(new Answer() {
|
|
|
+
|
|
|
+ doAnswer(new Answer<HostRoleCommand>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
- return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
|
|
|
+ public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
|
|
|
}
|
|
|
}).when(db).getTask(anyLong());
|
|
|
|
|
@@ -1951,13 +1892,13 @@ public class TestActionScheduler {
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
int cycleCount = 0;
|
|
|
- while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")
|
|
|
+ while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
|
|
|
.equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
|
|
|
scheduler.doWork();
|
|
|
scheduler.getServerActionExecutor().doWork();
|
|
|
}
|
|
|
|
|
|
- assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"),
|
|
|
+ assertEquals(stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"),
|
|
|
HostRoleStatus.COMPLETED);
|
|
|
}
|
|
|
|
|
@@ -1970,7 +1911,6 @@ public class TestActionScheduler {
|
|
|
ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- RequestFactory requestFactory = mock(RequestFactory.class);
|
|
|
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
@@ -2019,16 +1959,15 @@ public class TestActionScheduler {
|
|
|
}
|
|
|
when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
|
|
|
when(db.getAllStages(anyLong())).thenReturn(stages);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
|
|
|
for (CommandReport report : reports) {
|
|
|
String actionId = report.getActionId();
|
|
|
long[] requestStageIds = StageUtils.getRequestStage(actionId);
|
|
|
Long requestId = requestStageIds[0];
|
|
|
Long stageId = requestStageIds[1];
|
|
|
- String role = report.getRole();
|
|
|
Long id = report.getTaskId();
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
|
|
@@ -2060,9 +1999,9 @@ public class TestActionScheduler {
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
Long requestId = (Long) invocation.getArguments()[0];
|
|
|
for (Stage stage : stages) {
|
|
|
if (requestId.equals(stage.getRequestId())) {
|
|
@@ -2086,10 +2025,6 @@ public class TestActionScheduler {
|
|
|
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
- ActionManager am = new ActionManager(
|
|
|
- 2, 2, aq, fsm, db, new HostsMap((String) null),
|
|
|
- unitOfWork, requestFactory, conf, null);
|
|
|
-
|
|
|
scheduler.doWork();
|
|
|
|
|
|
String reason = "Some reason";
|
|
@@ -2116,7 +2051,6 @@ public class TestActionScheduler {
|
|
|
ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
- RequestFactory requestFactory = mock(RequestFactory.class);
|
|
|
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
@@ -2184,16 +2118,15 @@ public class TestActionScheduler {
|
|
|
}
|
|
|
when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
|
|
|
when(db.getAllStages(anyLong())).thenReturn(stagesInProgress);
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
|
|
|
for (CommandReport report : reports) {
|
|
|
String actionId = report.getActionId();
|
|
|
long[] requestStageIds = StageUtils.getRequestStage(actionId);
|
|
|
Long requestId = requestStageIds[0];
|
|
|
Long stageId = requestStageIds[1];
|
|
|
- String role = report.getRole();
|
|
|
Long id = report.getTaskId();
|
|
|
for (Stage stage : stagesInProgress) {
|
|
|
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
|
|
@@ -2227,9 +2160,9 @@ public class TestActionScheduler {
|
|
|
});
|
|
|
|
|
|
final Map<Long, Boolean> startedRequests = new HashMap<Long, Boolean>();
|
|
|
- doAnswer(new Answer() {
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
startedRequests.put((Long)invocation.getArguments()[0], true);
|
|
|
return null;
|
|
|
}
|
|
@@ -2252,10 +2185,6 @@ public class TestActionScheduler {
|
|
|
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
|
|
|
new HostsMap((String) null), unitOfWork, null, conf);
|
|
|
|
|
|
- ActionManager am = new ActionManager(
|
|
|
- 2, 2, aq, fsm, db, new HostsMap((String) null),
|
|
|
- unitOfWork, requestFactory, conf, null);
|
|
|
-
|
|
|
// Execution of request 1
|
|
|
|
|
|
scheduler.doWork();
|