浏览代码

AMBARI-8836 - Upgrade pack for Hive (jonathanhurley)

Jonathan Hurley 10 年之前
父节点
当前提交
181f3ab4de

+ 73 - 2
ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java

@@ -17,23 +17,94 @@
  */
  */
 package org.apache.ambari.server.serveraction.upgrades;
 package org.apache.ambari.server.serveraction.upgrades;
 
 
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.ConfigurationRequest;
 import org.apache.ambari.server.serveraction.AbstractServerAction;
 import org.apache.ambari.server.serveraction.AbstractServerAction;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.DesiredConfig;
+
+import com.google.inject.Inject;
 
 
 /**
 /**
  * Action that represents a manual stage.
  * Action that represents a manual stage.
  */
  */
 public class ConfigureAction extends AbstractServerAction {
 public class ConfigureAction extends AbstractServerAction {
 
 
+  /**
+   * Used to lookup the cluster.
+   */
+  @Inject
+  private Clusters m_clusters;
+
+  /**
+   * Used to update the configuration properties.
+   */
+  @Inject
+  private AmbariManagementController m_controller;
+
+  /**
+   * Used to assist in the creation of a {@link ConfigurationRequest} to update
+   * configuration values.
+   */
+  @Inject
+  private ConfigHelper m_configHelper;
+
+  /**
+   * {@inheritDoc}
+   */
   @Override
   @Override
   public CommandReport execute(
   public CommandReport execute(
       ConcurrentMap<String, Object> requestSharedDataContext)
       ConcurrentMap<String, Object> requestSharedDataContext)
       throws AmbariException, InterruptedException {
       throws AmbariException, InterruptedException {
-    // TODO Auto-generated method stub
-    return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", "", "");
+
+    Map<String,String> commandParameters = getCommandParameters();
+    if( null == commandParameters || commandParameters.isEmpty() ){
+      return createCommandReport(0, HostRoleStatus.FAILED, "{}", "",
+          "Unable to change configuration values without command parameters");
+    }
+
+    String clusterName = commandParameters.get("clusterName");
+    String key = commandParameters.get("key");
+    String value = commandParameters.get("value");
+
+    // such as hdfs-site or hbase-env
+    String configType = commandParameters.get("type");
+
+    if (null == clusterName || null == configType || null == key) {
+      String message = "cluster={0}, type={1}, key={2}";
+      message = MessageFormat.format(message, clusterName, configType, key);
+
+      return createCommandReport(0, HostRoleStatus.FAILED, "{}", "", message);
+    }
+
+    Cluster cluster = m_clusters.getCluster(clusterName);
+    Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
+    DesiredConfig desiredConfig = desiredConfigs.get(configType);
+    Config config = cluster.getConfig(configType, desiredConfig.getTag());
+
+    Map<String, String> propertiesToChange = new HashMap<String, String>();
+    propertiesToChange.put(key, value);
+    config.updateProperties(propertiesToChange);
+
+    String serviceVersionNote = "Stack Upgrade";
+
+    m_configHelper.createConfigType(cluster, m_controller, configType,
+        config.getProperties(), m_controller.getAuthName(), serviceVersionNote);
+
+    String message = "Updated ''{0}'' with ''{1}={2}''";
+    message = MessageFormat.format(message, configType, key, value);
+
+    return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", message, "");
   }
   }
 }
 }

+ 135 - 106
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java

@@ -31,16 +31,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.inject.Singleton;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
-
-import com.google.inject.Inject;
-import com.google.inject.persist.Transactional;
-
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.ConfigurationRequest;
 import org.apache.ambari.server.controller.ConfigurationRequest;
@@ -52,6 +44,12 @@ import org.apache.ambari.server.upgrade.UpgradeCatalog170;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
+
 /**
 /**
  * Helper class that works with config traversals.
  * Helper class that works with config traversals.
  */
  */
