123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import logging
- import signal
- import json
- import sys
- import platform
- import os
- import socket
- import time
- import threading
- import urllib2
- import pprint
- from random import randint
- import hostname
- import security
- import ssl
- import AmbariConfig
- from Heartbeat import Heartbeat
- from Register import Register
- from ActionQueue import ActionQueue
- from FileCache import FileCache
- from NetUtil import NetUtil
- from LiveStatus import LiveStatus
- from AlertSchedulerHandler import AlertSchedulerHandler
- from HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
- logger = logging.getLogger()
- AGENT_AUTO_RESTART_EXIT_CODE = 77
- class Controller(threading.Thread):
- def __init__(self, config, heartbeat_stop_callback = None, range=30):
- threading.Thread.__init__(self)
- logger.debug('Initializing Controller RPC thread.')
- if heartbeat_stop_callback is None:
- heartbeat_stop_callback = HeartbeatStopHandlers()
- self.lock = threading.Lock()
- self.safeMode = True
- self.credential = None
- self.config = config
- self.hostname = hostname.hostname(config)
- self.serverHostname = config.get('server', 'hostname')
- server_secured_url = 'https://' + self.serverHostname + \
- ':' + config.get('server', 'secured_url_port')
- self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
- self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
- self.componentsUrl = server_secured_url + '/agent/v1/components/'
- self.netutil = NetUtil(heartbeat_stop_callback)
- self.responseId = -1
- self.repeatRegistration = False
- self.isRegistered = False
- self.cachedconnect = None
- self.range = range
- self.hasMappedComponents = True
- # Event is used for synchronizing heartbeat iterations (to make possible
- # manual wait() interruption between heartbeats )
- self.heartbeat_stop_callback = heartbeat_stop_callback
- # List of callbacks that are called at agent registration
- self.registration_listeners = []
- # pull config directory out of config
- cache_dir = config.get('agent', 'cache_dir')
- if cache_dir is None:
- cache_dir = '/var/lib/ambari-agent/cache'
- stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
- common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
- host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
- alerts_cache_dir = os.path.join(cache_dir, 'alerts')
-
- self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir,
- stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir, config)
- def __del__(self):
- logger.info("Server connection disconnected.")
- pass
- def registerWithServer(self):
- """
- :return: returning from current method without setting self.isRegistered
- to True will lead to agent termination.
- """
- LiveStatus.SERVICES = []
- LiveStatus.CLIENT_COMPONENTS = []
- LiveStatus.COMPONENTS = []
- ret = {}
- while not self.isRegistered:
- try:
- data = json.dumps(self.register.build())
- prettyData = pprint.pformat(data)
- try:
- server_ip = socket.gethostbyname(self.hostname)
- logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData)
- except socket.error:
- logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)",
- self.hostname, prettyData)
- ret = self.sendRequest(self.registerUrl, data)
- # exitstatus is a code of error which was rised on server side.
- # exitstatus = 0 (OK - Default)
- # exitstatus = 1 (Registration failed because different version of agent and server)
- exitstatus = 0
- if 'exitstatus' in ret.keys():
- exitstatus = int(ret['exitstatus'])
- if exitstatus == 1:
- # log - message, which will be printed to agents log
- if 'log' in ret.keys():
- log = ret['log']
- logger.error(log)
- self.isRegistered = False
- self.repeatRegistration = False
- return ret
- logger.info("Registration Successful (response=%s)", pprint.pformat(ret))
- self.responseId = int(ret['responseId'])
- self.isRegistered = True
- if 'statusCommands' in ret.keys():
- logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']))
- self.addToStatusQueue(ret['statusCommands'])
- pass
- else:
- self.hasMappedComponents = False
- if 'alertDefinitionCommands' in ret.keys():
- logger.info("Got alert definition update on registration " + pprint.pformat(ret['alertDefinitionCommands']))
- self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands'])
- pass
- pass
- except ssl.SSLError:
- self.repeatRegistration = False
- self.isRegistered = False
- return
- except Exception:
- # try a reconnect only after a certain amount of random time
- delay = randint(0, self.range)
- logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
- """ Sleeping for {0} seconds and then retrying again """.format(delay)
- time.sleep(delay)
- pass
- return ret
- def cancelCommandInQueue(self, commands):
- """ Remove from the queue commands, kill the process if it's in progress """
- if commands:
- try:
- self.actionQueue.cancel(commands)
- except Exception, err:
- logger.error("Exception occurred on commands cancel: %s", err.message)
- pass
- pass
- def addToQueue(self, commands):
- """Add to the queue for running the commands """
- """ Put the required actions into the Queue """
- """ Verify if the action is to reboot or not """
- if not commands:
- logger.debug("No commands received from %s", self.serverHostname)
- else:
- """Only add to the queue if not empty list """
- self.actionQueue.put(commands)
- pass
- def addToStatusQueue(self, commands):
- if not commands:
- logger.debug("No status commands received from %s", self.serverHostname)
- else:
- if not LiveStatus.SERVICES:
- self.updateComponents(commands[0]['clusterName'])
- self.actionQueue.put_status(commands)
- pass
- # For testing purposes
- DEBUG_HEARTBEAT_RETRIES = 0
- DEBUG_SUCCESSFULL_HEARTBEATS = 0
- DEBUG_STOP_HEARTBEATING = False
- def trigger_heartbeat(self):
- self.heartbeat_stop_callback.set_heartbeat()
- def heartbeatWithServer(self):
- self.DEBUG_HEARTBEAT_RETRIES = 0
- self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
- retry = False
- certVerifFailed = False
- hb_interval = self.config.get('heartbeat', 'state_interval')
- while not self.DEBUG_STOP_HEARTBEATING:
- try:
- if not retry:
- data = json.dumps(
- self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
- pass
- else:
- self.DEBUG_HEARTBEAT_RETRIES += 1
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
- response = self.sendRequest(self.heartbeatUrl, data)
- exitStatus = 0
- if 'exitstatus' in response.keys():
- exitStatus = int(response['exitstatus'])
- if exitStatus != 0:
- raise Exception(response)
- serverId = int(response['responseId'])
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug('Heartbeat response (id = %s): %s', serverId, pprint.pformat(response))
- else:
- logger.info('Heartbeat response received (id = %s)', serverId)
- if 'hasMappedComponents' in response.keys():
- self.hasMappedComponents = response['hasMappedComponents'] is not False
- if 'registrationCommand' in response.keys():
- # check if the registration command is None. If none skip
- if response['registrationCommand'] is not None:
- logger.info("RegistrationCommand received - repeat agent registration")
- self.isRegistered = False
- self.repeatRegistration = True
- return
- if serverId != self.responseId + 1:
- logger.error("Error in responseId sequence - restarting")
- self.restartAgent()
- else:
- self.responseId = serverId
- response_keys = response.keys()
- if 'cancelCommands' in response_keys:
- self.cancelCommandInQueue(response['cancelCommands'])
- if 'executionCommands' in response_keys:
- execution_commands = response['executionCommands']
- self.addToQueue(execution_commands)
- self.alert_scheduler_handler.update_configurations(execution_commands)
- if 'statusCommands' in response_keys:
- self.addToStatusQueue(response['statusCommands'])
- if 'alertDefinitionCommands' in response_keys:
- self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
- if 'alertExecutionCommands' in response_keys:
- self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
- if "true" == response['restartAgent']:
- logger.error("Received the restartAgent command")
- self.restartAgent()
- else:
- logger.info("No commands sent from %s", self.serverHostname)
- if retry:
- logger.info("Reconnected to %s", self.heartbeatUrl)
- retry = False
- certVerifFailed = False
- self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
- self.DEBUG_HEARTBEAT_RETRIES = 0
- self.heartbeat_stop_callback.reset_heartbeat()
- except ssl.SSLError:
- self.repeatRegistration=False
- self.isRegistered = False
- return
- except Exception, err:
- if "code" in err:
- logger.error(err.code)
- else:
- logException = False
- if logger.isEnabledFor(logging.DEBUG):
- logException = True
- exceptionMessage = str(err)
- errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage)
- if not retry:
- errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage)
- logger.error(errorMessage, exc_info=logException)
- if 'certificate verify failed' in str(err) and not certVerifFailed:
- logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
- certVerifFailed = True
- self.cachedconnect = None # Previous connection is broken now
- retry = True
- #randomize the heartbeat
- delay = randint(0, self.range)
- time.sleep(delay)
- # Sleep for some time
- timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
- if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
- # Stop loop when stop event received
- logger.info("Stop event received")
- self.DEBUG_STOP_HEARTBEATING=True
- pass
- def run(self):
- self.actionQueue = ActionQueue(self.config, controller=self)
- self.actionQueue.start()
- self.register = Register(self.config)
- self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
- opener = urllib2.build_opener()
- urllib2.install_opener(opener)
- while True:
- self.repeatRegistration = False
- self.registerAndHeartbeat()
- if not self.repeatRegistration:
- break
- pass
- def registerAndHeartbeat(self):
- registerResponse = self.registerWithServer()
- message = registerResponse['response']
- logger.info("Registration response from %s was %s", self.serverHostname, message)
- self.alert_scheduler_handler.start()
- if self.isRegistered:
- # Clearing command queue to stop executing "stale" commands
- # after registration
- logger.info('Resetting ActionQueue...')
- self.actionQueue.reset()
- # Process callbacks
- for callback in self.registration_listeners:
- callback()
- time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
- self.heartbeatWithServer()
- def restartAgent(self):
- os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
- pass
- def sendRequest(self, url, data):
- response = None
- try:
- if self.cachedconnect is None: # Lazy initialization
- self.cachedconnect = security.CachedHTTPSConnection(self.config)
- req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
- response = self.cachedconnect.request(req)
- return json.loads(response)
- except Exception, exception:
- if response is None:
- raise IOError('Request to {0} failed due to {1}'.format(url, str(exception)))
- else:
- raise IOError('Response parsing failed! Request data: ' + str(data)
- + '; Response: ' + str(response))
- def updateComponents(self, cluster_name):
- logger.info("Updating components map of cluster " + cluster_name)
- # May throw IOError on server connection error
- response = self.sendRequest(self.componentsUrl + cluster_name, None)
- logger.debug("Response from %s was %s", self.serverHostname, str(response))
- for service, components in response['components'].items():
- LiveStatus.SERVICES.append(service)
- for component, category in components.items():
- if category == 'CLIENT':
- LiveStatus.CLIENT_COMPONENTS.append({"serviceName": service, "componentName": component})
- else:
- LiveStatus.COMPONENTS.append({"serviceName": service, "componentName": component})
- logger.info("Components map updated")
- logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES))
- logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
- logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
- pass
- def main(argv=None):
- # Allow Ctrl-C
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
- %(message)s")
- stream_handler = logging.StreamHandler()
- stream_handler.setFormatter(formatter)
- logger.addHandler(stream_handler)
- logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
- config = AmbariConfig.config
- heartbeat_stop_callback = bind_signal_handlers()
- collector = Controller(config, heartbeat_stop_callback)
- collector.start()
- collector.run()
- if __name__ == '__main__':
- main()
|