Browse Source

AMBARI-8970. Custom Actions for Namenode to execute tasks by calling a python script (alejandro)

Alejandro Fernandez 10 years ago
parent
commit
ccda419012
19 changed files with 325 additions and 217 deletions
  1. 2 2
      ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
  2. 8 8
      ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
  3. 3 3
      ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
  4. 2 2
      ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
  5. 0 3
      ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py
  6. 13 6
      ambari-common/src/main/python/resource_management/libraries/script/script.py
  7. 7 2
      ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
  8. 28 9
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
  9. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
  10. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
  11. 10 6
      ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java
  12. 14 33
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
  13. 8 2
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
  14. 111 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
  15. 90 77
      ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
  16. 6 48
      ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
  17. 9 10
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java
  18. 9 1
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostStackVersionResourceProviderTest.java
  19. 1 1
      ambari-web/app/messages.js

+ 2 - 2
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -133,7 +133,7 @@ class CustomServiceOrchestrator():
         hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
         base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
         
-        script_path = self.resolve_script_path(base_dir, script, script_type)
+        script_path = self.resolve_script_path(base_dir, script)
         script_tuple = (script_path, base_dir)
 
       tmpstrucoutfile = os.path.join(self.tmp_dir,
@@ -260,7 +260,7 @@ class CustomServiceOrchestrator():
 
     return result
 
-  def resolve_script_path(self, base_dir, script, script_type):
+  def resolve_script_path(self, base_dir, script):
     """
     Incapsulates logic of script location determination.
     """

+ 8 - 8
ambari-agent/src/main/python/ambari_agent/PythonExecutor.py

@@ -51,7 +51,7 @@ class PythonExecutor:
     pass
 
 
-  def open_subporcess_files(self, tmpoutfile, tmperrfile, override_output_files):
+  def open_subprocess_files(self, tmpoutfile, tmperrfile, override_output_files):
     if override_output_files: # Recreate files
       tmpout =  open(tmpoutfile, 'w')
       tmperr =  open(tmperrfile, 'w')
@@ -83,8 +83,8 @@ class PythonExecutor:
     script_params += [tmpstructedoutfile, logger_level, tmp_dir]
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
-    if(handle == None) :
-      tmpout, tmperr = self.open_subporcess_files(tmpoutfile, tmperrfile, override_output_files)
+    if handle is None:
+      tmpout, tmperr = self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files)
 
       process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
       # map task_id to pid
@@ -106,7 +106,7 @@ class PythonExecutor:
       background.start()
       return {"exitcode": 777}
 
-  def prepare_process_result (self, process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
+  def prepare_process_result(self, process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
     out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
     # Building results
     returncode = process.returncode
@@ -200,7 +200,7 @@ class BackgroundThread(threading.Thread):
     self.pythonExecutor = pythonExecutor
 
   def run(self):
-    process_out, process_err  = self.pythonExecutor.open_subporcess_files(self.holder.out_file, self.holder.err_file, True)
+    process_out, process_err = self.pythonExecutor.open_subprocess_files(self.holder.out_file, self.holder.err_file, True)
 
     logger.info("Starting process command %s" % self.holder.command)
     process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
@@ -214,7 +214,7 @@ class BackgroundThread(threading.Thread):
     process.communicate()
 
     self.holder.handle.exitCode = process.returncode
-    process_condenced_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
-    logger.info("Calling callback with args %s" % process_condenced_result)
-    self.holder.handle.on_background_command_complete_callback(process_condenced_result, self.holder.handle)
+    process_condensed_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
+    logger.info("Calling callback with args %s" % process_condensed_result)
+    self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle)
     logger.info("Exiting from thread for holder pid %s" % self.holder.handle.pid)

+ 3 - 3
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -731,14 +731,14 @@ def patch_output_file(pythonExecutor):
     with tmperr:
       tmperr.write('process_err')
     return proc
-  def open_subporcess_files_win(fout, ferr, f):
+  def open_subprocess_files_win(fout, ferr, f):
     return MagicMock(), MagicMock()
   def read_result_from_files(out_path, err_path, structured_out_path):
     return 'process_out', 'process_err', '{"a": "b."}'
   pythonExecutor.launch_python_subprocess = windows_py
-  pythonExecutor.open_subporcess_files = open_subporcess_files_win
+  pythonExecutor.open_subprocess_files = open_subprocess_files_win
   pythonExecutor.read_result_from_files = read_result_from_files
-    
+
 def wraped(func, before = None, after = None):
     def wrapper(*args, **kwargs):
       if(before is not None):

+ 2 - 2
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -146,13 +146,13 @@ class TestCustomServiceOrchestrator(TestCase):
     # Testing existing path
     exists_mock.return_value = True
     path = orchestrator.\
-      resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py"), "PYTHON")
+      resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py"))
     self.assertEqual(os.path.join("HBASE", "package", "scripts", "hbase_master.py"), path)
     # Testing not existing path
     exists_mock.return_value = False
     try:
       orchestrator.resolve_script_path("/HBASE",
-                                       os.path.join("scripts", "hbase_master.py"), "PYTHON")
+                                       os.path.join("scripts", "hbase_master.py"))
       self.fail('ExpectedException not thrown')
     except AgentException:
       pass # Expected

