Controller.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import signal
  19. import json
  20. import sys
  21. import os
  22. import time
  23. import threading
  24. import urllib2
  25. import pprint
  26. from random import randint
  27. import hostname
  28. import AmbariConfig
  29. import ProcessHelper
  30. from Heartbeat import Heartbeat
  31. from Register import Register
  32. from ActionQueue import ActionQueue
  33. import security
  34. from NetUtil import NetUtil
  35. import ssl
  36. logger = logging.getLogger()
  37. AGENT_AUTO_RESTART_EXIT_CODE = 77
  38. class Controller(threading.Thread):
  39. def __init__(self, config, range=30):
  40. threading.Thread.__init__(self)
  41. logger.debug('Initializing Controller RPC thread.')
  42. self.lock = threading.Lock()
  43. self.safeMode = True
  44. self.credential = None
  45. self.config = config
  46. self.hostname = hostname.hostname()
  47. server_secured_url = 'https://' + config.get('server', 'hostname') + \
  48. ':' + config.get('server', 'secured_url_port')
  49. self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
  50. self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
  51. self.netutil = NetUtil()
  52. self.responseId = -1
  53. self.repeatRegistration = False
  54. self.isRegistered = False
  55. self.cachedconnect = None
  56. self.range = range
  57. self.hasMappedComponents = True
  58. # Event is used for synchronizing heartbeat iterations (to make possible
  59. # manual wait() interruption between heartbeats )
  60. self.heartbeat_wait_event = threading.Event()
  61. # List of callbacks that are called at agent registration
  62. self.registration_listeners = []
  63. def __del__(self):
  64. logger.info("Server connection disconnected.")
  65. pass
  66. def registerWithServer(self):
  67. id = -1
  68. ret = {}
  69. while not self.isRegistered:
  70. try:
  71. data = json.dumps(self.register.build(id))
  72. logger.info("Registering with the server " + pprint.pformat(data))
  73. response = self.sendRequest(self.registerUrl, data)
  74. ret = json.loads(response)
  75. exitstatus = 0
  76. # exitstatus is a code of error which was rised on server side.
  77. # exitstatus = 0 (OK - Default)
  78. # exitstatus = 1 (Registration failed because
  79. # different version of agent and server)
  80. if 'exitstatus' in ret.keys():
  81. exitstatus = int(ret['exitstatus'])
  82. # log - message, which will be printed to agents log
  83. if 'log' in ret.keys():
  84. log = ret['log']
  85. if exitstatus == 1:
  86. logger.error(log)
  87. self.isRegistered = False
  88. self.repeatRegistration=False
  89. return ret
  90. logger.info("Registered with the server with " + pprint.pformat(ret))
  91. print("Registered with the server")
  92. self.responseId= int(ret['responseId'])
  93. self.isRegistered = True
  94. if 'statusCommands' in ret.keys():
  95. logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
  96. self.addToStatusQueue(ret['statusCommands'])
  97. pass
  98. else:
  99. self.hasMappedComponents = False
  100. pass
  101. except ssl.SSLError:
  102. self.repeatRegistration=False
  103. self.isRegistered = False
  104. return
  105. except Exception, err:
  106. # try a reconnect only after a certain amount of random time
  107. delay = randint(0, self.range)
  108. logger.info("Unable to connect to: " + self.registerUrl, exc_info = True)
  109. """ Sleeping for {0} seconds and then retrying again """.format(delay)
  110. time.sleep(delay)
  111. pass
  112. pass
  113. return ret
  114. def addToQueue(self, commands):
  115. """Add to the queue for running the commands """
  116. """ Put the required actions into the Queue """
  117. """ Verify if the action is to reboot or not """
  118. if not commands:
  119. logger.debug("No commands from the server : " + pprint.pformat(commands))
  120. else:
  121. """Only add to the queue if not empty list """
  122. self.actionQueue.put(commands)
  123. pass
  124. def addToStatusQueue(self, commands):
  125. if not commands:
  126. logger.debug("No status commands from the server : " + pprint.pformat(commands))
  127. else:
  128. self.actionQueue.put_status(commands)
  129. pass
  130. # For testing purposes
  131. DEBUG_HEARTBEAT_RETRIES = 0
  132. DEBUG_SUCCESSFULL_HEARTBEATS = 0
  133. DEBUG_STOP_HEARTBEATING = False
  134. def heartbeatWithServer(self):
  135. self.DEBUG_HEARTBEAT_RETRIES = 0
  136. self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
  137. retry = False
  138. certVerifFailed = False
  139. config = AmbariConfig.config
  140. hb_interval = config.get('heartbeat', 'state_interval')
  141. #TODO make sure the response id is monotonically increasing
  142. id = 0
  143. while not self.DEBUG_STOP_HEARTBEATING:
  144. try:
  145. if not retry:
  146. data = json.dumps(
  147. self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
  148. logger.debug("Sending request: " + data)
  149. pass
  150. else:
  151. self.DEBUG_HEARTBEAT_RETRIES += 1
  152. response = self.sendRequest(self.heartbeatUrl, data)
  153. response = json.loads(response)
  154. logger.debug('Got server response: ' + pprint.pformat(response))
  155. serverId=int(response['responseId'])
  156. if 'hasMappedComponents' in response.keys():
  157. self.hasMappedComponents = response['hasMappedComponents'] != False
  158. if 'registrationCommand' in response.keys():
  159. # check if the registration command is None. If none skip
  160. if response['registrationCommand'] is not None:
  161. logger.info("RegistrationCommand received - repeat agent registration")
  162. self.isRegistered = False
  163. self.repeatRegistration = True
  164. return
  165. if serverId!=self.responseId+1:
  166. logger.error("Error in responseId sequence - restarting")
  167. self.restartAgent()
  168. else:
  169. self.responseId=serverId
  170. if 'executionCommands' in response.keys():
  171. self.addToQueue(response['executionCommands'])
  172. pass
  173. if 'statusCommands' in response.keys():
  174. self.addToStatusQueue(response['statusCommands'])
  175. pass
  176. if "true" == response['restartAgent']:
  177. logger.error("Got restartAgent command")
  178. self.restartAgent()
  179. else:
  180. logger.info("No commands sent from the Server.")
  181. pass
  182. if retry:
  183. print("Reconnected to the server")
  184. logger.info("Reconnected to the server")
  185. retry=False
  186. certVerifFailed = False
  187. self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
  188. self.DEBUG_HEARTBEAT_RETRIES = 0
  189. self.heartbeat_wait_event.clear()
  190. except ssl.SSLError:
  191. self.repeatRegistration=False
  192. self.isRegistered = False
  193. return
  194. except Exception, err:
  195. #randomize the heartbeat
  196. delay = randint(0, self.range)
  197. time.sleep(delay)
  198. if "code" in err:
  199. logger.error(err.code)
  200. else:
  201. logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err))
  202. logger.debug("Details: " + str(err), exc_info=True)
  203. if not retry:
  204. print("Connection to the server was lost. Reconnecting...")
  205. if 'certificate verify failed' in str(err) and not certVerifFailed:
  206. print("Server certificate verify failed. Did you regenerate server certificate?")
  207. certVerifFailed = True
  208. self.cachedconnect = None # Previous connection is broken now
  209. retry=True
  210. # Sleep for some time
  211. timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
  212. - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
  213. self.heartbeat_wait_event.wait(timeout = timeout)
  214. # Sleep a bit more to allow STATUS_COMMAND results to be collected
  215. # and sent in one heartbeat. Also avoid server overload with heartbeats
  216. time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
  217. pass
  218. def run(self):
  219. self.actionQueue = ActionQueue(self.config, controller=self)
  220. self.actionQueue.start()
  221. self.register = Register(self.config)
  222. self.heartbeat = Heartbeat(self.actionQueue, self.config)
  223. opener = urllib2.build_opener()
  224. urllib2.install_opener(opener)
  225. while True:
  226. self.repeatRegistration = False
  227. self.registerAndHeartbeat()
  228. if not self.repeatRegistration:
  229. break
  230. pass
  231. def registerAndHeartbeat(self):
  232. registerResponse = self.registerWithServer()
  233. message = registerResponse['response']
  234. logger.info("Response from server = " + message)
  235. if self.isRegistered:
  236. # Process callbacks
  237. for callback in self.registration_listeners:
  238. callback()
  239. time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
  240. self.heartbeatWithServer()
  241. def restartAgent(self):
  242. os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
  243. pass
  244. def sendRequest(self, url, data):
  245. if self.cachedconnect is None: # Lazy initialization
  246. self.cachedconnect = security.CachedHTTPSConnection(self.config)
  247. req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
  248. response = self.cachedconnect.request(req)
  249. return response
  250. def main(argv=None):
  251. # Allow Ctrl-C
  252. signal.signal(signal.SIGINT, signal.SIG_DFL)
  253. logger.setLevel(logging.INFO)
  254. formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
  255. %(message)s")
  256. stream_handler = logging.StreamHandler()
  257. stream_handler.setFormatter(formatter)
  258. logger.addHandler(stream_handler)
  259. logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
  260. config = AmbariConfig.config
  261. collector = Controller(config)
  262. collector.start()
  263. collector.run()
  264. if __name__ == '__main__':
  265. main()