Browse Source

AMBARI-8450. Rolling Upgrade - Flush upgrade pack for HDFS and ZK (alejandro)

Alejandro Fernandez 10 năm trước cách đây
mục cha
commit
6b7a1c9b0d
19 tập tin đã thay đổi với 448 bổ sung34 xóa
  1. 2 2
      ambari-common/src/main/python/resource_management/core/shell.py
  2. 58 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
  3. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
  4. 39 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
  5. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
  6. 91 6
      ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
  7. 21 3
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/hdfs_namenode.py
  8. 7 0
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py
  9. 7 7
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/files/zkSmoke.sh
  10. 2 2
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/scripts/params.py
  11. 2 2
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/scripts/service_check.py
  12. 19 0
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/scripts/zookeeper_service.py
  13. 138 7
      ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
  14. 16 0
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
  15. 6 0
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py
  16. 14 2
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
  17. 6 0
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_snamenode.py
  18. 4 0
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_zkfc.py
  19. 14 0
      ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py

+ 2 - 2
ambari-common/src/main/python/resource_management/core/shell.py

@@ -31,9 +31,9 @@ from exceptions import Fail
 from exceptions import ExecuteTimeoutException
 from resource_management.core.logger import Logger
 
-def checked_call(command, logoutput=False, 
+def checked_call(command, logoutput=False, throw_on_failure=True, 
          cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False):
-  return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo)
+  return _call(command, logoutput, throw_on_failure, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo)
 
 def call(command, logoutput=False, 
          cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False):

+ 58 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java

@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.regex.*;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.StaticallyInject;
@@ -57,6 +58,7 @@ import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.StackId;
@@ -68,6 +70,8 @@ import org.apache.ambari.server.utils.StageUtils;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manages the ability to start and get status of upgrades.
@@ -100,6 +104,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
   private static Provider<AmbariActionExecutionHelper> actionExecutionHelper;
   @Inject
   private static Provider<AmbariCustomCommandExecutionHelper> commandExecutionHelper;
+  @Inject
+  private ConfigHelper configHelper;
 
   static {
     // properties
@@ -113,6 +119,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     KEY_PROPERTY_IDS.put(Resource.Type.Cluster, UPGRADE_CLUSTER_NAME);
   }
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(UpgradeResourceProvider.class);
+
   /**
    * Constructor.
    *
@@ -268,6 +277,46 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     return up;
   }
 
+  /**
+   * Inject variables into the list of {@link org.apache.ambari.server.orm.entities.UpgradeItemEntity} items, whose
+   * tasks may use strings like {{configType/propertyName}} that need to be retrieved from the properties.
+   * @param configHelper Configuration Helper
+   * @param cluster Cluster
+   * @param items Collection of items whose tasks will be injected.
+   * @return Return the collection of items with the injected properties.
+   */
+  private List<UpgradeItemEntity> injectVariables(ConfigHelper configHelper, Cluster cluster, List<UpgradeItemEntity> items) {
+    final String regexp = "(\\{\\{.*?\\}\\})";
+
+    for (UpgradeItemEntity upgradeItem : items) {
+      String task = upgradeItem.getTasks();
+      if (task != null && !task.isEmpty()) {
+        Matcher m = Pattern.compile(regexp).matcher(task);
+        while(m.find()) {
+          String origVar = m.group(1);
+          String formattedVar = origVar.substring(2, origVar.length() - 2).trim();
+
+          int posConfigFile = formattedVar.indexOf("/");
+          if (posConfigFile > 0) {
+            String configType = formattedVar.substring(0, posConfigFile);
+            String propertyName = formattedVar.substring(posConfigFile + 1, formattedVar.length());
+            try {
+              // TODO, some properties use 0.0.0.0 to indicate the current host.
+              // Right now, ru_execute_tasks.py is responsible for replacing 0.0.0.0 with the hostname.
+
+              String configValue = configHelper.getPropertyValueFromStackDefinitions(cluster, configType, propertyName);
+              task = task.replace(origVar, configValue);
+            } catch (Exception err) {
+              LOG.error(String.format("Exception trying to retrieve property %s/%s. Error: %s", configType, propertyName, err.getMessage()));
+            }
+          }
+        }
+        upgradeItem.setTasks(task);
+      }
+    }
+    return items;
+  }
+
   private UpgradeEntity createUpgrade(UpgradePack pack, Map<String, Object> requestMap)
     throws AmbariException {
 
@@ -278,6 +327,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     }
 
     Cluster cluster = getManagementController().getClusters().getCluster(clusterName);
