123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- """
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import os
- import re
- import urllib2
- import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
- from resource_management.core.resources.system import Directory, File, Execute
- from resource_management.libraries.functions.format import format
- from resource_management.libraries.functions import check_process_status
- from resource_management.libraries.functions.version import compare_versions
- from resource_management.core import shell
- from resource_management.core.shell import as_user, as_sudo
- from resource_management.core.exceptions import ComponentIsNotRunning
- from resource_management.core.logger import Logger
- from resource_management.libraries.functions.curl_krb_request import curl_krb_request
- from zkfc_slave import ZkfcSlave
- def safe_zkfc_op(action, env):
- """
- Idempotent operation on the zkfc process to either start or stop it.
- :param action: start or stop
- :param env: environment
- """
- Logger.info("Performing action {0} on zkfc.".format(action))
- zkfc = None
- if action == "start":
- try:
- zkfc = ZkfcSlave()
- zkfc.status(env)
- except ComponentIsNotRunning:
- if zkfc:
- zkfc.start(env)
- if action == "stop":
- try:
- zkfc = ZkfcSlave()
- zkfc.status(env)
- except ComponentIsNotRunning:
- pass
- else:
- if zkfc:
- zkfc.stop(env)
- def failover_namenode():
- """
- Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
- """
- import params
- check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
- code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
- state = "unknown"
- if code == 0 and out:
- state = "active" if "active" in out else ("standby" if "standby" in out else state)
- Logger.info("Namenode service state: %s" % state)
- if state == "active":
- Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
- # Forcefully kill ZKFC on this host to initiate a failover
- # If ZKFC is already dead, then potentially this node can still be the active one.
- was_zkfc_killed = kill_zkfc(params.hdfs_user)
- # Wait until it transitions to standby
- check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
- # process may already be down. try one time, then proceed
- code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
- Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
- if code == 255 and out:
- Logger.info("Rolling Upgrade - namenode is already down.")
- else:
- if was_zkfc_killed:
- # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
- Logger.info("Waiting for this NameNode to become the standby one.")
- Execute(check_standby_cmd,
- user=params.hdfs_user,
- tries=50,
- try_sleep=6,
- logoutput=True)
- else:
- Logger.info("Rolling Upgrade - Host %s is already the standby namenode." % str(params.hostname))
- def kill_zkfc(zkfc_user):
- """
- There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
- Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
- Option 2. Silent failover (not supported as of HDP 2.2.0.0)
- :param zkfc_user: User that started the ZKFC process.
- :return: Return True if ZKFC was killed, otherwise, false.
- """
- import params
- if params.dfs_ha_enabled:
- zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
- if zkfc_pid_file:
- check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
- code, out = shell.call(check_process)
- if code == 0:
- Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
- kill_command = format("kill -9 `cat {zkfc_pid_file}`")
- Execute(kill_command,
- user=zkfc_user
- )
- File(zkfc_pid_file,
- action = "delete",
- )
- return True
- return False
- def get_service_pid_file(name, user):
- """
- Get the pid file path that was used to start the service by the user.
- :param name: Service name
- :param user: User that started the service.
- :return: PID file path
- """
- import params
- pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
- pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
- return pid_file
- def service(action=None, name=None, user=None, options="", create_pid_dir=False,
- create_log_dir=False):
- """
- :param action: Either "start" or "stop"
- :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
- :param user: User to run the command as
- :param options: Additional options to pass to command as a string
- :param create_pid_dir: Create PID directory
- :param create_log_dir: Crate log file directory
- """
- import params
- options = options if options else ""
- pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
- pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
- hadoop_env_exports = {
- 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir
- }
- log_dir = format("{hdfs_log_dir_prefix}/{user}")
- # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
- # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
- if name == "nfs3" :
- pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid")
- custom_export = {
- 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user,
- 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir,
- 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir
- }
- hadoop_env_exports.update(custom_export)
- check_process = as_user(format(
- "ls {pid_file} >/dev/null 2>&1 &&"
- " ps -p `cat {pid_file}` >/dev/null 2>&1"), user=params.hdfs_user)
- # on STOP directories shouldn't be created
- # since during stop still old dirs are used (which were created during previous start)
- if action != "stop":
- if name == "nfs3":
- Directory(params.hadoop_pid_dir_prefix,
- mode=0755,
- owner=params.root_user,
- group=params.root_group
- )
- else:
- Directory(params.hadoop_pid_dir_prefix,
- mode=0755,
- owner=params.hdfs_user,
- group=params.user_group
- )
- if create_pid_dir:
- Directory(pid_dir,
- owner=user,
- recursive=True)
- if create_log_dir:
- if name == "nfs3":
- Directory(log_dir,
- mode=0775,
- owner=params.root_user,
- group=params.user_group)
- else:
- Directory(log_dir,
- owner=user,
- recursive=True)
- if params.security_enabled and name == "datanode":
- ## The directory where pid files are stored in the secure data environment.
- hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
- hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid")
- # At Champlain stack and further, we may start datanode as a non-root even in secure cluster
- if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) or params.secure_dn_ports_are_in_use:
- user = "root"
- pid_file = format(
- "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid")
- if action == 'stop' and (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) and \
- os.path.isfile(hadoop_secure_dn_pid_file):
- # We need special handling for this case to handle the situation
- # when we configure non-root secure DN and then restart it
- # to handle new configs. Otherwise we will not be able to stop
- # a running instance
- user = "root"
-
- try:
- check_process_status(hadoop_secure_dn_pid_file)
-
- custom_export = {
- 'HADOOP_SECURE_DN_USER': params.hdfs_user
- }
- hadoop_env_exports.update(custom_export)
-
- except ComponentIsNotRunning:
- pass
- hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
- if user == "root":
- cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
- if options:
- cmd += [options, ]
- daemon_cmd = as_sudo(cmd)
- else:
- cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
- if options:
- cmd += " " + options
- daemon_cmd = as_user(cmd, user)
-
- service_is_up = check_process if action == "start" else None
- #remove pid file from dead process
- File(pid_file,
- action="delete",
- not_if=check_process
- )
- Execute(daemon_cmd,
- not_if=service_is_up,
- environment=hadoop_env_exports
- )
- if action == "stop":
- File(pid_file,
- action="delete",
- )
- def get_jmx_data(nn_address, modeler_type, metric, encrypted=False, security_enabled=False):
- """
- :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
- If not preceded, will use the encrypted param to determine.
- :param modeler_type: Modeler type to query using startswith function
- :param metric: Metric to return
- :return: Return an object representation of the metric, or None if it does not exist
- """
- if not nn_address or not modeler_type or not metric:
- return None
- nn_address = nn_address.strip()
- if not nn_address.startswith("http"):
- nn_address = ("https://" if encrypted else "http://") + nn_address
- if not nn_address.endswith("/"):
- nn_address = nn_address + "/"
- nn_address = nn_address + "jmx"
- Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
- if security_enabled:
- import params
- data, error_msg, time_millis = curl_krb_request(params.tmp_dir, params.smoke_user_keytab, params.smokeuser_principal, nn_address,
- "jn_upgrade", params.kinit_path_local, False, None, params.smoke_user)
- else:
- data = urllib2.urlopen(nn_address).read()
- my_data = None
- if data:
- data_dict = json.loads(data)
- if data_dict:
- for el in data_dict['beans']:
- if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
- if metric in el:
- my_data = el[metric]
- if my_data:
- my_data = json.loads(str(my_data))
- break
- return my_data
- def get_port(address):
- """
- Extracts port from the address like 0.0.0.0:1019
- """
- if address is None:
- return None
- m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
- if m is not None and len(m.groups()) >= 2:
- return int(m.group(2))
- else:
- return None
- def is_secure_port(port):
- """
- Returns True if port is root-owned at *nix systems
- """
- if port is not None:
- return port < 1024
- else:
- return False
|