123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import logging.handlers
- import signal
- from optparse import OptionParser
- import sys
- import traceback
- import getpass
- import os
- import time
- import platform
- import ConfigParser
- import ProcessHelper
- from Controller import Controller
- import AmbariConfig
- from NetUtil import NetUtil
- from PingPortListener import PingPortListener
- import hostname
- from DataCleaner import DataCleaner
- import socket
- from ambari_agent import shell
- logger = logging.getLogger()
- formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
- agentPid = os.getpid()
- config = AmbariConfig.AmbariConfig()
- configFile = config.CONFIG_FILE
- two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY
- IS_WINDOWS = platform.system() == "Windows"
- if IS_WINDOWS:
- from HeartbeatHandlers_windows import bind_signal_handlers
- else:
- from HeartbeatStopHandler_linux import bind_signal_handlers
- from HeartbeatStopHandler_linux import signal_handler
- from HeartbeatStopHandler_linux import debug
- def setup_logging(verbose):
- formatter = logging.Formatter(formatstr)
- rotateLog = logging.handlers.RotatingFileHandler(AmbariConfig.AmbariConfig.getLogFile(), "a", 10000000, 25)
- rotateLog.setFormatter(formatter)
- logger.addHandler(rotateLog)
- if verbose:
- logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
- logger.setLevel(logging.DEBUG)
- logger.info("loglevel=logging.DEBUG")
- else:
- logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
- logger.setLevel(logging.INFO)
- logger.info("loglevel=logging.INFO")
- def update_log_level(config):
- # Setting loglevel based on config file
- try:
- loglevel = config.get('agent', 'loglevel')
- if loglevel is not None:
- if loglevel == 'DEBUG':
- logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
- logger.setLevel(logging.DEBUG)
- logger.info("Newloglevel=logging.DEBUG")
- else:
- logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
- logger.setLevel(logging.INFO)
- logger.debug("Newloglevel=logging.INFO")
- except Exception, err:
- logger.info("Default loglevel=DEBUG")
- # ToDo: move that function inside AmbariConfig
- def resolve_ambari_config():
- global config
- configPath = os.path.abspath(AmbariConfig.AmbariConfig.getConfigFile())
- try:
- if os.path.exists(configPath):
- config.read(configPath)
- else:
- raise Exception("No config found at {0}, use default".format(configPath))
- except Exception, err:
- logger.warn(err)
- def perform_prestart_checks(expected_hostname):
- # Check if current hostname is equal to expected one (got from the server
- # during bootstrap.
- global config
- if expected_hostname is not None:
- current_hostname = hostname.hostname(config)
- if current_hostname != expected_hostname:
- print("Determined hostname does not match expected. Please check agent "
- "log for details")
- msg = "Ambari agent machine hostname ({0}) does not match expected ambari " \
- "server hostname ({1}). Aborting registration. Please check hostname, " \
- "hostname -f and /etc/hosts file to confirm your " \
- "hostname is setup correctly".format(current_hostname, expected_hostname)
- logger.error(msg)
- sys.exit(1)
- # Check if there is another instance running
- if os.path.isfile(ProcessHelper.pidfile) and not IS_WINDOWS:
- print("%s already exists, exiting" % ProcessHelper.pidfile)
- sys.exit(1)
- # check if ambari prefix exists
- elif config.has_option('agent', 'prefix') and not os.path.isdir(os.path.abspath(config.get('agent', 'prefix'))):
- msg = "Ambari prefix dir %s does not exists, can't continue" \
- % config.get("agent", "prefix")
- logger.error(msg)
- print(msg)
- sys.exit(1)
- elif not config.has_option('agent', 'prefix'):
- msg = "Ambari prefix dir %s not configured, can't continue"
- logger.error(msg)
- print(msg)
- sys.exit(1)
- def daemonize():
- # Daemonize current instance of Ambari Agent
- # Currently daemonization is done via /usr/sbin/ambari-agent script (nohup)
- # and agent only dumps self pid to file
- if not os.path.exists(ProcessHelper.piddir):
- os.makedirs(ProcessHelper.piddir, 0755)
- pid = str(os.getpid())
- file(ProcessHelper.pidfile, 'w').write(pid)
- def stop_agent():
- # stop existing Ambari agent
- pid = -1
- try:
- f = open(ProcessHelper.pidfile, 'r')
- pid = f.read()
- pid = int(pid)
- f.close()
- kill_process_group(pid, signal.SIGTERM)
- time.sleep(5)
- if os.path.exists(ProcessHelper.pidfile):
- raise Exception("PID file still exists.")
- os._exit(0)
- except Exception:
- if pid == -1:
- print ("Agent process is not running")
- else:
- kill_process_group(pid, signal.SIGKILL)
- os._exit(1)
-
- def kill_process_group(pid, sig):
- shell.shellRunner().run("sudo kill -{0} -$(ps -o pgid= {1} | grep -o [0-9]*)".format(sig, pid))
- def reset_agent(options):
- try:
- # update agent config file
- agent_config = ConfigParser.ConfigParser()
- agent_config.read(configFile)
- server_host = agent_config.get('server', 'hostname')
- new_host = options[2]
- if new_host is not None and server_host != new_host:
- print "Updating server host from " + server_host + " to " + new_host
- agent_config.set('server', 'hostname', new_host)
- with (open(configFile, "wb")) as new_agent_config:
- agent_config.write(new_agent_config)
- # clear agent certs
- agent_keysdir = agent_config.get('security', 'keysdir')
- print "Removing Agent certificates..."
- for root, dirs, files in os.walk(agent_keysdir, topdown=False):
- for name in files:
- os.remove(os.path.join(root, name))
- for name in dirs:
- os.rmdir(os.path.join(root, name))
- except Exception, err:
- print("A problem occurred while trying to reset the agent: " + str(err))
- os._exit(1)
- os._exit(0)
- # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
- # we need this for windows os, where no sigterm available
- def main(heartbeat_stop_callback=None):
- global config
- parser = OptionParser()
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose log output", default=False)
- parser.add_option("-e", "--expected-hostname", dest="expected_hostname", action="store",
- help="expected hostname of current host. If hostname differs, agent will fail", default=None)
- (options, args) = parser.parse_args()
- expected_hostname = options.expected_hostname
- current_user = getpass.getuser()
- setup_logging(options.verbose)
-
- default_cfg = {'agent': {'prefix': '/home/ambari'}}
- config.load(default_cfg)
- bind_signal_handlers(agentPid)
- if (len(sys.argv) > 1) and sys.argv[1] == 'stop':
- stop_agent()
- if (len(sys.argv) > 2) and sys.argv[1] == 'reset':
- reset_agent(sys.argv)
- # Check for ambari configuration file.
- resolve_ambari_config()
- # Starting data cleanup daemon
- data_cleaner = None
- if config.has_option('agent', 'data_cleanup_interval') and int(config.get('agent','data_cleanup_interval')) > 0:
- data_cleaner = DataCleaner(config)
- data_cleaner.start()
- perform_prestart_checks(expected_hostname)
- if not IS_WINDOWS:
- daemonize()
- # Starting ping port listener
- try:
- ping_port_listener = PingPortListener(config)
- except Exception as ex:
- err_message = "Failed to start ping port listener of: " + str(ex)
- logger.error(err_message)
- sys.stderr.write(err_message)
- sys.exit(1)
- ping_port_listener.start()
- update_log_level(config)
- server_hostname = config.get('server', 'hostname')
- server_url = config.get_api_url()
- try:
- server_ip = socket.gethostbyname(server_hostname)
- logger.info('Connecting to Ambari server at %s (%s)', server_url, server_ip)
- except socket.error:
- logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname)
- # Wait until server is reachable
- netutil = NetUtil(heartbeat_stop_callback)
- retries, connected = netutil.try_to_connect(server_url, -1, logger)
- # Ambari Agent was stopped using stop event
- if connected:
- # Launch Controller communication
- controller = Controller(config, heartbeat_stop_callback)
- controller.start()
- controller.join()
- if not IS_WINDOWS:
- stop_agent()
- logger.info("finished")
- if __name__ == "__main__":
- heartbeat_stop_callback = bind_signal_handlers(agentPid)
- main(heartbeat_stop_callback)
|