Controller.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import signal
  19. import json
  20. import sys
  21. import platform
  22. import os
  23. import socket
  24. import time
  25. import threading
  26. import urllib2
  27. import pprint
  28. from random import randint
  29. import hostname
  30. import security
  31. import ssl
  32. import AmbariConfig
  33. from Heartbeat import Heartbeat
  34. from Register import Register
  35. from ActionQueue import ActionQueue
  36. from FileCache import FileCache
  37. from NetUtil import NetUtil
  38. from LiveStatus import LiveStatus
  39. from AlertSchedulerHandler import AlertSchedulerHandler
  40. from HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
  41. logger = logging.getLogger()
  42. AGENT_AUTO_RESTART_EXIT_CODE = 77
  43. class Controller(threading.Thread):
  44. def __init__(self, config, heartbeat_stop_callback = None, range=30):
  45. threading.Thread.__init__(self)
  46. logger.debug('Initializing Controller RPC thread.')
  47. if heartbeat_stop_callback is None:
  48. heartbeat_stop_callback = HeartbeatStopHandlers()
  49. self.lock = threading.Lock()
  50. self.safeMode = True
  51. self.credential = None
  52. self.config = config
  53. self.hostname = hostname.hostname(config)
  54. self.serverHostname = config.get('server', 'hostname')
  55. server_secured_url = 'https://' + self.serverHostname + \
  56. ':' + config.get('server', 'secured_url_port')
  57. self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
  58. self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
  59. self.componentsUrl = server_secured_url + '/agent/v1/components/'
  60. self.netutil = NetUtil(heartbeat_stop_callback)
  61. self.responseId = -1
  62. self.repeatRegistration = False
  63. self.isRegistered = False
  64. self.cachedconnect = None
  65. self.range = range
  66. self.hasMappedComponents = True
  67. # Event is used for synchronizing heartbeat iterations (to make possible
  68. # manual wait() interruption between heartbeats )
  69. self.heartbeat_stop_callback = heartbeat_stop_callback
  70. # List of callbacks that are called at agent registration
  71. self.registration_listeners = []
  72. # pull config directory out of config
  73. cache_dir = config.get('agent', 'cache_dir')
  74. if cache_dir is None:
  75. cache_dir = '/var/lib/ambari-agent/cache'
  76. stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
  77. common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
  78. host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
  79. alerts_cache_dir = os.path.join(cache_dir, 'alerts')
  80. self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir,
  81. stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir, config)
  82. def __del__(self):
  83. logger.info("Server connection disconnected.")
  84. pass
  85. def registerWithServer(self):
  86. """
  87. :return: returning from current method without setting self.isRegistered
  88. to True will lead to agent termination.
  89. """
  90. LiveStatus.SERVICES = []
  91. LiveStatus.CLIENT_COMPONENTS = []
  92. LiveStatus.COMPONENTS = []
  93. ret = {}
  94. while not self.isRegistered:
  95. try:
  96. data = json.dumps(self.register.build())
  97. prettyData = pprint.pformat(data)
  98. try:
  99. server_ip = socket.gethostbyname(self.hostname)
  100. logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData)
  101. except socket.error:
  102. logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)",
  103. self.hostname, prettyData)
  104. ret = self.sendRequest(self.registerUrl, data)
  105. # exitstatus is a code of error which was rised on server side.
  106. # exitstatus = 0 (OK - Default)
  107. # exitstatus = 1 (Registration failed because different version of agent and server)
  108. exitstatus = 0
  109. if 'exitstatus' in ret.keys():
  110. exitstatus = int(ret['exitstatus'])
  111. if exitstatus == 1:
  112. # log - message, which will be printed to agents log
  113. if 'log' in ret.keys():
  114. log = ret['log']
  115. logger.error(log)
  116. self.isRegistered = False
  117. self.repeatRegistration = False
  118. return ret
  119. logger.info("Registration Successful (response=%s)", pprint.pformat(ret))
  120. self.responseId = int(ret['responseId'])
  121. self.isRegistered = True
  122. if 'statusCommands' in ret.keys():
  123. logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']))
  124. self.addToStatusQueue(ret['statusCommands'])
  125. pass
  126. else:
  127. self.hasMappedComponents = False
  128. if 'alertDefinitionCommands' in ret.keys():
  129. logger.info("Got alert definition update on registration " + pprint.pformat(ret['alertDefinitionCommands']))
  130. self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands'])
  131. pass
  132. pass
  133. except ssl.SSLError:
  134. self.repeatRegistration = False
  135. self.isRegistered = False
  136. return
  137. except Exception:
  138. # try a reconnect only after a certain amount of random time
  139. delay = randint(0, self.range)
  140. logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
  141. """ Sleeping for {0} seconds and then retrying again """.format(delay)
  142. time.sleep(delay)
  143. pass
  144. return ret
  145. def cancelCommandInQueue(self, commands):
  146. """ Remove from the queue commands, kill the process if it's in progress """
  147. if commands:
  148. try:
  149. self.actionQueue.cancel(commands)
  150. except Exception, err:
  151. logger.error("Exception occurred on commands cancel: %s", err.message)
  152. pass
  153. pass
  154. def addToQueue(self, commands):
  155. """Add to the queue for running the commands """
  156. """ Put the required actions into the Queue """
  157. """ Verify if the action is to reboot or not """
  158. if not commands:
  159. logger.debug("No commands received from %s", self.serverHostname)
  160. else:
  161. """Only add to the queue if not empty list """
  162. self.actionQueue.put(commands)
  163. pass
  164. def addToStatusQueue(self, commands):
  165. if not commands:
  166. logger.debug("No status commands received from %s", self.serverHostname)
  167. else:
  168. if not LiveStatus.SERVICES:
  169. self.updateComponents(commands[0]['clusterName'])
  170. self.actionQueue.put_status(commands)
  171. pass
  172. # For testing purposes
  173. DEBUG_HEARTBEAT_RETRIES = 0
  174. DEBUG_SUCCESSFULL_HEARTBEATS = 0
  175. DEBUG_STOP_HEARTBEATING = False
  176. def trigger_heartbeat(self):
  177. self.heartbeat_stop_callback.set_heartbeat()
  178. def heartbeatWithServer(self):
  179. self.DEBUG_HEARTBEAT_RETRIES = 0
  180. self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
  181. retry = False
  182. certVerifFailed = False
  183. hb_interval = self.config.get('heartbeat', 'state_interval')
  184. while not self.DEBUG_STOP_HEARTBEATING:
  185. try:
  186. if not retry:
  187. data = json.dumps(
  188. self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
  189. pass
  190. else:
  191. self.DEBUG_HEARTBEAT_RETRIES += 1
  192. if logger.isEnabledFor(logging.DEBUG):
  193. logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
  194. response = self.sendRequest(self.heartbeatUrl, data)
  195. exitStatus = 0
  196. if 'exitstatus' in response.keys():
  197. exitStatus = int(response['exitstatus'])
  198. if exitStatus != 0:
  199. raise Exception(response)
  200. serverId = int(response['responseId'])
  201. if logger.isEnabledFor(logging.DEBUG):
  202. logger.debug('Heartbeat response (id = %s): %s', serverId, pprint.pformat(response))
  203. else:
  204. logger.info('Heartbeat response received (id = %s)', serverId)
  205. if 'hasMappedComponents' in response.keys():
  206. self.hasMappedComponents = response['hasMappedComponents'] is not False
  207. if 'registrationCommand' in response.keys():
  208. # check if the registration command is None. If none skip
  209. if response['registrationCommand'] is not None:
  210. logger.info("RegistrationCommand received - repeat agent registration")
  211. self.isRegistered = False
  212. self.repeatRegistration = True
  213. return
  214. if serverId != self.responseId + 1:
  215. logger.error("Error in responseId sequence - restarting")
  216. self.restartAgent()
  217. else:
  218. self.responseId = serverId
  219. response_keys = response.keys()
  220. if 'cancelCommands' in response_keys:
  221. self.cancelCommandInQueue(response['cancelCommands'])
  222. if 'executionCommands' in response_keys:
  223. execution_commands = response['executionCommands']
  224. self.addToQueue(execution_commands)
  225. self.alert_scheduler_handler.update_configurations(execution_commands)
  226. if 'statusCommands' in response_keys:
  227. self.addToStatusQueue(response['statusCommands'])
  228. if 'alertDefinitionCommands' in response_keys:
  229. self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
  230. if 'alertExecutionCommands' in response_keys:
  231. self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
  232. if "true" == response['restartAgent']:
  233. logger.error("Received the restartAgent command")
  234. self.restartAgent()
  235. else:
  236. logger.info("No commands sent from %s", self.serverHostname)
  237. if retry:
  238. logger.info("Reconnected to %s", self.heartbeatUrl)
  239. retry = False
  240. certVerifFailed = False
  241. self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
  242. self.DEBUG_HEARTBEAT_RETRIES = 0
  243. self.heartbeat_stop_callback.reset_heartbeat()
  244. except ssl.SSLError:
  245. self.repeatRegistration=False
  246. self.isRegistered = False
  247. return
  248. except Exception, err:
  249. if "code" in err:
  250. logger.error(err.code)
  251. else:
  252. logException = False
  253. if logger.isEnabledFor(logging.DEBUG):
  254. logException = True
  255. exceptionMessage = str(err)
  256. errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage)
  257. if not retry:
  258. errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage)
  259. logger.error(errorMessage, exc_info=logException)
  260. if 'certificate verify failed' in str(err) and not certVerifFailed:
  261. logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
  262. certVerifFailed = True
  263. self.cachedconnect = None # Previous connection is broken now
  264. retry = True
  265. #randomize the heartbeat
  266. delay = randint(0, self.range)
  267. time.sleep(delay)
  268. # Sleep for some time
  269. timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
  270. - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
  271. if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
  272. # Stop loop when stop event received
  273. logger.info("Stop event received")
  274. self.DEBUG_STOP_HEARTBEATING=True
  275. pass
  276. def run(self):
  277. self.actionQueue = ActionQueue(self.config, controller=self)
  278. self.actionQueue.start()
  279. self.register = Register(self.config)
  280. self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
  281. opener = urllib2.build_opener()
  282. urllib2.install_opener(opener)
  283. while True:
  284. self.repeatRegistration = False
  285. self.registerAndHeartbeat()
  286. if not self.repeatRegistration:
  287. break
  288. pass
  289. def registerAndHeartbeat(self):
  290. registerResponse = self.registerWithServer()
  291. message = registerResponse['response']
  292. logger.info("Registration response from %s was %s", self.serverHostname, message)
  293. self.alert_scheduler_handler.start()
  294. if self.isRegistered:
  295. # Clearing command queue to stop executing "stale" commands
  296. # after registration
  297. logger.info('Resetting ActionQueue...')
  298. self.actionQueue.reset()
  299. # Process callbacks
  300. for callback in self.registration_listeners:
  301. callback()
  302. time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
  303. self.heartbeatWithServer()
  304. def restartAgent(self):
  305. os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
  306. pass
  307. def sendRequest(self, url, data):
  308. response = None
  309. try:
  310. if self.cachedconnect is None: # Lazy initialization
  311. self.cachedconnect = security.CachedHTTPSConnection(self.config)
  312. req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
  313. response = self.cachedconnect.request(req)
  314. return json.loads(response)
  315. except Exception, exception:
  316. if response is None:
  317. raise IOError('Request to {0} failed due to {1}'.format(url, str(exception)))
  318. else:
  319. raise IOError('Response parsing failed! Request data: ' + str(data)
  320. + '; Response: ' + str(response))
  321. def updateComponents(self, cluster_name):
  322. logger.info("Updating components map of cluster " + cluster_name)
  323. # May throw IOError on server connection error
  324. response = self.sendRequest(self.componentsUrl + cluster_name, None)
  325. logger.debug("Response from %s was %s", self.serverHostname, str(response))
  326. for service, components in response['components'].items():
  327. LiveStatus.SERVICES.append(service)
  328. for component, category in components.items():
  329. if category == 'CLIENT':
  330. LiveStatus.CLIENT_COMPONENTS.append({"serviceName": service, "componentName": component})
  331. else:
  332. LiveStatus.COMPONENTS.append({"serviceName": service, "componentName": component})
  333. logger.info("Components map updated")
  334. logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES))
  335. logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
  336. logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
  337. pass
  338. def main(argv=None):
  339. # Allow Ctrl-C
  340. logger.setLevel(logging.INFO)
  341. formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
  342. %(message)s")
  343. stream_handler = logging.StreamHandler()
  344. stream_handler.setFormatter(formatter)
  345. logger.addHandler(stream_handler)
  346. logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
  347. config = AmbariConfig.config
  348. heartbeat_stop_callback = bind_signal_handlers()
  349. collector = Controller(config, heartbeat_stop_callback)
  350. collector.start()
  351. collector.run()
  352. if __name__ == '__main__':
  353. main()