+ 0 - 3
ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py

@@ -70,9 +70,6 @@ def _get_tar_source_and_dest_folder(tarball_prefix):
   if not component_tar_destination_folder.endswith("/"):
     component_tar_destination_folder = component_tar_destination_folder + "/"
 
-  if not component_tar_destination_folder.startswith("hdfs://"):
-    return None, None
-
   return component_tar_source_file, component_tar_destination_folder
 
 

+ 13 - 6
ambari-common/src/main/python/resource_management/libraries/script/script.py

@@ -83,6 +83,13 @@ class Script(object):
   4 path to file with structured command output (file will be created)
   """
   structuredOut = {}
+  command_data_file = ""
+  basedir = ""
+  stroutfile = ""
+  logging_level = ""
+
+  # Class variable
+  tmp_dir = ""
 
   def put_structured_out(self, sout):
     Script.structuredOut.update(sout)
@@ -106,13 +113,13 @@ class Script(object):
      sys.exit(1)
 
     command_name = str.lower(sys.argv[1])
-    command_data_file = sys.argv[2]
-    basedir = sys.argv[3]
+    self.command_data_file = sys.argv[2]
+    self.basedir = sys.argv[3]
     self.stroutfile = sys.argv[4]
-    logging_level = sys.argv[5]
+    self.logging_level = sys.argv[5]
     Script.tmp_dir = sys.argv[6]
 
-    logging_level_str = logging._levelNames[logging_level]
+    logging_level_str = logging._levelNames[self.logging_level]
     chout.setLevel(logging_level_str)
     logger.setLevel(logging_level_str)
 
@@ -123,7 +130,7 @@ class Script(object):
       reload_windows_env()
 
     try:
-      with open(command_data_file, "r") as f:
+      with open(self.command_data_file, "r") as f:
         pass
         Script.config = ConfigDictionary(json.load(f))
         #load passwords here(used on windows to impersonate different users)
@@ -138,7 +145,7 @@ class Script(object):
     # Run class method depending on a command type
     try:
       method = self.choose_method_to_execute(command_name)
-      with Environment(basedir) as env:
+      with Environment(self.basedir) as env:
         method(env)
     except ClientComponentHasNoStatus or ComponentIsNotRunning:
       # Support of component status checks.

+ 7 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java

@@ -43,6 +43,7 @@ import org.apache.ambari.server.customactions.ActionDefinition;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.StackId;
@@ -72,6 +73,8 @@ public class AmbariActionExecutionHelper {
   private AmbariMetaInfo ambariMetaInfo;
   @Inject
   private MaintenanceStateHelper maintenanceStateHelper;
+  @Inject
+  private ConfigHelper configHelper;
 
   /**
    * Validates the request to execute an action.
@@ -336,7 +339,9 @@ public class AmbariActionExecutionHelper {
             hostName, System.currentTimeMillis()), clusterName,
               serviceName, retryAllowed);
 
-      Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+      Map<String, Map<String, String>> hostConfigTags = configHelper.getEffectiveDesiredTags(cluster, hostName);
+      Map<String, Map<String, String>> hostConfigurations = configHelper.getEffectiveConfigProperties(cluster, hostConfigTags);
+
       Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<String, Map<String, Map<String, String>>>();
       Map<String, Map<String, String>> configTags = null;
       if (!StringUtils.isEmpty(serviceName) && null != cluster) {
@@ -355,7 +360,7 @@ public class AmbariActionExecutionHelper {
        * TODO Execution command field population should be (partially?)
         * combined with the same code at createHostAction()
         */
-      execCmd.setConfigurations(configurations);
+      execCmd.setConfigurations(hostConfigurations);
       execCmd.setConfigurationAttributes(configurationAttributes);
       execCmd.setConfigurationTags(configTags);
       execCmd.setCommandParams(commandParams);

+ 28 - 9
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java

@@ -64,7 +64,9 @@ import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.stack.MasterHostResolver;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.UpgradeHelper;
 import org.apache.ambari.server.state.UpgradeHelper.UpgradeGroupHolder;
 import org.apache.ambari.server.state.stack.UpgradePack;
@@ -79,6 +81,9 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_PACKAGE_FOLDER;
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOOKS_FOLDER;
+
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 
@@ -341,9 +346,6 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
           String configType = formattedVar.substring(0, posConfigFile);
           String propertyName = formattedVar.substring(posConfigFile + 1, formattedVar.length());
           try {
-            // TODO, some properties use 0.0.0.0 to indicate the current host.
-            // Right now, ru_execute_tasks.py is responsible for replacing 0.0.0.0 with the hostname.
-
             String configValue = configHelper.getPropertyValueFromStackDefinitions(cluster, configType, propertyName);
             task = task.replace(origVar, configValue);
           } catch (Exception err) {
@@ -484,9 +486,22 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     RequestResourceFilter filter = new RequestResourceFilter("", "",
         new ArrayList<String>(wrapper.getHosts()));
 
-
     Map<String, String> params = new HashMap<String, String>();
     params.put("tasks", entity.getTasks());
+    params.put("version", version);
+
+    // Because custom task may end up calling a script/function inside a service, it is necessary to set the
+    // service_package_folder and hooks_folder params.
+    AmbariMetaInfo ambariMetaInfo = m_metaProvider.get();
+    StackId stackId = cluster.getDesiredStackVersion();
+    StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
+    if (wrapper.getTasks() != null && wrapper.getTasks().size() > 0) {
+      String serviceName = wrapper.getTasks().get(0).getService();
+      ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(), serviceName);
+      params.put(SERVICE_PACKAGE_FOLDER,
+          serviceInfo.getServicePackageFolder());
+      params.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
+    }
 
     ActionExecutionContext actionContext = new ActionExecutionContext(
         cluster.getClusterName(), "ru_execute_tasks",
@@ -497,13 +512,17 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     Map<String, String> hostLevelParams = new HashMap<String, String>();
     hostLevelParams.put(JDK_LOCATION, getManagementController().getJdkResourceUrl());
 
+    ExecuteCommandJson jsons = commandExecutionHelper.get().getCommandJson(
+        actionContext, cluster);
+
     Stage stage = stageFactory.get().createNew(request.getId().longValue(),
         "/tmp/ambari",
         cluster.getClusterName(),
         cluster.getClusterId(),
         entity.getText(),
-        "{}", "{}",
-        StageUtils.getGson().toJson(hostLevelParams));
+        jsons.getClusterHostInfo(),
+        jsons.getCommandParamsForStage(),
+        jsons.getHostParamsForStage());
 
     stage.setSkippable(UPGRADE_DEFAULT_SKIPPABLE);
 
@@ -589,13 +608,13 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       filters.add(new RequestResourceFilter(tw.getService(), "", Collections.<String>emptyList()));
     }
 
-    Map<String, String> restartCommandParams = new HashMap<String, String>();
-    restartCommandParams.put("version", version);
+    Map<String, String> commandParams = new HashMap<String, String>();
+    commandParams.put("version", version);
 
     ActionExecutionContext actionContext = new ActionExecutionContext(
         cluster.getClusterName(), "SERVICE_CHECK",
         filters,
-        restartCommandParams);
+        commandParams);
     actionContext.setTimeout(Short.valueOf((short)-1));
 
     ExecuteCommandJson jsons = commandExecutionHelper.get().getCommandJson(

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java

@@ -102,7 +102,7 @@ public class ConfigHelper {
   private Map<String, Map<String, String>> getEffectiveDesiredTags(
       Cluster cluster, Map<String, HostConfig> hostConfigOverrides) {
 
-    Map<String, DesiredConfig> clusterDesired = cluster.getDesiredConfigs();
+    Map<String, DesiredConfig> clusterDesired = (cluster == null) ? new HashMap<String, DesiredConfig>() : cluster.getDesiredConfigs();
 
     Map<String, Map<String,String>> resolved = new TreeMap<String, Map<String, String>>();
 

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.state;
 
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -48,6 +49,7 @@ import org.apache.ambari.server.state.stack.upgrade.Grouping;
 import org.apache.ambari.server.state.stack.upgrade.StageWrapper;
 import org.apache.ambari.server.state.stack.upgrade.StageWrapperBuilder;
 import org.apache.ambari.server.state.stack.upgrade.TaskWrapper;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,10 +119,8 @@ public class UpgradeHelper {
 
               // Override the hosts with the ordered collection
               hostsType.hosts = order;
-
             } else {
-                // TODO Rolling Upgrade, enable once Namenode HA is a pre-check requirement.
-                // throw new AmbariException(MessageFormat.format("Could not find active and standby namenodes using hosts: {0}", StringUtils.join(hostsType.hosts, ", ").toString()));
+                throw new AmbariException(MessageFormat.format("Could not find active and standby namenodes using hosts: {0}", StringUtils.join(hostsType.hosts, ", ").toString()));
             }
 
             builder.add(hostsType, service.serviceName, svc.isClientOnlyService(), pc);

+ 10 - 6
ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java

@@ -1226,14 +1226,18 @@ public class HostImpl implements Host {
   @Override
   public Map<String, HostConfig> getDesiredHostConfigs(Cluster cluster) throws AmbariException {
     Map<String, HostConfig> hostConfigMap = new HashMap<String, HostConfig>();
-    for (Map.Entry<String, DesiredConfig> desiredConfigEntry :
-        cluster.getDesiredConfigs().entrySet()) {
-      HostConfig hostConfig = new HostConfig();
-      hostConfig.setDefaultVersionTag(desiredConfigEntry.getValue().getTag());
-      hostConfigMap.put(desiredConfigEntry.getKey(), hostConfig);
+    Map<String, DesiredConfig> clusterDesiredConfigs = (cluster == null) ? new HashMap<String, DesiredConfig>() : cluster.getDesiredConfigs();
+
+    if (clusterDesiredConfigs != null) {
+      for (Map.Entry<String, DesiredConfig> desiredConfigEntry :
+          clusterDesiredConfigs.entrySet()) {
+        HostConfig hostConfig = new HostConfig();
+        hostConfig.setDefaultVersionTag(desiredConfigEntry.getValue().getTag());
+        hostConfigMap.put(desiredConfigEntry.getKey(), hostConfig);
+      }
     }
 
-    Map<Long, ConfigGroup> configGroups = cluster.getConfigGroupsByHostname(getHostName());
+    Map<Long, ConfigGroup> configGroups = (cluster == null) ? new HashMap<Long, ConfigGroup>() : cluster.getConfigGroupsByHostname(getHostName());
 
     if (configGroups != null && !configGroups.isEmpty()) {
       for (ConfigGroup configGroup : configGroups.values()) {

+ 14 - 33
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java

@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 
 /**
  * Used to represent an execution that should occur on an agent.
+ * An equivalent class exists in the python server-side, called ExecuteTask in ru_execute_tasks.py
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -43,46 +44,26 @@ public class ExecuteTask extends Task {
   public String hosts;
 
   /**
-   * Command to run under normal conditions.
-   */
-  @XmlElement(name="command")
-  public String command;
-
-  /**
-   * Run the command only if this condition is first met.
-   */
-  @XmlElement(name="first")
-  public String first;
-
-  /**
-   * Run the command unless this condition is met.
+   * Similar to a command, but instead it is a call to invoke the script (using its relative path).
+   * THe script and function elements are used together, and are invoked with additional environment variables.
+   * If both a (script, function) and command are defined, only the (script, function) will be executed.
+   * The service is already specified as part of the group.
    */
-  @XmlElement(name="unless")
-  public String unless;
+  @XmlElement(name="script")
+  public String script;
 
   /**
-   * Command to run if a failure occurs.
+   * Function name to call in the {@see script} element.
    */
-  @XmlElement(name="onfailure")
-  public String onfailure;
+  @XmlElement(name="function")
+  public String function;
 
   /**
-   * Run the command up to x times, until it succeeds.
-   */
-  @XmlElement(name="upto")
-  public String upto;
-
-  /**
-   * If "until" is specified, then sleep this many seconds between attempts.
-   */
-  @XmlElement(name="every")
-  public String every;
-
-  /**
-   * Comma delimited list of return codes to ignore
+   * Command to run under normal conditions.
+   *  If both a function and command are defined, only the function will be executed.
    */
-  @XmlElement(name="ignore")
-  public String ignore;
+  @XmlElement(name="command")
+  public String command;
 
   @Override
   public Task.Type getType() {

+ 8 - 2
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py

@@ -20,8 +20,6 @@ limitations under the License.
 import sys
 import os
 import json
-import subprocess
-from datetime import datetime
 
 from resource_management import *
 from resource_management.libraries.functions.security_commons import build_expectations, \
@@ -31,7 +29,9 @@ from resource_management.libraries.functions.version import compare_versions, \
   format_hdp_stack_version
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.core.exceptions import Fail
 
+import namenode_upgrade
 from hdfs_namenode import namenode
 from hdfs import hdfs
 import hdfs_rebalance
@@ -47,6 +47,12 @@ class NameNode(Script):
     #TODO we need this for HA because of manual steps
     self.configure(env)
 
+  def prepare_rolling_upgrade(self, env):
+    namenode_upgrade.prepare_rolling_upgrade()
+
+  def finalize_rolling_upgrade(self, env):
+    namenode_upgrade.finalize_rolling_upgrade()
+
   def pre_rolling_restart(self, env):
     Logger.info("Executing Rolling Upgrade pre-restart")
     import params

+ 111 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py

@@ -0,0 +1,111 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.format import format
+from resource_management.core.shell import call
+from resource_management.core.exceptions import Fail
+
+
+class SAFEMODE:
+  ON = "ON"
+  OFF = "OFF"
+
+
+safemode_to_instruction = {SAFEMODE.ON: "enter",
+                           SAFEMODE.OFF: "leave"}
+
+
+def reach_safemode_state(secure_user, safemode_state, in_ha):
+  """
+  Enter or leave safemode for the Namenode.
+  @param secure_user: user to perform action as
+  @param safemode_state: ON or OFF
+  @param in_ha: bool indicating if Namenode High Availability is enabled
+  @:return True if successful, false otherwise.
+  """
+  Logger.info("Prepare to leave safemode")
+  import params
+
+  hostname = params.hostname
+  grep = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
+  safemode_check = format("su - {secure_user} -c 'hdfs dfsadmin -safemode get | grep \"{grep}\"'")
+  code, out = call(safemode_check)
+  Logger.info("Command: %s\nCode: %d." % (safemode_check, code))
+  if code != 0:
+    command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state])
+    Execute(command,
+            user=secure_user,
+            logoutput=True)
+
+    code, out = call(safemode_check)
+    Logger.info("Command: %s\nCode: %d. Out: %s" % (safemode_check, code, out))
+    if code != 0:
+      return False
+  return True
+
+
+def prepare_rolling_upgrade():
+  """
+  Rolling Upgrade for HDFS Namenode requires the following.
+  1. Leave safemode if the safemode status is not OFF
+  2. Execute a rolling upgrade "prepare"
+  3. Execute a rolling upgrade "query"
+  """
+  Logger.info("Executing Rolling Upgrade prepare")
+  import params
+
+  secure_user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+
+  if params.security_enabled:
+    Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+            user=secure_user)
+
+  safemode_transition_successful = reach_safemode_state(secure_user, SAFEMODE.OFF, True)
+  if not safemode_transition_successful:
+    raise Fail("Could leave safemode")
+
+  prepare = "hdfs dfsadmin -rollingUpgrade prepare"
+  query = "hdfs dfsadmin -rollingUpgrade query"
+  Execute(prepare,
+          user=secure_user,
+          logoutput=True)
+  Execute(query,
+          user=secure_user,
+          logoutput=True)
+
+
+def finalize_rolling_upgrade():
+  """
+  Finalize the Namenode upgrade, at which point it cannot be downgraded.
+  """
+  Logger.info("Executing Rolling Upgrade finalize")
+  import params
+
+  secure_user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+  finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
+  Execute(finalize_cmd,
+          user=secure_user,
+          logoutput=True)
+
+  safemode_transition_successful = reach_safemode_state(secure_user, SAFEMODE.OFF, True)
+  if not safemode_transition_successful:
+    Logger.warning("Could leave safemode")

+ 90 - 77
ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py

@@ -20,19 +20,49 @@ Ambari Agent
 
 """
 
