|
@@ -26,12 +26,14 @@ import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.lang.reflect.Type;
|
|
import java.lang.reflect.Type;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
import com.google.common.reflect.TypeToken;
|
|
import com.google.common.reflect.TypeToken;
|
|
import com.google.inject.persist.UnitOfWork;
|
|
import com.google.inject.persist.UnitOfWork;
|
|
|
|
+
|
|
import junit.framework.Assert;
|
|
import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.ambari.server.Role;
|
|
import org.apache.ambari.server.Role;
|
|
@@ -64,7 +66,10 @@ public class TestActionScheduler {
|
|
private static final String CLUSTER_HOST_INFO = "{all_hosts=[c6403.ambari.apache.org," +
|
|
private static final String CLUSTER_HOST_INFO = "{all_hosts=[c6403.ambari.apache.org," +
|
|
" c6401.ambari.apache.org, c6402.ambari.apache.org], slave_hosts=[c6403.ambari.apache.org," +
|
|
" c6401.ambari.apache.org, c6402.ambari.apache.org], slave_hosts=[c6403.ambari.apache.org," +
|
|
" c6401.ambari.apache.org, c6402.ambari.apache.org]}";
|
|
" c6401.ambari.apache.org, c6402.ambari.apache.org]}";
|
|
-;
|
|
|
|
|
|
+ private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org,"
|
|
|
|
+ + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org,"
|
|
|
|
+ + " c6402.ambari.apache.org]}";
|
|
|
|
+
|
|
|
|
|
|
/**
|
|
/**
|
|
* This test sends a new action to the action scheduler and verifies that the action
|
|
* This test sends a new action to the action scheduler and verifies that the action
|
|
@@ -131,7 +136,7 @@ public class TestActionScheduler {
|
|
private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq,
|
|
private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq,
|
|
int expectedQueueSize) throws InterruptedException {
|
|
int expectedQueueSize) throws InterruptedException {
|
|
while (true) {
|
|
while (true) {
|
|
- List<AgentCommand> ac = aq.dequeueAll(hostname);
|
|
|
|
|
|
+ List<AgentCommand> ac = aq.dequeueAll(hostname);
|
|
if (ac != null) {
|
|
if (ac != null) {
|
|
if (ac.size() == expectedQueueSize) {
|
|
if (ac.size() == expectedQueueSize) {
|
|
return ac;
|
|
return ac;
|
|
@@ -619,4 +624,68 @@ public class TestActionScheduler {
|
|
rs4.numSucceeded = 2;
|
|
rs4.numSucceeded = 2;
|
|
assertFalse(rs3.isSuccessFactorMet());
|
|
assertFalse(rs3.isSuccessFactorMet());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testClusterHostInfoCache() throws Exception {
|
|
|
|
+
|
|
|
|
+ Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
|
|
|
|
+
|
|
|
|
+ //Data for stages
|
|
|
|
+ Map<String, List<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
|
|
|
|
+ Map<String, List<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
|
|
|
|
+ int stageId = 1;
|
|
|
|
+ int requestId1 = 1;
|
|
|
|
+ int requestId2 = 2;
|
|
|
|
+
|
|
|
|
+ ActionQueue aq = new ActionQueue();
|
|
|
|
+ Clusters fsm = mock(Clusters.class);
|
|
|
|
+ Cluster oneClusterMock = mock(Cluster.class);
|
|
|
|
+ Service serviceObj = mock(Service.class);
|
|
|
|
+ ServiceComponent scomp = mock(ServiceComponent.class);
|
|
|
|
+ ServiceComponentHost sch = mock(ServiceComponentHost.class);
|
|
|
|
+ UnitOfWork unitOfWork = mock(UnitOfWork.class);
|
|
|
|
+ when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
|
|
|
|
+ when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
|
|
|
|
+ when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
|
|
|
|
+ when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
|
|
|
|
+ when(serviceObj.getCluster()).thenReturn(oneClusterMock);
|
|
|
|
+ String hostname = "ahost.ambari.apache.org";
|
|
|
|
+ Host host = mock(Host.class);
|
|
|
|
+ when(fsm.getHost(anyString())).thenReturn(host);
|
|
|
|
+ when(host.getState()).thenReturn(HostState.HEALTHY);
|
|
|
|
+ when(host.getHostName()).thenReturn(hostname);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
|
|
|
|
+ Stage s1 = StageUtils.getATestStage(requestId1, stageId, hostname, CLUSTER_HOST_INFO);
|
|
|
|
+ Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED);
|
|
|
|
+ when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1));
|
|
|
|
+
|
|
|
|
+ //Keep large number of attempts so that the task is not expired finally
|
|
|
|
+ //Small action timeout to test rescheduling
|
|
|
|
+ ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
|
|
|
|
+ 10000, new HostsMap((String) null), null, unitOfWork);
|
|
|
|
+ scheduler.setTaskTimeoutAdjustment(false);
|
|
|
|
+ // Start the thread
|
|
|
|
+ scheduler.start();
|
|
|
|
+
|
|
|
|
+ List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1);
|
|
|
|
+
|
|
|
|
+ assertTrue(ac.get(0) instanceof ExecutionCommand);
|
|
|
|
+ assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
|
|
|
|
+ assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
|
|
|
|
+
|
|
|
|
+ //Verify that ActionSheduler does not return cached value of cluster host info for new requestId
|
|
|
|
+ ac = waitForQueueSize(hostname, aq, 1);
|
|
|
|
+ assertTrue(ac.get(0) instanceof ExecutionCommand);
|
|
|
|
+ assertEquals(String.valueOf(requestId2) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
|
|
|
|
+ assertEquals(clusterHostInfo2, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
|
|
|
|
+
|
|
|
|
+ }
|
|
}
|
|
}
|