123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- """
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import os
- import re
- import urllib2
- import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
- from resource_management.core.resources.system import Directory, File, Execute
- from resource_management.libraries.functions.format import format
- from resource_management.libraries.functions import check_process_status
- from resource_management.libraries.functions.version import compare_versions
- from resource_management.core import shell
- from resource_management.core.shell import as_user, as_sudo
- from resource_management.core.exceptions import ComponentIsNotRunning
- from resource_management.core.logger import Logger
- from resource_management.libraries.functions.curl_krb_request import curl_krb_request
- from resource_management.core.exceptions import Fail
- from resource_management.libraries.functions.namenode_ha_utils import get_namenode_states
- from resource_management.libraries.script.script import Script
- from zkfc_slave import ZkfcSlave
- def safe_zkfc_op(action, env):
- """
- Idempotent operation on the zkfc process to either start or stop it.
- :param action: start or stop
- :param env: environment
- """
- Logger.info("Performing action {0} on zkfc.".format(action))
- zkfc = None
- if action == "start":
- try:
- zkfc = ZkfcSlave()
- zkfc.status(env)
- except ComponentIsNotRunning:
- if zkfc:
- zkfc.start(env)
- if action == "stop":
- try:
- zkfc = ZkfcSlave()
- zkfc.status(env)
- except ComponentIsNotRunning:
- pass
- else:
- if zkfc:
- zkfc.stop(env)
- def initiate_safe_zkfc_failover():
- """
- If this is the active namenode, initiate a safe failover and wait for it to become the standby.
- If an error occurs, force a failover to happen by killing zkfc on this host. In this case, during the Restart,
- will also have to start ZKFC manually.
- """
- import params
- # Must kinit before running the HDFS command
- if params.security_enabled:
- Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
- user = params.hdfs_user)
- active_namenode_id = None
- standby_namenode_id = None
- active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states(params.hdfs_site, params.security_enabled, params.hdfs_user)
- if active_namenodes:
- active_namenode_id = active_namenodes[0][0]
- if standby_namenodes:
- standby_namenode_id = standby_namenodes[0][0]
- if active_namenode_id:
- Logger.info(format("Active NameNode id: {active_namenode_id}"))
- if standby_namenode_id:
- Logger.info(format("Standby NameNode id: {standby_namenode_id}"))
- if unknown_namenodes:
- for unknown_namenode in unknown_namenodes:
- Logger.info("NameNode HA state for {0} is unknown".format(unknown_namenode[0]))
- if params.namenode_id == active_namenode_id and params.other_namenode_id == standby_namenode_id:
- # Failover if this NameNode is active and other NameNode is up and in standby (i.e. ready to become active on failover)
- Logger.info(format("NameNode {namenode_id} is active and NameNode {other_namenode_id} is in standby"))
- failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
- check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
- msg = "Rolling Upgrade - Initiating a ZKFC failover on active NameNode host {0}.".format(params.hostname)
- Logger.info(msg)
- code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
- Logger.info(format("Rolling Upgrade - failover command returned {code}"))
- wait_for_standby = False
- if code == 0:
- wait_for_standby = True
- else:
- # Try to kill ZKFC manually
- was_zkfc_killed = kill_zkfc(params.hdfs_user)
- code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
- Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
- if code == 255 and out:
- Logger.info("Rolling Upgrade - NameNode is already down.")
- else:
- if was_zkfc_killed:
- # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
- wait_for_standby = True
- if wait_for_standby:
- Logger.info("Waiting for this NameNode to become the standby one.")
- Execute(check_standby_cmd,
- user=params.hdfs_user,
- tries=50,
- try_sleep=6,
- logoutput=True)
- else:
- msg = "Rolling Upgrade - Skipping ZKFC failover on NameNode host {0}.".format(params.hostname)
- Logger.info(msg)
- def kill_zkfc(zkfc_user):
- """
- There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
- Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
- Option 2. Silent failover (not supported as of HDP 2.2.0.0)
- :param zkfc_user: User that started the ZKFC process.
- :return: Return True if ZKFC was killed, otherwise, false.
- """
- import params
- if params.dfs_ha_enabled:
- if params.zkfc_pid_file:
- check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
- code, out = shell.call(check_process)
- if code == 0:
- Logger.debug("ZKFC is running and will be killed.")
- kill_command = format("kill -15 `cat {zkfc_pid_file}`")
- Execute(kill_command,
- user=zkfc_user
- )
- File(params.zkfc_pid_file,
- action = "delete",
- )
- return True
- return False
- def service(action=None, name=None, user=None, options="", create_pid_dir=False,
- create_log_dir=False):
- """
- :param action: Either "start" or "stop"
- :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
- :param user: User to run the command as
- :param options: Additional options to pass to command as a string
- :param create_pid_dir: Create PID directory
- :param create_log_dir: Crate log file directory
- """
- import params
- options = options if options else ""
- pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
- pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
- hadoop_env_exports = {
- 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir
- }
- log_dir = format("{hdfs_log_dir_prefix}/{user}")
- # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
- # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
- if name == "nfs3" :
- pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid")
- custom_export = {
- 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user,
- 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir,
- 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir
- }
- hadoop_env_exports.update(custom_export)
- process_id_exists_command = as_sudo(["test", "-f", pid_file]) + " && " + as_sudo(["pgrep", "-F", pid_file])
- # on STOP directories shouldn't be created
- # since during stop still old dirs are used (which were created during previous start)
- if action != "stop":
- if name == "nfs3":
- Directory(params.hadoop_pid_dir_prefix,
- mode=0755,
- owner=params.root_user,
- group=params.root_group
- )
- else:
- Directory(params.hadoop_pid_dir_prefix,
- mode=0755,
- owner=params.hdfs_user,
- group=params.user_group
- )
- if create_pid_dir:
- Directory(pid_dir,
- owner=user,
- recursive=True)
- if create_log_dir:
- if name == "nfs3":
- Directory(log_dir,
- mode=0775,
- owner=params.root_user,
- group=params.user_group)
- else:
- Directory(log_dir,
- owner=user,
- recursive=True)
- if params.security_enabled and name == "datanode":
- ## The directory where pid files are stored in the secure data environment.
- hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
- hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid")
- # At Champlain stack and further, we may start datanode as a non-root even in secure cluster
- if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) or params.secure_dn_ports_are_in_use:
- user = "root"
- pid_file = format(
- "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid")
- if action == 'stop' and (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) and \
- os.path.isfile(hadoop_secure_dn_pid_file):
- # We need special handling for this case to handle the situation
- # when we configure non-root secure DN and then restart it
- # to handle new configs. Otherwise we will not be able to stop
- # a running instance
- user = "root"
-
- try:
- check_process_status(hadoop_secure_dn_pid_file)
-
- custom_export = {
- 'HADOOP_SECURE_DN_USER': params.hdfs_user
- }
- hadoop_env_exports.update(custom_export)
-
- except ComponentIsNotRunning:
- pass
- hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
- if user == "root":
- cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
- if options:
- cmd += [options, ]
- daemon_cmd = as_sudo(cmd)
- else:
- cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
- if options:
- cmd += " " + options
- daemon_cmd = as_user(cmd, user)
-
- if action == "start":
- # remove pid file from dead process
- File(pid_file, action="delete", not_if=process_id_exists_command)
- Execute(daemon_cmd, not_if=process_id_exists_command, environment=hadoop_env_exports)
- elif action == "stop":
- Execute(daemon_cmd, only_if=process_id_exists_command, environment=hadoop_env_exports)
- File(pid_file, action="delete")
- def get_jmx_data(nn_address, modeler_type, metric, encrypted=False, security_enabled=False):
- """
- :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
- If not preceded, will use the encrypted param to determine.
- :param modeler_type: Modeler type to query using startswith function
- :param metric: Metric to return
- :return: Return an object representation of the metric, or None if it does not exist
- """
- if not nn_address or not modeler_type or not metric:
- return None
- nn_address = nn_address.strip()
- if not nn_address.startswith("http"):
- nn_address = ("https://" if encrypted else "http://") + nn_address
- if not nn_address.endswith("/"):
- nn_address = nn_address + "/"
- nn_address = nn_address + "jmx"
- Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
- if security_enabled:
- import params
- data, error_msg, time_millis = curl_krb_request(params.tmp_dir, params.smoke_user_keytab, params.smokeuser_principal, nn_address,
- "jn_upgrade", params.kinit_path_local, False, None, params.smoke_user)
- else:
- data = urllib2.urlopen(nn_address).read()
- my_data = None
- if data:
- data_dict = json.loads(data)
- if data_dict:
- for el in data_dict['beans']:
- if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
- if metric in el:
- my_data = el[metric]
- if my_data:
- my_data = json.loads(str(my_data))
- break
- return my_data
- def get_port(address):
- """
- Extracts port from the address like 0.0.0.0:1019
- """
- if address is None:
- return None
- m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
- if m is not None and len(m.groups()) >= 2:
- return int(m.group(2))
- else:
- return None
- def is_secure_port(port):
- """
- Returns True if port is root-owned at *nix systems
- """
- if port is not None:
- return port < 1024
- else:
- return False
- def is_previous_fs_image():
- """
- Return true if there's a previous folder in the HDFS namenode directories.
- """
- import params
- if params.dfs_name_dir:
- nn_name_dirs = params.dfs_name_dir.split(',')
- for nn_dir in nn_name_dirs:
- prev_dir = os.path.join(nn_dir, "previous")
- if os.path.isdir(prev_dir):
- return True
- return False
- def get_hdfs_binary(distro_component_name):
- """
- Get the hdfs binary to use depending on the stack and version.
- :param distro_component_name: e.g., hadoop-hdfs-namenode, hadoop-hdfs-datanode
- :return: The hdfs binary to use
- """
- import params
- hdfs_binary = "hdfs"
- if params.stack_name == "HDP":
- # This was used in HDP 2.1 and earlier
- hdfs_binary = "hdfs"
- if Script.is_hdp_stack_greater_or_equal("2.2"):
- hdfs_binary = "/usr/hdp/current/{0}/bin/hdfs".format(distro_component_name)
- return hdfs_binary
- def get_dfsadmin_base_command(hdfs_binary, use_specific_namenode = False):
- """
- Get the dfsadmin base command constructed using hdfs_binary path and passing namenode address as explicit -fs argument
- :param hdfs_binary: path to hdfs binary to use
- :param use_specific_namenode: flag if set and Namenode HA is enabled, then the dfsadmin command will use
- current namenode's address
- :return: the constructed dfsadmin base command
- """
- import params
- dfsadmin_base_command = ""
- if params.dfs_ha_enabled and use_specific_namenode:
- dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs hdfs://{params.namenode_rpc}")
- else:
- dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs {params.namenode_address}")
- return dfsadmin_base_command
|