@@ -89,9 +87,9 @@ public class ConfigHelper {
    */
    */
   public Map<String, Map<String, String>> getEffectiveDesiredTags(
   public Map<String, Map<String, String>> getEffectiveDesiredTags(
       Cluster cluster, String hostName) throws AmbariException {
       Cluster cluster, String hostName) throws AmbariException {
-    
+
     Host host = clusters.getHost(hostName);
     Host host = clusters.getHost(hostName);
-    
+
     return getEffectiveDesiredTags(cluster, host.getDesiredHostConfigs(cluster));
     return getEffectiveDesiredTags(cluster, host.getDesiredHostConfigs(cluster));
   }
   }
 
 
@@ -103,15 +101,15 @@ public class ConfigHelper {
    */
    */
   private Map<String, Map<String, String>> getEffectiveDesiredTags(
   private Map<String, Map<String, String>> getEffectiveDesiredTags(
       Cluster cluster, Map<String, HostConfig> hostConfigOverrides) {
       Cluster cluster, Map<String, HostConfig> hostConfigOverrides) {
-    
+
     Map<String, DesiredConfig> clusterDesired = cluster.getDesiredConfigs();
     Map<String, DesiredConfig> clusterDesired = cluster.getDesiredConfigs();
-    
+
     Map<String, Map<String,String>> resolved = new TreeMap<String, Map<String, String>>();
     Map<String, Map<String,String>> resolved = new TreeMap<String, Map<String, String>>();
-    
+
     // Do not use host component config mappings.  Instead, the rules are:
     // Do not use host component config mappings.  Instead, the rules are:
     // 1) Use the cluster desired config
     // 1) Use the cluster desired config
     // 2) override (1) with config-group overrides
     // 2) override (1) with config-group overrides
-    
+
     for (Entry<String, DesiredConfig> clusterEntry : clusterDesired.entrySet()) {
     for (Entry<String, DesiredConfig> clusterEntry : clusterDesired.entrySet()) {
       String type = clusterEntry.getKey();
       String type = clusterEntry.getKey();
       String tag = clusterEntry.getValue().getTag();
       String tag = clusterEntry.getValue().getTag();
@@ -394,7 +392,7 @@ public class ConfigHelper {
   public void invalidateStaleConfigsCache(ServiceComponentHost sch) {
   public void invalidateStaleConfigsCache(ServiceComponentHost sch) {
     staleConfigsCache.invalidate(sch);
     staleConfigsCache.invalidate(sch);
   }
   }
-  
+
   /**
   /**
    * Remove configs by type
    * Remove configs by type
    * @param type config Type
    * @param type config Type
@@ -402,15 +400,15 @@ public class ConfigHelper {
   @Transactional
   @Transactional
   public void removeConfigsByType(Cluster cluster, String type) {
   public void removeConfigsByType(Cluster cluster, String type) {
     Set<String> globalVersions = cluster.getConfigsByType(type).keySet();
     Set<String> globalVersions = cluster.getConfigsByType(type).keySet();
-    
+
     for(String version:globalVersions) {
     for(String version:globalVersions) {
       ClusterConfigEntity clusterConfigEntity = clusterDAO.findConfig
       ClusterConfigEntity clusterConfigEntity = clusterDAO.findConfig
         (cluster.getClusterId(), type, version);
         (cluster.getClusterId(), type, version);
-      
+
       clusterDAO.removeConfig(clusterConfigEntity);
       clusterDAO.removeConfig(clusterConfigEntity);
     }
     }
   }
   }
-  
+
   /**
   /**
    * Gets all the config dictionary where property with the given name is present in stack definitions
    * Gets all the config dictionary where property with the given name is present in stack definitions
    * @param stackId
    * @param stackId
@@ -419,30 +417,30 @@ public class ConfigHelper {
   public Set<String> findConfigTypesByPropertyName(StackId stackId, String propertyName, String clusterName) throws AmbariException {
   public Set<String> findConfigTypesByPropertyName(StackId stackId, String propertyName, String clusterName) throws AmbariException {
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
         stackId.getStackVersion());
         stackId.getStackVersion());
-    
+
     Set<String> result = new HashSet<String>();
     Set<String> result = new HashSet<String>();
 
 
     for(Service service : clusters.getCluster(clusterName).getServices().values()) {
     for(Service service : clusters.getCluster(clusterName).getServices().values()) {
       Set<PropertyInfo> stackProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), service.getName());
       Set<PropertyInfo> stackProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), service.getName());
       Set<PropertyInfo> stackLevelProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
       Set<PropertyInfo> stackLevelProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
       stackProperties.addAll(stackLevelProperties);
       stackProperties.addAll(stackLevelProperties);
-      
+
       for (PropertyInfo stackProperty : stackProperties) {
       for (PropertyInfo stackProperty : stackProperties) {
         if(stackProperty.getName().equals(propertyName)) {
         if(stackProperty.getName().equals(propertyName)) {
           String configType = fileNameToConfigType(stackProperty.getFilename());
           String configType = fileNameToConfigType(stackProperty.getFilename());
-          
+
           result.add(configType);
           result.add(configType);
         }
         }
       }
       }
     }
     }
-    
+
     return result;
     return result;
   }
   }
-  
+
   public Set<String> getPropertyValuesWithPropertyType(StackId stackId, PropertyType propertyType, Cluster cluster) throws AmbariException {
   public Set<String> getPropertyValuesWithPropertyType(StackId stackId, PropertyType propertyType, Cluster cluster) throws AmbariException {
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
         stackId.getStackVersion());
         stackId.getStackVersion());
-    
+
     Set<String> result = new HashSet<String>();
     Set<String> result = new HashSet<String>();
 
 
     for(Service service : cluster.getServices().values()) {
     for(Service service : cluster.getServices().values()) {
@@ -456,99 +454,123 @@ public class ConfigHelper {
         }
         }
       }
       }
     }
     }
-    
+
     Set<PropertyInfo> stackProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
     Set<PropertyInfo> stackProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
-    
+
     for (PropertyInfo stackProperty : stackProperties) {
     for (PropertyInfo stackProperty : stackProperties) {
       if(stackProperty.getPropertyTypes().contains(propertyType)) {
       if(stackProperty.getPropertyTypes().contains(propertyType)) {
         String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename());
         String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename());
         result.add(cluster.getDesiredConfigByType(stackPropertyConfigType).getProperties().get(stackProperty.getName()));
         result.add(cluster.getDesiredConfigByType(stackPropertyConfigType).getProperties().get(stackProperty.getName()));
       }
       }
     }
     }
-    
+
     return result;
     return result;
   }
   }
-  
+
   public String getPropertyValueFromStackDefinitions(Cluster cluster, String configType, String propertyName) throws AmbariException {
   public String getPropertyValueFromStackDefinitions(Cluster cluster, String configType, String propertyName) throws AmbariException {
     StackId stackId = cluster.getCurrentStackVersion();
     StackId stackId = cluster.getCurrentStackVersion();
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
         stackId.getStackVersion());
         stackId.getStackVersion());
-    
+
     for(ServiceInfo serviceInfo:stack.getServices()) {
     for(ServiceInfo serviceInfo:stack.getServices()) {
       Set<PropertyInfo> serviceProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceInfo.getName());
       Set<PropertyInfo> serviceProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceInfo.getName());
       Set<PropertyInfo> stackProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
       Set<PropertyInfo> stackProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
       serviceProperties.addAll(stackProperties);
       serviceProperties.addAll(stackProperties);
-      
+
       for (PropertyInfo stackProperty : serviceProperties) {
       for (PropertyInfo stackProperty : serviceProperties) {
         String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename());
         String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename());
-        
+
         if(stackProperty.getName().equals(propertyName) && stackPropertyConfigType.equals(configType)) {
         if(stackProperty.getName().equals(propertyName) && stackPropertyConfigType.equals(configType)) {
           return stackProperty.getValue();
           return stackProperty.getValue();
         }
         }
       }
       }
-      
+
     }
     }
-    
+
     return null;
     return null;
   }
   }
-  
+
   public ServiceInfo getPropertyOwnerService(Cluster cluster, String configType, String propertyName) throws AmbariException {
   public ServiceInfo getPropertyOwnerService(Cluster cluster, String configType, String propertyName) throws AmbariException {
     StackId stackId = cluster.getCurrentStackVersion();
     StackId stackId = cluster.getCurrentStackVersion();
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
-    
-    for(ServiceInfo serviceInfo:stack.getServices()) {     
+
+    for(ServiceInfo serviceInfo:stack.getServices()) {
       Set<PropertyInfo> serviceProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceInfo.getName());
       Set<PropertyInfo> serviceProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceInfo.getName());
-      
+
       for (PropertyInfo stackProperty : serviceProperties) {
       for (PropertyInfo stackProperty : serviceProperties) {
         String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename());
         String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename());
-        
+
         if(stackProperty.getName().equals(propertyName) && stackPropertyConfigType.equals(configType)) {
         if(stackProperty.getName().equals(propertyName) && stackPropertyConfigType.equals(configType)) {
           return serviceInfo;
           return serviceInfo;
         }
         }
       }
       }
-      
+
     }
     }
-    
+
     return null;
     return null;
   }
   }
-  
+
   public Set<PropertyInfo> getServiceProperties(Cluster cluster, String serviceName) throws AmbariException {
   public Set<PropertyInfo> getServiceProperties(Cluster cluster, String serviceName) throws AmbariException {
     StackId stackId = cluster.getCurrentStackVersion();
     StackId stackId = cluster.getCurrentStackVersion();
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
-    
+
     return ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceName);
     return ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceName);
   }
   }
-  
+
   public Set<PropertyInfo> getStackProperties(Cluster cluster) throws AmbariException {
   public Set<PropertyInfo> getStackProperties(Cluster cluster) throws AmbariException {
     StackId stackId = cluster.getCurrentStackVersion();
     StackId stackId = cluster.getCurrentStackVersion();
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
-    
+
     return ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
     return ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion());
   }
   }
-  
-  public void createConfigType(Cluster cluster, AmbariManagementController ambariManagementController, 
-      String configType, Map<String, String> properties, String authName) throws AmbariException {
-    String tag;
-    if(cluster.getConfigsByType(configType) == null) {
-      tag = "version1";
-    } else {
+
+  /**
+   * A helper method to create a new {@link Config} for a given configuration
+   * type. This method will perform the following tasks:
+   * <ul>
+   * <li>Create a {@link Config} in the cluster for the specified type. This
+   * will have the proper versions and tags set automatically.</li>
+   * <li>Set the cluster's {@link DesiredConfig} to the new configuration</li>
+   * <li>Create an entry in the configuration history with a note and username.</li>
+   * <ul>
+   *
+   * @param cluster
+   * @param controller
+   * @param configType
+   * @param properties
+   * @param authenticatedUserName
+   * @param serviceVersionNote
+   * @throws AmbariException
+   */
+  public void createConfigType(Cluster cluster,
+      AmbariManagementController controller, String configType,
+      Map<String, String> properties, String authenticatedUserName,
+      String serviceVersionNote) throws AmbariException {
+
+    String tag = "version1";
+    if (cluster.getConfigsByType(configType) != null) {
       tag = "version" + System.currentTimeMillis();
       tag = "version" + System.currentTimeMillis();
     }
     }
-    
-    ConfigurationRequest cr = new ConfigurationRequest();
-    cr.setClusterName(cluster.getClusterName());
-    cr.setVersionTag(tag);
-    cr.setType(configType);
-    cr.setProperties(properties);
-    ambariManagementController.createConfiguration(cr);
-    
-    Config baseConfig = cluster.getConfig(cr.getType(), cr.getVersionTag());
-    
+
+    // update the configuration
+    ConfigurationRequest configurationRequest = new ConfigurationRequest();
+    configurationRequest.setClusterName(cluster.getClusterName());
+    configurationRequest.setVersionTag(tag);
+    configurationRequest.setType(configType);
+    configurationRequest.setProperties(properties);
+    configurationRequest.setServiceConfigVersionNote(serviceVersionNote);
+    controller.createConfiguration(configurationRequest);
+
+    // create the configuration history entry
+    Config baseConfig = cluster.getConfig(configurationRequest.getType(),
+        configurationRequest.getVersionTag());
+
     if (baseConfig != null) {
     if (baseConfig != null) {
-      cluster.addDesiredConfig(authName, Collections.singleton(baseConfig));
+      cluster.addDesiredConfig(authenticatedUserName,
+          Collections.singleton(baseConfig), serviceVersionNote);
     }
     }
   }
   }
-  
+
   /**
   /**
    * Since global configs are deprecated since 1.7.0, but still supported.
    * Since global configs are deprecated since 1.7.0, but still supported.
    * We should automatically map any globals used, to *-env dictionaries.
    * We should automatically map any globals used, to *-env dictionaries.
@@ -557,57 +579,58 @@ public class ConfigHelper {
    */
    */
   public void moveDeprecatedGlobals(StackId stackId, Map<String, Map<String, String>> configurations, String clusterName) {
   public void moveDeprecatedGlobals(StackId stackId, Map<String, Map<String, String>> configurations, String clusterName) {
     Map<String, String> globalConfigurations = new HashMap<String, String>();
     Map<String, String> globalConfigurations = new HashMap<String, String>();
-    
+
     if(configurations.get(Configuration.GLOBAL_CONFIG_TAG) == null ||
     if(configurations.get(Configuration.GLOBAL_CONFIG_TAG) == null ||
-        configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0)
+        configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0) {
       return;
       return;
-  
+    }
+
     globalConfigurations.putAll(configurations.get(Configuration.GLOBAL_CONFIG_TAG));
     globalConfigurations.putAll(configurations.get(Configuration.GLOBAL_CONFIG_TAG));
-    
+
     if(globalConfigurations!=null && globalConfigurations.size() != 0) {
     if(globalConfigurations!=null && globalConfigurations.size() != 0) {
       LOG.warn("Global configurations are deprecated, "
       LOG.warn("Global configurations are deprecated, "
           + "please use *-env");
           + "please use *-env");
     }
     }
-    
+
     for(Map.Entry<String, String> property:globalConfigurations.entrySet()) {
     for(Map.Entry<String, String> property:globalConfigurations.entrySet()) {
       String propertyName = property.getKey();
       String propertyName = property.getKey();
       String propertyValue = property.getValue();
       String propertyValue = property.getValue();
-      
+
       Set<String> newConfigTypes = null;
       Set<String> newConfigTypes = null;
       try{
       try{
-        newConfigTypes = this.findConfigTypesByPropertyName(stackId, propertyName, clusterName);
+        newConfigTypes = findConfigTypesByPropertyName(stackId, propertyName, clusterName);
       } catch(AmbariException e) {
       } catch(AmbariException e) {
         LOG.error("Exception while getting configurations from the stacks", e);
         LOG.error("Exception while getting configurations from the stacks", e);
         return;
         return;
       }
       }
-      
+
       newConfigTypes.remove(Configuration.GLOBAL_CONFIG_TAG);
       newConfigTypes.remove(Configuration.GLOBAL_CONFIG_TAG);
-      
+
       String newConfigType = null;
       String newConfigType = null;
       if(newConfigTypes.size() > 0) {
       if(newConfigTypes.size() > 0) {
         newConfigType = newConfigTypes.iterator().next();
         newConfigType = newConfigTypes.iterator().next();
       } else {
       } else {
         newConfigType = UpgradeCatalog170.getAdditionalMappingGlobalToEnv().get(propertyName);
         newConfigType = UpgradeCatalog170.getAdditionalMappingGlobalToEnv().get(propertyName);
       }
       }
-      
+
       if(newConfigType==null) {
       if(newConfigType==null) {
         LOG.warn("Cannot find where to map " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG +
         LOG.warn("Cannot find where to map " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG +
             " (value="+propertyValue+")");
             " (value="+propertyValue+")");
         continue;
         continue;
       }
       }
-      
-      LOG.info("Mapping config " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG + 
+
+      LOG.info("Mapping config " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG +
           " to " + newConfigType +
           " to " + newConfigType +
           " (value="+propertyValue+")");
           " (value="+propertyValue+")");
-      
+
       configurations.get(Configuration.GLOBAL_CONFIG_TAG).remove(propertyName);
       configurations.get(Configuration.GLOBAL_CONFIG_TAG).remove(propertyName);
-      
+
       if(!configurations.containsKey(newConfigType)) {
       if(!configurations.containsKey(newConfigType)) {
         configurations.put(newConfigType, new HashMap<String, String>());
         configurations.put(newConfigType, new HashMap<String, String>());
       }
       }
       configurations.get(newConfigType).put(propertyName, propertyValue);
       configurations.get(newConfigType).put(propertyName, propertyValue);
     }
     }
-    
+
     if(configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0) {
     if(configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0) {
       configurations.remove(Configuration.GLOBAL_CONFIG_TAG);
       configurations.remove(Configuration.GLOBAL_CONFIG_TAG);
     }
     }
@@ -620,15 +643,16 @@ public class ConfigHelper {
     }
     }
 
 
     Map <String, HostConfig> actual = sch.getActualConfigs();
     Map <String, HostConfig> actual = sch.getActualConfigs();
-    if (null == actual || actual.isEmpty())
+    if (null == actual || actual.isEmpty()) {
       return false;
       return false;
+    }
 
 
     Cluster cluster = clusters.getClusterById(sch.getClusterId());
     Cluster cluster = clusters.getClusterById(sch.getClusterId());
     StackId stackId = cluster.getDesiredStackVersion();
     StackId stackId = cluster.getDesiredStackVersion();
-    
+
     Map<String, Map<String, String>> desired = getEffectiveDesiredTags(cluster,
     Map<String, Map<String, String>> desired = getEffectiveDesiredTags(cluster,
         sch.getHostName());
         sch.getHostName());
-    
+
     ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
     ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
         stackId.getStackVersion(), sch.getServiceName());
         stackId.getStackVersion(), sch.getServiceName());
     ComponentInfo componentInfo = serviceInfo.getComponentByName(sch.getServiceComponentName());
     ComponentInfo componentInfo = serviceInfo.getComponentByName(sch.getServiceComponentName());
@@ -643,13 +667,13 @@ public class ConfigHelper {
     boolean stale = false;
     boolean stale = false;
 
 
     Iterator<Entry<String, Map<String, String>>> it = desired.entrySet().iterator();
     Iterator<Entry<String, Map<String, String>>> it = desired.entrySet().iterator();
-    
+
     while (it.hasNext() && !stale) {
     while (it.hasNext() && !stale) {
       Entry<String, Map<String, String>> desiredEntry = it.next();
       Entry<String, Map<String, String>> desiredEntry = it.next();
-      
+
       String type = desiredEntry.getKey();
       String type = desiredEntry.getKey();
       Map<String, String> tags = desiredEntry.getValue();
       Map<String, String> tags = desiredEntry.getValue();
-      
+
       if (!actual.containsKey(type)) {
       if (!actual.containsKey(type)) {
         // desired is set, but actual is not
         // desired is set, but actual is not
         if (!serviceInfo.hasConfigDependency(type)) {
         if (!serviceInfo.hasConfigDependency(type)) {
@@ -658,7 +682,7 @@ public class ConfigHelper {
           // find out if the keys are stale by first checking the target service,
           // find out if the keys are stale by first checking the target service,
           // then all services
           // then all services
           Collection<String> keys = mergeKeyNames(cluster, type, tags.values());
           Collection<String> keys = mergeKeyNames(cluster, type, tags.values());
-          
+
           if (serviceInfo.hasDependencyAndPropertyFor(type, keys) || !hasPropertyFor(stackId, type, keys)) {
           if (serviceInfo.hasDependencyAndPropertyFor(type, keys) || !hasPropertyFor(stackId, type, keys)) {
             stale = true;
             stale = true;
           }
           }
@@ -723,51 +747,55 @@ public class ConfigHelper {
 
 
     for (ServiceInfo svc : ambariMetaInfo.getServices(stack.getStackName(),
     for (ServiceInfo svc : ambariMetaInfo.getServices(stack.getStackName(),
         stack.getStackVersion()).values()) {
         stack.getStackVersion()).values()) {
-      
-      if (svc.hasDependencyAndPropertyFor(type, keys))
+
+      if (svc.hasDependencyAndPropertyFor(type, keys)) {
         return true;
         return true;
-      
+      }
+
     }
     }
-    
+
     return false;
     return false;
   }
   }
-  
+
   /**
   /**
    * @return the keys that have changed values
    * @return the keys that have changed values
    */
    */
   private Collection<String> findChangedKeys(Cluster cluster, String type,
   private Collection<String> findChangedKeys(Cluster cluster, String type,
       Collection<String> desiredTags, Collection<String> actualTags) {
       Collection<String> desiredTags, Collection<String> actualTags) {
-    
+
     Map<String, String> desiredValues = new HashMap<String, String>();
     Map<String, String> desiredValues = new HashMap<String, String>();
     Map<String, String> actualValues = new HashMap<String, String>();
     Map<String, String> actualValues = new HashMap<String, String>();
-    
+
     for (String tag : desiredTags) {
     for (String tag : desiredTags) {
       Config config = cluster.getConfig(type, tag);
       Config config = cluster.getConfig(type, tag);
-      if (null != config)
+      if (null != config) {
         desiredValues.putAll(config.getProperties());
         desiredValues.putAll(config.getProperties());
+      }
     }
     }
-    
+
     for (String tag : actualTags) {
     for (String tag : actualTags) {
       Config config = cluster.getConfig(type, tag);
       Config config = cluster.getConfig(type, tag);
-      if (null != config)
+      if (null != config) {
         actualValues.putAll(config.getProperties());
         actualValues.putAll(config.getProperties());
+      }
     }
     }
-    
+
     List<String> keys = new ArrayList<String>();
     List<String> keys = new ArrayList<String>();
-    
+
     for (Entry<String, String> entry : desiredValues.entrySet()) {
     for (Entry<String, String> entry : desiredValues.entrySet()) {
       String key = entry.getKey();
       String key = entry.getKey();
       String value = entry.getValue();
       String value = entry.getValue();
-      
-      if (!actualValues.containsKey(key))
+
+      if (!actualValues.containsKey(key)) {
         keys.add(key);
         keys.add(key);
-      else if (!actualValues.get(key).equals(value))
+      } else if (!actualValues.get(key).equals(value)) {
         keys.add(key);
         keys.add(key);
+      }
     }
     }
-    
+
     return keys;
     return keys;
   }
   }
-  
+
   /**
   /**
    * @return the map of tags for a desired config
    * @return the map of tags for a desired config
    */
    */
@@ -781,13 +809,14 @@ public class ConfigHelper {
     }
     }
     return map;
     return map;
   }
   }
-  
+
   /**
   /**
    * @return true if the tags are different in any way, even if not-specified
    * @return true if the tags are different in any way, even if not-specified
    */
    */
   private boolean isTagChanged(Map<String, String> desiredTags, Map<String, String> actualTags, boolean groupSpecificConfigs) {
   private boolean isTagChanged(Map<String, String> desiredTags, Map<String, String> actualTags, boolean groupSpecificConfigs) {
-    if (!actualTags.get(CLUSTER_DEFAULT_TAG).equals(desiredTags.get(CLUSTER_DEFAULT_TAG)) && !groupSpecificConfigs)
+    if (!actualTags.get(CLUSTER_DEFAULT_TAG).equals(desiredTags.get(CLUSTER_DEFAULT_TAG)) && !groupSpecificConfigs) {
       return true;
       return true;
+    }
 
 
     // if the host has group specific configs for type we should ignore the cluster level configs and compare specifics
     // if the host has group specific configs for type we should ignore the cluster level configs and compare specifics
     if (groupSpecificConfigs) {
     if (groupSpecificConfigs) {
@@ -807,14 +836,14 @@ public class ConfigHelper {
    */
    */
   private Collection<String> mergeKeyNames(Cluster cluster, String type, Collection<String> tags) {
   private Collection<String> mergeKeyNames(Cluster cluster, String type, Collection<String> tags) {
     Set<String> names = new HashSet<String>();
     Set<String> names = new HashSet<String>();
-    
+
     for (String tag : tags) {
     for (String tag : tags) {
       Config config = cluster.getConfig(type, tag);
       Config config = cluster.getConfig(type, tag);
       if (null != config) {
       if (null != config) {
         names.addAll(config.getProperties().keySet());
         names.addAll(config.getProperties().keySet());
       }
       }
     }
     }
-    
+
     return names;
     return names;
   }
   }
 
 

+ 34 - 29
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java

@@ -18,9 +18,30 @@
 
 
 package org.apache.ambari.server.upgrade;
 package org.apache.ambari.server.upgrade;
 
 
-import com.google.common.reflect.TypeToken;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
+import java.lang.reflect.Type;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+import javax.persistence.criteria.CriteriaBuilder;
+import javax.persistence.criteria.CriteriaQuery;
+import javax.persistence.criteria.Expression;
+import javax.persistence.criteria.Predicate;
+import javax.persistence.criteria.Root;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariManagementController;
@@ -70,35 +91,16 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.ConfigHelper;
-import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.ambari.server.view.ViewRegistry;
 import org.apache.ambari.server.view.ViewRegistry;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import javax.persistence.EntityManager;
-import javax.persistence.TypedQuery;
-import javax.persistence.criteria.CriteriaBuilder;
-import javax.persistence.criteria.CriteriaQuery;
-import javax.persistence.criteria.Expression;
-import javax.persistence.criteria.Predicate;
-import javax.persistence.criteria.Root;
-import java.lang.reflect.Type;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import com.google.common.reflect.TypeToken;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
 
 
 /**
 /**
  * Upgrade catalog for version 1.7.0.
  * Upgrade catalog for version 1.7.0.
@@ -1110,11 +1112,12 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
         Config oldConfig = cluster.getDesiredConfigByType(PIG_PROPERTIES_CONFIG_TYPE);
         Config oldConfig = cluster.getDesiredConfigByType(PIG_PROPERTIES_CONFIG_TYPE);
         if (oldConfig != null) {
         if (oldConfig != null) {
           Map<String, String> properties = oldConfig.getProperties();
           Map<String, String> properties = oldConfig.getProperties();
-          
+
           if(!properties.containsKey(CONTENT_FIELD_NAME)) {
           if(!properties.containsKey(CONTENT_FIELD_NAME)) {
             String value = properties.remove(PIG_CONTENT_FIELD_NAME);
             String value = properties.remove(PIG_CONTENT_FIELD_NAME);
             properties.put(CONTENT_FIELD_NAME, value);
             properties.put(CONTENT_FIELD_NAME, value);
-            configHelper.createConfigType(cluster, ambariManagementController, PIG_PROPERTIES_CONFIG_TYPE, properties, "ambari-upgrade");
+            configHelper.createConfigType(cluster, ambariManagementController,
+                PIG_PROPERTIES_CONFIG_TYPE, properties, "ambari-upgrade", null);
           }
           }
         }
         }
       }
       }
@@ -1212,7 +1215,9 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
         // if have some custom properties, for own services etc., leave that as it was
         // if have some custom properties, for own services etc., leave that as it was
         if(unmappedGlobalProperties.size() != 0) {
         if(unmappedGlobalProperties.size() != 0) {
           LOG.info("Not deleting globals because have custom properties");
           LOG.info("Not deleting globals because have custom properties");
-          configHelper.createConfigType(cluster, ambariManagementController, Configuration.GLOBAL_CONFIG_TAG, unmappedGlobalProperties, "ambari-upgrade");
+          configHelper.createConfigType(cluster, ambariManagementController,
+              Configuration.GLOBAL_CONFIG_TAG, unmappedGlobalProperties,
+              "ambari-upgrade", null);
         } else {
         } else {
           configHelper.removeConfigsByType(cluster, Configuration.GLOBAL_CONFIG_TAG);
           configHelper.removeConfigsByType(cluster, Configuration.GLOBAL_CONFIG_TAG);
         }
         }

+ 26 - 13
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_metastore.py

@@ -23,42 +23,55 @@ from resource_management import *
 
 
 from hive import hive
 from hive import hive
 from hive_service import hive_service
 from hive_service import hive_service
-from mysql_service import mysql_service
 
 
-class HiveMetastore(Script):
 
 
+class HiveMetastore(Script):
   def install(self, env):
   def install(self, env):
     import params
     import params
-    self.install_packages(env, exclude_packages=params.hive_exclude_packages)
+
+    self.install_packages(env, exclude_packages = params.hive_exclude_packages)
+
 
 
   def configure(self, env):
   def configure(self, env):
     import params
     import params
+
     env.set_params(params)
     env.set_params(params)
 
 
-    hive(name='metastore')
+    hive(name = 'metastore')
+
 
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, rolling_restart = False):
     import params
     import params
+
     env.set_params(params)
     env.set_params(params)
-    self.configure(env) # FOR SECURITY
-    hive_service( 'metastore',
-                   action = 'start'
-    )
+    self.configure(env)  # FOR SECURITY
+    hive_service('metastore', action = 'start')
+
 
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, rolling_restart = False):
     import params
     import params
+
     env.set_params(params)
     env.set_params(params)
+    hive_service('metastore', action = 'stop' )
 
 
-    hive_service( 'metastore',
-                   action = 'stop'
-    )
 
 
   def status(self, env):
   def status(self, env):
     import status_params
     import status_params
+
     env.set_params(status_params)
     env.set_params(status_params)
     pid_file = format("{hive_pid_dir}/{hive_metastore_pid}")
     pid_file = format("{hive_pid_dir}/{hive_metastore_pid}")
     # Recursively check all existing gmetad pid files
     # Recursively check all existing gmetad pid files
     check_process_status(pid_file)
     check_process_status(pid_file)
 
 
+
+  def pre_rolling_restart(self, env):
+    Logger.info("Executing Metastore Rolling Upgrade pre-restart")
+    import params
+    env.set_params(params)
+
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set hive-metastore {version}"))
+
+
 if __name__ == "__main__":
 if __name__ == "__main__":
   HiveMetastore().execute()
   HiveMetastore().execute()

+ 22 - 6
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py

@@ -17,6 +17,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 
 
 """
 """
+import hive_server_upgrade
 
 
 from resource_management import *
 from resource_management import *
 from hive import hive
 from hive import hive
@@ -30,13 +31,16 @@ class HiveServer(Script):
     import params
     import params
     self.install_packages(env, exclude_packages=params.hive_exclude_packages)
     self.install_packages(env, exclude_packages=params.hive_exclude_packages)
 
 
+
   def configure(self, env):
   def configure(self, env):
     import params
     import params
     env.set_params(params)
     env.set_params(params)
     if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >=0):
     if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >=0):
       install_tez_jars()
       install_tez_jars()
+
     hive(name='hiveserver2')
     hive(name='hiveserver2')
 
 
+
   def start(self, env, rolling_restart=False):
   def start(self, env, rolling_restart=False):
     import params
     import params
     env.set_params(params)
     env.set_params(params)
@@ -46,25 +50,37 @@ class HiveServer(Script):
     copy_tarballs_to_hdfs('mapreduce', params.tez_user, params.hdfs_user, params.user_group)
     copy_tarballs_to_hdfs('mapreduce', params.tez_user, params.hdfs_user, params.user_group)
     copy_tarballs_to_hdfs('tez', params.tez_user, params.hdfs_user, params.user_group)
     copy_tarballs_to_hdfs('tez', params.tez_user, params.hdfs_user, params.user_group)
 
 
-    hive_service( 'hiveserver2',
-                  action = 'start'
-    )
+    hive_service( 'hiveserver2', action = 'start',
+      rolling_restart=rolling_restart )
+
 
 
   def stop(self, env, rolling_restart=False):
   def stop(self, env, rolling_restart=False):
     import params
     import params
     env.set_params(params)
     env.set_params(params)
 
 
-    hive_service( 'hiveserver2',
-                  action = 'stop'
-    )
+    if rolling_restart:
+      hive_server_upgrade.pre_upgrade_deregister()
+    else:
+      hive_service( 'hiveserver2', action = 'stop' )
+
 
 
   def status(self, env):
   def status(self, env):
     import status_params
     import status_params
     env.set_params(status_params)
     env.set_params(status_params)
     pid_file = format("{hive_pid_dir}/{hive_pid}")
     pid_file = format("{hive_pid_dir}/{hive_pid}")
+
     # Recursively check all existing gmetad pid files
     # Recursively check all existing gmetad pid files
     check_process_status(pid_file)
     check_process_status(pid_file)
 
 
 
 
+  def pre_rolling_restart(self, env):
+    Logger.info("Executing HiveServer2 Rolling Upgrade pre-restart")
+    import params
+    env.set_params(params)
+
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set hive-server2 {version}"))
+
+
 if __name__ == "__main__":
 if __name__ == "__main__":
   HiveServer().execute()
   HiveServer().execute()

+ 85 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py

@@ -0,0 +1,85 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import re
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Execute
+from resource_management.core.shell import call
+from resource_management.libraries.functions import format
+
+
+def pre_upgrade_deregister():
+  """
+  Runs the "hive --service hiveserver2 --deregister <version>" command to
+  de-provision the server in preparation for an upgrade. This will contact
+  ZooKeeper to remove the server so that clients that attempt to connect
+  will be directed to other servers automatically. Once all
+  clients have drained, the server will shutdown automatically; this process
+  could take a very long time.
+  This function will obtain the Kerberos ticket if security is enabled.
+  :return:
+  """
+  import params
+
+  Logger.info('HiveServer2 executing "deregister" command in preparation for upgrade...')
+
+  if params.security_enabled:
+    kinit_command=format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser}; ")
+    Execute(kinit_command,user=params.smokeuser)
+
+  # calculate the current hive server version
+  current_hiveserver_version = _get_current_hiveserver_version()
+  if current_hiveserver_version is None:
+    raise Fail('Unable to determine the current HiveServer2 version to deregister.')
+
+  # deregister
+  command = 'hive --service hiveserver2 --deregister ' + current_hiveserver_version
+  Execute(command, user=params.hive_user, path=params.execute_path, tries=1 )
+
+
+def _get_current_hiveserver_version():
+  """
+  Runs an "hdp-select status hive-server2" check and parses the result in order
+  to obtain the current version of hive.
+
+  :return:  the hiveserver2 version, such as "hdp-select status hive-server2"
+  """
+  import params
+
+  try:
+    command = 'hdp-select status hive-server2'
+    return_code, hdp_output = call(command, user=params.hive_user)
+  except Exception, e:
+    Logger.error(str(e))
+    raise Fail('Unable to execute hdp-select command to retrieve the hiveserver2 version.')
+
+  if return_code != 0:
+    raise Fail('Unable to determine the current HiveServer2 version because of a non-zero return code of {0}'.format(str(return_code)))
+
+  # strip "hive-server2 - " off of result and test the version
+  current_hive_server_version = re.sub('hive-server2 - ', '', hdp_output)
+  match = re.match('[0-9]+.[0-9]+.[0-9]+.[0-9]+-[0-9]+', current_hive_server_version)
+
+  if match:
+    return current_hive_server_version
+  else:
+    raise Fail('The extracted hiveserver2 version "{0}" does not matching any known pattern'.format(current_hive_server_version))
+
+

+ 16 - 18
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py

@@ -23,9 +23,7 @@ import sys
 import time
 import time
 from resource_management.core import shell
 from resource_management.core import shell
 
 
-def hive_service(
-    name,
-    action='start'):
+def hive_service(name, action='start', rolling_restart=False):
 
 
   import params
   import params
 
 
@@ -38,20 +36,23 @@ def hive_service(
     cmd = format(
     cmd = format(
       "env JAVA_HOME={java64_home} {start_hiveserver2_path} {hive_log_dir}/hive-server2.out {hive_log_dir}/hive-server2.log {pid_file} {hive_server_conf_dir} {hive_log_dir}")
       "env JAVA_HOME={java64_home} {start_hiveserver2_path} {hive_log_dir}/hive-server2.out {hive_log_dir}/hive-server2.log {pid_file} {hive_server_conf_dir} {hive_log_dir}")
 
 
-  process_id_exists = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
-  
+  process_id_exists_command = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
+
   if action == 'start':
   if action == 'start':
     if name == 'hiveserver2':
     if name == 'hiveserver2':
       check_fs_root()
       check_fs_root()
 
 
     demon_cmd = format("{cmd}")
     demon_cmd = format("{cmd}")
-    
-    Execute(demon_cmd,
-            user=params.hive_user,
-            environment={'HADOOP_HOME': params.hadoop_home},
-            path=params.execute_path,
-            not_if=process_id_exists
-    )
+
+    # upgrading hiveserver2 (rolling_restart) means that there is an existing,
+    # de-registering hiveserver2; the pid will still exist, but the new
+    # hiveserver is spinning up on a new port, so the pid will be re-written
+    if rolling_restart:
+      process_id_exists_command = None
+
+    Execute(demon_cmd, user=params.hive_user,
+      environment={'HADOOP_HOME': params.hadoop_home}, path=params.execute_path,
+      not_if=process_id_exists_command )
 
 
     if params.hive_jdbc_driver == "com.mysql.jdbc.Driver" or \
     if params.hive_jdbc_driver == "com.mysql.jdbc.Driver" or \
        params.hive_jdbc_driver == "org.postgresql.Driver" or \
        params.hive_jdbc_driver == "org.postgresql.Driver" or \
@@ -96,12 +97,9 @@ def hive_service(
             
             
   elif action == 'stop':
   elif action == 'stop':
     demon_cmd = format("sudo kill `cat {pid_file}`")
     demon_cmd = format("sudo kill `cat {pid_file}`")
-    Execute(demon_cmd,
-         not_if = format("! ({process_id_exists})")
-    )
-    File(pid_file,
-         action = "delete",
-    )
+    Execute(demon_cmd, not_if = format("! ({process_id_exists_command})"))
+
+    File(pid_file, action = "delete",)
 
 
 def check_fs_root():
 def check_fs_root():
   import params  
   import params  

+ 14 - 1
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params.py

@@ -38,14 +38,27 @@ version = default("/commandParams/version", None)
 # Hadoop params
 # Hadoop params
 # TODO, this logic should initialize these parameters in a file inside the HDP 2.2 stack.
 # TODO, this logic should initialize these parameters in a file inside the HDP 2.2 stack.
 if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >=0:
 if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >=0:
+  # start out with client libraries
   hadoop_bin_dir = "/usr/hdp/current/hadoop-client/bin"
   hadoop_bin_dir = "/usr/hdp/current/hadoop-client/bin"
   hadoop_home = '/usr/hdp/current/hadoop-client'
   hadoop_home = '/usr/hdp/current/hadoop-client'
   hive_bin = '/usr/hdp/current/hive-client/bin'
   hive_bin = '/usr/hdp/current/hive-client/bin'
   hive_lib = '/usr/hdp/current/hive-client/lib'
   hive_lib = '/usr/hdp/current/hive-client/lib'
 
 
+  # if this is a server action, then use the server binaries; smoke tests
+  # use the client binaries
+  command_role = default("/role", "")
+  server_role_dir_mapping = { 'HIVE_SERVER' : 'hive-server2',
+    'HIVE_METASTORE' : 'hive-metastore' }
+
+  if command_role in server_role_dir_mapping:
+    hive_server_root = server_role_dir_mapping[command_role]
+    hive_bin = format('/usr/hdp/current/{hive_server_root}/bin')
+    hive_lib = format('/usr/hdp/current/{hive_server_root}/lib')
+
+  # there are no client versions of these, use server versions directly
   hcat_lib = '/usr/hdp/current/hive-webhcat/share/hcatalog'
   hcat_lib = '/usr/hdp/current/hive-webhcat/share/hcatalog'
   webhcat_bin_dir = '/usr/hdp/current/hive-webhcat/sbin'
   webhcat_bin_dir = '/usr/hdp/current/hive-webhcat/sbin'
-  
+
   hive_specific_configs_supported = True
   hive_specific_configs_supported = True
 else:
 else:
   hadoop_bin_dir = "/usr/bin"
   hadoop_bin_dir = "/usr/bin"

+ 14 - 1
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat_server.py

@@ -18,7 +18,6 @@ limitations under the License.
 Ambari Agent
 Ambari Agent
 
 
 """
 """
-import sys
 from resource_management import *
 from resource_management import *
 
 
 from webhcat import webhcat
 from webhcat import webhcat
@@ -27,27 +26,41 @@ from webhcat_service import webhcat_service
 class WebHCatServer(Script):
 class WebHCatServer(Script):
   def install(self, env):
   def install(self, env):
     self.install_packages(env)
     self.install_packages(env)
+
+
   def configure(self, env):
   def configure(self, env):
     import params
     import params
     env.set_params(params)
     env.set_params(params)
     webhcat()
     webhcat()
 
 
+
   def start(self, env, rolling_restart=False):
   def start(self, env, rolling_restart=False):
     import params
     import params
     env.set_params(params)
     env.set_params(params)
     self.configure(env) # FOR SECURITY
     self.configure(env) # FOR SECURITY
     webhcat_service(action = 'start')
     webhcat_service(action = 'start')
 
 
+
   def stop(self, env, rolling_restart=False):
   def stop(self, env, rolling_restart=False):
     import params
     import params
     env.set_params(params)
     env.set_params(params)
 
 
     webhcat_service(action = 'stop')
     webhcat_service(action = 'stop')
 
 
+
   def status(self, env):
   def status(self, env):
     import status_params
     import status_params
     env.set_params(status_params)
     env.set_params(status_params)
     check_process_status(status_params.webhcat_pid_file)
     check_process_status(status_params.webhcat_pid_file)
 
 
+
+  def pre_rolling_restart(self, env):
+    Logger.info("Executing WebHCat Rolling Upgrade pre-restart")
+    import params
+    env.set_params(params)
+
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set hive-webhcat {version}"))
+
 if __name__ == "__main__":
 if __name__ == "__main__":
   WebHCatServer().execute()
   WebHCatServer().execute()

+ 48 - 3
ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml

@@ -21,7 +21,6 @@
   <target>2.2.*.*</target>
   <target>2.2.*.*</target>
 
 
   <order>
   <order>
-
     <group name="ZOOKEEPER" title="Zookeeper">
     <group name="ZOOKEEPER" title="Zookeeper">
       <service name="ZOOKEEPER">
       <service name="ZOOKEEPER">
         <component>ZOOKEEPER_SERVER</component>
         <component>ZOOKEEPER_SERVER</component>
@@ -68,6 +67,14 @@
       </batch>
       </batch>
     </group>
     </group>
 
 
+    <group name="HIVE" title="Hive">
+      <service name="HIVE">
+        <component>HIVE_METASTORE</component>
+        <component>HIVE_SERVER</component>
+        <component>WEBHCAT_SERVER</component>
+      </service>
+    </group>
+
     <group name="CLIENTS" title="Client Components">
     <group name="CLIENTS" title="Client Components">
       <service name="HDFS">
       <service name="HDFS">
         <component>HDFS_CLIENT</component>
         <component>HDFS_CLIENT</component>
@@ -118,7 +125,7 @@
         </task>
         </task>
       </execute-stage>
       </execute-stage>
     </group>
     </group>
-   
+
   </order>
   </order>
   
   
   
   
@@ -312,8 +319,46 @@
         </upgrade>
         </upgrade>
       </component>
       </component>
     </service>
     </service>
-    
+
     <service name="HIVE">
     <service name="HIVE">
+      <component name="HIVE_METASTORE">
+        <pre-upgrade>
+          <task xsi:type="manual">
+            <message>Backup the Hive Metastore database.</message>
+          </task>
+          <task xsi:type="manual">
+            <message>Run the SQL file at /usr/hdp/$version/hive/scripts/metastore/upgrade to update the Hive Metastore schema.</message>
+          </task>
+        </pre-upgrade>
+        <upgrade>
+          <task xsi:type="restart" />
+        </upgrade>
+      </component>
+
+      <component name="HIVE_SERVER">
+        <pre-upgrade>
+          <task xsi:type="manual">
+            <message>The HiveServer port will now change to 10010. Ensure that this port is available on each HiveServer instance.</message>
+          </task>
+
+          <task xsi:type="configure">
+            <type>hive-site</type>
+            <key>hive.server2.thrift.port</key>
+            <value>10010</value>
+          </task>
+        </pre-upgrade>
+
+        <upgrade>
+          <task xsi:type="restart" />
+        </upgrade>
+      </component>
+
+      <component name="WEBHCAT_SERVER">
+        <upgrade>
+          <task xsi:type="restart" />
+        </upgrade>
+      </component>
+
       <component name="HIVE_CLIENT">
       <component name="HIVE_CLIENT">
         <upgrade>
         <upgrade>
           <task xsi:type="restart" />
           <task xsi:type="restart" />

+ 58 - 5
ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py

@@ -17,18 +17,17 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
-import os
+import socket
 import subprocess
 import subprocess
-from mock.mock import MagicMock, call, patch
+
+from mock.mock import MagicMock, patch
 from resource_management.core import shell
 from resource_management.core import shell
-from resource_management.libraries.functions import hive_check
 from stacks.utils.RMFTestCase import *
 from stacks.utils.RMFTestCase import *
 
 
-import socket
-
 class TestHiveServer(RMFTestCase):
 class TestHiveServer(RMFTestCase):
   COMMON_SERVICES_PACKAGE_DIR = "HIVE/0.12.0.2.0/package"
   COMMON_SERVICES_PACKAGE_DIR = "HIVE/0.12.0.2.0/package"
   STACK_VERSION = "2.0.6"
   STACK_VERSION = "2.0.6"
+  UPGRADE_STACK_VERSION = "2.2"
 
 
   def test_configure_default(self):
   def test_configure_default(self):
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
@@ -545,3 +544,57 @@ class TestHiveServer(RMFTestCase):
       self.assert_configure_default()
       self.assert_configure_default()
       self.assertFalse(socket_mock.called)
       self.assertFalse(socket_mock.called)
       self.assertFalse(s.close.called)
       self.assertFalse(s.close.called)
+
+
+  @patch("hive_server.HiveServer.pre_rolling_restart")
+  @patch("hive_server.HiveServer.start")
+  @patch("subprocess.Popen")
+  def test_stop_during_upgrade(self, process_mock, hive_server_start_mock,
+    hive_server_pre_rolling_mock):
+
+    process_output = 'hive-server2 - 2.2.0.0-2041'
+
+    process = MagicMock()
+    process.communicate.return_value = [process_output]
+    process.returncode = 0
+    process_mock.return_value = process
+
+    self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
+     classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json",
+     hdp_stack_version = self.UPGRADE_STACK_VERSION,
+     target = RMFTestCase.TARGET_COMMON_SERVICES )
+
+    self.assertTrue(process_mock.called)
+    self.assertEqual(process_mock.call_count,2)
+
+    self.assertResourceCalled('Execute', 'hive --service hiveserver2 --deregister 2.2.0.0-2041',
+      path=['/bin:/usr/hdp/current/hive-server2/bin:/usr/hdp/current/hadoop-client/bin'],
+      tries=1, user='hive')
+
+    self.assertResourceCalled('Execute', 'hdp-select set hive-server2 2.2.1.0-2065',)
+
+
+  @patch("hive_server.HiveServer.pre_rolling_restart")
+  @patch("hive_server.HiveServer.start")
+  @patch("subprocess.Popen")
+  def test_stop_during_upgrade_bad_hive_version(self, process_mock, hive_server_start_mock,
+    hive_server_pre_rolling_mock):
+
+    process_output = 'BAD VERSION'
+
+    process = MagicMock()
+    process.communicate.return_value = [process_output]
+    process.returncode = 0
+    process_mock.return_value = process
+
+    try:
+      self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
+       classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json",
+       hdp_stack_version = self.UPGRADE_STACK_VERSION,
+       target = RMFTestCase.TARGET_COMMON_SERVICES )
+
+      self.fail("Invalid hive version should have caused an exception")
+    except:
+      pass
+
+    self.assertNoMoreResources()

文件差异内容过多而无法显示
+ 229 - 0
ambari-server/src/test/python/stacks/2.2/configs/hive-upgrade.json


部分文件因为文件数量过多而无法显示