TestController.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. Licensed to the Apache Software Foundation (ASF) under one
  5. or more contributor license agreements. See the NOTICE file
  6. distributed with this work for additional information
  7. regarding copyright ownership. The ASF licenses this file
  8. to you under the Apache License, Version 2.0 (the
  9. "License"); you may not use this file except in compliance
  10. with the License. You may obtain a copy of the License at
  11. http://www.apache.org/licenses/LICENSE-2.0
  12. Unless required by applicable law or agreed to in writing, software
  13. distributed under the License is distributed on an "AS IS" BASIS,
  14. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. See the License for the specific language governing permissions and
  16. limitations under the License.
  17. '''
  18. import StringIO
  19. import ssl
  20. import unittest, threading
  21. import sys
  22. from mock.mock import patch, MagicMock, call, Mock
  23. import logging
  24. import platform
  25. from threading import Event
  26. import json
  27. with patch("platform.linux_distribution", return_value = ('Suse','11','Final')):
  28. from ambari_agent import Controller, ActionQueue
  29. from ambari_agent import hostname
  30. from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
  31. from ambari_commons import OSCheck
  32. @patch.object(platform, "linux_distribution", new = ('Suse','11','Final'))
  33. class TestController(unittest.TestCase):
  34. logger = logging.getLogger()
  35. @patch("threading.Thread")
  36. @patch("threading.Lock")
  37. @patch.object(Controller, "NetUtil")
  38. @patch.object(hostname, "hostname")
  39. def setUp(self, hostname_method, NetUtil_mock, lockMock, threadMock):
  40. Controller.logger = MagicMock()
  41. lockMock.return_value = MagicMock()
  42. NetUtil_mock.return_value = MagicMock()
  43. hostname_method.return_value = "test_hostname"
  44. config = MagicMock()
  45. config.get.return_value = "something"
  46. self.controller = Controller.Controller(config)
  47. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
  48. self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
  49. @patch("json.dumps")
  50. @patch("time.sleep")
  51. @patch("pprint.pformat")
  52. @patch.object(Controller, "randint")
  53. @patch.object(Controller, "LiveStatus")
  54. def test_registerWithServer(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
  55. dumpsMock):
  56. out = StringIO.StringIO()
  57. sys.stdout = out
  58. LiveStatus_mock.SERVICES = ["foo"]
  59. LiveStatus_mock.CLIENT_COMPONENTS = ["foo"]
  60. LiveStatus_mock.COMPONENTS = ["foo"]
  61. register = MagicMock()
  62. self.controller.register = register
  63. self.controller.sendRequest = MagicMock()
  64. dumpsMock.return_value = '{"valid_object": true}'
  65. self.controller.sendRequest.return_value = {"log":"Error text", "exitstatus":"1"}
  66. self.assertEqual({u'exitstatus': u'1', u'log': u'Error text'}, self.controller.registerWithServer())
  67. self.assertEqual(LiveStatus_mock.SERVICES, [])
  68. self.assertEqual(LiveStatus_mock.CLIENT_COMPONENTS, [])
  69. self.assertEqual(LiveStatus_mock.COMPONENTS, [])
  70. self.controller.sendRequest.return_value = {"responseId":1}
  71. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  72. self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}
  73. self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue")
  74. self.controller.isRegistered = False
  75. self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer())
  76. self.controller.addToStatusQueue.assert_called_with("commands")
  77. calls = []
  78. def side_effect(*args):
  79. if len(calls) == 0:
  80. calls.append(1)
  81. raise Exception("test")
  82. return "request"
  83. self.controller.sendRequest.return_value = {"responseId":1}
  84. dumpsMock.side_effect = side_effect
  85. self.controller.isRegistered = False
  86. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  87. self.assertTrue(randintMock.called)
  88. self.assertTrue(sleepMock.called)
  89. sys.stdout = sys.__stdout__
  90. self.controller.sendRequest = Controller.Controller.sendRequest
  91. self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
  92. @patch("pprint.pformat")
  93. def test_addToQueue(self, pformatMock):
  94. actionQueue = MagicMock()
  95. self.controller.actionQueue = actionQueue
  96. self.controller.addToQueue(None)
  97. self.assertFalse(actionQueue.put.called)
  98. self.controller.addToQueue("cmd")
  99. self.assertTrue(actionQueue.put.called)
  100. @patch("pprint.pformat")
  101. @patch.object(Controller, "LiveStatus")
  102. def test_addToStatusQueue(self, LiveStatus_mock, pformatMock):
  103. LiveStatus_mock.SERVICES = ["foo"]
  104. LiveStatus_mock.CLIENT_COMPONENTS = ["foo"]
  105. LiveStatus_mock.COMPONENTS = ["foo"]
  106. commands = json.loads('[{"clusterName":"dummy_cluster"}]')
  107. actionQueue = MagicMock()
  108. self.controller.actionQueue = actionQueue
  109. updateComponents = Mock()
  110. self.controller.updateComponents = updateComponents
  111. self.controller.addToStatusQueue(None)
  112. self.assertFalse(actionQueue.put_status.called)
  113. self.assertFalse(updateComponents.called)
  114. self.controller.addToStatusQueue(commands)
  115. self.assertTrue(actionQueue.put_status.called)
  116. self.assertFalse(updateComponents.called)
  117. LiveStatus_mock.SERVICES = []
  118. LiveStatus_mock.CLIENT_COMPONENTS = []
  119. LiveStatus_mock.COMPONENTS = []
  120. self.controller.addToStatusQueue(commands)
  121. self.assertTrue(updateComponents.called)
  122. self.assertTrue(actionQueue.put_status.called)
  123. @patch("urllib2.build_opener")
  124. @patch("urllib2.install_opener")
  125. @patch.object(Controller, "ActionQueue")
  126. @patch.object(OSCheck, "get_os_type")
  127. @patch.object(OSCheck, "get_os_version")
  128. def test_run(self, get_os_version_mock, get_os_type_mock, ActionQueue_mock, installMock, buildMock):
  129. aq = MagicMock()
  130. ActionQueue_mock.return_value = aq
  131. get_os_type_mock.return_value = "suse"
  132. get_os_version_mock.return_value = "11"
  133. buildMock.return_value = "opener"
  134. registerAndHeartbeat = MagicMock("registerAndHeartbeat")
  135. calls = []
  136. def side_effect():
  137. if len(calls) == 0:
  138. self.controller.repeatRegistration = True
  139. calls.append(1)
  140. registerAndHeartbeat.side_effect = side_effect
  141. self.controller.registerAndHeartbeat = registerAndHeartbeat
  142. # repeat registration
  143. self.controller.run()
  144. self.assertTrue(buildMock.called)
  145. installMock.called_once_with("opener")
  146. self.assertEqual(2, registerAndHeartbeat.call_count)
  147. # one call, +1
  148. registerAndHeartbeat.side_effect = None
  149. self.controller.run()
  150. self.assertEqual(3, registerAndHeartbeat.call_count)
  151. # Action queue should be started during calls
  152. self.assertTrue(ActionQueue_mock.called)
  153. self.assertTrue(aq.start.called)
  154. @patch("urllib2.build_opener")
  155. @patch("urllib2.install_opener")
  156. @patch.object(ActionQueue.ActionQueue, "run")
  157. @patch.object(OSCheck, "get_os_type")
  158. @patch.object(OSCheck, "get_os_version")
  159. def test_repeatRegistration(self, get_os_version_mock, get_os_type_mock,
  160. run_mock, installMock, buildMock):
  161. registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
  162. get_os_type_mock.return_value = "suse"
  163. get_os_version_mock.return_value = "11"
  164. self.controller.registerAndHeartbeat = registerAndHeartbeat
  165. self.controller.run()
  166. self.assertTrue(installMock.called)
  167. self.assertTrue(buildMock.called)
  168. self.controller.registerAndHeartbeat.assert_called_once_with()
  169. calls = []
  170. def switchBool():
  171. if len(calls) == 0:
  172. self.controller.repeatRegistration = True
  173. calls.append(1)
  174. self.controller.repeatRegistration = False
  175. registerAndHeartbeat.side_effect = switchBool
  176. self.controller.run()
  177. self.assertEqual(2, registerAndHeartbeat.call_count)
  178. self.controller.registerAndHeartbeat = \
  179. Controller.Controller.registerAndHeartbeat
  180. @patch("time.sleep")
  181. def test_registerAndHeartbeatWithException(self, sleepMock):
  182. registerWithServer = MagicMock(name="registerWithServer")
  183. registerWithServer.return_value = {"response":"resp"}
  184. self.controller.registerWithServer = registerWithServer
  185. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  186. self.controller.heartbeatWithServer = heartbeatWithServer
  187. Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
  188. self.controller.isRegistered = True
  189. self.controller.registerAndHeartbeat()
  190. registerWithServer.assert_called_once_with()
  191. heartbeatWithServer.assert_called_once_with()
  192. self.controller.registerWithServer =\
  193. Controller.Controller.registerWithServer
  194. self.controller.heartbeatWithServer =\
  195. Controller.Controller.registerWithServer
  196. @patch("time.sleep")
  197. def test_registerAndHeartbeat(self, sleepMock):
  198. registerWithServer = MagicMock(name="registerWithServer")
  199. registerWithServer.return_value = {"response":"resp"}
  200. self.controller.registerWithServer = registerWithServer
  201. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  202. self.controller.heartbeatWithServer = heartbeatWithServer
  203. listener1 = MagicMock()
  204. listener2 = MagicMock()
  205. self.controller.registration_listeners.append(listener1)
  206. self.controller.registration_listeners.append(listener2)
  207. self.controller.isRegistered = True
  208. self.controller.registerAndHeartbeat()
  209. registerWithServer.assert_called_once_with()
  210. heartbeatWithServer.assert_called_once_with()
  211. self.assertTrue(listener1.called)
  212. self.assertTrue(listener2.called)
  213. self.controller.registerWithServer = \
  214. Controller.Controller.registerWithServer
  215. self.controller.heartbeatWithServer = \
  216. Controller.Controller.registerWithServer
  217. @patch("time.sleep")
  218. def test_registerAndHeartbeat_check_registration_listener(self, sleepMock):
  219. registerWithServer = MagicMock(name="registerWithServer")
  220. registerWithServer.return_value = {"response":"resp"}
  221. self.controller.registerWithServer = registerWithServer
  222. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  223. self.controller.heartbeatWithServer = heartbeatWithServer
  224. self.controller.isRegistered = True
  225. self.controller.registerAndHeartbeat()
  226. registerWithServer.assert_called_once_with()
  227. heartbeatWithServer.assert_called_once_with()
  228. self.controller.registerWithServer = \
  229. Controller.Controller.registerWithServer
  230. self.controller.heartbeatWithServer = \
  231. Controller.Controller.registerWithServer
  232. @patch("os._exit")
  233. def test_restartAgent(self, os_exit_mock):
  234. self.controller.restartAgent()
  235. self.assertTrue(os_exit_mock.called)
  236. self.assertTrue(os_exit_mock.call_args[0][0] == AGENT_AUTO_RESTART_EXIT_CODE)
  237. @patch("urllib2.Request")
  238. @patch.object(Controller, "security")
  239. def test_sendRequest(self, security_mock, requestMock):
  240. conMock = MagicMock()
  241. security_mock.CachedHTTPSConnection.return_value = conMock
  242. url = "url"
  243. data = "data"
  244. requestMock.return_value = "request"
  245. self.controller.cachedconnect = None
  246. conMock.request.return_value = '{"valid_object": true}'
  247. actual = self.controller.sendRequest(url, data)
  248. expected = json.loads('{"valid_object": true}')
  249. self.assertEqual(actual, expected)
  250. security_mock.CachedHTTPSConnection.assert_called_once_with(
  251. self.controller.config)
  252. requestMock.called_once_with(url, data,
  253. {'Content-Type': 'application/json'})
  254. conMock.request.return_value = '{invalid_object}'
  255. actual = self.controller.sendRequest(url, data)
  256. expected = {'exitstatus': 1, 'log': ('Response parsing failed! Request data: ' + data
  257. + '; Response: {invalid_object}')}
  258. self.assertEqual(actual, expected)
  259. conMock.request.side_effect = Exception()
  260. actual = self.controller.sendRequest(url, data)
  261. expected = {'exitstatus': 1, 'log': 'Request failed! Data: ' + data}
  262. self.assertEqual(actual, expected)
  263. @patch.object(threading._Event, "wait")
  264. @patch("time.sleep")
  265. @patch("json.dumps")
  266. def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock):
  267. out = StringIO.StringIO()
  268. sys.stdout = out
  269. hearbeat = MagicMock()
  270. self.controller.heartbeat = hearbeat
  271. dumpsMock.return_value = "data"
  272. sendRequest = MagicMock(name="sendRequest")
  273. self.controller.sendRequest = sendRequest
  274. self.controller.responseId = 1
  275. response = {"responseId":"2", "restartAgent":"false"}
  276. sendRequest.return_value = response
  277. def one_heartbeat(*args, **kwargs):
  278. self.controller.DEBUG_STOP_HEARTBEATING = True
  279. return response
  280. sendRequest.side_effect = one_heartbeat
  281. actionQueue = MagicMock()
  282. actionQueue.isIdle.return_value = True
  283. # one successful request, after stop
  284. self.controller.actionQueue = actionQueue
  285. self.controller.heartbeatWithServer()
  286. self.assertTrue(sendRequest.called)
  287. calls = []
  288. def retry(*args, **kwargs):
  289. if len(calls) == 0:
  290. calls.append(1)
  291. response["responseId"] = "3"
  292. raise Exception()
  293. if len(calls) > 0:
  294. self.controller.DEBUG_STOP_HEARTBEATING = True
  295. return response
  296. # exception, retry, successful and stop
  297. sendRequest.side_effect = retry
  298. self.controller.DEBUG_STOP_HEARTBEATING = False
  299. self.controller.heartbeatWithServer()
  300. self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
  301. # retry registration
  302. self.controller.responseId = 2
  303. response["registrationCommand"] = "true"
  304. sendRequest.side_effect = one_heartbeat
  305. self.controller.DEBUG_STOP_HEARTBEATING = False
  306. self.controller.heartbeatWithServer()
  307. self.assertTrue(self.controller.repeatRegistration)
  308. # components are not mapped
  309. self.controller.responseId = 2
  310. response["registrationCommand"] = "false"
  311. response["hasMappedComponents"] = False
  312. sendRequest.side_effect = one_heartbeat
  313. self.controller.DEBUG_STOP_HEARTBEATING = False
  314. self.controller.heartbeatWithServer()
  315. self.assertFalse(self.controller.hasMappedComponents)
  316. # components are mapped
  317. self.controller.responseId = 2
  318. response["hasMappedComponents"] = True
  319. sendRequest.side_effect = one_heartbeat
  320. self.controller.DEBUG_STOP_HEARTBEATING = False
  321. self.controller.heartbeatWithServer()
  322. self.assertTrue(self.controller.hasMappedComponents)
  323. # components are mapped
  324. self.controller.responseId = 2
  325. del response["hasMappedComponents"]
  326. sendRequest.side_effect = one_heartbeat
  327. self.controller.DEBUG_STOP_HEARTBEATING = False
  328. self.controller.heartbeatWithServer()
  329. self.assertTrue(self.controller.hasMappedComponents)
  330. # wrong responseId => restart
  331. self.controller.responseId = 2
  332. response = {"responseId":"2", "restartAgent":"false"}
  333. restartAgent = MagicMock(name="restartAgent")
  334. self.controller.restartAgent = restartAgent
  335. self.controller.DEBUG_STOP_HEARTBEATING = False
  336. self.controller.heartbeatWithServer()
  337. restartAgent.assert_called_once_with()
  338. # executionCommands
  339. self.controller.responseId = 1
  340. addToQueue = MagicMock(name="addToQueue")
  341. self.controller.addToQueue = addToQueue
  342. response["executionCommands"] = "executionCommands"
  343. self.controller.DEBUG_STOP_HEARTBEATING = False
  344. self.controller.heartbeatWithServer()
  345. addToQueue.assert_has_calls([call("executionCommands")])
  346. # statusCommands
  347. self.controller.responseId = 1
  348. addToStatusQueue = MagicMock(name="addToStatusQueue")
  349. self.controller.addToStatusQueue = addToStatusQueue
  350. response["statusCommands"] = "statusCommands"
  351. self.controller.DEBUG_STOP_HEARTBEATING = False
  352. self.controller.heartbeatWithServer()
  353. addToStatusQueue.assert_has_calls([call("statusCommands")])
  354. # restartAgent command
  355. self.controller.responseId = 1
  356. self.controller.DEBUG_STOP_HEARTBEATING = False
  357. response["restartAgent"] = "true"
  358. restartAgent = MagicMock(name="restartAgent")
  359. self.controller.restartAgent = restartAgent
  360. self.controller.heartbeatWithServer()
  361. restartAgent.assert_called_once_with()
  362. # actionQueue not idle
  363. self.controller.responseId = 1
  364. self.controller.DEBUG_STOP_HEARTBEATING = False
  365. actionQueue.isIdle.return_value = False
  366. response["restartAgent"] = "false"
  367. self.controller.heartbeatWithServer()
  368. sleepMock.assert_called_with(
  369. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
  370. sys.stdout = sys.__stdout__
  371. self.controller.sendRequest = Controller.Controller.sendRequest
  372. self.controller.sendRequest = Controller.Controller.addToQueue
  373. self.controller.sendRequest = Controller.Controller.addToStatusQueue
  374. @patch("pprint.pformat")
  375. @patch("time.sleep")
  376. @patch("json.loads")
  377. @patch("json.dumps")
  378. def test_certSigningFailed(self, dumpsMock, loadsMock, sleepMock, pformatMock):
  379. register = MagicMock()
  380. self.controller.register = register
  381. dumpsMock.return_value = "request"
  382. response = {"responseId":1,}
  383. loadsMock.return_value = response
  384. self.controller.sendRequest = Mock(side_effect=ssl.SSLError())
  385. self.controller.repeatRegistration=True
  386. self.controller.registerWithServer()
  387. #Conroller thread and the agent stop if the repeatRegistration flag is False
  388. self.assertFalse(self.controller.repeatRegistration)
  389. @patch.object(Controller, "LiveStatus")
  390. def test_updateComponents(self, LiveStatus_mock):
  391. LiveStatus_mock.SERVICES = []
  392. LiveStatus_mock.CLIENT_COMPONENTS = []
  393. LiveStatus_mock.COMPONENTS = []
  394. self.controller.componentsUrl = "foo_url/"
  395. sendRequest = Mock()
  396. self.controller.sendRequest = sendRequest
  397. self.controller.sendRequest.return_value = {"clusterName":"dummy_cluster_name",
  398. "stackName":"dummy_stack_name",
  399. "stackVersion":"dummy_stack_version",
  400. "components":{"PIG":{"PIG":"CLIENT"},
  401. "MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT",
  402. "JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}}
  403. self.controller.updateComponents("dummy_cluster_name")
  404. sendRequest.assert_called_with('foo_url/dummy_cluster_name', None)
  405. services_expected = [u'MAPREDUCE', u'PIG']
  406. client_components_expected = [
  407. {'serviceName':u'MAPREDUCE','componentName':u'MAPREDUCE_CLIENT'},
  408. {'serviceName':u'PIG','componentName':u'PIG'}
  409. ]
  410. components_expected = [
  411. {'serviceName':u'MAPREDUCE','componentName':u'TASKTRACKER'},
  412. {'serviceName':u'MAPREDUCE','componentName':u'JOBTRACKER'}
  413. ]
  414. self.assertEquals(LiveStatus_mock.SERVICES, services_expected)
  415. self.assertEquals(LiveStatus_mock.CLIENT_COMPONENTS, client_components_expected)
  416. self.assertEquals(LiveStatus_mock.COMPONENTS, components_expected)
  417. if __name__ == "__main__":
  418. unittest.main(verbosity=2)