TestController.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. #!/usr/bin/env python2.6
  2. # -*- coding: utf-8 -*-
  3. '''
  4. Licensed to the Apache Software Foundation (ASF) under one
  5. or more contributor license agreements. See the NOTICE file
  6. distributed with this work for additional information
  7. regarding copyright ownership. The ASF licenses this file
  8. to you under the Apache License, Version 2.0 (the
  9. "License"); you may not use this file except in compliance
  10. with the License. You may obtain a copy of the License at
  11. http://www.apache.org/licenses/LICENSE-2.0
  12. Unless required by applicable law or agreed to in writing, software
  13. distributed under the License is distributed on an "AS IS" BASIS,
  14. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. See the License for the specific language governing permissions and
  16. limitations under the License.
  17. '''
  18. import StringIO
  19. import ssl
  20. import unittest, threading
  21. from ambari_agent import Controller, ActionQueue
  22. from ambari_agent import hostname
  23. import sys
  24. from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
  25. from mock.mock import patch, MagicMock, call, Mock
  26. import logging
  27. from threading import Event
  28. class TestController(unittest.TestCase):
  29. logger = logging.getLogger()
  30. @patch("threading.Thread")
  31. @patch("threading.Lock")
  32. @patch.object(Controller, "NetUtil")
  33. @patch.object(hostname, "hostname")
  34. def setUp(self, hostname_method, NetUtil_mock, lockMock, threadMock):
  35. Controller.logger = MagicMock()
  36. lockMock.return_value = MagicMock()
  37. NetUtil_mock.return_value = MagicMock()
  38. hostname_method.return_value = "test_hostname"
  39. config = MagicMock()
  40. config.get.return_value = "something"
  41. self.controller = Controller.Controller(config)
  42. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
  43. self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
  44. @patch("json.dumps")
  45. @patch("time.sleep")
  46. @patch("pprint.pformat")
  47. @patch.object(Controller, "randint")
  48. def test_registerWithServer(self, randintMock, pformatMock, sleepMock,
  49. dumpsMock):
  50. out = StringIO.StringIO()
  51. sys.stdout = out
  52. register = MagicMock()
  53. self.controller.register = register
  54. self.controller.sendRequest = MagicMock()
  55. dumpsMock.return_value = "request"
  56. self.controller.sendRequest.return_value = '{"log":"Error text", "exitstatus":"1"}'
  57. self.assertEqual({u'exitstatus': u'1', u'log': u'Error text'}, self.controller.registerWithServer())
  58. self.controller.sendRequest.return_value = '{"responseId":1}'
  59. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  60. self.controller.sendRequest.return_value = '{"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}'
  61. self.controller.addToQueue = MagicMock(name="addToQueue")
  62. self.controller.isRegistered = False
  63. self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer())
  64. self.controller.addToQueue.assert_called_with("commands")
  65. calls = []
  66. def side_effect(*args):
  67. if len(calls) == 0:
  68. calls.append(1)
  69. raise Exception("test")
  70. return "request"
  71. self.controller.sendRequest.return_value = '{"responseId":1}'
  72. dumpsMock.side_effect = side_effect
  73. self.controller.isRegistered = False
  74. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  75. self.assertTrue(randintMock.called)
  76. self.assertTrue(sleepMock.called)
  77. sys.stdout = sys.__stdout__
  78. self.controller.sendRequest = Controller.Controller.sendRequest
  79. self.controller.addToQueue = Controller.Controller.addToQueue
  80. @patch("pprint.pformat")
  81. def test_addToQueue(self, pformatMock):
  82. actionQueue = MagicMock()
  83. self.controller.actionQueue = actionQueue
  84. self.controller.addToQueue(None)
  85. self.assertFalse(actionQueue.put.called)
  86. self.controller.addToQueue("cmd")
  87. self.assertTrue(actionQueue.put.called)
  88. @patch("urllib2.build_opener")
  89. @patch("urllib2.install_opener")
  90. @patch.object(Controller, "ActionQueue")
  91. def test_run(self, ActionQueue_mock, installMock, buildMock):
  92. aq = MagicMock()
  93. ActionQueue_mock.return_value = aq
  94. buildMock.return_value = "opener"
  95. registerAndHeartbeat = MagicMock("registerAndHeartbeat")
  96. calls = []
  97. def side_effect():
  98. if len(calls) == 0:
  99. self.controller.repeatRegistration = True
  100. calls.append(1)
  101. registerAndHeartbeat.side_effect = side_effect
  102. self.controller.registerAndHeartbeat = registerAndHeartbeat
  103. # repeat registration
  104. self.controller.run()
  105. self.assertTrue(buildMock.called)
  106. installMock.called_once_with("opener")
  107. self.assertEqual(2, registerAndHeartbeat.call_count)
  108. # one call, +1
  109. registerAndHeartbeat.side_effect = None
  110. self.controller.run()
  111. self.assertEqual(3, registerAndHeartbeat.call_count)
  112. # Action queue should be started during calls
  113. self.assertTrue(ActionQueue_mock.called)
  114. self.assertTrue(aq.start.called)
  115. @patch("urllib2.build_opener")
  116. @patch("urllib2.install_opener")
  117. @patch.object(ActionQueue.ActionQueue, "run")
  118. def test_repeatRegistration(self,
  119. run_mock, installMock, buildMock):
  120. registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
  121. self.controller.registerAndHeartbeat = registerAndHeartbeat
  122. self.controller.run()
  123. self.assertTrue(installMock.called)
  124. self.assertTrue(buildMock.called)
  125. self.controller.registerAndHeartbeat.assert_called_once_with()
  126. calls = []
  127. def switchBool():
  128. if len(calls) == 0:
  129. self.controller.repeatRegistration = True
  130. calls.append(1)
  131. self.controller.repeatRegistration = False
  132. registerAndHeartbeat.side_effect = switchBool
  133. self.controller.run()
  134. self.assertEqual(2, registerAndHeartbeat.call_count)
  135. self.controller.registerAndHeartbeat = \
  136. Controller.Controller.registerAndHeartbeat
  137. @patch("time.sleep")
  138. def test_registerAndHeartbeatWithException(self, sleepMock):
  139. registerWithServer = MagicMock(name="registerWithServer")
  140. registerWithServer.return_value = {"response":"resp"}
  141. self.controller.registerWithServer = registerWithServer
  142. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  143. self.controller.heartbeatWithServer = heartbeatWithServer
  144. Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
  145. self.controller.isRegistered = True
  146. self.controller.registerAndHeartbeat()
  147. registerWithServer.assert_called_once_with()
  148. heartbeatWithServer.assert_called_once_with()
  149. self.controller.registerWithServer =\
  150. Controller.Controller.registerWithServer
  151. self.controller.heartbeatWithServer =\
  152. Controller.Controller.registerWithServer
  153. @patch("time.sleep")
  154. def test_registerAndHeartbeat(self, sleepMock):
  155. registerWithServer = MagicMock(name="registerWithServer")
  156. registerWithServer.return_value = {"response":"resp"}
  157. self.controller.registerWithServer = registerWithServer
  158. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  159. self.controller.heartbeatWithServer = heartbeatWithServer
  160. self.controller.isRegistered = True;
  161. self.controller.registerAndHeartbeat()
  162. registerWithServer.assert_called_once_with()
  163. heartbeatWithServer.assert_called_once_with()
  164. self.controller.registerWithServer = \
  165. Controller.Controller.registerWithServer
  166. self.controller.heartbeatWithServer = \
  167. Controller.Controller.registerWithServer
  168. @patch("os._exit")
  169. def test_restartAgent(self, os_exit_mock):
  170. self.controller.restartAgent()
  171. self.assertTrue(os_exit_mock.called)
  172. self.assertTrue(os_exit_mock.call_args[0][0] == AGENT_AUTO_RESTART_EXIT_CODE)
  173. @patch("urllib2.Request")
  174. @patch.object(Controller, "security")
  175. def test_sendRequest(self, security_mock, requestMock):
  176. conMock = MagicMock()
  177. conMock.request.return_value = "response"
  178. security_mock.CachedHTTPSConnection.return_value = conMock
  179. url = "url"
  180. data = "data"
  181. requestMock.return_value = "request"
  182. self.controller.cachedconnect = None
  183. self.assertEqual("response", self.controller.sendRequest(url, data))
  184. security_mock.CachedHTTPSConnection.assert_called_once_with(
  185. self.controller.config)
  186. requestMock.called_once_with(url, data,
  187. {'Content-Type': 'application/json'})
  188. @patch.object(threading._Event, "wait")
  189. @patch("time.sleep")
  190. @patch("json.loads")
  191. @patch("json.dumps")
  192. def test_heartbeatWithServer(self, dumpsMock, loadsMock, sleepMock, event_mock):
  193. out = StringIO.StringIO()
  194. sys.stdout = out
  195. hearbeat = MagicMock()
  196. self.controller.heartbeat = hearbeat
  197. dumpsMock.return_value = "data"
  198. sendRequest = MagicMock(name="sendRequest")
  199. self.controller.sendRequest = sendRequest
  200. self.controller.responseId = 1
  201. response = {"responseId":"2", "restartAgent":"false"}
  202. loadsMock.return_value = response
  203. def one_heartbeat(*args, **kwargs):
  204. self.controller.DEBUG_STOP_HEARTBEATING = True
  205. return "data"
  206. sendRequest.side_effect = one_heartbeat
  207. actionQueue = MagicMock()
  208. actionQueue.isIdle.return_value = True
  209. # one successful request, after stop
  210. self.controller.actionQueue = actionQueue
  211. self.controller.heartbeatWithServer()
  212. self.assertTrue(sendRequest.called)
  213. calls = []
  214. def retry(*args, **kwargs):
  215. if len(calls) == 0:
  216. calls.append(1)
  217. response["responseId"] = "3"
  218. raise Exception()
  219. if len(calls) > 0:
  220. self.controller.DEBUG_STOP_HEARTBEATING = True
  221. return "data"
  222. # exception, retry, successful and stop
  223. sendRequest.side_effect = retry
  224. self.controller.DEBUG_STOP_HEARTBEATING = False
  225. self.controller.heartbeatWithServer()
  226. self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
  227. # retry registration
  228. response["registrationCommand"] = "true"
  229. sendRequest.side_effect = one_heartbeat
  230. self.controller.DEBUG_STOP_HEARTBEATING = False
  231. self.controller.heartbeatWithServer()
  232. self.assertTrue(self.controller.repeatRegistration)
  233. # components are not mapped
  234. response["registrationCommand"] = "false"
  235. response["hasMappedComponents"] = False
  236. sendRequest.side_effect = one_heartbeat
  237. self.controller.DEBUG_STOP_HEARTBEATING = False
  238. self.controller.heartbeatWithServer()
  239. self.assertFalse(self.controller.hasMappedComponents)
  240. # components are mapped
  241. response["hasMappedComponents"] = True
  242. sendRequest.side_effect = one_heartbeat
  243. self.controller.DEBUG_STOP_HEARTBEATING = False
  244. self.controller.heartbeatWithServer()
  245. self.assertTrue(self.controller.hasMappedComponents)
  246. # components are mapped
  247. del response["hasMappedComponents"]
  248. sendRequest.side_effect = one_heartbeat
  249. self.controller.DEBUG_STOP_HEARTBEATING = False
  250. self.controller.heartbeatWithServer()
  251. self.assertTrue(self.controller.hasMappedComponents)
  252. # wrong responseId => restart
  253. response = {"responseId":"2", "restartAgent":"false"}
  254. loadsMock.return_value = response
  255. restartAgent = MagicMock(name="restartAgent")
  256. self.controller.restartAgent = restartAgent
  257. self.controller.DEBUG_STOP_HEARTBEATING = False
  258. self.controller.heartbeatWithServer()
  259. restartAgent.assert_called_once_with()
  260. # executionCommands, statusCommands
  261. self.controller.responseId = 1
  262. addToQueue = MagicMock(name="addToQueue")
  263. self.controller.addToQueue = addToQueue
  264. response["executionCommands"] = "executionCommands"
  265. response["statusCommands"] = "statusCommands"
  266. self.controller.DEBUG_STOP_HEARTBEATING = False
  267. self.controller.heartbeatWithServer()
  268. addToQueue.assert_has_calls([call("executionCommands"),
  269. call("statusCommands")])
  270. # restartAgent command
  271. self.controller.responseId = 1
  272. self.controller.DEBUG_STOP_HEARTBEATING = False
  273. response["restartAgent"] = "true"
  274. restartAgent = MagicMock(name="restartAgent")
  275. self.controller.restartAgent = restartAgent
  276. self.controller.heartbeatWithServer()
  277. restartAgent.assert_called_once_with()
  278. # actionQueue not idle
  279. self.controller.responseId = 1
  280. self.controller.DEBUG_STOP_HEARTBEATING = False
  281. actionQueue.isIdle.return_value = False
  282. response["restartAgent"] = "false"
  283. self.controller.heartbeatWithServer()
  284. sleepMock.assert_called_with(
  285. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
  286. sys.stdout = sys.__stdout__
  287. self.controller.sendRequest = Controller.Controller.sendRequest
  288. self.controller.sendRequest = Controller.Controller.addToQueue
  289. @patch("pprint.pformat")
  290. @patch("time.sleep")
  291. @patch("json.loads")
  292. @patch("json.dumps")
  293. def test_certSigningFailed(self, dumpsMock, loadsMock, sleepMock, pformatMock):
  294. register = MagicMock()
  295. self.controller.register = register
  296. dumpsMock.return_value = "request"
  297. response = {"responseId":1,}
  298. loadsMock.return_value = response
  299. self.controller.sendRequest = Mock(side_effect=ssl.SSLError())
  300. self.controller.repeatRegistration=True
  301. self.controller.registerWithServer()
  302. #Conroller thread and the agent stop if the repeatRegistration flag is False
  303. self.assertFalse(self.controller.repeatRegistration)
  304. if __name__ == "__main__":
  305. unittest.main(verbosity=2)