Browse Source

AMMBARI-12205. RU - Misc issues: ZKFC not upgraded on Standby NN; Flume kill needs signal; Package Installation fails when host has no Stack components (alejandro)

Alejandro Fernandez 10 years ago
parent
commit
416f60063c

+ 1 - 1
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py

@@ -204,7 +204,7 @@ def flume(action = None):
       
       
       if is_flume_process_live(pid_file):
       if is_flume_process_live(pid_file):
         pid = shell.checked_call(("cat", pid_file), sudo=True)[1].strip()
         pid = shell.checked_call(("cat", pid_file), sudo=True)[1].strip()
-        Execute(('kill', pid), sudo=True)
+        Execute(("kill", "-15", pid), sudo=True)    # kill command has to be a tuple
       
       
       if not await_flume_process_termination(pid_file):
       if not await_flume_process_termination(pid_file):
         raise Fail("Can't stop flume agent: {0}".format(agent))
         raise Fail("Can't stop flume agent: {0}".format(agent))

+ 2 - 2
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py

@@ -46,7 +46,7 @@ import namenode_upgrade
 from hdfs_namenode import namenode
 from hdfs_namenode import namenode
 from hdfs import hdfs
 from hdfs import hdfs
 import hdfs_rebalance
 import hdfs_rebalance
-from utils import failover_namenode
+from utils import stop_zkfc_during_ru
 
 
 
 
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
@@ -86,7 +86,7 @@ class NameNode(Script):
     env.set_params(params)
     env.set_params(params)
     if rolling_restart and params.dfs_ha_enabled:
     if rolling_restart and params.dfs_ha_enabled:
       if params.dfs_ha_automatic_failover_enabled:
       if params.dfs_ha_automatic_failover_enabled:
-        failover_namenode()
+        stop_zkfc_during_ru()
       else:
       else:
         raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
         raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
     namenode(action="stop", rolling_restart=rolling_restart, env=env)
     namenode(action="stop", rolling_restart=rolling_restart, env=env)

+ 29 - 24
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py

@@ -30,6 +30,7 @@ from resource_management.core.shell import as_user, as_sudo
 from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.core.logger import Logger
 from resource_management.core.logger import Logger
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
+from resource_management.core.exceptions import Fail
 
 
 from zkfc_slave import ZkfcSlave
 from zkfc_slave import ZkfcSlave
 
 
@@ -60,23 +61,25 @@ def safe_zkfc_op(action, env):
         zkfc.stop(env)
         zkfc.stop(env)
 
 
 
 
-def failover_namenode():
+def stop_zkfc_during_ru():
   """
   """
-  Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
+  Restart ZKFC on either the standby or active Namenode. If done on the currently active namenode, wait for it to
+  become the standby.
   """
   """
   import params
   import params
   check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
   check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
   code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
   code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
 
 
-  state = "unknown"
+  original_state = "unknown"
   if code == 0 and out:
   if code == 0 and out:
-    state = "active" if "active" in out else ("standby" if "standby" in out else state)
-    Logger.info("Namenode service state: %s" % state)
+    original_state = "active" if "active" in out else ("standby" if "standby" in out else original_state)
+    Logger.info("Namenode service state: %s" % original_state)
 
 
-  if state == "active":
-    Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
+    msg = "Rolling Upgrade - Killing ZKFC on {0} NameNode host {1} {2}"\
+      .format(original_state, params.hostname, "to initiate a failover" if original_state == "active" else "")
+    Logger.info(msg)
 
 
-    # Forcefully kill ZKFC on this host to initiate a failover
+    # Forcefully kill ZKFC. If this is the active, will initiate a failover.
     # If ZKFC is already dead, then potentially this node can still be the active one.
     # If ZKFC is already dead, then potentially this node can still be the active one.
     was_zkfc_killed = kill_zkfc(params.hdfs_user)
     was_zkfc_killed = kill_zkfc(params.hdfs_user)
 
 
@@ -84,22 +87,24 @@ def failover_namenode():
     check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
     check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
 
 
     # process may already be down.  try one time, then proceed
     # process may already be down.  try one time, then proceed
