Bladeren bron

AMBARI-8703. Rolling Upgrade - Upgrade Pack to run preupgrade tasks only on master for NameNode (alejandro)

Alejandro Fernandez 10 jaren geleden
bovenliggende
commit
a1348d3c20
20 gewijzigde bestanden met toevoegingen van 427 en 147 verwijderingen
  1. 7 2
      ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py
  2. 4 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
  3. 43 0
      ambari-server/src/main/java/org/apache/ambari/server/stack/HostsType.java
  4. 156 0
      ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
  5. 10 0
      ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
  6. 33 98
      ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
  7. 24 0
      ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
  8. 5 5
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
  9. 9 5
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
  10. 7 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
  11. 13 8
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
  12. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
  13. 3 2
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
  14. 0 3
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
  15. 76 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
  16. 1 0
      ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
  17. 3 2
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py
  18. 9 17
      ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
  19. 22 2
      ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java
  20. 1 1
      ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml

+ 7 - 2
ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py

@@ -148,7 +148,7 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owne
   tmpfile = tempfile.NamedTemporaryFile()
   out = None
   with open(tmpfile.name, 'r+') as file:
-    get_hdp_version_cmd = '/usr/bin/hdp-select versions > %s' % tmpfile.name
+    get_hdp_version_cmd = '/usr/bin/hdp-select status > %s' % tmpfile.name
     code, stdoutdata = shell.call(get_hdp_version_cmd)
     out = file.read()
   pass
@@ -157,7 +157,12 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owne
                    (get_hdp_version_cmd, str(code), str(out)))
     return 1
 
-  hdp_version = out.strip() # this should include the build number
+  matches = re.findall(r"([\d\.]+\-\d+)", out)
+  hdp_version = matches[0] if matches and len(matches) > 0 else None
+
+  if not hdp_version:
+    Logger.error("Could not parse HDP version from output of hdp-select: %s" % str(out))
+    return 1
 
   file_name = os.path.basename(component_tar_source_file)
   destination_file = os.path.join(component_tar_destination_folder, file_name)

+ 4 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java

@@ -62,6 +62,7 @@ import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.serveraction.upgrades.ManualStageAction;
+import org.apache.ambari.server.stack.MasterHostResolver;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.StackId;
@@ -357,8 +358,10 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     Cluster cluster = getManagementController().getClusters().getCluster(clusterName);
     ConfigHelper configHelper = getManagementController().getConfigHelper();
 
+    MasterHostResolver mhr = new MasterHostResolver(cluster);
     UpgradeHelper helper = new UpgradeHelper();
-    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, pack);
+
+    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, mhr, pack);
     List<UpgradeGroupEntity> groupEntities = new ArrayList<UpgradeGroupEntity>();
 
     final String version = (String) requestMap.get(UPGRADE_VERSION);

+ 43 - 0
ambari-server/src/main/java/org/apache/ambari/server/stack/HostsType.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.stack;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Wrapper around a collection of hosts that may have either a Master or Secondary host.
+ */
+public class HostsType {
+
+  public String master;
+
+  public String secondary;
+
+  /**
+   * Ordered collection of hosts.
+   */
+  public Set<String> hosts;
+
+  public HostsType () {
+    this.master = null;
+    this.secondary = null;
+    this.hosts = new HashSet<String>();
+  }
+}

+ 156 - 0
ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java