+import os
 import json
 import socket
-import re
 import time
 
-from resource_management import *
-from resource_management.core.shell import call, checked_call
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions.default import default
+from resource_management.core.shell import call
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
-from resource_management.libraries.functions.list_ambari_managed_repos import *
+from ambari_agent.FileCache import FileCache
+from ambari_agent.AmbariConfig import AmbariConfig
+
+agent_config = AmbariConfig()
+
+
+class ExecuteTask:
+  """
+  Encapsulate a task that can be executed on the agent.
+  An equivalent class exists in the Java server-side, called ExecuteTask.java
+  """
+
+  def __init__(self, t):
+    """
+    @:param t: Dictionary with string representation
+    """
+    self.type = t["type"] if "type" in t else None
+    self.hosts = t["hosts"] if "hosts" in t else None
+    self.script = t["script"] if "script" in t else None
+    self.function = t["function"] if "function" in t else None
+    self.command = t["command"] if "command" in t else None
+
+  def __str__(self):
+    inner = []
+    if self.type:
+      inner.append("Type: %s" % str(self.type))
+    if self.script and self.function:
+      inner.append("Script: %s - Function: %s" % (str(self.script), str(self.function)))
+    elif self.command:
+      inner.append("Command: %s" % str(self.command))
+    return "Task. %s" % ", ".join(inner)
 
 
-# TODO, HACK
 def replace_variables(cmd, host_name, version):
   if cmd:
     cmd = cmd.replace("{{host_name}}", "{host_name}")