-    code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
-    Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
 
 
-    if code == 255 and out:
-      Logger.info("Rolling Upgrade - namenode is already down.")
-    else:
-      if was_zkfc_killed:
-        # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
-        Logger.info("Waiting for this NameNode to become the standby one.")
-        Execute(check_standby_cmd,
-                user=params.hdfs_user,
-                tries=50,
-                try_sleep=6,
-                logoutput=True)
+    if original_state == "active":
+      code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
+      Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
+
+      if code == 255 and out:
+        Logger.info("Rolling Upgrade - namenode is already down.")
+      else:
+        if was_zkfc_killed:
+          # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
+          Logger.info("Waiting for this NameNode to become the standby one.")
+          Execute(check_standby_cmd,
+                  user=params.hdfs_user,
+                  tries=50,
+                  try_sleep=6,
+                  logoutput=True)
   else:
   else:
-    Logger.info("Rolling Upgrade - Host %s is already the standby namenode." % str(params.hostname))
+    raise Fail("Unable to determine NameNode HA states by calling command: {0}".format(check_service_cmd))
 
 
 
 
 def kill_zkfc(zkfc_user):
 def kill_zkfc(zkfc_user):
@@ -117,8 +122,8 @@ def kill_zkfc(zkfc_user):
       check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
       check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
       code, out = shell.call(check_process)
       code, out = shell.call(check_process)
       if code == 0:
       if code == 0:
-        Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
-        kill_command = format("kill -9 `cat {zkfc_pid_file}`")
+        Logger.debug("ZKFC is running and will be killed.")
+        kill_command = format("kill -15 `cat {zkfc_pid_file}`")
         Execute(kill_command,
         Execute(kill_command,
              user=zkfc_user
              user=zkfc_user
         )
         )

+ 50 - 28
ambari-server/src/main/resources/custom_actions/scripts/install_packages.py

@@ -48,6 +48,7 @@ class InstallPackages(Script):
 
 
   UBUNTU_REPO_COMPONENTS_POSTFIX = ["main"]
   UBUNTU_REPO_COMPONENTS_POSTFIX = ["main"]
   REPO_FILE_NAME_PREFIX = 'HDP-'
   REPO_FILE_NAME_PREFIX = 'HDP-'
+  STACK_TO_ROOT_FOLDER = {"HDP": "/usr/hdp"}
   
   
   # Mapping file used to store repository versions without a build number, and the actual version it corresponded to.
   # Mapping file used to store repository versions without a build number, and the actual version it corresponded to.
   # E.g., HDP 2.2.0.0 => HDP 2.2.0.0-2041
   # E.g., HDP 2.2.0.0 => HDP 2.2.0.0-2041
@@ -80,6 +81,17 @@ class InstallPackages(Script):
       package_list = json.loads(config['commandParams']['package_list'])
       package_list = json.loads(config['commandParams']['package_list'])
       stack_id = config['commandParams']['stack_id']
       stack_id = config['commandParams']['stack_id']
 
 
+    stack_name = None
+    self.stack_root_folder = None
+    if stack_id and "-" in stack_id:
+      stack_split = stack_id.split("-")
+      if len(stack_split) == 2:
+        stack_name = stack_split[0].upper()
+        if stack_name in self.STACK_TO_ROOT_FOLDER:
+          self.stack_root_folder = self.STACK_TO_ROOT_FOLDER[stack_name]
+    if self.stack_root_folder is None:
+      raise Fail("Cannot determine the stack's root directory by parsing the stack_id property, {0}".format(str(stack_id)))
+
     self.repository_version = self.repository_version.strip()
     self.repository_version = self.repository_version.strip()
 
 
     # Install/update repositories
     # Install/update repositories
@@ -120,14 +132,11 @@ class InstallPackages(Script):
       m = re.search("[\d\.]+-\d+", self.repository_version)
       m = re.search("[\d\.]+-\d+", self.repository_version)
       if m:
       if m:
         # Contains a build number
         # Contains a build number
-        self.structured_output['actual_version'] = self.repository_version
+        self.structured_output['actual_version'] = self.repository_version  # This is the best value known so far.
         self.put_structured_out(self.structured_output)
         self.put_structured_out(self.structured_output)
 
 
     # Initial list of versions, used to compute the new version installed
     # Initial list of versions, used to compute the new version installed
-    self.old_versions = []
-    if self.actual_version is None:
-      Logger.info("Calculate the actual version.".format(self.repository_version))
-      self.old_versions = self.hdp_versions()
+    self.old_versions = self.hdp_versions()
 
 
     try:
     try:
       # It's possible for the process to receive a SIGTERM while installing the packages
       # It's possible for the process to receive a SIGTERM while installing the packages
