|
@@ -31,6 +31,7 @@ from resource_management.core.exceptions import ComponentIsNotRunning
|
|
|
from resource_management.core.logger import Logger
|
|
|
from resource_management.libraries.functions.curl_krb_request import curl_krb_request
|
|
|
from resource_management.core.exceptions import Fail
|
|
|
+from resource_management.libraries.functions.namenode_ha_utils import get_namenode_states
|
|
|
|
|
|
from zkfc_slave import ZkfcSlave
|
|
|
|
|
@@ -74,50 +75,59 @@ def initiate_safe_zkfc_failover():
|
|
|
Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
|
|
|
user = params.hdfs_user)
|
|
|
|
|
|
- check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
|
|
|
- code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
|
|
|
-
|
|
|
- original_state = "unknown"
|
|
|
- if code == 0 and out:
|
|
|
- original_state = "active" if "active" in out else ("standby" if "standby" in out else original_state)
|
|
|
- Logger.info("Namenode service state: %s" % original_state)
|
|
|
-
|
|
|
- if original_state == "active":
|
|
|
- msg = "Rolling Upgrade - Initiating a ZKFC failover on {0} NameNode host {1}.".format(original_state, params.hostname)
|
|
|
- Logger.info(msg)
|
|
|
-
|
|
|
- check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
|
|
|
- failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
|
|
|
-
|
|
|
- code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
|
|
|
- Logger.info(format("Rolling Upgrade - failover command returned {code}"))
|
|
|
- wait_for_standby = False
|
|
|
-
|
|
|
- if code == 0:
|
|
|
- wait_for_standby = True
|
|
|
+ active_namenode_id = None
|
|
|
+ standby_namenode_id = None
|
|
|
+ active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states(params.hdfs_site, params.security_enabled, params.hdfs_user)
|
|
|
+ if active_namenodes:
|
|
|
+ active_namenode_id = active_namenodes[0][0]
|
|
|
+ if standby_namenodes:
|
|
|
+ standby_namenode_id = standby_namenodes[0][0]
|
|
|
+
|
|
|
+ if active_namenode_id:
|
|
|
+ Logger.info(format("Active NameNode id: {active_namenode_id}"))
|
|
|
+ if standby_namenode_id:
|
|
|
+ Logger.info(format("Standby NameNode id: {standby_namenode_id}"))
|
|
|
+ if unknown_namenodes:
|
|
|
+ for unknown_namenode in unknown_namenodes:
|
|
|
+ Logger.info("NameNode HA state for {0} is unknown".format(unknown_namenode[0]))
|
|
|
+
|
|
|
+ if params.namenode_id == active_namenode_id and params.other_namenode_id == standby_namenode_id:
|
|
|
+ # Failover if this NameNode is active and other NameNode is up and in standby (i.e. ready to become active on failover)
|
|
|
+ Logger.info(format("NameNode {namenode_id} is active and NameNode {other_namenode_id} is in standby"))
|
|
|
+
|
|
|
+ failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
|
|
|
+ check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
|
|
|
+
|
|
|
+ msg = "Rolling Upgrade - Initiating a ZKFC failover on active NameNode host {0}.".format(params.hostname)
|
|
|
+ Logger.info(msg)
|
|
|
+ code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
|
|
|
+ Logger.info(format("Rolling Upgrade - failover command returned {code}"))
|
|
|
+ wait_for_standby = False
|
|
|
+
|
|
|
+ if code == 0:
|
|
|
+ wait_for_standby = True
|
|
|
+ else:
|
|
|
+ # Try to kill ZKFC manually
|
|
|
+ was_zkfc_killed = kill_zkfc(params.hdfs_user)
|
|
|
+ code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
|
|
|
+ Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
|
|
|
+ if code == 255 and out:
|
|
|
+ Logger.info("Rolling Upgrade - NameNode is already down.")
|
|
|
else:
|
|
|
- # Try to kill ZKFC manually
|
|
|
- was_zkfc_killed = kill_zkfc(params.hdfs_user)
|
|
|
- code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
|
|
|
- Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
|
|
|
-
|
|
|
- if code == 255 and out:
|
|
|
- Logger.info("Rolling Upgrade - namenode is already down.")
|
|
|
- else:
|
|
|
- if was_zkfc_killed:
|
|
|
- # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
|
|
|
- wait_for_standby = True
|
|
|
-
|
|
|
- if wait_for_standby:
|
|
|
- Logger.info("Waiting for this NameNode to become the standby one.")
|
|
|
- Execute(check_standby_cmd,
|
|
|
- user=params.hdfs_user,
|
|
|
- tries=50,
|
|
|
- try_sleep=6,
|
|
|
- logoutput=True)
|
|
|
+ if was_zkfc_killed:
|
|
|
+ # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
|
|
|
+ wait_for_standby = True
|
|
|
+
|
|
|
+ if wait_for_standby:
|
|
|
+ Logger.info("Waiting for this NameNode to become the standby one.")
|
|
|
+ Execute(check_standby_cmd,
|
|
|
+ user=params.hdfs_user,
|
|
|
+ tries=50,
|
|
|
+ try_sleep=6,
|
|
|
+ logoutput=True)
|
|
|
else:
|
|
|
- raise Fail("Unable to determine NameNode HA states by calling command: {0}".format(check_service_cmd))
|
|
|
-
|
|
|
+ msg = "Rolling Upgrade - Skipping ZKFC failover on NameNode host {0}.".format(params.hostname)
|
|
|
+ Logger.info(msg)
|
|
|
|
|
|
def kill_zkfc(zkfc_user):
|
|
|
"""
|
|
@@ -129,36 +139,21 @@ def kill_zkfc(zkfc_user):
|
|
|
"""
|
|
|
import params
|
|
|
if params.dfs_ha_enabled:
|
|
|
- zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
|
|
|
- if zkfc_pid_file:
|
|
|
+ if params.zkfc_pid_file:
|
|
|
check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
|
|
|
code, out = shell.call(check_process)
|
|
|
if code == 0:
|
|
|
Logger.debug("ZKFC is running and will be killed.")
|
|
|
kill_command = format("kill -15 `cat {zkfc_pid_file}`")
|
|
|
Execute(kill_command,
|
|
|
- user=zkfc_user
|
|
|
+ user=zkfc_user
|
|
|
)
|
|
|
- File(zkfc_pid_file,
|
|
|
+ File(params.zkfc_pid_file,
|
|
|
action = "delete",
|
|
|
- )
|
|
|
+ )
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
-
|
|
|
-def get_service_pid_file(name, user):
|
|
|
- """
|
|
|
- Get the pid file path that was used to start the service by the user.
|
|
|
- :param name: Service name
|
|
|
- :param user: User that started the service.
|
|
|
- :return: PID file path
|
|
|
- """
|
|
|
- import params
|
|
|
- pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
|
|
|
- pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
|
|
|
- return pid_file
|
|
|
-
|
|
|
-
|
|
|
def service(action=None, name=None, user=None, options="", create_pid_dir=False,
|
|
|
create_log_dir=False):
|
|
|
"""
|