Ver código fonte

AMBARI-11146. Upgrade Is Allowed When A Service Is Installed After Distribute Repositories (alejandro)

Alejandro Fernandez 10 anos atrás
pai
commit
7e4f57b164

+ 10 - 3
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListener.java

@@ -34,6 +34,7 @@ import org.apache.ambari.server.orm.dao.HostVersionDAO;
 import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
 import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.HostVersionEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.RepositoryVersionState;
@@ -54,7 +55,7 @@ import com.google.inject.persist.Transactional;
  * {@link org.apache.ambari.server.events.ServiceComponentInstalledEvent}
  * to update {@link org.apache.ambari.server.state.RepositoryVersionState}
  *
- * @see org.apache.ambari.server.state.Cluster#recalculateClusterVersionState(String)
+ * @see org.apache.ambari.server.state.Cluster#recalculateClusterVersionState(StackId, String)
  */
 @Singleton
 @EagerSingleton
@@ -92,7 +93,10 @@ public class HostVersionOutOfSyncListener {
 
       StackId currentStackId = cluster.getCurrentStackVersion();
       for (HostVersionEntity hostVersionEntity : hostVersionEntities) {
-        if (hostVersionEntity.getRepositoryVersion().getStack().equals(currentStackId.getStackId())
+        StackEntity hostStackEntity = hostVersionEntity.getRepositoryVersion().getStack();
+        StackId hostStackId = new StackId(hostStackEntity.getStackName(), hostStackEntity.getStackVersion());
+
+        if (currentStackId.equals(hostStackId)
             && hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED)) {
           hostVersionEntity.setState(RepositoryVersionState.OUT_OF_SYNC);
           hostVersionDAO.get().merge(hostVersionEntity);
@@ -128,7 +132,10 @@ public class HostVersionOutOfSyncListener {
         List<HostVersionEntity> hostVersionEntities =
             hostVersionDAO.get().findByClusterAndHost(cluster.getClusterName(), hostName);
         for (HostVersionEntity hostVersionEntity : hostVersionEntities) {
-          if (hostVersionEntity.getRepositoryVersion().getStack().equals(currentStackId.getStackId())
+          StackEntity hostStackEntity = hostVersionEntity.getRepositoryVersion().getStack();
+          StackId hostStackId = new StackId(hostStackEntity.getStackName(), hostStackEntity.getStackVersion());
+          
+          if (currentStackId.equals(hostStackId)
               && hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED)) {
             hostVersionEntity.setState(RepositoryVersionState.OUT_OF_SYNC);
             hostVersionDAO.get().merge(hostVersionEntity);

+ 140 - 22
ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java

@@ -19,6 +19,7 @@
 package org.apache.ambari.server.events.listeners.upgrade;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,8 +31,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.events.ServiceComponentInstalledEvent;
+import org.apache.ambari.server.events.ServiceInstalledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -44,8 +49,12 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,26 +67,40 @@ import com.google.inject.persist.PersistService;
 
 public class HostVersionOutOfSyncListenerTest {
   private static final Logger LOG = LoggerFactory.getLogger(HostVersionOutOfSyncListenerTest.class);
-  private final String stackId = "HDP-0.1";
+  private final String stackId = "HDP-2.2.0";
 
+  private Injector injector;
+
+  @Inject
   private Clusters clusters;
   private Cluster c1;
-  private Injector injector;
+
+  @Inject
   private OrmTestHelper helper;
+
+  @Inject
   private HostVersionDAO hostVersionDAO;
+
+  @Inject
   private AmbariMetaInfo metaInfo;
+
+  @Inject
   private ServiceComponentHostFactory serviceComponentHostFactory;
 
+  @Inject
+  private ServiceFactory serviceFactory;
+
+  @Inject
+  private AmbariEventPublisher m_eventPublisher;
+
 
   @Before
   public void setup() throws Exception {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
-    clusters = injector.getInstance(Clusters.class);
-    metaInfo = injector.getInstance(AmbariMetaInfo.class);
-    helper = injector.getInstance(OrmTestHelper.class);
-    hostVersionDAO = injector.getInstance(HostVersionDAO.class);
-    serviceComponentHostFactory = injector.getInstance(ServiceComponentHostFactory.class);
+
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
+    injector.injectMembers(this);
 
     StackId stackId = new StackId(this.stackId);
     clusters.addCluster("c1", stackId);
@@ -95,16 +118,16 @@ public class HostVersionOutOfSyncListenerTest {
     injector.getInstance(PersistService.class).stop();
   }
 
-  /**
-   * When a service is added to a cluster, all non-CURRENT host versions on
-   * all affected hosts (where host new components are installed)
-   * should transition to OUT_OF_SYNC state
+
+  /***
+   * Shared between several test cases.
+   * @param INSTALLED_VERSION Version to treat as INSTALLED
+   * @param stackId Stack Id to use
+   * @throws AmbariException
    */
-  @Test
-  public void testOnServiceEvent() throws AmbariException {
+  private void createClusterAndHosts(String INSTALLED_VERSION, StackId stackId) throws AmbariException {
     // Configuring 3-node cluster with 2 repo versions
     String CURRENT_VERSION = "1.0-2086";
-    String INSTALLED_VERSION = "1.0-1000";
 
     Host h1 = clusters.getHost("h1");
     h1.setState(HostState.HEALTHY);
@@ -114,10 +137,6 @@ public class HostVersionOutOfSyncListenerTest {
     addHost("h3");
     clusters.mapHostToCluster("h3", "c1");
 
-
-    StackId stackId = new StackId(this.stackId);
-    RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId,
-            INSTALLED_VERSION);
     c1.createClusterVersion(stackId, INSTALLED_VERSION, "admin", RepositoryVersionState.INSTALLING);
     c1.setCurrentStackVersion(stackId);
     c1.recalculateAllClusterVersionStates();
@@ -125,6 +144,19 @@ public class HostVersionOutOfSyncListenerTest {
 
     checkStackVersionState(stackId.getStackId(), CURRENT_VERSION, RepositoryVersionState.CURRENT);
 
+    // Add ZK service with only ZOOKEEPER_SERVER
+    List<String> hostList = new ArrayList<String>();
+    hostList.add("h1");
+    hostList.add("h2");
+    hostList.add("h3");
+    Map<String, List<Integer>> zkTopology = new HashMap<String, List<Integer>>();
+    List<Integer> zkServerHosts = Arrays.asList(0, 1, 2);
+    zkTopology.put("ZOOKEEPER_SERVER", new ArrayList<Integer>(zkServerHosts));
+    addService(c1, hostList, zkTopology, "ZOOKEEPER");
+
+    // Register and install new version
+    RepositoryVersionEntity repositoryVersionEntity = helper.getOrCreateRepositoryVersion(stackId,
+        INSTALLED_VERSION);
     HostVersionEntity hv1 = helper.createHostVersion("h1", repositoryVersionEntity, RepositoryVersionState.INSTALLED);
     c1.recalculateAllClusterVersionStates();
     checkStackVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED);
@@ -135,10 +167,24 @@ public class HostVersionOutOfSyncListenerTest {
 
     // Check before adding service
     for (HostVersionEntity hostVersionEntity : h2Versions) {
-      if (hostVersionEntity.getRepositoryVersion().toString().equals(INSTALLED_VERSION)) {
+      if (hostVersionEntity.getRepositoryVersion().getVersion().equals(INSTALLED_VERSION)) {
         assertEquals(hostVersionEntity.getState(), RepositoryVersionState.INSTALLED);
       }
     }
+  }
+
+  /**
+   * When a service is added to a cluster, all non-CURRENT host versions on
+   * all affected hosts (where host new components are installed)
+   * should transition to OUT_OF_SYNC state
+   */
+  @Test
+  public void testOnServiceEvent() throws AmbariException {
+    String INSTALLED_VERSION = "1.0-1000";
+    StackId stackId = new StackId(this.stackId);
+
+    createClusterAndHosts(INSTALLED_VERSION, stackId);
+
 
     //Add HDFS service
     List<String> hostList = new ArrayList<String>();
@@ -157,16 +203,66 @@ public class HostVersionOutOfSyncListenerTest {
     changedHosts.add("h1");
     changedHosts.add("h2");
 
+    List<HostVersionEntity> hostVersions = hostVersionDAO.findAll();
+
+    boolean atLeastOneOutOfSync = false;
     checkStackVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED);
-    for (HostVersionEntity hostVersionEntity : h2Versions) {
-      if (hostVersionEntity.getRepositoryVersion().toString().equals(INSTALLED_VERSION)) {
+    for (HostVersionEntity hostVersionEntity : hostVersions) {
+      if (hostVersionEntity.getRepositoryVersion().getVersion().equals(INSTALLED_VERSION)) {
         if (changedHosts.contains(hostVersionEntity.getHostName())) {
           assertEquals(hostVersionEntity.getState(), RepositoryVersionState.OUT_OF_SYNC);
+          atLeastOneOutOfSync = true;
         } else {
           assertEquals(hostVersionEntity.getState(), RepositoryVersionState.INSTALLED);
         }
       }
     }
+
+    assertTrue(atLeastOneOutOfSync);
+  }
+
+  /**
+   * When a new service is added to a cluster with components, all INSTALLED host versions on
+   * all affected hosts (where host new components are installed)
+   * should transition to OUT_OF_SYNC state
+   */
+  @Test
+  public void testOnServiceComponentEvent() throws AmbariException {
+    String INSTALLED_VERSION = "1.0-1000";
+    StackId stackId = new StackId(this.stackId);
+
+    createClusterAndHosts(INSTALLED_VERSION, stackId);
+
+    //Add ZOOKEEPER_CLIENT component
+    List<String> hostList = new ArrayList<String>();
+    hostList.add("h1");
+    hostList.add("h2");
+    hostList.add("h3");
+    addServiceComponent(c1, hostList, "ZOOKEEPER", "ZOOKEEPER_CLIENT");
+
+    // Check result
+    Set<String> changedHosts = new HashSet<String>();
+    changedHosts.add("h1");
+    changedHosts.add("h2");
+    changedHosts.add("h3");
+
+    checkStackVersionState(stackId.getStackId(), INSTALLED_VERSION, RepositoryVersionState.INSTALLED);
+    List<HostVersionEntity> hostVersions = hostVersionDAO.findAll();
+
+    boolean atLeastOneOutOfSync = false;
+    for (HostVersionEntity hostVersionEntity : hostVersions) {
+      RepositoryVersionEntity repoVersion = hostVersionEntity.getRepositoryVersion();
+      if (repoVersion.getVersion().equals(INSTALLED_VERSION)) {
+        if (changedHosts.contains(hostVersionEntity.getHostName())) {
+          assertEquals(hostVersionEntity.getState(), RepositoryVersionState.OUT_OF_SYNC);
+          atLeastOneOutOfSync = true;
+        } else {
+          assertEquals(hostVersionEntity.getState(), RepositoryVersionState.INSTALLED);
+        }
+      }
+    }
+
+    assertTrue(atLeastOneOutOfSync);
   }
 
 
@@ -227,7 +323,8 @@ public class HostVersionOutOfSyncListenerTest {
   private void addService(Cluster cl, List<String> hostList,
                                 Map<String, List<Integer>> topology, String serviceName
                           ) throws AmbariException {
-    cl.setDesiredStackVersion(new StackId(stackId));
+    StackId stackIdObj = new StackId(stackId);
+    cl.setDesiredStackVersion(stackIdObj);
     cl.addService(serviceName);
 
     for (Map.Entry<String, List<Integer>> component : topology.entrySet()) {
@@ -242,6 +339,27 @@ public class HostVersionOutOfSyncListenerTest {
                         serviceComponentHostFactory.createNew(cl.getService(serviceName)
                                 .getServiceComponent(componentName), hostList.get(hostIndex)));
       }
+
+      ServiceInstalledEvent event = new ServiceInstalledEvent(cl.getClusterId(),
+          stackIdObj.getStackName(), stackIdObj.getStackVersion(), serviceName);
+      m_eventPublisher.publish(event);
+    }
+  }
+
+  private void addServiceComponent(Cluster cl, List<String> hostList,
+                                   String serviceName, String componentName) throws AmbariException {
+    StackId stackIdObj = new StackId(stackId);
+    Service service = cl.getService(serviceName);
+    service.addServiceComponent(componentName);
+    ServiceComponent component = service.getServiceComponent(componentName);
+
+    for(String hostName : hostList) {
+      component.addServiceComponentHost(serviceComponentHostFactory.createNew(cl.getService(serviceName)
+          .getServiceComponent(componentName), hostName));
+      ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(cl.getClusterId(),
+          stackIdObj.getStackName(), stackIdObj.getStackVersion(),
+          serviceName, componentName, hostName);
+      m_eventPublisher.publish(event);
     }
   }
 

+ 2 - 0
ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java

@@ -26,6 +26,7 @@ import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
 import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
 import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
 import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
+import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 
@@ -108,6 +109,7 @@ public class EventBusSynchronizer {
     synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
     synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
     synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
+    synchronizedBus.register(injector.getInstance(HostVersionOutOfSyncListener.class));
   }
 
   /**