|
@@ -31,7 +31,11 @@ import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import javax.persistence.EntityManager;
|
|
|
|
|
@@ -233,6 +237,22 @@ public class ConfigHelperTest {
|
|
|
return configGroup.getId();
|
|
|
}
|
|
|
|
|
|
+ void applyConfig(Map<String, String> properties, String configType, String configTag) throws Exception {
|
|
|
+ ConfigurationRequest cr = new ConfigurationRequest();
|
|
|
+ cr.setClusterName(clusterName);
|
|
|
+ cr.setType(configType);
|
|
|
+ cr.setVersionTag(configTag);
|
|
|
+ cr.setProperties(properties);
|
|
|
+
|
|
|
+ final ClusterRequest clusterRequest =
|
|
|
+ new ClusterRequest(cluster.getClusterId(), clusterName,
|
|
|
+ cluster.getDesiredStackVersion().getStackVersion(), null);
|
|
|
+
|
|
|
+ clusterRequest.setDesiredConfig(Collections.singletonList(cr));
|
|
|
+ managementController.updateClusters(new HashSet<ClusterRequest>() {{
|
|
|
+ add(clusterRequest);
|
|
|
+ }}, null);
|
|
|
+ }
|
|
|
@Test
|
|
|
public void testEffectiveTagsForHost() throws Exception {
|
|
|
final Config config = new ConfigImpl("core-site");
|
|
@@ -754,6 +774,111 @@ public class ConfigHelperTest {
|
|
|
|
|
|
verify(sch);
|
|
|
}
|
|
|
+ @Test
|
|
|
+ public void testCalculateIsStaleConfigsParallel() throws Exception{
|
|
|
+ Map<String, HostConfig> schReturn = new HashMap<String, HostConfig>();
|
|
|
+ HostConfig hc = new HostConfig();
|
|
|
+ // Put a different version to check for change
|
|
|
+ hc.setDefaultVersionTag("version2");
|
|
|
+ schReturn.put("flume-conf", hc);
|
|
|
+
|
|
|
+ // set up mocks
|
|
|
+ final ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class);
|
|
|
+ // set up expectations
|
|
|
+ expect(sch.getActualConfigs()).andReturn(schReturn).anyTimes();
|
|
|
+ expect(sch.getHostName()).andReturn("h1").anyTimes();
|
|
|
+ expect(sch.getClusterId()).andReturn(1l).anyTimes();
|
|
|
+ expect(sch.getServiceName()).andReturn("FLUME").anyTimes();
|
|
|
+ expect(sch.getServiceComponentName()).andReturn("FLUME_HANDLER").anyTimes();
|
|
|
+ replay(sch);
|
|
|
+ // Cluster level config changes
|
|
|
+
|
|
|
+ final Config config1 = cluster.getDesiredConfigByType("flume-conf");
|
|
|
+
|
|
|
+ applyConfig(new HashMap<String, String>(){{
|
|
|
+ put("property", "1");
|
|
|
+ }}, "flume-conf", "version2");
|
|
|
+
|
|
|
+ final Config config2 = cluster.getDesiredConfigByType("flume-conf");
|
|
|
+
|
|
|
+ applyConfig(new HashMap<String, String>(){{
|
|
|
+ put("property", "2");
|
|
|
+ }}, "flume-conf", "version3");
|
|
|
+
|
|
|
+ final Config config3 = cluster.getDesiredConfigByType("flume-conf");
|
|
|
+
|
|
|
+ cluster.addDesiredConfig("admin", new HashSet<Config>(){{add(config1);}});
|
|
|
+
|
|
|
+ final AtomicBoolean mustBeStale = new AtomicBoolean();
|
|
|
+ mustBeStale.set(false);
|
|
|
+
|
|
|
+ final AtomicBoolean failed = new AtomicBoolean();
|
|
|
+ failed.set(false);
|
|
|
+
|
|
|
+ final AtomicBoolean finished = new AtomicBoolean();
|
|
|
+ finished.set(false);
|
|
|
+ // avoid situations when not checked previous mustBeStale value yes and applied new config version
|
|
|
+ final Lock checkLock = new ReentrantLock();
|
|
|
+
|
|
|
+ // parallel thread that will compare actual stale config with expected accordingly to desired configs, checks if
|
|
|
+ // isStaleConfigs bypassing every cache and returns correct information.
|
|
|
+ Thread parallel = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while(!finished.get()){
|
|
|
+ checkLock.lock();
|
|
|
+ try {
|
|
|
+ boolean isStale = configHelper.isStaleConfigs(sch);
|
|
|
+ if(mustBeStale.get() != isStale){
|
|
|
+ failed.set(true);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (AmbariException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ } finally {
|
|
|
+ checkLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ parallel.start();
|
|
|
+
|
|
|
+ Random r = new Random();
|
|
|
+ for(int i=0; i< 1000; i++){
|
|
|
+ try {
|
|
|
+ checkLock.lock();
|
|
|
+ switch(r.nextInt(3)) {
|
|
|
+ case 0: {
|
|
|
+ cluster.addDesiredConfig("admin", new HashSet<Config>(){{add(config1);}});
|
|
|
+ mustBeStale.set(true);
|
|
|
+ checkLock.unlock();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case 1: {
|
|
|
+ cluster.addDesiredConfig("admin", new HashSet<Config>(){{add(config2);}});
|
|
|
+ mustBeStale.set(false);
|
|
|
+ checkLock.unlock();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case 2: {
|
|
|
+ cluster.addDesiredConfig("admin", new HashSet<Config>(){{add(config3);}});
|
|
|
+ mustBeStale.set(true);
|
|
|
+ checkLock.unlock();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e){
|
|
|
+ checkLock.unlock();
|
|
|
+ }
|
|
|
+ if(!parallel.isAlive()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finished.set(true);
|
|
|
+ parallel.join();
|
|
|
+ Assert.assertFalse(failed.get());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static class RunWithCustomModule {
|