@@ -207,34 +216,43 @@ class InstallPackages(Script):
     """
     """
     After packages are installed, determine what the new actual version is, in order to save it.
     After packages are installed, determine what the new actual version is, in order to save it.
     """
     """
+    Logger.info("Attempting to determine actual version with build number.")
+    Logger.info("Old versions: {0}".format(self.old_versions))
 
 
-    # If needed to calculate the actual_version, add it to the structured out file.
-    if self.actual_version is None:
-      Logger.info("Attempting to determine actual version with build number.")
-      Logger.info("Old versions: {0}".format(self.old_versions))
-
-      new_versions = self.hdp_versions()
-      Logger.info("New versions: {0}".format(new_versions))
+    new_versions = self.hdp_versions()
+    Logger.info("New versions: {0}".format(new_versions))
 
 
-      deltas = set(new_versions) - set(self.old_versions)
-      Logger.info("Deltas: {0}".format(deltas))
+    deltas = set(new_versions) - set(self.old_versions)
+    Logger.info("Deltas: {0}".format(deltas))
 
 
-      if 1 == len(deltas):
-        self.actual_version = next(iter(deltas)).strip()
+    if 1 == len(deltas):
+      self.actual_version = next(iter(deltas)).strip()
+      self.structured_output['actual_version'] = self.actual_version
+      self.put_structured_out(self.structured_output)
+      self.write_actual_version_to_file(self.actual_version)
+    else:
+      Logger.info("Cannot determine a new actual version installed by using the delta method.")
+      # If the first install attempt does a partial install and is unable to report this to the server,
+      # then a subsequent attempt will report an empty delta. For this reason, it is important to search the
+      # repo version history file to determine if we previously did write an actual_version.
+      self.actual_version = self.get_actual_version_from_file()
+      if self.actual_version is not None:
+        self.actual_version = self.actual_version.strip()
         self.structured_output['actual_version'] = self.actual_version
         self.structured_output['actual_version'] = self.actual_version
         self.put_structured_out(self.structured_output)
         self.put_structured_out(self.structured_output)
-        self.write_actual_version_to_file(self.actual_version)
+        Logger.info("Found actual version {0} by parsing file {1}".format(self.actual_version, self.REPO_VERSION_HISTORY_FILE))
       else:
       else:
-        Logger.info("Cannot determine a new actual version installed by using the delta method. "
-                    "This is expected during the first install attempt since not all packages will yield a new version in \"hdp-select versions\".")
-        # If the first install attempt does a partial install and is unable to report this to the server,
-        # then a subsequent attempt will report an empty delta. For this reason, it is important to search the
-        # repo version history file to determine if we previously did write an actual_version.
-        self.actual_version = self.get_actual_version_from_file()
-        if self.actual_version is not None:
-          self.actual_version = self.actual_version.strip()
-          self.structured_output['actual_version'] = self.actual_version
-          self.put_structured_out(self.structured_output)
+        # It's likely that this host does not have any Stack Components installed, so only contains AMS.
+        if not os.path.exists(self.stack_root_folder):
+          # Special case when this host does not contain any HDP components, but still contains other components like AMS.
+          msg = "Could not determine actual version. This stack's root directory ({0}) is not present on this host, so this host does not contain any versionable components. " \
+                "Therefore, ignore this host and allow other hosts to report the correct repository version.".format(self.stack_root_folder)
+          Logger.info(msg)
+        else:
+          msg = "Could not determine actual version. This stack's root directory ({0}) exists but was not able to determine the actual repository version installed. " \
+                "Try reinstalling packages again.".format(self.stack_root_folder)
+          raise Fail(msg)
+
 
 
   def install_packages(self, package_list):
   def install_packages(self, package_list):
     """
     """
@@ -277,7 +295,11 @@ class InstallPackages(Script):
             Package(package, action="remove")
             Package(package, action="remove")
     else:
     else:
       # Compute the actual version in order to save it in structured out
       # Compute the actual version in order to save it in structured out
-      self.compute_actual_version()
+      try:
+        self.compute_actual_version()
+      except Exception, err:
+        ret_code = 1
+        Logger.logger.exception("Failure while computing actual version. Error: {0}".format(str(err)))
 
 
     pass
     pass
     return ret_code
     return ret_code