@@ -42,6 +72,17 @@ def replace_variables(cmd, host_name, version):
   return cmd
 
 
+def resolve_ambari_config():
+  config_path = os.path.abspath(AmbariConfig.getConfigFile())
+  try:
+    if os.path.exists(config_path):
+      agent_config.read(config_path)
+    else:
+      raise Exception("No config found at %s" % str(config_path))
+  except Exception, err:
+    Logger.warn(err)
+
+
 class ExecuteUpgradeTasks(Script):
   """
   This script is a part of Rolling Upgrade workflow and is described at
@@ -51,86 +92,58 @@ class ExecuteUpgradeTasks(Script):
   """
 
   def actionexecute(self, env):
-    # Parse parameters
+    resolve_ambari_config()
+
+    # Parse parameters from command json file.
     config = Script.get_config()
 
-    # TODO Rolling Upgrade, should be retrieved from the command.
     host_name = socket.gethostname()
-    version = "2.2.0.0"
+    version = default('/roleParams/version', None)
 
-    # TODO Rolling Upgrade, does this work on Ubuntu?
-    code, out = checked_call("hdp-select")
-    if code == 0 and out:
-      p = re.compile(r"(2\.2\.0\.0\-\d{4})")
-      m = p.search(out)
-      if m and len(m.groups()) == 1:
-        version = m.group(1)
+    # These 2 variables are optional
+    service_package_folder = default('/roleParams/service_package_folder', None)
+    hooks_folder = default('/roleParams/hooks_folder', None)
 
     tasks = json.loads(config['roleParams']['tasks'])
     if tasks:
       for t in tasks:
