Browse Source

AMBARI-9577. RU - Repo version on all hosts are set to "Out of Sync" after adding Storm (dlysnichenko)

Lisnichenko Dmitro 10 years ago
parent
commit
fe2122c72f

+ 11 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java

@@ -35,6 +35,7 @@ import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.StackId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -111,7 +113,15 @@ public class HostVersionOutOfSyncListener {
       Cluster cluster = clusters.get().getClusterById(event.getClusterId());
       Set<String> changedRepositoryVersions = new HashSet<String>();
       StackId currentStackId = cluster.getCurrentStackVersion();
-      for (String hostName : clusters.get().getHostsForCluster(cluster.getClusterName()).keySet()) {
+      Map<String, ServiceComponent> serviceComponents = cluster.getService(event.getServiceName()).getServiceComponents();
+      // Determine hosts that become OUT_OF_SYNC when adding components for new service
+      Set<String> affectedHosts = new HashSet<String>();
+      for (ServiceComponent component : serviceComponents.values()) {
+        for (String hostname : component.getServiceComponentHosts().keySet()) {
+          affectedHosts.add(hostname);
+        }
+      }
+      for (String hostName : affectedHosts) {
         List<HostVersionEntity> hostVersionEntities =
             hostVersionDAO.get().findByClusterAndHost(cluster.getClusterName(), hostName);
         for (HostVersionEntity hostVersionEntity : hostVersionEntities) {

+ 109 - 3
ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java

@@ -24,10 +24,15 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.persistence.RollbackException;
 
@@ -51,6 +56,9 @@ import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
 import org.apache.ambari.server.state.StackId;
 import org.junit.After;
 import org.junit.Before;
@@ -64,6 +72,7 @@ import com.google.inject.persist.PersistService;
 
 public class HostVersionOutOfSyncListenerTest {
   private static final Logger LOG = LoggerFactory.getLogger(HostVersionOutOfSyncListenerTest.class);
+  private final String stackId = "HDP-0.1";
 
   private Clusters clusters;
   private Cluster c1;
@@ -71,6 +80,7 @@ public class HostVersionOutOfSyncListenerTest {
   private OrmTestHelper helper;
   private HostVersionDAO hostVersionDAO;
   private AmbariMetaInfo metaInfo;
+  private ServiceComponentHostFactory serviceComponentHostFactory;
 
 
   @Before
@@ -81,12 +91,13 @@ public class HostVersionOutOfSyncListenerTest {
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
     helper = injector.getInstance(OrmTestHelper.class);
     hostVersionDAO = injector.getInstance(HostVersionDAO.class);
+    serviceComponentHostFactory = injector.getInstance(ServiceComponentHostFactory.class);
     metaInfo.init();
     clusters.addCluster("c1");
     c1 = clusters.getCluster("c1");
     addHost("h1");
 
-    StackId stackId = new StackId("HDP-0.1");
+    StackId stackId = new StackId(this.stackId);
     c1.setDesiredStackVersion(stackId);
     helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion());
     c1.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
@@ -99,6 +110,81 @@ public class HostVersionOutOfSyncListenerTest {
     injector.getInstance(PersistService.class).stop();
   }
 
+  /**
+   * When a service is added to a cluster, all non-CURRENT host versions on
+   * all affected hosts (where host new components are installed)
+   * should transition to OUT_OF_SYNC state
+   */
+  @Test
+  public void testOnServiceEvent() throws AmbariException {
+    // Configuring 3-node cluster with 2 repo versions
+    String CURRENT_VERSION = "1.0-2086";
+    String INSTALLED_VERSION = "1.0-1000";
+
+    Host h1 = clusters.getHost("h1");
+    h1.setState(HostState.HEALTHY);
+
+    addHost("h2");
+    clusters.mapHostToCluster("h2", "c1");
+    addHost("h3");
+    clusters.mapHostToCluster("h3", "c1");
+
+
+    StackId stackId = new StackId(this.stackId);
+    RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId.getStackId(),
+            INSTALLED_VERSION);
+    c1.createClusterVersion(stackId.getStackId(), INSTALLED_VERSION, "admin", RepositoryVersionState.INSTALLING);
+    c1.setCurrentStackVersion(stackId);
+    c1.recalculateAllClusterVersionStates();
+    checkStackVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLING);
+
+    checkStackVersionState(stackId.getStackId(), CURRENT_VERSION, RepositoryVersionState.CURRENT);
+
+    HostVersionEntity hv1 = helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED);
+    c1.recalculateAllClusterVersionStates();
+    checkStackVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED);
+    checkStackVersionState(stackId.getStackId(), CURRENT_VERSION, RepositoryVersionState.CURRENT);
+
+    // Add new host and verify that it has all host versions present
+    List<HostVersionEntity> h2Versions = hostVersionDAO.findAll();
+
+    // Check before adding service
+    for (HostVersionEntity hostVersionEntity : h2Versions) {
+      if (hostVersionEntity.getRepositoryVersion().toString().equals(INSTALLED_VERSION)) {
+        assertEquals(hostVersionEntity.getState(), RepositoryVersionState.INSTALLED);
+      }
+    }
+
+    //Add HDFS service
+    List<String> hostList = new ArrayList<String>();
+    hostList.add("h1");
+    hostList.add("h2");
+    hostList.add("h3");
+    Map<String, List<Integer>> hdfsTopology = new HashMap<String, List<Integer>>();
+    hdfsTopology.put("NAMENODE", Collections.singletonList(0));
+    hdfsTopology.put("SECONDARY_NAMENODE", Collections.singletonList(1));
+    List<Integer> datanodeHosts = Arrays.asList(0, 1);
+    hdfsTopology.put("DATANODE", new ArrayList<Integer>(datanodeHosts));
+    addService(c1, hostList, hdfsTopology, "HDFS");
+
+    // Check result
+    Set<String> changedHosts = new HashSet<String>();
+    changedHosts.add("h1");
+    changedHosts.add("h2");
+
+    checkStackVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED);
+    for (HostVersionEntity hostVersionEntity : h2Versions) {
+      if (hostVersionEntity.getRepositoryVersion().toString().equals(INSTALLED_VERSION)) {
+        if (changedHosts.contains(hostVersionEntity.getHostName())) {
+          assertEquals(hostVersionEntity.getState(), RepositoryVersionState.OUT_OF_SYNC);
+        } else {
+          assertEquals(hostVersionEntity.getState(), RepositoryVersionState.INSTALLED);
+        }
+      }
+    }
+  }
+
+
   /**
    * When a host is added to a cluster that has non-CURRENT cluster_version records,
    * then Ambari needs to insert host_verion record for each one of
@@ -110,7 +196,7 @@ public class HostVersionOutOfSyncListenerTest {
     Host h1 = clusters.getHost("h1");
     h1.setState(HostState.HEALTHY);
 
-    StackId stackId = new StackId("HDP-0.1");
+    StackId stackId = new StackId(this.stackId);
     RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId.getStackId(),
             "1.0-1000");
     c1.createClusterVersion(stackId.getStackId(), "1.0-1000", "admin", RepositoryVersionState.INSTALLING);
@@ -137,7 +223,6 @@ public class HostVersionOutOfSyncListenerTest {
         assertEquals(hostVersionEntity.getState(), RepositoryVersionState.OUT_OF_SYNC);
       }
     }
-
   }
 
   private void addHost(String hostname) throws AmbariException{
@@ -155,6 +240,27 @@ public class HostVersionOutOfSyncListenerTest {
     host1.persist();
   }
 
+  private void addService(Cluster cl, List<String> hostList,
+                                Map<String, List<Integer>> topology, String serviceName
+                          ) throws AmbariException {
+    cl.setDesiredStackVersion(new StackId(stackId));
+    cl.addService(serviceName);
+
+    for (Map.Entry<String, List<Integer>> component : topology.entrySet()) {
+
+      String componentName = component.getKey();
+      cl.getService(serviceName).addServiceComponent(componentName);
+
+      for (Integer hostIndex : component.getValue()) {
+        cl.getService(serviceName)
+                .getServiceComponent(componentName)
+                .addServiceComponentHost(
+                        serviceComponentHostFactory.createNew(cl.getService(serviceName)
+                                .getServiceComponent(componentName), hostList.get(hostIndex)));
+      }
+    }
+  }
+
   private void checkStackVersionState(String stack, String version, RepositoryVersionState state) {
     Collection<ClusterVersionEntity> allClusterVersions = c1.getAllClusterVersions();
     for (ClusterVersionEntity entity : allClusterVersions) {