|
@@ -60,19 +60,19 @@ def safe_zkfc_op(action, env):
|
|
|
if zkfc:
|
|
|
zkfc.stop(env)
|
|
|
|
|
|
-
|
|
|
-def stop_zkfc_during_ru():
|
|
|
+def initiate_safe_zkfc_failover():
|
|
|
"""
|
|
|
- Restart ZKFC on either the standby or active Namenode. If done on the currently active namenode,
|
|
|
- wait for it to become the standby.
|
|
|
- This will run a kinit before executing the 'hdfs haadmin' command.
|
|
|
+ If this is the active namenode, initiate a safe failover and wait for it to become the standby.
|
|
|
+
|
|
|
+ If an error occurs, force a failover to happen by killing zkfc on this host. In this case, during the Restart,
|
|
|
+ will also have to start ZKFC manually.
|
|
|
"""
|
|
|
import params
|
|
|
|
|
|
- # must kinit before running the HDFS command
|
|
|
+ # Must kinit before running the HDFS command
|
|
|
if params.security_enabled:
|
|
|
- Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
|
|
|
- user = params.hdfs_user)
|
|
|
+ Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
|
|
|
+ user = params.hdfs_user)
|
|
|
|
|
|
check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
|
|
|
code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
|
|
@@ -82,34 +82,39 @@ def stop_zkfc_during_ru():
|
|
|
original_state = "active" if "active" in out else ("standby" if "standby" in out else original_state)
|
|
|
Logger.info("Namenode service state: %s" % original_state)
|
|
|
|
|
|
- msg = "Rolling Upgrade - Killing ZKFC on {0} NameNode host {1} {2}"\
|
|
|
- .format(original_state, params.hostname, "to initiate a failover" if original_state == "active" else "")
|
|
|
- Logger.info(msg)
|
|
|
-
|
|
|
- # Forcefully kill ZKFC. If this is the active, will initiate a failover.
|
|
|
- # If ZKFC is already dead, then potentially this node can still be the active one.
|
|
|
- was_zkfc_killed = kill_zkfc(params.hdfs_user)
|
|
|
-
|
|
|
- # Wait until it transitions to standby
|
|
|
- check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
|
|
|
+ if original_state == "active":
|
|
|
+ msg = "Rolling Upgrade - Initiating a ZKFC failover on {0} NameNode host {1}.".format(original_state, params.hostname)
|
|
|
+ Logger.info(msg)
|
|
|
|
|
|
- # process may already be down. try one time, then proceed
|
|
|
+ check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
|
|
|
+ failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
|
|
|
|
|
|
- if original_state == "active":
|
|
|
- code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
|
|
|
- Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
|
|
|
+ code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
|
|
|
+ Logger.info(format("Rolling Upgrade - failover command returned {code}"))
|
|
|
+ wait_for_standby = False
|
|
|
|
|
|
- if code == 255 and out:
|
|
|
- Logger.info("Rolling Upgrade - namenode is already down.")
|
|
|
+ if code == 0:
|
|
|
+ wait_for_standby = True
|
|
|
else:
|
|
|
- if was_zkfc_killed:
|
|
|
- # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
|
|
|
- Logger.info("Waiting for this NameNode to become the standby one.")
|
|
|
- Execute(check_standby_cmd,
|
|
|
- user=params.hdfs_user,
|
|
|
- tries=50,
|
|
|
- try_sleep=6,
|
|
|
- logoutput=True)
|
|
|
+ # Try to kill ZKFC manually
|
|
|
+ was_zkfc_killed = kill_zkfc(params.hdfs_user)
|
|
|
+ code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
|
|
|
+ Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
|
|
|
+
|
|
|
+ if code == 255 and out:
|
|
|
+ Logger.info("Rolling Upgrade - namenode is already down.")
|
|
|
+ else:
|
|
|
+ if was_zkfc_killed:
|
|
|
+ # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
|
|
|
+ wait_for_standby = True
|
|
|
+
|
|
|
+ if wait_for_standby:
|
|
|
+ Logger.info("Waiting for this NameNode to become the standby one.")
|
|
|
+ Execute(check_standby_cmd,
|
|
|
+ user=params.hdfs_user,
|
|
|
+ tries=50,
|
|
|
+ try_sleep=6,
|
|
|
+ logoutput=True)
|
|
|
else:
|
|
|
raise Fail("Unable to determine NameNode HA states by calling command: {0}".format(check_service_cmd))
|
|
|
|