|
|
@@ -17,13 +17,21 @@
|
|
|
*/
|
|
|
package org.apache.ambari.server.actionmanager;
|
|
|
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Matchers.anyCollectionOf;
|
|
|
import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Matchers.anyString;
|
|
|
-import static org.mockito.Mockito.*;
|
|
|
+import static org.mockito.Matchers.eq;
|
|
|
+import static org.mockito.Mockito.atLeastOnce;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
import java.lang.reflect.Type;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
@@ -36,9 +44,8 @@ import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
-import com.google.common.reflect.TypeToken;
|
|
|
-import com.google.inject.persist.UnitOfWork;
|
|
|
import junit.framework.Assert;
|
|
|
+
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.Role;
|
|
|
import org.apache.ambari.server.RoleCommand;
|
|
|
@@ -69,7 +76,6 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEve
|
|
|
import org.apache.ambari.server.utils.StageUtils;
|
|
|
import org.easymock.Capture;
|
|
|
import org.easymock.EasyMock;
|
|
|
-import org.junit.Ignore;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.ArgumentCaptor;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
@@ -77,6 +83,9 @@ import org.mockito.stubbing.Answer;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.reflect.TypeToken;
|
|
|
+import com.google.inject.persist.UnitOfWork;
|
|
|
+
|
|
|
public class TestActionScheduler {
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
|
|
|
@@ -96,7 +105,7 @@ public class TestActionScheduler {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testActionSchedule() throws Exception {
|
|
|
-
|
|
|
+
|
|
|
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
|
|
|
Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
|
|
|
|
|
|
@@ -116,7 +125,7 @@ public class TestActionScheduler {
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
-
|
|
|
+
|
|
|
Host host = mock(Host.class);
|
|
|
HashMap<String, ServiceComponentHost> hosts =
|
|
|
new HashMap<String, ServiceComponentHost>();
|
|
|
@@ -132,6 +141,7 @@ public class TestActionScheduler {
|
|
|
Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
|
|
|
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
|
|
|
stages.add(s);
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
Request request = mock(Request.class);
|
|
|
@@ -167,7 +177,7 @@ public class TestActionScheduler {
|
|
|
int expectedQueueSize, ActionScheduler scheduler) {
|
|
|
int cycleCount = 0;
|
|
|
while (cycleCount++ <= MAX_CYCLE_ITERATIONS) {
|
|
|
- List<AgentCommand> ac = aq.dequeueAll(hostname);
|
|
|
+ List<AgentCommand> ac = aq.dequeueAll(hostname);
|
|
|
if (ac != null) {
|
|
|
if (ac.size() == expectedQueueSize) {
|
|
|
return ac;
|
|
|
@@ -220,6 +230,7 @@ public class TestActionScheduler {
|
|
|
stages.add(s);
|
|
|
|
|
|
ActionDBAccessor db = mock(ActionDBAccessor.class);
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
@@ -292,7 +303,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -379,7 +390,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
doAnswer(new Answer() {
|
|
|
@@ -508,7 +519,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -572,7 +583,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -680,7 +691,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
Properties properties = new Properties();
|
|
|
@@ -764,7 +775,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
Properties properties = new Properties();
|
|
|
@@ -805,7 +816,7 @@ public class TestActionScheduler {
|
|
|
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
-
|
|
|
+
|
|
|
String hostname1 = "ahost.ambari.apache.org";
|
|
|
String hostname2 = "bhost.ambari.apache.org";
|
|
|
HashMap<String, ServiceComponentHost> hosts =
|
|
|
@@ -813,48 +824,48 @@ public class TestActionScheduler {
|
|
|
hosts.put(hostname1, sch);
|
|
|
hosts.put(hostname2, sch);
|
|
|
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
|
|
|
-
|
|
|
+
|
|
|
List<Stage> stages = new ArrayList<Stage>();
|
|
|
Stage backgroundStage = null;
|
|
|
stages.add(//stage with background command
|
|
|
backgroundStage = getStageWithSingleTask(
|
|
|
hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1));
|
|
|
-
|
|
|
+
|
|
|
Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType());
|
|
|
-
|
|
|
+
|
|
|
stages.add( // Stage with the same hostname, should be scheduled
|
|
|
getStageWithSingleTask(
|
|
|
hostname1, "cluster1", Role.GANGLIA_MONITOR,
|
|
|
RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
|
|
|
-
|
|
|
+
|
|
|
stages.add(
|
|
|
getStageWithSingleTask(
|
|
|
hostname2, "cluster1", Role.DATANODE,
|
|
|
RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
ActionDBAccessor db = mock(ActionDBAccessor.class);
|
|
|
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
-
|
|
|
+
|
|
|
Properties properties = new Properties();
|
|
|
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
|
|
|
Configuration conf = new Configuration(properties);
|
|
|
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
|
|
|
new HostsMap((String) null), new ServerActionManagerImpl(fsm),
|
|
|
unitOfWork, conf);
|
|
|
-
|
|
|
+
|
|
|
ActionManager am = new ActionManager(
|
|
|
2, 2, aq, fsm, db, new HostsMap((String) null),
|
|
|
new ServerActionManagerImpl(fsm), unitOfWork,
|
|
|
requestFactory, conf);
|
|
|
-
|
|
|
+
|
|
|
scheduler.doWork();
|
|
|
-
|
|
|
+
|
|
|
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
|
|
|
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
|
|
|
|
|
|
@@ -901,7 +912,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -1082,7 +1093,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -1259,7 +1270,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -1419,7 +1430,7 @@ public class TestActionScheduler {
|
|
|
assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE)));
|
|
|
assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER)));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test
|
|
|
public void testSuccessCriteria() {
|
|
|
RoleStats rs1 = new RoleStats(1, (float)0.5);
|
|
|
@@ -1427,37 +1438,37 @@ public class TestActionScheduler {
|
|
|
assertTrue(rs1.isSuccessFactorMet());
|
|
|
rs1.numSucceeded = 0;
|
|
|
assertFalse(rs1.isSuccessFactorMet());
|
|
|
-
|
|
|
+
|
|
|
RoleStats rs2 = new RoleStats(2, (float)0.5);
|
|
|
rs2.numSucceeded = 1;
|
|
|
assertTrue(rs2.isSuccessFactorMet());
|
|
|
-
|
|
|
+
|
|
|
RoleStats rs3 = new RoleStats(3, (float)0.5);
|
|
|
rs3.numSucceeded = 2;
|
|
|
assertTrue(rs2.isSuccessFactorMet());
|
|
|
rs3.numSucceeded = 1;
|
|
|
assertFalse(rs3.isSuccessFactorMet());
|
|
|
-
|
|
|
+
|
|
|
RoleStats rs4 = new RoleStats(3, (float)1.0);
|
|
|
rs4.numSucceeded = 2;
|
|
|
assertFalse(rs3.isSuccessFactorMet());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly.
|
|
|
*/
|
|
|
@Test
|
|
|
public void testClusterHostInfoCache() throws Exception {
|
|
|
-
|
|
|
+
|
|
|
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
|
|
|
-
|
|
|
+
|
|
|
//Data for stages
|
|
|
Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
|
|
|
Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
|
|
|
int stageId = 1;
|
|
|
int requestId1 = 1;
|
|
|
int requestId2 = 2;
|
|
|
-
|
|
|
+
|
|
|
ActionQueue aq = new ActionQueue();
|
|
|
Properties properties = new Properties();
|
|
|
Configuration conf = new Configuration(properties);
|
|
|
@@ -1492,6 +1503,7 @@ public class TestActionScheduler {
|
|
|
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
|
|
|
Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED,
|
|
|
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(1);
|
|
|
when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1));
|
|
|
|
|
|
//Keep large number of attempts so that the task is not expired finally
|
|
|
@@ -1504,12 +1516,12 @@ public class TestActionScheduler {
|
|
|
|
|
|
assertTrue(ac.get(0) instanceof ExecutionCommand);
|
|
|
assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
|
|
|
-
|
|
|
+
|
|
|
assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
|
|
|
-
|
|
|
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(1);
|
|
|
when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
|
|
|
-
|
|
|
+
|
|
|
//Verify that ActionSheduler does not return cached value of cluster host info for new requestId
|
|
|
ac = waitForQueueSize(hostname, aq, 1, scheduler);
|
|
|
assertTrue(ac.get(0) instanceof ExecutionCommand);
|
|
|
@@ -1572,7 +1584,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
|
|
|
@@ -1655,7 +1667,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
doAnswer(new Answer() {
|
|
|
@Override
|
|
|
@@ -1728,7 +1740,7 @@ public class TestActionScheduler {
|
|
|
Request request = mock(Request.class);
|
|
|
when(request.isExclusive()).thenReturn(false);
|
|
|
when(db.getRequest(anyLong())).thenReturn(request);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stages.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stages);
|
|
|
|
|
|
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
|
|
|
@@ -1894,7 +1906,7 @@ public class TestActionScheduler {
|
|
|
when(host3.getHostName()).thenReturn(hostname);
|
|
|
|
|
|
ActionDBAccessor db = mock(ActionDBAccessor.class);
|
|
|
-
|
|
|
+ when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
|
|
|
when(db.getStagesInProgress()).thenReturn(stagesInProgress);
|
|
|
|
|
|
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
|