-        Logger.info("Task: %s" % str(t))
-        command = t["command"] if "command" in t else None
-        first = t["first"] if "first" in t else None
-        unless = t["unless"] if "unless" in t else None
-        on_failure = t["onfailure"] if "onfailure" in t else None
-
-        # Run at most x times
-        upto = None
-        try:
-          upto = int(t["upto"]) if "upto" in t else None
-        except ValueError, e:
-          Logger.warning("Could not retrieve 'upto' value from task.")
-
-        # If upto is set, repeat every x seconds
-        every = int(t["every"]) if "every" in t and upto else 0
-        if every < 0:
-          every = 0
-        effective_times = upto if upto else 1
-
-        # Set of return codes to ignore
-        ignore_return_codes = t["ignore"] if "ignore" in t else set()
-        if ignore_return_codes:
-          ignore_return_codes = set([int(e) for e in ignore_return_codes.split(",")])
-
-        if command:
-          command = replace_variables(command, host_name, version)
-          first = replace_variables(first, host_name, version)
-          unless = replace_variables(unless, host_name, version)
-
-          if first:
-            code, out = call(first)
-            Logger.info("Pre-condition command. Code: %s, Out: %s" % (str(code), str(out)))
-            if code != 0:
-              break
-
-          if unless:
-            code, out = call(unless)
-            Logger.info("Unless command. Code: %s, Out: %s" % (str(code), str(out)))
-            if code == 0:
-              break
-
-          for i in range(1, effective_times+1):
-            # TODO, Execute already has a tries and try_sleep, see hdfs_namenode.py for an example
-            code, out = call(command)
-            Logger.info("Command. Code: %s, Out: %s" % (str(code), str(out)))
-
-            if code == 0 or code in ignore_return_codes:
-              break
-
-            if i == effective_times:
-              err_msg = Logger.filter_text("Execution of '%s' returned %d. %s" % (command, code, out))
-              try:
-                if on_failure:
-                  on_failure = replace_variables(on_failure, host_name, version)
-                  code_failure_handler, out_failure_handler = call(on_failure)
-                  Logger.error("Failure Handler. Code: %s, Out: %s" % (str(code_failure_handler), str(out_failure_handler)))
-              except:
-                pass
-              raise Fail(err_msg)
-
-            if upto:
-              time.sleep(every)
+        task = ExecuteTask(t)
+        Logger.info(str(task))
+
+        # If a (script, function) exists, it overwrites the command.
+        if task.script and task.function and service_package_folder and hooks_folder:
+          file_cache = FileCache(agent_config)
+          command_paths = {"commandParams":
+                                 {"service_package_folder": service_package_folder,
+                                  "hooks_folder": hooks_folder
+                                 }
+                              }
+          server_url_prefix = default('/hostLevelParams/jdk_location', "")
+          base_dir = file_cache.get_service_base_dir(command_paths, server_url_prefix)
+          script_path = os.path.join(base_dir, task.script)
+          if not os.path.exists(script_path):
+            message = "Script %s does not exist" % str(script_path)
+            raise Fail(message)
+
+          # Notice that the script_path is now the fully qualified path, and the
+          # same command-#.json file is used.
+          command_params = ["python",
+                            script_path,
+                            task.function,
+                            self.command_data_file,
+                            self.basedir,
+                            self.stroutfile,
+                            self.logging_level,
+                            Script.get_tmp_dir()]
+
+          task.command = " ".join(command_params)
+
+        if task.command:
+          task.command = replace_variables(task.command, host_name, version)
+          code, out = call(task.command)
+          Logger.info("Command: %s\nCode: %s, Out: %s" % (task.command, str(code), str(out)))
+          if code != 0:
+            raise Fail(out)
 
 if __name__ == "__main__":
   ExecuteUpgradeTasks().execute()

