|
@@ -17,15 +17,115 @@ limitations under the License.
|
|
|
|
|
|
"""
|
|
|
import os
|
|
|
+import re
|
|
|
|
|
|
from resource_management import *
|
|
|
-import re
|
|
|
+from resource_management.libraries.functions.format import format
|
|
|
+from resource_management.core.shell import call, checked_call
|
|
|
+from resource_management.core.exceptions import ComponentIsNotRunning
|
|
|
+
|
|
|
+from zkfc_slave import ZkfcSlave
|
|
|
+
|
|
|
+def safe_zkfc_op(action, env):
|
|
|
+ """
|
|
|
+ Idempotent operation on the zkfc process to either start or stop it.
|
|
|
+ :param action: start or stop
|
|
|
+ :param env: environment
|
|
|
+ """
|
|
|
+ zkfc = None
|
|
|
+ if action == "start":
|
|
|
+ try:
|
|
|
+ zkfc = ZkfcSlave()
|
|
|
+ zkfc.status(env)
|
|
|
+ except ComponentIsNotRunning:
|
|
|
+ if zkfc:
|
|
|
+ zkfc.start(env)
|
|
|
+
|
|
|
+ if action == "stop":
|
|
|
+ try:
|
|
|
+ zkfc = ZkfcSlave()
|
|
|
+ zkfc.status(env)
|
|
|
+ except ComponentIsNotRunning:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if zkfc:
|
|
|
+ zkfc.stop(env)
|
|
|
|
|
|
|
|
|
-def service(action=None, name=None, user=None, create_pid_dir=False,
|
|
|
+def failover_namenode():
|
|
|
+ """
|
|
|
+ Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
|
|
|
+ """
|
|
|
+ import params
|
|
|
+ check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
|
|
|
+ code, out = call(check_service_cmd, verbose=True, logoutput=True, user=params.hdfs_user)
|
|
|
+
|
|
|
+ state = "unknown"
|
|
|
+ if code == 0 and out:
|
|
|
+ state = "active" if "active" in out else ("standby" if "standby" in out else state)
|
|
|
+ Logger.info("Namenode service state: %s" % state)
|
|
|
+
|
|
|
+ if state == "active":
|
|
|
+ Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
|
|
|
+
|
|
|
+ # Forcefully kill ZKFC on this host to initiate a failover
|
|
|
+ kill_zkfc(params.hdfs_user)
|
|
|
+
|
|
|
+ # Wait until it transitions to standby
|
|
|
+ check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
|
|
|
+ Execute(check_standby_cmd,
|
|
|
+ user=params.hdfs_user,
|
|
|
+ tries=30,
|
|
|
+ try_sleep=6)
|
|
|
+ else:
|
|
|
+ Logger.info("Rolling Upgrade - Host %s is the standby namenode." % str(params.hostname))
|
|
|
+
|
|
|
+
|
|
|
+def kill_zkfc(zkfc_user):
|
|
|
+ """
|
|
|
+ There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
|
|
|
+ Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
|
|
|
+ Option 2. Silent failover (not supported as of HDP 2.2.0.0)
|
|
|
+ :param zkfc_user: User that started the ZKFC process.
|
|
|
+ """
|
|
|
+ import params
|
|
|
+ if params.dfs_ha_enabled:
|
|
|
+ zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
|
|
|
+ if zkfc_pid_file:
|
|
|
+ check_process = format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1")
|
|
|
+ code, out = call(check_process, verbose=True)
|
|
|
+ if code == 0:
|
|
|
+ Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
|
|
|
+ kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
|
|
|
+ checked_call(kill_command)
|
|
|
+
|
|
|
+
|
|
|
+def get_service_pid_file(name, user):
|
|
|
+ """
|
|
|
+ Get the pid file path that was used to start the service by the user.
|
|
|
+ :param name: Service name
|
|
|
+ :param user: User that started the service.
|
|
|
+ :return: PID file path
|
|
|
+ """
|
|
|
+ import params
|
|
|
+ pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
|
|
|
+ pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
|
|
|
+ return pid_file
|
|
|
+
|
|
|
+
|
|
|
+def service(action=None, name=None, user=None, options="", create_pid_dir=False,
|
|
|
create_log_dir=False):
|
|
|
+ """
|
|
|
+ :param action: Either "start" or "stop"
|
|
|
+ :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
|
|
|
+ :param user: User to run the command as
|
|
|
+ :param options: Additional options to pass to command as a string
|
|
|
+ :param create_pid_dir: Create PID directory
|
|
|
+ :param create_log_dir: Crate log file directory
|
|
|
+ """
|
|
|
import params
|
|
|
|
|
|
+ options = options if options else ""
|
|
|
pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
|
|
|
pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
|
|
|
log_dir = format("{hdfs_log_dir_prefix}/{user}")
|
|
@@ -76,15 +176,18 @@ def service(action=None, name=None, user=None, create_pid_dir=False,
|
|
|
except ComponentIsNotRunning:
|
|
|
pass
|
|
|
|
|
|
-
|
|
|
hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
|
|
|
|
|
|
if user == "root":
|
|
|
- cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir]
|
|
|
- daemon_cmd = as_sudo(cmd + [action, name])
|
|
|
+ cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
|
|
|
+ if options:
|
|
|
+ cmd += [options, ]
|
|
|
+ daemon_cmd = as_sudo(cmd)
|
|
|
else:
|
|
|
- cmd = format("{hadoop_daemon} --config {hadoop_conf_dir}")
|
|
|
- daemon_cmd = as_user(format("{ulimit_cmd} {cmd} {action} {name}"), user)
|
|
|
+ cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
|
|
|
+ if options:
|
|
|
+ cmd += " " + options
|
|
|
+ daemon_cmd = as_user(cmd, user)
|
|
|
|
|
|
service_is_up = check_process if action == "start" else None
|
|
|
#remove pid file from dead process
|
|
@@ -102,6 +205,7 @@ def service(action=None, name=None, user=None, create_pid_dir=False,
|
|
|
action="delete",
|
|
|
)
|
|
|
|
|
|
+
|
|
|
def get_port(address):
|
|
|
"""
|
|
|
Extracts port from the address like 0.0.0.0:1019
|
|
@@ -114,6 +218,7 @@ def get_port(address):
|
|
|
else:
|
|
|
return None
|
|
|
|
|
|
+
|
|
|
def is_secure_port(port):
|
|
|
"""
|
|
|
Returns True if port is root-owned at *nix systems
|