+    ConfigHelper configHelper = getManagementController().getConfigHelper();
     Map<String, Service> clusterServices = cluster.getServices();
 
     Map<String, Map<String, ProcessingComponent>> tasks = pack.getTasks();
@@ -341,25 +391,32 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       items.add(holder.upgradeItemEntity);
     }
 
+    // This should be the last thing just before finalizing
     for (StageHolder holder : postUpgrades) {
       holder.upgradeItemEntity.setHosts(gson.toJson(holder.hosts));
       holder.upgradeItemEntity.setTasks(gson.toJson(holder.taskHolder.tasks));
       items.add(holder.upgradeItemEntity);
     }
 
+    items = injectVariables(configHelper, cluster, items);
+
     entity.setClusterId(Long.valueOf(cluster.getClusterId()));
     entity.setUpgradeItems(items);
 
     RequestStageContainer req = createRequest((String) requestMap.get(UPGRADE_VERSION));
 
+    // All of the Pre-Upgrades occur first, potentially in several stages.
+    // Should include things like entering safe mode, backing up data, changing the version using hdp-select, etc.
     for (StageHolder holder : preUpgrades) {
       createUpgradeTaskStage(cluster, req, holder);
     }
 
+    // The restart occurs after all of the Pre-Upgrades are done, and is meant to change the pointers and configs.
     for (StageHolder holder : restart) {
       createRestartStage(cluster, req, holder);
     }
 
+    // Post-Upgrades require the user to click on the "Finalize" button.
     for (StageHolder holder : postUpgrades) {
       createUpgradeTaskStage(cluster, req, holder);
     }
@@ -534,7 +591,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
 
     // !!! TODO when the custom action is underway, change this
     Map<String, String> params = new HashMap<String, String>();