+ 6 - 48
ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml

@@ -114,17 +114,16 @@
     </group>
 
     <group xsi:type="cluster" name="POST_CLUSTER" title="Finalize Upgrade">
-      <!--
       <execute-stage title="Confirm Finalize">
         <task xsi:type="manual">
           <message>Please confirm you are ready to finalize</message>
         </task>
       </execute-stage>
-      -->
+
       <execute-stage service="HDFS" component="NAMENODE" title="Execute HDFS Finalize">
         <task xsi:type="execute" hosts="master">
-          <first>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade finalize'</first>
-          <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
+          <script>scripts/namenode.py</script>
+          <function>finalize_rolling_upgrade</function>
         </task>
       </execute-stage>
       <execute-stage title="Save Cluster State" service="" component="">
@@ -133,8 +132,7 @@
       </execute-stage>
     </group>
   </order>
-  
-  
+
   <processing>
     <service name="ZOOKEEPER">
       <component name="ZOOKEEPER_SERVER">
@@ -169,47 +167,9 @@
     <service name="HDFS">
       <component name="NAMENODE">
         <pre-upgrade>
-          <!-- Backup the image,
-          Enter Safemode if not already in it,
-
-          $ su hdfs -c 'hdfs dfsadmin -safemode get'
-          Safe mode is OFF
-
-          $ su hdfs -c 'hdfs dfsadmin -safemode enter'
-          Safe mode is OFF
-
-          $ su hdfs -c 'hdfs dfsadmin -rollingUpgrade prepare'
-          PREPARE rolling upgrade ...
-          Proceed with rolling upgrade:
-          Block Pool ID: BP-1917654970-192.168.64.107-1416527263491
-          Start Time: Fri Nov 21 22:31:03 UTC 2014 (=1416609063176)
-          Finalize Time: <NOT FINALIZED>
-
-          $ su hdfs -c 'hdfs dfsadmin -rollingUpgrade query'
-          QUERY rolling upgrade ...
-          Proceed with rolling upgrade:
-          Block Pool ID: BP-1917654970-192.168.64.107-1416527263491
-          Start Time: Sat Nov 22 02:44:21 UTC 2014 (=1416624261594)
-          Finalize Time: <NOT FINALIZED>
-
-          This should be the last thing called on each service once the user decides to commit to finalizing the entire upgrade.
-          $ su hdfs -c 'hdfs dfsadmin -rollingUpgrade finalize'
-          FINALIZE rolling upgrade ...
-          There is no rolling upgrade in progress or rolling upgrade has already been finalized.
-          -->
-          <task xsi:type="execute" hosts="master">
-            <first>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode get | grep "Safe mode is ON in {{host_name}}"'</first>
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
-            <upto>10</upto>
-            <every>6</every>
-          </task>
-
           <task xsi:type="execute" hosts="master">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade prepare'</command>
