Controller.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. #!/usr/bin/env python2.6
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import logging.handlers
  19. import signal
  20. import json
  21. import socket
  22. import sys, traceback
  23. import time
  24. import threading
  25. import urllib2
  26. from urllib2 import Request, urlopen, URLError
  27. import httplib
  28. import ssl
  29. import AmbariConfig
  30. import pprint
  31. import ProcessHelper
  32. from Heartbeat import Heartbeat
  33. from Register import Register
  34. from ActionQueue import ActionQueue
  35. from optparse import OptionParser
  36. from wsgiref.simple_server import ServerHandler
  37. import security
  38. from NetUtil import NetUtil
  39. from random import randrange, randint
  40. logger = logging.getLogger()
  41. class Controller(threading.Thread):
  42. def __init__(self, config, range=120):
  43. threading.Thread.__init__(self)
  44. logger.debug('Initializing Controller RPC thread.')
  45. self.lock = threading.Lock()
  46. self.safeMode = True
  47. self.credential = None
  48. self.config = config
  49. self.hostname = socket.gethostname()
  50. server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port')
  51. self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
  52. self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
  53. self.netutil = NetUtil()
  54. self.responseId = -1
  55. self.repeatRegistration = False
  56. self.cachedconnect = None
  57. self.range = range
  58. def start(self):
  59. self.actionQueue = ActionQueue(self.config)
  60. self.actionQueue.start()
  61. self.register = Register()
  62. self.heartbeat = Heartbeat(self.actionQueue)
  63. pass
  64. def __del__(self):
  65. logger.info("Server connection disconnected.")
  66. pass
  67. def registerWithServer(self):
  68. retry=False
  69. firstTime=True
  70. registered=False
  71. id = -1
  72. ret = {}
  73. while not registered:
  74. try:
  75. data = json.dumps(self.register.build(id))
  76. logger.info("Registering with the server " + pprint.pformat(data))
  77. response = self.sendRequest(self.registerUrl, data)
  78. ret = json.loads(response)
  79. logger.info("Registered with the server with " + pprint.pformat(ret))
  80. print("Registered with the server")
  81. self.responseId= int(ret['responseId'])
  82. registered = True
  83. if 'statusCommands' in ret.keys():
  84. logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
  85. self.addToQueue(ret['statusCommands'])
  86. pass
  87. pass
  88. except Exception, err:
  89. # try a reconnect only after a certain amount of random time
  90. delay = randint(0, self.range)
  91. logger.info("Unable to connect to: " + self.registerUrl, exc_info = True)
  92. """ Sleeping for {0} seconds and then retrying again """.format(delay)
  93. time.sleep(delay)
  94. pass
  95. pass
  96. return ret
  97. def addToQueue(self, commands):
  98. """Add to the queue for running the commands """
  99. """ Put the required actions into the Queue """
  100. """ Verify if the action is to reboot or not """
  101. if not commands:
  102. logger.info("No commands from the server : " + pprint.pformat(commands))
  103. else:
  104. """Only add to the queue if not empty list """
  105. for command in commands:
  106. logger.info("Adding command to the action queue: \n" +
  107. pprint.pformat(command))
  108. self.actionQueue.put(command)
  109. pass
  110. pass
  111. pass
  112. # For testing purposes
  113. DEBUG_HEARTBEAT_RETRIES = 0
  114. DEBUG_SUCCESSFULL_HEARTBEATS = 0
  115. DEBUG_STOP_HEARTBITTING = False
  116. def heartbeatWithServer(self):
  117. self.DEBUG_HEARTBEAT_RETRIES = 0
  118. self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
  119. retry = False
  120. certVerifFailed = False
  121. config = AmbariConfig.config
  122. hb_interval = config.get('heartbeat', 'state_interval')
  123. #TODO make sure the response id is monotonically increasing
  124. id = 0
  125. while not self.DEBUG_STOP_HEARTBITTING:
  126. try:
  127. if not retry:
  128. data = json.dumps(self.heartbeat.build(self.responseId, int(hb_interval)))
  129. pass
  130. else:
  131. self.DEBUG_HEARTBEAT_RETRIES += 1
  132. response = self.sendRequest(self.heartbeatUrl, data)
  133. response = json.loads(response)
  134. logger.info('Got server response: ' + pprint.pformat(response))
  135. serverId=int(response['responseId'])
  136. if 'registrationCommand' in response.keys():
  137. # check if the registration command is None. If none skip
  138. if response['registrationCommand'] is not None:
  139. logger.info("RegistrationCommand received - repeat agent registration")
  140. self.repeatRegistration = True
  141. return
  142. if serverId!=self.responseId+1:
  143. logger.error("Error in responseId sequence - restarting")
  144. self.restartAgent()
  145. else:
  146. self.responseId=serverId
  147. if 'executionCommands' in response.keys():
  148. self.addToQueue(response['executionCommands'])
  149. pass
  150. if 'statusCommands' in response.keys():
  151. self.addToQueue(response['statusCommands'])
  152. pass
  153. if "true" == response['restartAgent']:
  154. logger.error("Got restartAgent command")
  155. self.restartAgent()
  156. else:
  157. logger.info("No commands sent from the Server.")
  158. pass
  159. if retry:
  160. print("Reconnected to the server")
  161. logger.info("Reconnected to the server")
  162. retry=False
  163. certVerifFailed = False
  164. self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
  165. self.DEBUG_HEARTBEAT_RETRIES = 0
  166. except Exception, err:
  167. #randomize the heartbeat
  168. delay = randint(0, self.range)
  169. time.sleep(delay)
  170. if "code" in err:
  171. logger.error(err.code)
  172. else:
  173. logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err))
  174. logger.debug("Details: " + str(err), exc_info=True)
  175. if not retry:
  176. print("Connection to the server was lost. Reconnecting...")
  177. if 'certificate verify failed' in str(err) and not certVerifFailed:
  178. print("Server certificate verify failed. Did you regenerate server certificate?")
  179. certVerifFailed = True
  180. self.cachedconnect = None # Previous connection is broken now
  181. retry=True
  182. if self.actionQueue.isIdle():
  183. time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
  184. else:
  185. time.sleep(self.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC)
  186. pass
  187. def run(self):
  188. opener = urllib2.build_opener()
  189. urllib2.install_opener(opener)
  190. while True:
  191. self.repeatRegistration = False
  192. self.registerAndHeartbeat()
  193. if not self.repeatRegistration:
  194. break
  195. pass
  196. def registerAndHeartbeat(self):
  197. registerResponse = self.registerWithServer()
  198. message = registerResponse['response']
  199. logger.info("Response from server = " + message)
  200. time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
  201. self.heartbeatWithServer()
  202. def restartAgent(self):
  203. ProcessHelper.restartAgent()
  204. pass
  205. def sendRequest(self, url, data):
  206. if self.cachedconnect is None: # Lazy initialization
  207. self.cachedconnect = security.CachedHTTPSConnection(self.config)
  208. req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
  209. response = self.cachedconnect.request(req)
  210. return response
  211. def main(argv=None):
  212. # Allow Ctrl-C
  213. signal.signal(signal.SIGINT, signal.SIG_DFL)
  214. logger.setLevel(logging.INFO)
  215. formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
  216. %(message)s")
  217. stream_handler = logging.StreamHandler()
  218. stream_handler.setFormatter(formatter)
  219. logger.addHandler(stream_handler)
  220. logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
  221. config = AmbariConfig.config
  222. collector = Controller(config)
  223. collector.start()
  224. collector.run()
  225. if __name__ == '__main__':
  226. main()