-    params.put("tasks", "TheTaskInfo");
+    params.put("tasks", holder.upgradeItemEntity.getTasks());
 
     ActionExecutionContext actionContext = new ActionExecutionContext(
         cluster.getClusterName(), "ru_execute_tasks",

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java

@@ -467,7 +467,7 @@ public class ConfigHelper {
     return result;
   }
   
-  public String getPropertyValueFromStackDefenitions(Cluster cluster, String configType, String propertyName) throws AmbariException {
+  public String getPropertyValueFromStackDefinitions(Cluster cluster, String configType, String propertyName) throws AmbariException {
     StackId stackId = cluster.getCurrentStackVersion();
     StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(),
         stackId.getStackVersion());

+ 39 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java

@@ -35,9 +35,48 @@ public class ExecuteTask extends Task {
   @XmlTransient
   private Task.Type type = Task.Type.EXECUTE;
 
+  /**
+   * Command to run under normal conditions.
+   */
   @XmlElement(name="command")
   public String command;
 
+  /**
+   * Run the command only if this condition is first met.
+   */
+  @XmlElement(name="first")
+  public String first;
+
+  /**
+   * Run the command unless this condition is met.
+   */
+  @XmlElement(name="unless")
+  public String unless;
+
+  /**
+   * Command to run if a failure occurs.
+   */
+  @XmlElement(name="onfailure")
+  public String onfailure;
+
+  /**
+   * Run the command up to x times, until it succeeds.
+   */
+  @XmlElement(name="upto")
+  public String upto;
+
+  /**
+   * If "until" is specified, then sleep this many seconds between attempts.
+   */
+  @XmlElement(name="every")
+  public String every;
+
+  /**
+   * Comma delimited list of return codes to ignore
+   */
+  @XmlElement(name="ignore")
+  public String ignore;
+
   @Override
   public Task.Type getType() {
     return type;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java

@@ -256,7 +256,7 @@ public abstract class AbstractUpgradeCatalog implements UpgradeCatalog {
         Map<String, String> properties = new HashMap<String, String>();
         
         for(String propertyName:propertyNames) {
-          String propertyValue = configHelper.getPropertyValueFromStackDefenitions(cluster, configType, propertyName);
+          String propertyValue = configHelper.getPropertyValueFromStackDefinitions(cluster, configType, propertyName);
           
           if(propertyValue == null) {
             LOG.info("Config " + propertyName + " from " + configType + " is not found in xml definitions." +

+ 91 - 6
ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py

@@ -21,11 +21,24 @@ Ambari Agent
 """
 
 import json
-import sys
-import traceback
+import socket
+import re
+import time
+
 from resource_management import *
+from resource_management.core.shell import checked_call
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
 from resource_management.libraries.functions.list_ambari_managed_repos import *
-from ambari_commons.os_check import OSCheck, OSConst
+
+
+# TODO, HACK
+def replace_variables(cmd, host_name, version):
+  if cmd:
+    cmd = cmd.replace("0.0.0.0", "{host_name}")
+    cmd = cmd.replace("{{version}}", "{version}")
+    cmd = format(cmd)
+  return cmd
 
 
 class ExecuteUpgradeTasks(Script):
@@ -37,13 +50,85 @@ class ExecuteUpgradeTasks(Script):
   """
 
   def actionexecute(self, env):
-
     # Parse parameters
     config = Script.get_config()
-    #tasks = json.loads(config['roleParams']['tasks'])
 
-    print str(config)
+    # TODO HACK, should be retrieved from the command.
+    host_name = socket.gethostname()
+    version = "2.2.0.0"
+
+    code, out = checked_call("hdp-select")
+    if code == 0 and out:
+      p = re.compile(r"(2\.2\.0\.0\-\d{4})")
+      m = p.search(out)
+      if m and len(m.groups()) == 1:
+        version = m.group(1)
+
+    tasks = json.loads(config['roleParams']['tasks'])
+    if tasks:
+      for t in tasks:
+        Logger.info("Task: %s" % str(t))
+        command = t["command"] if "command" in t else None
+        first = t["first"] if "first" in t else None
+        unless = t["unless"] if "unless" in t else None
+        on_failure = t["onfailure"] if "onfailure" in t else None
+
+        # Run at most x times
+        upto = None
+        try:
+          upto = int(t["upto"]) if "upto" in t else None
+        except ValueError, e:
+          Logger.warning("Could not retrieve 'upto' value from task.")
+
+        # If upto is set, repeat every x seconds
+        every = int(t["every"]) if "every" in t and upto else 0
+        if every < 0:
+          every = 0
+        effective_times = upto if upto else 1
+
+        # Set of return codes to ignore
+        ignore_return_codes = t["ignore"] if "ignore" in t else set()
+        if ignore_return_codes:
+          ignore_return_codes = set([int(e) for e in ignore_return_codes.split(",")])
+
+        if command:
+          command = replace_variables(command, host_name, version)
+          first = replace_variables(first, host_name, version)
+          unless = replace_variables(unless, host_name, version)
+
+          if first:
+            code, out = checked_call(first, throw_on_failure=False)
+            Logger.info("Pre-condition command. Code: %s, Out: %s" % (str(code), str(out)))
+            if code != 0:
+              break
+
+          if unless:
+            code, out = checked_call(unless, throw_on_failure=False)
+            Logger.info("Unless command. Code: %s, Out: %s" % (str(code), str(out)))
+            if code == 0:
+              break
+
+          for i in range(1, effective_times+1):
+            # TODO, Execute already has a tries and try_sleep, see hdfs_namenode.py for an example
+            code, out = checked_call(command, throw_on_failure=False)
+            Logger.info("Command. Code: %s, Out: %s" % (str(code), str(out)))
+
+            if code == 0 or code in ignore_return_codes:
+              break
+
+            if i == effective_times:
+              err_msg = Logger.get_protected_text("Execution of '%s' returned %d. %s" % (command, code, out))
+              try:
+                if on_failure:
+                  on_failure = replace_variables(on_failure, host_name, version)
+                  code_failure_handler, out_failure_handler = checked_call(on_failure, throw_on_failure=False)
+                  Logger.error("Failure Handler. Code: %s, Out: %s" % (str(code_failure_handler), str(out_failure_handler)))
+              except:
+                pass
+              raise Fail(err_msg)
 
+            if upto:
+              time.sleep(every)
 
 if __name__ == "__main__":
   ExecuteUpgradeTasks().execute()

+ 21 - 3
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/hdfs_namenode.py

@@ -17,6 +17,7 @@ limitations under the License.
 
 """
 
+from resource_management.core.shell import checked_call
 from resource_management import *
 from utils import service
 
@@ -50,6 +51,11 @@ def namenode(action=None, do_format=True):
       create_pid_dir=True,
       create_log_dir=True
     )
+
+    if params.security_enabled:
+      Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+              user = params.hdfs_user)
+
     if params.dfs_ha_enabled:
       dfs_check_nn_status_cmd = format("su -s /bin/bash - {hdfs_user} -c 'export PATH=$PATH:{hadoop_bin_dir} ; hdfs --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active > /dev/null'")
     else:
@@ -57,9 +63,21 @@ def namenode(action=None, do_format=True):
 
     namenode_safe_mode_off = format("su -s /bin/bash - {hdfs_user} -c 'export PATH=$PATH:{hadoop_bin_dir} ; hdfs --config {hadoop_conf_dir} dfsadmin -safemode get' | grep 'Safe mode is OFF'")
 
-    if params.security_enabled:
-      Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
-              user = params.hdfs_user)
+    # If HA is enabled and it is in standby, then stay in safemode, otherwise, leave safemode.
+    leave_safe_mode = True
+    if dfs_check_nn_status_cmd is not None:
+      code, out = shell.checked_call(dfs_check_nn_status_cmd, throw_on_failure=False)
+      if code != 0:
+        leave_safe_mode = False
+
+    if leave_safe_mode:
+      # First check if Namenode is not in 'safemode OFF' (equivalent to safemode ON), if so, then leave it
+      code, out = shell.checked_call(namenode_safe_mode_off, throw_on_failure=False)
+      if code != 0:
+        leave_safe_mode_cmd = format("su -s /bin/bash - {hdfs_user} -c 'export PATH=$PATH:{hadoop_bin_dir} ; hdfs --config {hadoop_conf_dir} dfsadmin -safemode leave'")
+        Execute(leave_safe_mode_cmd)
+
+    # Verify if Namenode should be in safemode OFF
     Execute(namenode_safe_mode_off,
             tries=40,
             try_sleep=10,

+ 7 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py

@@ -101,6 +101,13 @@ def service(action=None, name=None, user=None, create_pid_dir=False,
   Execute(daemon_cmd,
           not_if=service_is_up
   )
+
+  #After performing the desired action, perform additional tasks.
+  if action == "start":
+    Execute("hdfs dfsadmin -report -live",
+            user=params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+    )
+
   if action == "stop":
     File(pid_file,
          action="delete",

+ 7 - 7
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/files/zkSmoke.sh

@@ -20,7 +20,7 @@
 #
 #
 
-smoke_script=$1
+zk_cli_shell=$1
 smoke_user=$2
 conf_dir=$3
 client_port=$4
@@ -51,17 +51,17 @@ function verify_output() {
 }
 
 # Delete /zk_smoketest znode if exists
-su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ;  echo delete /zk_smoketest | ${smoke_script} -server $zk_node1:$client_port" 2>&1>$test_output_file
+su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ;  echo delete /zk_smoketest | ${zk_cli_shell} -server $zk_node1:$client_port" 2>&1>$test_output_file
 # Create /zk_smoketest znode on one zookeeper server
-su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo create /zk_smoketest smoke_data | ${smoke_script} -server $zk_node1:$client_port" 2>&1>>$test_output_file
+su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo create /zk_smoketest smoke_data | ${zk_cli_shell} -server $zk_node1:$client_port" 2>&1>>$test_output_file
 verify_output
 
 for i in $zkhosts ; do
   echo "Running test on host $i"
   # Verify the data associated with znode across all the nodes in the zookeeper quorum
-  su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'get /zk_smoketest' | ${smoke_script} -server $i:$client_port"
-  su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'ls /' | ${smoke_script} -server $i:$client_port"
-  output=$(su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'get /zk_smoketest' | ${smoke_script} -server $i:$client_port")
+  su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'get /zk_smoketest' | ${zk_cli_shell} -server $i:$client_port"
+  su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'ls /' | ${zk_cli_shell} -server $i:$client_port"
+  output=$(su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'get /zk_smoketest' | ${zk_cli_shell} -server $i:$client_port")
   echo $output | grep smoke_data
   if [[ $? -ne 0 ]] ; then
     echo "Data associated with znode /zk_smoketests is not consistent on host $i"
@@ -69,7 +69,7 @@ for i in $zkhosts ; do
   fi
 done
 
-su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'delete /zk_smoketest' | ${smoke_script} -server $zk_node1:$client_port"
+su -s /bin/bash - $smoke_user -c "source $conf_dir/zookeeper-env.sh ; echo 'delete /zk_smoketest' | ${zk_cli_shell} -server $zk_node1:$client_port"
 if [[ "$ZOOKEEPER_EXIT_CODE" -ne "0" ]] ; then
   echo "Zookeeper Smoke Test: Failed" 
 else

+ 2 - 2
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/scripts/params.py

@@ -34,11 +34,11 @@ hdp_stack_version = format_hdp_stack_version(hdp_stack_version)
 if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
   zk_home = '/usr/hdp/current/zookeeper-client'
   zk_bin = '/usr/hdp/current/zookeeper-client/bin'
-  smoke_script = '/usr/hdp/current/zookeeper-client/bin/zkCli.sh'
+  zk_cli_shell = '/usr/hdp/current/zookeeper-client/bin/zkCli.sh'
 else:
   zk_home = '/usr'
   zk_bin = '/usr/lib/zookeeper/bin'
-  smoke_script = "/usr/lib/zookeeper/bin/zkCli.sh"
+  zk_cli_shell = "/usr/lib/zookeeper/bin/zkCli.sh"
 
 config_dir = "/etc/zookeeper/conf"
 zk_user =  config['configurations']['zookeeper-env']['zk_user']

+ 2 - 2
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/scripts/service_check.py

@@ -31,11 +31,11 @@ class ZookeeperServiceCheck(Script):
          content=StaticFile('zkSmoke.sh')
     )
 
-    cmd_qourum = format("{tmp_dir}/zkSmoke.sh {smoke_script} {smokeuser} {config_dir} {clientPort} "
+    cmd_quorum = format("{tmp_dir}/zkSmoke.sh {zk_cli_shell} {smokeuser} {config_dir} {clientPort} "
                   "{security_enabled} {kinit_path_local} {smokeUserKeytab}",
                   smokeUserKeytab=params.smoke_user_keytab if params.security_enabled else "no_keytab")
 
-    Execute(cmd_qourum,
+    Execute(cmd_quorum,
             tries=3,
             try_sleep=5,
             path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',

+ 19 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/ZOOKEEPER/package/scripts/zookeeper_service.py

@@ -33,6 +33,25 @@ def zookeeper_service(action='start'):
             not_if=no_op_test,
             user=params.zk_user
     )
+
+    if params.security_enabled:
+      kinit_cmd = format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser};")
+
+      Execute(kinit_cmd,
+              user=params.smokeuser
+      )
+
+    # Ensure that a quorum is still formed.
+    create_command = format("echo 'create /zk_test mydata' | {zk_cli_shell}")
+    list_command = format("echo 'ls /' | {zk_cli_shell}")
+    delete_command = format("echo 'delete /zk_test ' | {zk_cli_shell}")
+    Execute(create_command,
+            user=params.smokeuser)
+    Execute(list_command,
+            user=params.smokeuser)
+    Execute(delete_command,
+            user=params.smokeuser)
+
   elif action == 'stop':
     daemon_cmd = format("source {config_dir}/zookeeper-env.sh ; {cmd} stop")
     rm_pid = format("rm -f {zk_pid_file}")

+ 138 - 7
ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml

@@ -15,6 +15,20 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
+
+<!-- Sample Usage of tasks.
+<task xsi:type="execute">
+  <command>echo 'Hello World'</command>
+</task>
+<task xsi:type="configure">
+  <key>prop1</key>
+  <value>value1</value>
+</task>
+<task xsi:type="manual">
+  <message>Please perform the following manual step</message>
+</task>
+-->
+
 <upgrade xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <target>2.2.*.*</target>
   <order>
@@ -23,9 +37,9 @@
       <component>ZOOKEEPER_CLIENT</component>
     </service>
     <service name="HDFS">
-      <component>JOURNALNODE</component>
       <component>NAMENODE</component>
       <component>DATANODE</component>
+      <component>JOURNALNODE</component>
     </service>
   </order>
   <processing>
@@ -34,35 +48,152 @@
         <batch xsi:type="count">
           <count>1</count>
         </batch>
+        <!-- TODO, optimization
+        <pre-upgrade>
+          Find the leader by running
+          echo stat | nc localhost 2181
+          on the ZK nodes until one of them replies with a value (standalone or replicated).
+          Store that leader, and perform the upgrade on the leader last, this is only an optimization and is optional.
+        </pre-upgrade>
+        -->
+
+        <!-- ZK Server Restart (or Start, implicitly) must do the following:
+        Before continuing to the next ZK host, make sure that a quorum is established.
+        Start the shell, /usr/hdp/current/zookeeper-client/bin/zkCli.sh
+        Then run,
+        $ create /zk_test mydata
+        $ ls /
+        [hiveserver2, zookeeper, zk_test]
+
+        Finally, delete it,
+        $ delete /zk_test
+
+        $ quit
+        -->
       </component>
     </service>
     <service name="HDFS">
       <component name="NAMENODE">
         <pre-upgrade>
+          <!-- Backup the image,
+          Enter Safemode if not already in it,
+
+          $ su hdfs -c 'hdfs dfsadmin -safemode get'
+          Safe mode is OFF
+
+          $ su hdfs -c 'hdfs dfsadmin -safemode enter'
+          Safe mode is OFF
+
+          $ su hdfs -c 'hdfs dfsadmin -rollingUpgrade prepare'
+          PREPARE rolling upgrade ...
+          Proceed with rolling upgrade:
+          Block Pool ID: BP-1917654970-192.168.64.107-1416527263491
+          Start Time: Fri Nov 21 22:31:03 UTC 2014 (=1416609063176)
+          Finalize Time: <NOT FINALIZED>
+
+          $ su hdfs -c 'hdfs dfsadmin -rollingUpgrade query'
+          QUERY rolling upgrade ...
+          Proceed with rolling upgrade:
+          Block Pool ID: BP-1917654970-192.168.64.107-1416527263491
+          Start Time: Sat Nov 22 02:44:21 UTC 2014 (=1416624261594)
+          Finalize Time: <NOT FINALIZED>
+
+          This should be the last thing called on each service once the user decides to commit to finalizing the entire upgrade.
+          $ su hdfs -c 'hdfs dfsadmin -rollingUpgrade finalize'
+          FINALIZE rolling upgrade ...
+          There is no rolling upgrade in progress or rolling upgrade has already been finalized.
+          -->
+          <task xsi:type="execute">
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode enter'</command>
+            <upto>10</upto>
+            <every>6</every>
+          </task>
+
           <task xsi:type="execute">
-            <command>su - {hdfs-user} -c 'hdfs dfsadmin -rollingUpgrade prepare'</command>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade prepare'</command>
+            <onfailure>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</onfailure>   <!-- TODO, stay in safemode if in HA. -->
           </task>
+
+          <task xsi:type="execute">
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade query'</command>
+            <onfailure>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</onfailure>   <!-- TODO, stay in safemode if in HA. -->
+          </task>
+
+          <!-- Apparently, the HDFS Namenode restart requires safemode to be OFF when not in HA. -->
           <task xsi:type="execute">
-            <command>su - {hdfs-user} -c 'hdfs dfsadmin -rollingUpgrade query'</command>
-            <until>Proceed with rolling upgrade</until>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
+            <upto>60</upto>
+            <every>1</every>
           </task>
         </pre-upgrade>
+
+        <!-- This step should be done once the user clicks on the "Finalize" button. So the name post-upgrade is misleading. -->
         <post-upgrade>
           <task xsi:type="execute">
-            <command>su - {hdfs-user} -c 'hdfs dfsadmin -rollingUpgrade finalize'</command>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade finalize'</command>
+          </task>
+          <task xsi:type="execute">
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>       <!-- TODO, stay in safemode if in HA. -->
           </task>
         </post-upgrade>
       </component>
+
       <component name="DATANODE">
         <batch xsi:type="percent">
           <percent>20</percent>
         </batch>
-        <upgrade>
+        <pre-upgrade>
+          <!-- Shutdown the datanode,
+
+          Will retry 50 times.
+          Property dfs.datanode.ipc.address = 0.0.0.0:8010 needs to evaluate to current host.
+          $ su hdfs -c 'hdfs dfsadmin -shutdownDatanode <DATANODE_HOST:IPC_PORT> upgrade'
+          E.g.,
+          $ su hdfs -c 'hdfs dfsadmin -shutdownDatanode c6407.ambari.apache.org:8010 upgrade'
+
+          Will retry 50 times.
+          $ su hdfs -c 'hdfs dfsadmin -getDatanodeInfo c6407.ambari.apache.org:8010'
+          Datanode unreachable.
+
+          Change the version,
+          $ hdp-select set hadoop-hdfs-datanode 2.2.0.1-885
+
+          Start the datanode,
+          $ su - hdfs -c '/usr/hdp/current/hadoop-hdfs-datanode/../hadoop/sbin/hadoop-daemon.sh start datanode'
+          starting datanode, logging to /var/log/hadoop/hdfs/hadoop-hdfs-datanode-c6407.ambari.apache.org.out
+
+          Verify it is live,
+          $ su - hdfs -c 'hdfs dfsadmin -report -live'
+          Live datanodes (1):
+          -->
+          <task xsi:type="execute">
+            <first>su {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -getDatanodeInfo {{hdfs-site/dfs.datanode.ipc.address}}'</first>
+            <command>su {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -shutdownDatanode {{hdfs-site/dfs.datanode.ipc.address}} upgrade'</command>
+          </task>
+
+          <!-- After shutting down the datanode, this command is expected to fail with 255, so ignore only that return code. -->
           <task xsi:type="execute">
-            <command>su - {hdfs-user} -c 'hdp-select hadoop-hdfs-datanode {version}'</command>
+            <command>su {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -getDatanodeInfo {{hdfs-site/dfs.datanode.ipc.address}}'</command>
+            <ignore>255</ignore>
           </task>
+
+          <!-- TODO, move this to HDFS Datanode restart. -->
+          <task xsi:type="execute">
+            <command>hdp-select set hadoop-hdfs-datanode {{version}}</command>
+          </task>
+        </pre-upgrade>
+      </component>
+
+      <component name="JOURNALNODE">
+        <!-- Recommended after the Namenode, and only needed when HA is enabled. -->
+        <batch xsi:type="count">
+          <count>1</count>
+        </batch>
+        <upgrade>
+          <!-- TODO -->
         </upgrade>
       </component>
+
     </service>
   </processing>
 </upgrade>

+ 16 - 0
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py

@@ -60,6 +60,10 @@ class TestDatanode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start datanode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute',
+                              'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   @patch("os.path.exists", new = MagicMock(return_value=False))
@@ -130,6 +134,10 @@ class TestDatanode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - root -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start datanode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute',
+                              'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   def test_start_secured_HDP22_root(self):
@@ -165,6 +173,10 @@ class TestDatanode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - root -c \'export HADOOP_LIBEXEC_DIR=/usr/hdp/current/hadoop-client/libexec && /usr/hdp/current/hadoop-client/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start datanode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute',
+                              'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   def test_start_secured_HDP22_non_root_https_only(self):
@@ -203,6 +215,10 @@ class TestDatanode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/hdp/current/hadoop-client/libexec && /usr/hdp/current/hadoop-client/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start datanode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute',
+                              'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   @patch("os.path.exists", new = MagicMock(return_value=False))

+ 6 - 0
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py

@@ -59,6 +59,9 @@ class TestJournalnode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start journalnode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-journalnode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-journalnode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   def test_stop_default(self):
@@ -123,6 +126,9 @@ class TestJournalnode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start journalnode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-journalnode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-journalnode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   def test_stop_secured(self):

+ 14 - 2
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py

@@ -80,6 +80,9 @@ class TestNamenode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start namenode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertResourceCalled('Execute', "su -s /bin/bash - hdfs -c 'export PATH=$PATH:/usr/bin ; hdfs --config /etc/hadoop/conf dfsadmin -safemode get' | grep 'Safe mode is OFF'",
                               tries = 40,
                               only_if = None,
@@ -189,8 +192,11 @@ class TestNamenode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start namenode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertResourceCalled('Execute', '/usr/bin/kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs',
-                              user = 'hdfs',
+                              user='hdfs',
                               )
     self.assertResourceCalled('Execute', "su -s /bin/bash - hdfs -c 'export PATH=$PATH:/usr/bin ; hdfs --config /etc/hadoop/conf dfsadmin -safemode get' | grep 'Safe mode is OFF'",
                               tries = 40,
@@ -281,6 +287,9 @@ class TestNamenode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start namenode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertResourceCalled('Execute', "su -s /bin/bash - hdfs -c 'export PATH=$PATH:/usr/bin ; hdfs --config /etc/hadoop/conf dfsadmin -safemode get' | grep 'Safe mode is OFF'",
                               tries = 40,
                               only_if = "su -s /bin/bash - hdfs -c 'export PATH=$PATH:/usr/bin ; hdfs --config /etc/hadoop/conf haadmin -getServiceState nn1 | grep active > /dev/null'",
@@ -352,8 +361,11 @@ class TestNamenode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start namenode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs',
+                              )
     self.assertResourceCalled('Execute', '/usr/bin/kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs',
-                              user = 'hdfs',
+                              user='hdfs',
                               )
     self.assertResourceCalled('Execute', "su -s /bin/bash - hdfs -c 'export PATH=$PATH:/usr/bin ; hdfs --config /etc/hadoop/conf dfsadmin -safemode get' | grep 'Safe mode is OFF'",
                               tries = 40,

+ 6 - 0
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_snamenode.py

@@ -69,6 +69,9 @@ class TestSNamenode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start secondarynamenode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-secondarynamenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-secondarynamenode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   def test_stop_default(self):
@@ -148,6 +151,9 @@ class TestSNamenode(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start secondarynamenode\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-secondarynamenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-secondarynamenode.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', 'hdfs dfsadmin -report -live',
+                              user='hdfs'
+                              )
     self.assertNoMoreResources()
 
   def test_stop_secured(self):

+ 4 - 0
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_zkfc.py

@@ -79,6 +79,8 @@ class TestZkfc(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start zkfc\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-zkfc.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-zkfc.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', "hdfs dfsadmin -report -live",
+                              user="hdfs")
     self.assertNoMoreResources()
 
 
@@ -164,6 +166,8 @@ class TestZkfc(RMFTestCase):
     self.assertResourceCalled('Execute', 'ulimit -c unlimited;  su -s /bin/bash - hdfs -c \'export HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec && /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start zkfc\'',
                               not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-zkfc.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-zkfc.pid` >/dev/null 2>&1',
                               )
+    self.assertResourceCalled('Execute', "hdfs dfsadmin -report -live",
+                              user="hdfs")
     self.assertNoMoreResources()
 
   def test_stop_secured(self):

+ 14 - 0
ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py

@@ -44,6 +44,12 @@ class TestZookeeperServer(RMFTestCase):
                     not_if = 'ls /var/run/zookeeper/zookeeper_server.pid >/dev/null 2>&1 && ps -p `cat /var/run/zookeeper/zookeeper_server.pid` >/dev/null 2>&1',
                     user = 'zookeeper'
     )
+    self.assertResourceCalled('Execute', "echo 'create /zk_test mydata' | /usr/lib/zookeeper/bin/zkCli.sh",
+                              user="ambari-qa")
+    self.assertResourceCalled('Execute', "echo 'ls /' | /usr/lib/zookeeper/bin/zkCli.sh",
+                              user="ambari-qa")
+    self.assertResourceCalled('Execute', "echo 'delete /zk_test ' | /usr/lib/zookeeper/bin/zkCli.sh",
+                              user="ambari-qa")
     self.assertNoMoreResources()
 
   def test_stop_default(self):
@@ -80,6 +86,14 @@ class TestZookeeperServer(RMFTestCase):
                   not_if = 'ls /var/run/zookeeper/zookeeper_server.pid >/dev/null 2>&1 && ps -p `cat /var/run/zookeeper/zookeeper_server.pid` >/dev/null 2>&1',
                   user = 'zookeeper'
     )
+    self.assertResourceCalled('Execute', "/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa;",
+                              user="ambari-qa")
+    self.assertResourceCalled('Execute', "echo 'create /zk_test mydata' | /usr/lib/zookeeper/bin/zkCli.sh",
+                              user="ambari-qa")
+    self.assertResourceCalled('Execute', "echo 'ls /' | /usr/lib/zookeeper/bin/zkCli.sh",
+                              user="ambari-qa")
+    self.assertResourceCalled('Execute', "echo 'delete /zk_test ' | /usr/lib/zookeeper/bin/zkCli.sh",
+                              user="ambari-qa")
     self.assertNoMoreResources()
 
   def test_stop_secured(self):