-          </task>
-
-          <task xsi:type="execute" hosts="master">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade query'</command>
+            <script>scripts/namenode.py</script>
+            <function>prepare_rolling_upgrade</function>
           </task>
         </pre-upgrade>
 
@@ -235,7 +195,6 @@
           <task xsi:type="restart" />
         </upgrade>
       </component>
-
     </service>
 
     <service name="MAPREDUCE2">
@@ -384,6 +343,5 @@
         </upgrade>
       </component>
     </service>
-
   </processing>
 </upgrade>

+ 9 - 10
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java

@@ -45,22 +45,15 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
 import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
-import org.apache.ambari.server.state.AgentVersion;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.Config;
-import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.Host;
-import org.apache.ambari.server.state.HostConfig;
-import org.apache.ambari.server.state.HostEvent;
-import org.apache.ambari.server.state.HostHealthStatus;
-import org.apache.ambari.server.state.HostState;
-import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.ServiceOsSpecific;
 import org.apache.ambari.server.state.StackId;
-import org.apache.ambari.server.state.StackInfo;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.junit.After;
 import org.junit.Before;
@@ -103,6 +96,7 @@ public class ClusterStackVersionResourceProviderTest {
   private Injector injector;
   private AmbariMetaInfo ambariMetaInfo;
   private RepositoryVersionDAO repositoryVersionDAOMock;
+  private ConfigHelper configHelper;
 
   private String operatingSystemsJson = "[\n" +
           "   {\n" +
@@ -126,6 +120,7 @@ public class ClusterStackVersionResourceProviderTest {
   public void setup() throws Exception {
     // Create instances of mocks
     repositoryVersionDAOMock = createNiceMock(RepositoryVersionDAO.class);
+    configHelper = createNiceMock(ConfigHelper.class);
     // Initialize injector
     InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
     injector = Guice.createInjector(Modules.override(module).with(new MockModule()));
@@ -178,6 +173,9 @@ public class ClusterStackVersionResourceProviderTest {
 
     AbstractControllerResourceProvider.init(resourceProviderFactory);
 
+    Map<String, Map<String, String>> hostConfigTags = new HashMap<String, Map<String, String>>();
+    expect(configHelper.getEffectiveDesiredTags(anyObject(ClusterImpl.class), anyObject(String.class))).andReturn(hostConfigTags);
+
     expect(managementController.getClusters()).andReturn(clusters).anyTimes();
     expect(managementController.getAmbariMetaInfo()).andReturn(ambariMetaInfo).anyTimes();
     expect(managementController.getAuthName()).andReturn("admin").anyTimes();
@@ -204,7 +202,7 @@ public class ClusterStackVersionResourceProviderTest {
 
     // replay
     replay(managementController, response, clusters, resourceProviderFactory, csvResourceProvider,
-            cluster, repositoryVersionDAOMock, sch, actionManager);
+        cluster, repositoryVersionDAOMock, configHelper, sch, actionManager);
 
     ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
         type,
@@ -240,6 +238,7 @@ public class ClusterStackVersionResourceProviderTest {
     @Override
     protected void configure() {
       bind(RepositoryVersionDAO.class).toInstance(repositoryVersionDAOMock);
+      bind(ConfigHelper.class).toInstance(configHelper);
     }
   }
 }

+ 9 - 1
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostStackVersionResourceProviderTest.java

@@ -42,12 +42,14 @@ import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.ServiceOsSpecific;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -81,6 +83,7 @@ public class HostStackVersionResourceProviderTest {
   private AmbariMetaInfo ambariMetaInfo;
   private RepositoryVersionDAO repositoryVersionDAOMock;
   private HostVersionDAO hostVersionDAOMock;
+  private ConfigHelper configHelper;
 
   private String operatingSystemsJson = "[\n" +
           "   {\n" +
@@ -105,6 +108,7 @@ public class HostStackVersionResourceProviderTest {
     // Create instances of mocks
     repositoryVersionDAOMock = createNiceMock(RepositoryVersionDAO.class);
     hostVersionDAOMock = createNiceMock(HostVersionDAO.class);
+    configHelper = createNiceMock(ConfigHelper.class);
     // Initialize injector
     InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
     injector = Guice.createInjector(Modules.override(module).with(new MockModule()));
@@ -156,6 +160,9 @@ public class HostStackVersionResourceProviderTest {
 
     AbstractControllerResourceProvider.init(resourceProviderFactory);
 
+    Map<String, Map<String, String>> hostConfigTags = new HashMap<String, Map<String, String>>();
+    expect(configHelper.getEffectiveDesiredTags(anyObject(ClusterImpl.class), anyObject(String.class))).andReturn(hostConfigTags);
+
     expect(managementController.getClusters()).andReturn(clusters).anyTimes();
     expect(managementController.getAmbariMetaInfo()).andReturn(ambariMetaInfo).anyTimes();
     expect(managementController.getActionManager()).andReturn(actionManager).anyTimes();
@@ -186,7 +193,7 @@ public class HostStackVersionResourceProviderTest {
 
     // replay
     replay(managementController, response, clusters, resourceProviderFactory, csvResourceProvider,
-            cluster, repositoryVersionDAOMock, sch, actionManager, hostVersionEntityMock, hostVersionDAOMock);
+            cluster, repositoryVersionDAOMock, configHelper, sch, actionManager, hostVersionEntityMock, hostVersionDAOMock);
 
     ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
         type,
@@ -224,6 +231,7 @@ public class HostStackVersionResourceProviderTest {
     protected void configure() {
       bind(RepositoryVersionDAO.class).toInstance(repositoryVersionDAOMock);
       bind(HostVersionDAO.class).toInstance(hostVersionDAOMock);
+      bind(ConfigHelper.class).toInstance(configHelper);
     }
   }
 }

+ 1 - 1
ambari-web/app/messages.js

@@ -1335,7 +1335,7 @@ Em.I18n.translations = {
   'admin.stackUpgrade.dialog.keepRunning': "Keep running Upgrade in background",
   'admin.stackUpgrade.dialog.failed': "Failed on:",
   'admin.stackUpgrade.dialog.manual': "Manual steps required",
-  'admin.stackUpgrade.dialog.manualDone': "I have preformed the manual steps above.",
+  'admin.stackUpgrade.dialog.manualDone': "I have performed the manual steps above.",
   'admin.stackUpgrade.dialog.closeProgress': "Upgrade is in progress. \n If you dismiss this window, Upgrade will keep running in background.",
   'admin.stackUpgrade.dialog.closePause': "Upgrade is paused. \n If you dismiss this window, you can resume Upgrade later.",
   'admin.stackUpgrade.preupgradeCheck.header': "Upgrade to {0}",