Controller.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import json
  19. import sys
  20. import os
  21. import socket
  22. import time
  23. import threading
  24. import urllib2
  25. import pprint
  26. from random import randint
  27. import hostname
  28. import security
  29. import ssl
  30. import AmbariConfig
  31. from ambari_agent.Heartbeat import Heartbeat
  32. from ambari_agent.Register import Register
  33. from ambari_agent.ActionQueue import ActionQueue
  34. from ambari_agent.FileCache import FileCache
  35. from ambari_agent.NetUtil import NetUtil
  36. from ambari_agent.LiveStatus import LiveStatus
  37. from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
  38. from ambari_agent.ClusterConfiguration import ClusterConfiguration
  39. from ambari_agent.RecoveryManager import RecoveryManager
  40. from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
  41. logger = logging.getLogger(__name__)
  42. AGENT_AUTO_RESTART_EXIT_CODE = 77
  43. class Controller(threading.Thread):
  44. def __init__(self, config, heartbeat_stop_callback = None, range=30):
  45. threading.Thread.__init__(self)
  46. logger.debug('Initializing Controller RPC thread.')
  47. if heartbeat_stop_callback is None:
  48. heartbeat_stop_callback = HeartbeatStopHandlers()
  49. self.lock = threading.Lock()
  50. self.safeMode = True
  51. self.credential = None
  52. self.config = config
  53. self.hostname = hostname.hostname(config)
  54. self.serverHostname = hostname.server_hostname(config)
  55. server_secured_url = 'https://' + self.serverHostname + \
  56. ':' + config.get('server', 'secured_url_port')
  57. self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
  58. self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
  59. self.componentsUrl = server_secured_url + '/agent/v1/components/'
  60. self.netutil = NetUtil(heartbeat_stop_callback)
  61. self.responseId = -1
  62. self.repeatRegistration = False
  63. self.isRegistered = False
  64. self.cachedconnect = None
  65. self.range = range
  66. self.hasMappedComponents = True
  67. # Event is used for synchronizing heartbeat iterations (to make possible
  68. # manual wait() interruption between heartbeats )
  69. self.heartbeat_stop_callback = heartbeat_stop_callback
  70. # List of callbacks that are called at agent registration
  71. self.registration_listeners = []
  72. self.recovery_manager = RecoveryManager()
  73. # pull config directory out of config
  74. cache_dir = config.get('agent', 'cache_dir')
  75. if cache_dir is None:
  76. cache_dir = '/var/lib/ambari-agent/cache'
  77. stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
  78. common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
  79. host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
  80. alerts_cache_dir = os.path.join(cache_dir, 'alerts')
  81. cluster_config_cache_dir = os.path.join(cache_dir, 'cluster_configuration')
  82. self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
  83. self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir,
  84. stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir,
  85. self.cluster_configuration, config)
  86. self.alert_scheduler_handler.start()
  87. def __del__(self):
  88. logger.info("Server connection disconnected.")
  89. def registerWithServer(self):
  90. """
  91. :return: returning from current method without setting self.isRegistered
  92. to True will lead to agent termination.
  93. """
  94. LiveStatus.SERVICES = []
  95. LiveStatus.CLIENT_COMPONENTS = []
  96. LiveStatus.COMPONENTS = []
  97. ret = {}
  98. while not self.isRegistered:
  99. try:
  100. data = json.dumps(self.register.build())
  101. prettyData = pprint.pformat(data)
  102. try:
  103. server_ip = socket.gethostbyname(self.hostname)
  104. logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData)
  105. except socket.error:
  106. logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)",
  107. self.hostname, prettyData)
  108. ret = self.sendRequest(self.registerUrl, data)
  109. prettyData = pprint.pformat(ret)
  110. logger.debug("Registration response is %s", prettyData)
  111. # exitstatus is a code of error which was raised on server side.
  112. # exitstatus = 0 (OK - Default)
  113. # exitstatus = 1 (Registration failed because different version of agent and server)
  114. exitstatus = 0
  115. if 'exitstatus' in ret.keys():
  116. exitstatus = int(ret['exitstatus'])
  117. if exitstatus == 1:
  118. # log - message, which will be printed to agents log
  119. if 'log' in ret.keys():
  120. log = ret['log']
  121. logger.error(log)
  122. self.isRegistered = False
  123. self.repeatRegistration = False
  124. return ret
  125. self.responseId = int(ret['responseId'])
  126. logger.info("Registration Successful (response id = %s)", self.responseId)
  127. self.isRegistered = True
  128. if 'statusCommands' in ret.keys():
  129. logger.debug("Got status commands on registration.")
  130. self.addToStatusQueue(ret['statusCommands'])
  131. else:
  132. self.hasMappedComponents = False
  133. # always update cached cluster configurations on registration
  134. self.cluster_configuration.update_configurations_from_heartbeat(ret)
  135. self.recovery_manager.update_configuration_from_registration(ret)
  136. self.config.update_configuration_from_registration(ret)
  137. logger.debug("Updated config:" + str(self.config))
  138. # always update alert definitions on registration
  139. self.alert_scheduler_handler.update_definitions(ret)
  140. except ssl.SSLError:
  141. self.repeatRegistration = False
  142. self.isRegistered = False
  143. return
  144. except Exception, ex:
  145. # try a reconnect only after a certain amount of random time
  146. delay = randint(0, self.range)
  147. logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
  148. logger.error("Error:" + str(ex))
  149. logger.warn(""" Sleeping for {0} seconds and then trying again """.format(delay,))
  150. time.sleep(delay)
  151. return ret
  152. def cancelCommandInQueue(self, commands):
  153. """ Remove from the queue commands, kill the process if it's in progress """
  154. if commands:
  155. try:
  156. self.actionQueue.cancel(commands)
  157. except Exception, err:
  158. logger.error("Exception occurred on commands cancel: %s", err.message)
  159. def addToQueue(self, commands):
  160. """Add to the queue for running the commands """
  161. """ Put the required actions into the Queue """
  162. """ Verify if the action is to reboot or not """
  163. if not commands:
  164. logger.debug("No commands received from %s", self.serverHostname)
  165. else:
  166. """Only add to the queue if not empty list """
  167. self.actionQueue.put(commands)
  168. def addToStatusQueue(self, commands):
  169. if not commands:
  170. logger.debug("No status commands received from %s", self.serverHostname)
  171. else:
  172. if not LiveStatus.SERVICES:
  173. self.updateComponents(commands[0]['clusterName'])
  174. self.actionQueue.put_status(commands)
  175. # For testing purposes
  176. DEBUG_HEARTBEAT_RETRIES = 0
  177. DEBUG_SUCCESSFULL_HEARTBEATS = 0
  178. DEBUG_STOP_HEARTBEATING = False
  179. def trigger_heartbeat(self):
  180. self.heartbeat_stop_callback.set_heartbeat()
  181. def heartbeatWithServer(self):
  182. self.DEBUG_HEARTBEAT_RETRIES = 0
  183. self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
  184. retry = False
  185. certVerifFailed = False
  186. hb_interval = self.config.get('heartbeat', 'state_interval')
  187. while not self.DEBUG_STOP_HEARTBEATING:
  188. try:
  189. if not retry:
  190. data = json.dumps(
  191. self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
  192. else:
  193. self.DEBUG_HEARTBEAT_RETRIES += 1
  194. if logger.isEnabledFor(logging.DEBUG):
  195. logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
  196. response = self.sendRequest(self.heartbeatUrl, data)
  197. exitStatus = 0
  198. if 'exitstatus' in response.keys():
  199. exitStatus = int(response['exitstatus'])
  200. if exitStatus != 0:
  201. raise Exception(response)
  202. serverId = int(response['responseId'])
  203. logger.info('Heartbeat response received (id = %s)', serverId)
  204. if 'hasMappedComponents' in response.keys():
  205. self.hasMappedComponents = response['hasMappedComponents'] is not False
  206. if 'hasPendingTasks' in response.keys():
  207. self.recovery_manager.set_paused(response['hasPendingTasks'])
  208. if 'registrationCommand' in response.keys():
  209. # check if the registration command is None. If none skip
  210. if response['registrationCommand'] is not None:
  211. logger.info("RegistrationCommand received - repeat agent registration")
  212. self.isRegistered = False
  213. self.repeatRegistration = True
  214. return
  215. if serverId != self.responseId + 1:
  216. logger.error("Error in responseId sequence - restarting")
  217. self.restartAgent()
  218. else:
  219. self.responseId = serverId
  220. # if the response contains configurations, update the in-memory and
  221. # disk-based configuration cache (execution and alert commands have this)
  222. self.cluster_configuration.update_configurations_from_heartbeat(response)
  223. response_keys = response.keys()
  224. if 'cancelCommands' in response_keys:
  225. self.cancelCommandInQueue(response['cancelCommands'])
  226. if 'executionCommands' in response_keys:
  227. execution_commands = response['executionCommands']
  228. self.recovery_manager.process_execution_commands(execution_commands)
  229. self.addToQueue(execution_commands)
  230. if 'statusCommands' in response_keys:
  231. # try storing execution command details and desired state
  232. self.recovery_manager.process_status_commands(response['statusCommands'])
  233. self.addToStatusQueue(response['statusCommands'])
  234. if not self.actionQueue.tasks_in_progress_or_pending():
  235. recovery_commands = self.recovery_manager.get_recovery_commands()
  236. for recovery_command in recovery_commands:
  237. logger.info("Adding recovery command %s for component %s",
  238. recovery_command['roleCommand'], recovery_command['role'])
  239. self.addToQueue([recovery_command])
  240. if 'alertDefinitionCommands' in response_keys:
  241. self.alert_scheduler_handler.update_definitions(response)
  242. if 'alertExecutionCommands' in response_keys:
  243. self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
  244. if "true" == response['restartAgent']:
  245. logger.error("Received the restartAgent command")
  246. self.restartAgent()
  247. else:
  248. logger.debug("No commands sent from %s", self.serverHostname)
  249. if retry:
  250. logger.info("Reconnected to %s", self.heartbeatUrl)
  251. retry = False
  252. certVerifFailed = False
  253. self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
  254. self.DEBUG_HEARTBEAT_RETRIES = 0
  255. self.heartbeat_stop_callback.reset_heartbeat()
  256. except ssl.SSLError:
  257. self.repeatRegistration=False
  258. self.isRegistered = False
  259. return
  260. except Exception, err:
  261. if "code" in err:
  262. logger.error(err.code)
  263. else:
  264. logException = False
  265. if logger.isEnabledFor(logging.DEBUG):
  266. logException = True
  267. exceptionMessage = str(err)
  268. errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage)
  269. if not retry:
  270. errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage)
  271. logger.error(errorMessage, exc_info=logException)
  272. if 'certificate verify failed' in str(err) and not certVerifFailed:
  273. logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
  274. certVerifFailed = True
  275. self.cachedconnect = None # Previous connection is broken now
  276. retry = True
  277. #randomize the heartbeat
  278. delay = randint(0, self.range)
  279. time.sleep(delay)
  280. # Sleep for some time
  281. timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
  282. - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
  283. if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
  284. # Stop loop when stop event received
  285. logger.info("Stop event received")
  286. self.DEBUG_STOP_HEARTBEATING=True
  287. def run(self):
  288. self.actionQueue = ActionQueue(self.config, controller=self)
  289. self.actionQueue.start()
  290. self.register = Register(self.config)
  291. self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
  292. opener = urllib2.build_opener()
  293. urllib2.install_opener(opener)
  294. while True:
  295. self.repeatRegistration = False
  296. self.registerAndHeartbeat()
  297. if not self.repeatRegistration:
  298. break
  299. def registerAndHeartbeat(self):
  300. registerResponse = self.registerWithServer()
  301. if "response" in registerResponse:
  302. message = registerResponse["response"]
  303. logger.info("Registration response from %s was %s", self.serverHostname, message)
  304. if self.isRegistered:
  305. # Clearing command queue to stop executing "stale" commands
  306. # after registration
  307. logger.info('Resetting ActionQueue...')
  308. self.actionQueue.reset()
  309. # Process callbacks
  310. for callback in self.registration_listeners:
  311. callback()
  312. time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
  313. self.heartbeatWithServer()
  314. def restartAgent(self):
  315. sys.exit(AGENT_AUTO_RESTART_EXIT_CODE)
  316. def sendRequest(self, url, data):
  317. response = None
  318. try:
  319. if self.cachedconnect is None: # Lazy initialization
  320. self.cachedconnect = security.CachedHTTPSConnection(self.config)
  321. req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
  322. response = self.cachedconnect.request(req)
  323. return json.loads(response)
  324. except Exception, exception:
  325. if response is None:
  326. raise IOError('Request to {0} failed due to {1}'.format(url, str(exception)))
  327. else:
  328. raise IOError('Response parsing failed! Request data: ' + str(data)
  329. + '; Response: ' + str(response))
  330. def updateComponents(self, cluster_name):
  331. logger.debug("Updating components map of cluster " + cluster_name)
  332. # May throw IOError on server connection error
  333. response = self.sendRequest(self.componentsUrl + cluster_name, None)
  334. logger.debug("Response from %s was %s", self.serverHostname, str(response))
  335. for service, components in response['components'].items():
  336. LiveStatus.SERVICES.append(service)
  337. for component, category in components.items():
  338. if category == 'CLIENT':
  339. LiveStatus.CLIENT_COMPONENTS.append({"serviceName": service, "componentName": component})
  340. else:
  341. LiveStatus.COMPONENTS.append({"serviceName": service, "componentName": component})
  342. logger.debug("Components map updated")
  343. logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES))
  344. logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
  345. logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
  346. def main(argv=None):
  347. # Allow Ctrl-C
  348. formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
  349. %(message)s")
  350. stream_handler = logging.StreamHandler()
  351. stream_handler.setFormatter(formatter)
  352. logger.addHandler(stream_handler)
  353. logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
  354. config = AmbariConfig.config
  355. heartbeat_stop_callback = bind_signal_handlers()
  356. collector = Controller(config, heartbeat_stop_callback)
  357. collector.start()
  358. collector.run()
  359. if __name__ == '__main__':
  360. main()