@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.stack;
+
+import com.google.common.reflect.TypeToken;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.utils.HTTPUtils;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MasterHostResolver {
+
+  private static Logger LOG = LoggerFactory.getLogger(MasterHostResolver.class);
+
+  private Cluster cluster;
+
+  enum Service {
+    HDFS,
+    HBASE
+  }
+
+  /**
+   * Union of status for several services.
+   */
+  enum Status {
+    ACTIVE,
+    STANDBY
+  }
+
+  public MasterHostResolver() {
+    ;
+  }
+
+  public MasterHostResolver(Cluster cluster) {
+    this.cluster = cluster;
+  }
+
+  /**
+   * Get the master hostname of the given service and component.
+   * @param serviceName Service
+   * @param componentName Component
+   * @return The hostname that is the master of the service and component if successful, null otherwise.
+   */
+  public HostsType getMasterAndHosts(String serviceName, String componentName) {
+    HostsType hostsType = new HostsType();
+
+    if (serviceName == null || componentName == null) {
+      return null;
+    }
+
+    Service s = Service.valueOf(serviceName.toUpperCase());
+
+    Set<String> componentHosts = cluster.getHosts(serviceName, componentName);
+    if (0 == componentHosts.size()) {
+      return null;
+    }
+
+    hostsType.hosts = componentHosts;
+
+    switch (s) {
+      case HDFS:
+        if (componentName.equalsIgnoreCase("NAMENODE")) {
+          Map<Status, String> pair = getNameNodePair(componentHosts);
+          if (pair != null) {
+            hostsType.master = pair.containsKey(Status.ACTIVE) ? pair.get(Status.ACTIVE) :  null;
+            hostsType.secondary = pair.containsKey(Status.STANDBY) ? pair.get(Status.STANDBY) :  null;
+          }
+        }
+        break;
+      case HBASE:
+        if (componentName.equalsIgnoreCase("HBASE_REGIONSERVER")) {
+          // TODO Rolling Upgrade, fill for this Component.
+          ;
+        }
+        break;
+    }
+    return hostsType;
+  }
+
+  /**
+   * Get mapping of the HDFS Namenodes from the state ("active" or "standby") to the hostname.
+   * @param hosts Hosts to lookup.
+   * @return Returns a map from the state ("active" or "standby" to the hostname with that state.
+   */
+  private Map<Status, String> getNameNodePair(Set<String> hosts) {
+    Map<Status, String> stateToHost = new HashMap<Status, String>();
+    if (hosts != null && hosts.size() == 2) {
+      Iterator iter = hosts.iterator();
+
+      while(iter.hasNext()) {
+        String hostname = (String) iter.next();
+        try {
+          // TODO Rolling Upgrade, don't hardcode jmx port number
+          // E.g.,
+          // dfs.namenode.http-address.dev.nn1 : c6401.ambari.apache.org:50070
+          // dfs.namenode.http-address.dev.nn2 : c6402.ambari.apache.org:50070
+          String endpoint = "http://" + hostname + ":50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus";
+          String response = HTTPUtils.requestURL(endpoint);
+
+          if (response != null && !response.isEmpty()) {
+            Map<String, ArrayList<HashMap<String, String>>> nameNodeInfo = new HashMap<String, ArrayList<HashMap<String, String>>>();
+            Type type = new TypeToken<Map<String, ArrayList<HashMap<String, String>>>>() {}.getType();
+            nameNodeInfo = StageUtils.getGson().fromJson(response, type);
+
+            try {
+              String state = nameNodeInfo.get("beans").get(0).get("State");
+
+              if (state.equalsIgnoreCase(Status.ACTIVE.toString()) || state.equalsIgnoreCase(Status.STANDBY.toString())) {
+                Status status = Status.valueOf(state.toUpperCase());
+                stateToHost.put(status, hostname);
+              }
+            } catch (Exception e) {
+              throw new Exception("Response from endpoint " + endpoint + " was not formatted correctly. Value: " + response);
+            }
+          } else {
+            throw new Exception("Response from endpoint " + endpoint + " was empty.");
+          }
+        } catch (Exception e) {
+          LOG.warn("Failed to parse namenode jmx endpoint to get state for host " + hostname + ". Error: " + e.getMessage());
+        }
+      }
+
+      if (stateToHost.containsKey(Status.ACTIVE) && stateToHost.containsKey(Status.STANDBY) && !stateToHost.get(Status.ACTIVE).equalsIgnoreCase(stateToHost.get(Status.STANDBY))) {
+        return stateToHost;
+      }
+    }
+
+    return null;
+  }
+}

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java

@@ -77,6 +77,16 @@ public interface Cluster {
    */
   public List<ServiceComponentHost> getServiceComponentHosts(String hostname);
 
+
+  /**
+   * Get all of the hosts running the provided service and component.
+   * @param serviceName
+   * @param componentName
+   * @return
+   */
+  public Set<String> getHosts(String serviceName, String componentName);
+
+
   /**
    * Remove ServiceComponentHost from cluster
    * @param svcCompHost

+ 33 - 98
ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java

@@ -17,17 +17,14 @@
  */
 package org.apache.ambari.server.state;
 
+import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.LinkedHashSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.internal.RequestResourceProvider;
 import org.apache.ambari.server.controller.internal.StageResourceProvider;
 import org.apache.ambari.server.controller.predicate.AndPredicate;
@@ -43,6 +40,9 @@ import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
 import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.stack.HostsType;
+import org.apache.ambari.server.stack.MasterHostResolver;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 import org.apache.ambari.server.state.stack.upgrade.ClusterGrouping;
@@ -50,7 +50,7 @@ import org.apache.ambari.server.state.stack.upgrade.Grouping;
 import org.apache.ambari.server.state.stack.upgrade.StageWrapper;
 import org.apache.ambari.server.state.stack.upgrade.StageWrapperBuilder;
 import org.apache.ambari.server.state.stack.upgrade.TaskWrapper;
-import org.apache.ambari.server.utils.HTTPUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,68 +61,14 @@ public class UpgradeHelper {
 
   private static Logger LOG = LoggerFactory.getLogger(UpgradeHelper.class);
 
-  /**
-   * Tuple of namenode states
-   */
-  public static class NameNodePair {
-    String activeHostName;
-    String standbyHostName;
-  }
-
-  /**
-   * Retrieve a class that represents a tuple of the active and standby namenodes. This should be called in an HA cluster.
-   * @param hosts
-   * @return
-   */
-  public static NameNodePair getNameNodePair(Set<String> hosts) {
-    if (hosts != null && hosts.size() == 2) {
-      Iterator iter = hosts.iterator();
-      HashMap<String, String> stateToHost = new HashMap<String, String>();
-      Pattern pattern = Pattern.compile("^.*org\\.apache\\.hadoop\\.hdfs\\.server\\.namenode\\.NameNode\".*?\"State\"\\s*:\\s*\"(.+?)\".*$");
-
-      while(iter.hasNext()) {
-        String hostname = (String) iter.next();
-        try {
-          // TODO Rolling Upgrade, don't hardcode jmx port number
-          // E.g.,
-          // dfs.namenode.http-address.dev.nn1 : c6401.ambari.apache.org:50070
-          // dfs.namenode.http-address.dev.nn2 : c6402.ambari.apache.org:50070
-          String endpoint = "http://" + hostname + ":50070/jmx";
-          String response = HTTPUtils.requestURL(endpoint);
-
-          if (response != null && !response.isEmpty()) {
-            Matcher matcher = pattern.matcher(response);
-            if (matcher.matches()) {
-              String state = matcher.group(1);
-              stateToHost.put(state.toLowerCase(), hostname);
-            }
-          } else {
-            throw new Exception("Response from endpoint " + endpoint + " was empty.");
-          }
-        } catch (Exception e) {
-          LOG.warn("Failed to parse namenode jmx endpoint to get state for host " + hostname + ". Error: " + e.getMessage());
-        }
-      }
-
-      if (stateToHost.containsKey("active") && stateToHost.containsKey("standby") && !stateToHost.get("active").equalsIgnoreCase(stateToHost.get("standby"))) {
-        NameNodePair pair = new NameNodePair();
-        pair.activeHostName = stateToHost.get("active");
-        pair.standbyHostName = stateToHost.get("standby");
-        return pair;
-      }
-    }
-
-    return null;
-  }
-
   /**
    * Generates a list of UpgradeGroupHolder items that are used to execute an upgrade
    * @param cluster the cluster
+   * @param mhr Master Host Resolver needed to get master and secondary hosts of several components like NAMENODE
    * @param upgradePack the upgrade pack
    * @return the list of holders
    */
-  public List<UpgradeGroupHolder> createUpgrade(Cluster cluster, UpgradePack upgradePack) {
-
+  public List<UpgradeGroupHolder> createUpgrade(Cluster cluster, MasterHostResolver mhr, UpgradePack upgradePack) throws AmbariException {
     Map<String, Map<String, ProcessingComponent>> allTasks = upgradePack.getTasks();
 
     List<UpgradeGroupHolder> groups = new ArrayList<UpgradeGroupHolder>();
@@ -154,25 +100,38 @@ public class UpgradeHelper {
             continue;
           }
 
-          Set<String> componentHosts = getClusterHosts(cluster, service.serviceName, component);
+          Set<String> componentHosts = cluster.getHosts(service.serviceName, component);
           if (0 == componentHosts.size()) {
             continue;
           }
+          HostsType hostsType = new HostsType();
+          hostsType.hosts = componentHosts;
 
           ProcessingComponent pc = allTasks.get(service.serviceName).get(component);
 
           // Special case for NAMENODE
           if (service.serviceName.equalsIgnoreCase("HDFS") && component.equalsIgnoreCase("NAMENODE")) {
-              NameNodePair pair = getNameNodePair(componentHosts);
-              if (pair != null ) {
-                // The order is important, first do the standby, then the active namenode.
-                Set<String> order = new LinkedHashSet<String>();
-                order.add(pair.standbyHostName);
-                order.add(pair.activeHostName);
-                builder.add(order, service.serviceName, pc);
-              }
-          } else {
-            builder.add(componentHosts, service.serviceName, pc);
+            hostsType = mhr.getMasterAndHosts(service.serviceName, component);
+            if (hostsType != null && hostsType.master != null && componentHosts.contains(hostsType.master) && hostsType.secondary != null && componentHosts.contains(hostsType.secondary)) {
+              // The order is important, first do the standby, then the active namenode.
+              Set<String> order = new LinkedHashSet<String>();
+
+              // TODO Upgrade Pack, somehow, running the secondary first causes them to swap, even before the restart.
+              order.add(hostsType.master);
+              order.add(hostsType.secondary);
+
+              // Override the hosts with the ordered collection
+              hostsType.hosts = order;
+              builder.add(hostsType, service.serviceName, pc);
+            } else {
+              throw new AmbariException(MessageFormat.format("Could not find active and standby namenodes using hosts: {0}", StringUtils.join(componentHosts, ", ").toString()));
+            }
+          }
+          /*
+          TODO Rolling Upgrade, write logic for HBASE
+          */
+          else {
+            builder.add(hostsType, service.serviceName, pc);
           }
         }
       }
@@ -304,30 +263,6 @@ public class UpgradeHelper {
     return resources.iterator().next();
   }
 
-  /**
-   * @param cluster the cluster
-   * @param serviceName name of the service
-   * @param componentName name of the component
-   * @return the set of hosts for the provided service and component
-   */
-  public Set<String> getClusterHosts(Cluster cluster, String serviceName, String componentName) {
-    Map<String, Service> services = cluster.getServices();
-
-    if (!services.containsKey(serviceName)) {
-      return Collections.emptySet();
-    }
-
-    Service service = services.get(serviceName);
-    Map<String, ServiceComponent> components = service.getServiceComponents();
-
-    if (!components.containsKey(componentName) ||
-        components.get(componentName).getServiceComponentHosts().size() == 0) {
-      return Collections.emptySet();
-    }
-
-    return components.get(componentName).getServiceComponentHosts().keySet();
-  }
-
   /**
    * Special handling for ClusterGrouping.
    * @param cluster the cluster
@@ -336,7 +271,7 @@ public class UpgradeHelper {
    */
   private UpgradeGroupHolder getClusterGroupHolder(Cluster cluster, ClusterGrouping grouping) {
 
-    grouping.getBuilder().setHelpers(this, cluster);
+    grouping.getBuilder().setHelpers(cluster);
     List<StageWrapper> wrappers = grouping.getBuilder().build();
 
     if (wrappers.size() > 0) {

+ 24 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

@@ -2311,6 +2311,30 @@ public class ClusterImpl implements Cluster {
     return failedEvents;
   }
 
+  /**
+   * @param serviceName name of the service
+   * @param componentName name of the component
+   * @return the set of hosts for the provided service and component
+   */
+  @Override
+  public Set<String> getHosts(String serviceName, String componentName) {
+    Map<String, Service> services = this.getServices();
+
+    if (!services.containsKey(serviceName)) {
+      return Collections.emptySet();
+    }
+
+    Service service = services.get(serviceName);
+    Map<String, ServiceComponent> components = service.getServiceComponents();
+
+    if (!components.containsKey(componentName) ||
+        components.get(componentName).getServiceComponentHosts().size() == 0) {
+      return Collections.emptySet();
+    }
+
+    return components.get(componentName).getServiceComponentHosts().keySet();
+  }
+
   private ClusterHealthReport getClusterHealthReport() throws AmbariException {
 
     int staleConfigsHosts = 0;

+ 5 - 5
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java

@@ -30,8 +30,10 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.UpgradeHelper;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 
@@ -71,19 +73,17 @@ public class ClusterGrouping extends Grouping {
   }
 
   public class ClusterBuilder extends StageWrapperBuilder {
-    private UpgradeHelper m_helper = null;
     private Cluster m_cluster = null;
 
     /**
      * @param cluster the cluster to use with this builder
      */
-    public void setHelpers(UpgradeHelper helper, Cluster cluster) {
-      m_helper = helper;
+    public void setHelpers(Cluster cluster) {
       m_cluster = cluster;
     }
 
     @Override
-    public void add(Set<String> hosts, String service, ProcessingComponent pc) {
+    public void add(HostsType hostsType, String service, ProcessingComponent pc) {
       // !!! no-op in this case
     }
 
@@ -101,7 +101,7 @@ public class ClusterGrouping extends Grouping {
         StageWrapper wrapper = null;
 
         if (null != execution.service && null != execution.component) {
-          Set<String> hosts = m_helper.getClusterHosts(m_cluster, execution.service, execution.component);
+          Set<String> hosts = m_cluster.getHosts(execution.service, execution.component);
           // !!! FIXME other types
           if (hosts.size() > 0 && task.getType() == Task.Type.EXECUTE) {
             wrapper = new StageWrapper(

+ 9 - 5
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java

@@ -29,6 +29,7 @@ import java.util.Set;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -65,13 +66,16 @@ public class ColocatedGrouping extends Grouping {
     }
 
     @Override
-    public void add(Set<String> hosts, String service, ProcessingComponent pc) {
+    public void add(HostsType hostsType, String service, ProcessingComponent pc) {
 
       int count = Double.valueOf(Math.ceil(
-          (double) batch.percent / 100 * hosts.size())).intValue();
+          (double) batch.percent / 100 * hostsType.hosts.size())).intValue();
 
       int i = 0;
-      for (String host : hosts) {
+      for (String host : hostsType.hosts) {
+        // This class required inserting a single host into the collection
+        HostsType singleHostsType = new HostsType();
+        singleHostsType.hosts = Collections.singleton(host);
 
         Map<String, List<TaskProxy>> targetMap = ((i++) < count) ? initialBatch : finalBatches;
         List<TaskProxy> targetList = targetMap.get(host);
@@ -85,7 +89,7 @@ public class ColocatedGrouping extends Grouping {
         if (null != pc.preTasks && pc.preTasks.size() > 0) {
           proxy = new TaskProxy();
           proxy.message = getStageText("Preparing", pc.name, Collections.singleton(host));
-          proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), pc.preTasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, pc.preTasks));
           proxy.service = service;
           proxy.component = pc.name;
           targetList.add(proxy);
@@ -110,7 +114,7 @@ public class ColocatedGrouping extends Grouping {
           proxy = new TaskProxy();
           proxy.component = pc.name;
           proxy.service = service;
-          proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), pc.postTasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, pc.postTasks));
           proxy.message = getStageText("Completing", pc.name, Collections.singleton(host));
           targetList.add(proxy);
         }

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java

@@ -23,6 +23,7 @@ import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.XmlAttribute;
 
 /**
  * Used to represent an execution that should occur on an agent.
@@ -35,6 +36,12 @@ public class ExecuteTask extends Task {
   @XmlTransient
   private Task.Type type = Task.Type.EXECUTE;
 
+  /**
+   * Host to run on, if not specified, run on all. Other values include "master"
+   */
+  @XmlAttribute
+  public String hosts;
+
   /**
    * Command to run under normal conditions.
    */

+ 13 - 8
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java

@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlSeeAlso;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 import org.apache.commons.lang.StringUtils;
@@ -62,17 +63,20 @@ public class Grouping {
     /**
      * Add stages where the restart stages are ordered
      * E.g., preupgrade, restart hosts(0), ..., restart hosts(n-1), postupgrade
-     * @param hosts the hosts
+     * @param hostsType the order collection of hosts, which may have a master and secondary
      * @param service the service name
      * @param pc the ProcessingComponent derived from the upgrade pack.
      */
     @Override
-    public void add(Set<String> hosts, String service, ProcessingComponent pc) {
+    public void add(HostsType hostsType, String service, ProcessingComponent pc) {
       if (null != pc.preTasks && pc.preTasks.size() > 0) {
+        List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, pc.preTasks);
+        Set<String> preTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(preTasks);
         StageWrapper stage = new StageWrapper(
             StageWrapper.Type.RU_TASKS,
-            getStageText("Preparing", pc.name, hosts),
-            new TaskWrapper(service, pc.name, hosts, pc.preTasks));
+            getStageText("Preparing", pc.name, preTasksEffectiveHosts),
+            preTasks
+            );
         stages.add(stage);
       }
 
@@ -80,7 +84,7 @@ public class Grouping {
       if (null != pc.tasks && 1 == pc.tasks.size()) {
         Task t = pc.tasks.get(0);
         if (RestartTask.class.isInstance(t)) {
-          for (String hostName : hosts) {
+          for (String hostName : hostsType.hosts) {
             StageWrapper stage = new StageWrapper(
                 StageWrapper.Type.RESTART,
                 getStageText("Restarting", pc.name, Collections.singleton(hostName)),
@@ -91,15 +95,16 @@ public class Grouping {
       }
 
       if (null != pc.postTasks && pc.postTasks.size() > 0) {
+        List<TaskWrapper> postTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, pc.postTasks);
+        Set<String> postTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(postTasks);
         StageWrapper stage = new StageWrapper(
             StageWrapper.Type.RU_TASKS,
-            getStageText("Completing", pc.name, hosts),
-            new TaskWrapper(service, pc.name, hosts, pc.postTasks));
+            getStageText("Completing", pc.name, postTasksEffectiveHosts),
+            postTasks);
         stages.add(stage);
       }
 
       serviceChecks.add(service);
-
     }
 
     @Override

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java

@@ -40,7 +40,7 @@ public class StageWrapper {
     this(type, text, Arrays.asList(tasks));
   }
 
-  private StageWrapper(Type type, String text, List<TaskWrapper> tasks) {
+  public StageWrapper(Type type, String text, List<TaskWrapper> tasks) {
     this.type = type;
     this.text = text;
     this.tasks = tasks;

+ 3 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java

@@ -20,6 +20,7 @@ package org.apache.ambari.server.state.stack.upgrade;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 
 /**
@@ -30,11 +31,11 @@ public abstract class StageWrapperBuilder {
   /**
    * Adds a processing component that will be built into stage wrappers.
    *
-   * @param hosts the hosts
+   * @param hostsType the hosts, along with their type
    * @param service the service name
    * @param pc the ProcessingComponent derived from the upgrade pack.
    */
-  public abstract void add(Set<String> hosts, String service, ProcessingComponent pc);
+  public abstract void add(HostsType hostsType, String service, ProcessingComponent pc);
 
   /**
    * Builds the stage wrappers.

+ 0 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java

@@ -74,8 +74,5 @@ public abstract class Task {
     public boolean isCommand() {
       return this == RESTART;
     }
-
-
-
   }
 }

+ 76 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.stack.upgrade;
+
+import org.apache.ambari.server.stack.HostsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.*;
+
+
+/**
+ * Generates a collection of tasks that need to run on a set of hosts during an upgarde.
+ */
+public class TaskWrapperBuilder {
+
+  private static Logger LOG = LoggerFactory.getLogger(TaskWrapperBuilder.class);
+
+  /**
+   * Creates a collection of tasks based on the set of hosts they are allowed to run on
+   * by analyzing the "hosts" attribute of any ExecuteTask objects.
+   * @param service the service name for the tasks
+   * @param component the component name for the tasks
+   * @param hostsType the collection of sets along with their status
+   * @param tasks collection of tasks
+   */
+  public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks) {
+    List<TaskWrapper> collection = new ArrayList<TaskWrapper>();
+    for (Task t : tasks) {
+      if (t.getType().equals(Task.Type.EXECUTE)) {
+        if (((ExecuteTask) t).hosts != null && ((ExecuteTask) t).hosts.equalsIgnoreCase("master")) {
+          if (hostsType.master != null) {
+            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.master), t));
+            continue;
+          } else {
+            LOG.error(MessageFormat.format("Found an Execute task for {0} and {1} meant to run on a master but could not find any masters to run on. Skipping this task.", service, component));
+            continue;
+          }
+        }
+      }
+
+      collection.add(new TaskWrapper(service, component, hostsType.hosts, t));
+    }
+
+    return collection;
+  }
+
+  /**
+   * Given a collection of tasks, get the union of the hosts.
+   * @param tasks Collection of tasks
+   * @return Returns the union of the hosts scheduled to perform the tasks.
+   */
+  public static Set<String> getEffectiveHosts(List<TaskWrapper> tasks) {
+    Set<String> effectiveHosts = new HashSet<String>();
+    for(TaskWrapper t : tasks) {
+      effectiveHosts.addAll(t.getHosts());
+    }
+    return effectiveHosts;
+  }
+}

+ 1 - 0
ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py

@@ -35,6 +35,7 @@ from resource_management.libraries.functions.list_ambari_managed_repos import *
 # TODO, HACK
 def replace_variables(cmd, host_name, version):
   if cmd:
+    cmd = cmd.replace("{{host_name}}", "{host_name}")
     cmd = cmd.replace("0.0.0.0", "{host_name}")
     cmd = cmd.replace("{{version}}", "{version}")
     cmd = format(cmd)

+ 3 - 2
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py

@@ -76,7 +76,8 @@ def failover_namenode():
     Execute(check_standby_cmd,
             user=params.hdfs_user,
             tries=30,
-            try_sleep=6)
+            try_sleep=6,
+            logoutput=True)
   else:
     Logger.info("Rolling Upgrade - Host %s is the standby namenode." % str(params.hostname))
 
@@ -97,7 +98,7 @@ def kill_zkfc(zkfc_user):
       if code == 0:
         Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
         kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
-        checked_call(kill_command)
+        checked_call(kill_command, verbose=True)
 
 
 def get_service_pid_file(name, user):

+ 9 - 17
ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml

@@ -53,7 +53,6 @@
       <service name="YARN">
         <component>APP_TIMELINE_SERVER</component>
         <component>RESOURCEMANAGER</component>
-        <component>NODEMANAGER</component>
       </service>
       <service name="HBASE">
         <component>HBASE_MASTER</component>
@@ -159,27 +158,19 @@
           FINALIZE rolling upgrade ...
           There is no rolling upgrade in progress or rolling upgrade has already been finalized.
           -->
-          <task xsi:type="execute">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode enter'</command>
+          <task xsi:type="execute" hosts="master">
+            <first>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode get | grep "Safe mode is ON in {{host_name}}"'</first>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
             <upto>10</upto>
             <every>6</every>
           </task>
 
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade prepare'</command>
-            <onfailure>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</onfailure>   <!-- TODO, stay in safemode if in HA. -->
           </task>
 
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade query'</command>
-            <onfailure>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</onfailure>   <!-- TODO, stay in safemode if in HA. -->
-          </task>
-
-          <!-- Apparently, the HDFS Namenode restart requires safemode to be OFF when not in HA. -->
-          <task xsi:type="execute">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
-            <upto>60</upto>
-            <every>1</every>
           </task>
         </pre-upgrade>
 
@@ -189,11 +180,12 @@
 
         <!-- This step should be done once the user clicks on the "Finalize" button. So the name post-upgrade is misleading. -->
         <post-upgrade>
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade finalize'</command>
           </task>
-          <task xsi:type="execute">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>       <!-- TODO, stay in safemode if in HA. -->
+          <task xsi:type="execute" hosts="master">
+            <first>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode get | grep "Safe mode is ON in {{host_name}}"'</first>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
           </task>
         </post-upgrade>
       </component>

+ 22 - 2
ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java

@@ -29,6 +29,8 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.stack.HostsType;
+import org.apache.ambari.server.stack.MasterHostResolver;
 import org.apache.ambari.server.state.UpgradeHelper.UpgradeGroupHolder;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.junit.After;
@@ -38,6 +40,10 @@ import org.junit.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the {@link UpgradeHelper} class
@@ -46,6 +52,7 @@ public class UpgradeHelperTest {
 
   private Injector injector;
   private AmbariMetaInfo ambariMetaInfo;
+  private MasterHostResolver m_masterHostResolver;
 
   @Before
   public void before() throws Exception {
@@ -55,6 +62,8 @@ public class UpgradeHelperTest {
 
     ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
     ambariMetaInfo.init();
+    
+    m_masterHostResolver = mock(MasterHostResolver.class);
   }
 
   @After
@@ -74,8 +83,14 @@ public class UpgradeHelperTest {
 
     Cluster cluster = makeCluster();
 
+    HostsType hostsType = new HostsType();
+    hostsType.hosts = cluster.getHosts("HDFS", "NAMENODE");
+    hostsType.master = "h1";
+    hostsType.secondary = "h2";
+    when(m_masterHostResolver.getMasterAndHosts(Mockito.matches("HDFS"), Mockito.matches("NAMENODE"))).thenReturn(hostsType);
+
     UpgradeHelper helper = new UpgradeHelper();
-    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, upgrade);
+    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, m_masterHostResolver, upgrade);
 
     assertEquals(5, groups.size());
 
@@ -86,10 +101,14 @@ public class UpgradeHelperTest {
     assertEquals("POST_CLUSTER", groups.get(4).name);
 
     assertEquals(6, groups.get(1).items.size());
-    assertEquals(2, groups.get(2).items.size());
+    assertEquals(6, groups.get(2).items.size());
     assertEquals(7, groups.get(3).items.size());
   }
 
+  /**
+   * Create an HA cluster
+   * @throws AmbariException
+   */
   public Cluster makeCluster() throws AmbariException {
     Clusters clusters = injector.getInstance(Clusters.class);
     ServiceFactory serviceFactory = injector.getInstance(ServiceFactory.class);
@@ -123,6 +142,7 @@ public class UpgradeHelperTest {
     Service s = c.getService("HDFS");
     ServiceComponent sc = s.addServiceComponent("NAMENODE");
     sc.addServiceComponentHost("h1");
+    sc.addServiceComponentHost("h2");
     sc = s.addServiceComponent("DATANODE");
     sc.addServiceComponentHost("h2");
     sc.addServiceComponentHost("h3");

+ 1 - 1
ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml

@@ -107,7 +107,7 @@
     <service name="HDFS">
       <component name="NAMENODE">
         <pre-upgrade>
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {hdfs-user} -c 'dosomething'</command>
           </task>
           <task xsi:type="configure">