TestController.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. Licensed to the Apache Software Foundation (ASF) under one
  5. or more contributor license agreements. See the NOTICE file
  6. distributed with this work for additional information
  7. regarding copyright ownership. The ASF licenses this file
  8. to you under the Apache License, Version 2.0 (the
  9. "License"); you may not use this file except in compliance
  10. with the License. You may obtain a copy of the License at
  11. http://www.apache.org/licenses/LICENSE-2.0
  12. Unless required by applicable law or agreed to in writing, software
  13. distributed under the License is distributed on an "AS IS" BASIS,
  14. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. See the License for the specific language governing permissions and
  16. limitations under the License.
  17. '''
  18. import StringIO
  19. import os
  20. import ssl
  21. import tempfile
  22. import unittest, threading
  23. import sys
  24. from mock.mock import patch, MagicMock, call, Mock
  25. import logging
  26. import platform
  27. from threading import Event
  28. import ambari_simplejson
  29. from ambari_commons import OSCheck
  30. from only_for_platform import not_for_platform, only_for_platform, get_platform, PLATFORM_LINUX, PLATFORM_WINDOWS
  31. from ambari_agent import Controller, ActionQueue, Register
  32. from ambari_agent import hostname
  33. from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
  34. from ambari_commons import OSCheck
  35. from ambari_agent.Hardware import Hardware
  36. from ambari_agent.ExitHelper import ExitHelper
  37. from ambari_agent.AmbariConfig import AmbariConfig
  38. import ambari_commons
  39. OPERATING_SYSTEM_DISTRO = ('Suse','11','Final')
  40. @not_for_platform(PLATFORM_WINDOWS)
  41. @patch.object(OSCheck, "os_distribution", new = OPERATING_SYSTEM_DISTRO)
  42. class TestController(unittest.TestCase):
  43. logger = logging.getLogger()
  44. @patch.object(Controller, "NetUtil", MagicMock())
  45. @patch.object(Controller, "AlertSchedulerHandler", MagicMock())
  46. @patch.object(Controller.Controller, "read_agent_version")
  47. @patch("threading.Thread")
  48. @patch("threading.Lock")
  49. @patch.object(hostname, "hostname")
  50. def setUp(self, hostname_method, lockMock, threadMock, read_agent_versionMock):
  51. Controller.logger = MagicMock()
  52. lockMock.return_value = MagicMock()
  53. hostname_method.return_value = "test_hostname"
  54. read_agent_versionMock.return_value = '2.1.0'
  55. config = MagicMock()
  56. #config.get.return_value = "something"
  57. config.get.return_value = "5"
  58. self.controller = Controller.Controller(config)
  59. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
  60. self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
  61. @patch.object(OSCheck, "get_os_type")
  62. @patch.object(OSCheck, "get_os_version")
  63. def test_read_agent_version(self, get_os_version_mock, get_os_type_mock):
  64. config = AmbariConfig().getConfig()
  65. tmpdir = tempfile.gettempdir()
  66. config.set('agent', 'prefix', tmpdir)
  67. config.set('agent', 'current_ping_port', '33777')
  68. ver_file = os.path.join(tmpdir, "version")
  69. reference_version = "1.3.0"
  70. with open(ver_file, "w") as text_file:
  71. text_file.write(reference_version)
  72. version = self.controller.read_agent_version(config)
  73. os.remove(ver_file)
  74. self.assertEqual(reference_version, version)
  75. @patch("ambari_simplejson.dumps")
  76. @patch("time.sleep")
  77. @patch("pprint.pformat")
  78. @patch.object(Controller, "randint")
  79. @patch.object(Controller, "LiveStatus")
  80. def test_registerWithServer(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
  81. dumpsMock):
  82. out = StringIO.StringIO()
  83. sys.stdout = out
  84. LiveStatus_mock.SERVICES = ["foo"]
  85. LiveStatus_mock.CLIENT_COMPONENTS = ["foo"]
  86. LiveStatus_mock.COMPONENTS = ["foo"]
  87. register = MagicMock()
  88. self.controller.register = register
  89. self.controller.sendRequest = MagicMock()
  90. dumpsMock.return_value = '{"valid_object": true}'
  91. self.controller.sendRequest.return_value = {"log":"Error text", "exitstatus":"1"}
  92. self.assertEqual({u'exitstatus': u'1', u'log': u'Error text'}, self.controller.registerWithServer())
  93. self.assertEqual(LiveStatus_mock.SERVICES, [])
  94. self.assertEqual(LiveStatus_mock.CLIENT_COMPONENTS, [])
  95. self.assertEqual(LiveStatus_mock.COMPONENTS, [])
  96. self.controller.sendRequest.return_value = {"responseId":1}
  97. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  98. self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}
  99. self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue")
  100. self.controller.isRegistered = False
  101. self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer())
  102. self.controller.addToStatusQueue.assert_called_with("commands")
  103. calls = []
  104. def side_effect(*args):
  105. if len(calls) == 0:
  106. calls.append(1)
  107. raise Exception("test")
  108. return "request"
  109. self.controller.sendRequest.return_value = {"responseId":1}
  110. dumpsMock.side_effect = side_effect
  111. self.controller.isRegistered = False
  112. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  113. self.assertTrue(randintMock.called)
  114. self.assertTrue(sleepMock.called)
  115. sys.stdout = sys.__stdout__
  116. self.controller.sendRequest = Controller.Controller.sendRequest
  117. self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
  118. @patch("pprint.pformat")
  119. def test_addToQueue(self, pformatMock):
  120. actionQueue = MagicMock()
  121. self.controller.actionQueue = actionQueue
  122. self.controller.addToQueue(None)
  123. self.assertFalse(actionQueue.put.called)
  124. self.controller.addToQueue("cmd")
  125. self.assertTrue(actionQueue.put.called)
  126. @patch("pprint.pformat")
  127. @patch.object(Controller, "LiveStatus")
  128. def test_addToStatusQueue(self, LiveStatus_mock, pformatMock):
  129. LiveStatus_mock.SERVICES = ["foo"]
  130. LiveStatus_mock.CLIENT_COMPONENTS = ["foo"]
  131. LiveStatus_mock.COMPONENTS = ["foo"]
  132. commands = ambari_simplejson.loads('[{"clusterName":"dummy_cluster"}]')
  133. actionQueue = MagicMock()
  134. self.controller.actionQueue = actionQueue
  135. updateComponents = Mock()
  136. self.controller.updateComponents = updateComponents
  137. self.controller.addToStatusQueue(None)
  138. self.assertFalse(actionQueue.put_status.called)
  139. self.assertFalse(updateComponents.called)
  140. self.controller.addToStatusQueue(commands)
  141. self.assertTrue(actionQueue.put_status.called)
  142. self.assertFalse(updateComponents.called)
  143. LiveStatus_mock.SERVICES = []
  144. LiveStatus_mock.CLIENT_COMPONENTS = []
  145. LiveStatus_mock.COMPONENTS = []
  146. self.controller.addToStatusQueue(commands)
  147. self.assertTrue(updateComponents.called)
  148. self.assertTrue(actionQueue.put_status.called)
  149. @patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
  150. @patch("urllib2.build_opener")
  151. @patch("urllib2.install_opener")
  152. @patch.object(Controller, "ActionQueue")
  153. @patch.object(OSCheck, "get_os_type")
  154. @patch.object(OSCheck, "get_os_version")
  155. def test_run(self, get_os_version_mock, get_os_type_mock, ActionQueue_mock, installMock, buildMock):
  156. aq = MagicMock()
  157. ActionQueue_mock.return_value = aq
  158. get_os_type_mock.return_value = "suse"
  159. get_os_version_mock.return_value = "11"
  160. buildMock.return_value = "opener"
  161. registerAndHeartbeat = MagicMock("registerAndHeartbeat")
  162. calls = []
  163. def side_effect():
  164. if len(calls) == 0:
  165. self.controller.repeatRegistration = True
  166. calls.append(1)
  167. registerAndHeartbeat.side_effect = side_effect
  168. self.controller.registerAndHeartbeat = registerAndHeartbeat
  169. # repeat registration
  170. self.controller.run()
  171. self.assertTrue(buildMock.called)
  172. installMock.called_once_with("opener")
  173. self.assertEqual(2, registerAndHeartbeat.call_count)
  174. # one call, +1
  175. registerAndHeartbeat.side_effect = None
  176. self.controller.run()
  177. self.assertEqual(3, registerAndHeartbeat.call_count)
  178. # Action queue should be started during calls
  179. self.assertTrue(ActionQueue_mock.called)
  180. self.assertTrue(aq.start.called)
  181. @patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
  182. @patch("urllib2.build_opener")
  183. @patch("urllib2.install_opener")
  184. @patch.object(ActionQueue.ActionQueue, "run")
  185. @patch.object(OSCheck, "get_os_type")
  186. @patch.object(OSCheck, "get_os_version")
  187. def test_repeatRegistration(self, get_os_version_mock, get_os_type_mock,
  188. run_mock, installMock, buildMock):
  189. registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
  190. get_os_type_mock.return_value = "suse"
  191. get_os_version_mock.return_value = "11"
  192. self.controller.registerAndHeartbeat = registerAndHeartbeat
  193. self.controller.run()
  194. self.assertTrue(installMock.called)
  195. self.assertTrue(buildMock.called)
  196. self.controller.registerAndHeartbeat.assert_called_once_with()
  197. calls = []
  198. def switchBool():
  199. if len(calls) == 0:
  200. self.controller.repeatRegistration = True
  201. calls.append(1)
  202. self.controller.repeatRegistration = False
  203. registerAndHeartbeat.side_effect = switchBool
  204. self.controller.run()
  205. self.assertEqual(2, registerAndHeartbeat.call_count)
  206. self.controller.registerAndHeartbeat = \
  207. Controller.Controller.registerAndHeartbeat
  208. @patch("time.sleep")
  209. def test_registerAndHeartbeatWithException(self, sleepMock):
  210. registerWithServer = MagicMock(name="registerWithServer")
  211. registerWithServer.return_value = {"response":"resp"}
  212. self.controller.registerWithServer = registerWithServer
  213. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  214. self.controller.heartbeatWithServer = heartbeatWithServer
  215. actionQueue = MagicMock(name="actionQueue")
  216. self.controller.actionQueue = actionQueue
  217. Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
  218. self.controller.isRegistered = True
  219. self.controller.registerAndHeartbeat()
  220. registerWithServer.assert_called_once_with()
  221. heartbeatWithServer.assert_called_once_with()
  222. self.controller.registerWithServer =\
  223. Controller.Controller.registerWithServer
  224. self.controller.heartbeatWithServer =\
  225. Controller.Controller.registerWithServer
  226. @patch("time.sleep")
  227. def test_registerAndHeartbeat(self, sleepMock):
  228. registerWithServer = MagicMock(name="registerWithServer")
  229. registerWithServer.return_value = {"response":"resp"}
  230. self.controller.registerWithServer = registerWithServer
  231. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  232. self.controller.heartbeatWithServer = heartbeatWithServer
  233. actionQueue = MagicMock(name="actionQueue")
  234. self.controller.actionQueue = actionQueue
  235. listener1 = MagicMock()
  236. listener2 = MagicMock()
  237. self.controller.registration_listeners.append(listener1)
  238. self.controller.registration_listeners.append(listener2)
  239. self.controller.isRegistered = True
  240. self.controller.registerAndHeartbeat()
  241. registerWithServer.assert_called_once_with()
  242. heartbeatWithServer.assert_called_once_with()
  243. self.assertTrue(listener1.called)
  244. self.assertTrue(listener2.called)
  245. self.controller.registerWithServer = \
  246. Controller.Controller.registerWithServer
  247. self.controller.heartbeatWithServer = \
  248. Controller.Controller.registerWithServer
  249. @patch("time.sleep")
  250. def test_registerAndHeartbeat_check_registration_listener(self, sleepMock):
  251. registerWithServer = MagicMock(name="registerWithServer")
  252. registerWithServer.return_value = {"response":"resp"}
  253. self.controller.registerWithServer = registerWithServer
  254. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  255. self.controller.heartbeatWithServer = heartbeatWithServer
  256. actionQueue = MagicMock(name="actionQueue")
  257. self.controller.actionQueue = actionQueue
  258. self.controller.isRegistered = True
  259. self.controller.registerAndHeartbeat()
  260. registerWithServer.assert_called_once_with()
  261. heartbeatWithServer.assert_called_once_with()
  262. self.controller.registerWithServer = \
  263. Controller.Controller.registerWithServer
  264. self.controller.heartbeatWithServer = \
  265. Controller.Controller.registerWithServer
  266. @patch("time.sleep")
  267. @patch.object(Controller.Controller, "sendRequest")
  268. def test_registerWithIOErrors(self, sendRequestMock, sleepMock):
  269. # Check that server continues to heartbeat after connection errors
  270. registerMock = MagicMock(name="Register")
  271. registerMock.build.return_value = {}
  272. actionQueue = MagicMock()
  273. actionQueue.isIdle.return_value = True
  274. self.controller.actionQueue = actionQueue
  275. self.controller.register = registerMock
  276. self.controller.responseId = 1
  277. self.controller.TEST_IOERROR_COUNTER = 1
  278. self.controller.isRegistered = False
  279. def util_throw_IOErrors(*args, **kwargs):
  280. """
  281. Throws IOErrors 10 times and then stops heartbeats/registrations
  282. """
  283. if self.controller.TEST_IOERROR_COUNTER == 10:
  284. self.controller.isRegistered = True
  285. self.controller.TEST_IOERROR_COUNTER += 1
  286. raise IOError("Sample error")
  287. actionQueue.isIdle.return_value = False
  288. sendRequestMock.side_effect = util_throw_IOErrors
  289. self.controller.registerWithServer()
  290. self.assertTrue(sendRequestMock.call_count > 5)
  291. @patch.object(ExitHelper, "exit")
  292. def test_restartAgent(self, exit_mock):
  293. self.controller.restartAgent()
  294. self.assertTrue(exit_mock.called)
  295. self.assertTrue(exit_mock.call_args[0][0] == AGENT_AUTO_RESTART_EXIT_CODE)
  296. @patch("urllib2.Request")
  297. @patch.object(Controller, "security")
  298. def test_sendRequest(self, security_mock, requestMock):
  299. conMock = MagicMock()
  300. security_mock.CachedHTTPSConnection.return_value = conMock
  301. url = "http://ambari.apache.org:8081/agent"
  302. data = "data"
  303. requestMock.return_value = "request"
  304. self.controller.cachedconnect = None
  305. conMock.request.return_value = '{"valid_object": true}'
  306. actual = self.controller.sendRequest(url, data)
  307. expected = ambari_simplejson.loads('{"valid_object": true}')
  308. self.assertEqual(actual, expected)
  309. security_mock.CachedHTTPSConnection.assert_called_once_with(
  310. self.controller.config)
  311. requestMock.called_once_with(url, data,
  312. {'Content-Type': 'application/ambari_simplejson'})
  313. conMock.request.return_value = '{invalid_object}'
  314. try:
  315. self.controller.sendRequest(url, data)
  316. self.fail("Should throw exception!")
  317. except IOError, e: # Expected
  318. self.assertEquals('Response parsing failed! Request data: ' + data +
  319. '; Response: {invalid_object}', str(e))
  320. exceptionMessage = "Connection Refused"
  321. conMock.request.side_effect = Exception(exceptionMessage)
  322. try:
  323. self.controller.sendRequest(url, data)
  324. self.fail("Should throw exception!")
  325. except IOError, e: # Expected
  326. self.assertEquals('Request to ' + url + ' failed due to ' +
  327. exceptionMessage, str(e))
  328. @patch.object(threading._Event, "wait")
  329. @patch("time.sleep")
  330. @patch("ambari_simplejson.dumps")
  331. def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock):
  332. out = StringIO.StringIO()
  333. sys.stdout = out
  334. hearbeat = MagicMock()
  335. self.controller.heartbeat = hearbeat
  336. event_mock.return_value = False
  337. dumpsMock.return_value = "data"
  338. sendRequest = MagicMock(name="sendRequest")
  339. self.controller.sendRequest = sendRequest
  340. self.controller.responseId = 1
  341. response = {"responseId":"2", "restartAgent":"false"}
  342. sendRequest.return_value = response
  343. def one_heartbeat(*args, **kwargs):
  344. self.controller.DEBUG_STOP_HEARTBEATING = True
  345. return response
  346. sendRequest.side_effect = one_heartbeat
  347. actionQueue = MagicMock()
  348. actionQueue.isIdle.return_value = True
  349. # one successful request, after stop
  350. self.controller.actionQueue = actionQueue
  351. self.controller.alert_scheduler_handler = MagicMock()
  352. self.controller.heartbeatWithServer()
  353. self.assertTrue(sendRequest.called)
  354. calls = []
  355. def retry(*args, **kwargs):
  356. if len(calls) == 0:
  357. calls.append(1)
  358. response["responseId"] = "3"
  359. raise Exception()
  360. if len(calls) > 0:
  361. self.controller.DEBUG_STOP_HEARTBEATING = True
  362. return response
  363. # exception, retry, successful and stop
  364. sendRequest.side_effect = retry
  365. self.controller.DEBUG_STOP_HEARTBEATING = False
  366. self.controller.heartbeatWithServer()
  367. self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
  368. # retry registration
  369. self.controller.responseId = 2
  370. response["registrationCommand"] = "true"
  371. sendRequest.side_effect = one_heartbeat
  372. self.controller.DEBUG_STOP_HEARTBEATING = False
  373. self.controller.heartbeatWithServer()
  374. self.assertTrue(self.controller.repeatRegistration)
  375. # components are not mapped
  376. self.controller.responseId = 2
  377. response["registrationCommand"] = "false"
  378. response["hasMappedComponents"] = False
  379. sendRequest.side_effect = one_heartbeat
  380. self.controller.DEBUG_STOP_HEARTBEATING = False
  381. self.controller.heartbeatWithServer()
  382. self.assertFalse(self.controller.hasMappedComponents)
  383. # components are mapped
  384. self.controller.responseId = 2
  385. response["hasMappedComponents"] = True
  386. sendRequest.side_effect = one_heartbeat
  387. self.controller.DEBUG_STOP_HEARTBEATING = False
  388. self.controller.heartbeatWithServer()
  389. self.assertTrue(self.controller.hasMappedComponents)
  390. # components are mapped
  391. self.controller.responseId = 2
  392. del response["hasMappedComponents"]
  393. sendRequest.side_effect = one_heartbeat
  394. self.controller.DEBUG_STOP_HEARTBEATING = False
  395. self.controller.heartbeatWithServer()
  396. self.assertTrue(self.controller.hasMappedComponents)
  397. # wrong responseId => restart
  398. self.controller.responseId = 2
  399. response = {"responseId":"2", "restartAgent":"false"}
  400. restartAgent = MagicMock(name="restartAgent")
  401. self.controller.restartAgent = restartAgent
  402. self.controller.DEBUG_STOP_HEARTBEATING = False
  403. self.controller.heartbeatWithServer()
  404. restartAgent.assert_called_once_with()
  405. # executionCommands
  406. self.controller.responseId = 1
  407. addToQueue = MagicMock(name="addToQueue")
  408. self.controller.addToQueue = addToQueue
  409. response["executionCommands"] = "executionCommands"
  410. self.controller.DEBUG_STOP_HEARTBEATING = False
  411. self.controller.heartbeatWithServer()
  412. addToQueue.assert_has_calls([call("executionCommands")])
  413. # statusCommands
  414. self.controller.responseId = 1
  415. addToStatusQueue = MagicMock(name="addToStatusQueue")
  416. self.controller.addToStatusQueue = addToStatusQueue
  417. response["statusCommands"] = "statusCommands"
  418. self.controller.DEBUG_STOP_HEARTBEATING = False
  419. self.controller.heartbeatWithServer()
  420. addToStatusQueue.assert_has_calls([call("statusCommands")])
  421. # restartAgent command
  422. self.controller.responseId = 1
  423. self.controller.DEBUG_STOP_HEARTBEATING = False
  424. response["restartAgent"] = "true"
  425. restartAgent = MagicMock(name="restartAgent")
  426. self.controller.restartAgent = restartAgent
  427. self.controller.heartbeatWithServer()
  428. restartAgent.assert_called_once_with()
  429. # actionQueue not idle
  430. self.controller.responseId = 1
  431. self.controller.DEBUG_STOP_HEARTBEATING = False
  432. actionQueue.isIdle.return_value = False
  433. response["restartAgent"] = "false"
  434. self.controller.heartbeatWithServer()
  435. event_mock.assert_any_call(timeout=
  436. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
  437. # Check that server continues to heartbeat after connection errors
  438. self.controller.responseId = 1
  439. self.controller.TEST_IOERROR_COUNTER = 1
  440. sendRequest.reset()
  441. def util_throw_IOErrors(*args, **kwargs):
  442. """
  443. Throws IOErrors 100 times and then stops heartbeats/registrations
  444. """
  445. if self.controller.TEST_IOERROR_COUNTER == 10:
  446. self.controller.DEBUG_STOP_HEARTBEATING = True
  447. self.controller.TEST_IOERROR_COUNTER += 1
  448. raise IOError("Sample error")
  449. self.controller.DEBUG_STOP_HEARTBEATING = False
  450. actionQueue.isIdle.return_value = False
  451. sendRequest.side_effect = util_throw_IOErrors
  452. self.controller.heartbeatWithServer()
  453. self.assertTrue(sendRequest.call_count > 5)
  454. event_mock.assert_called_with(timeout=
  455. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
  456. sys.stdout = sys.__stdout__
  457. self.controller.sendRequest = Controller.Controller.sendRequest
  458. self.controller.sendRequest = Controller.Controller.addToQueue
  459. self.controller.sendRequest = Controller.Controller.addToStatusQueue
  460. @patch("pprint.pformat")
  461. @patch("time.sleep")
  462. @patch("ambari_simplejson.loads")
  463. @patch("ambari_simplejson.dumps")
  464. def test_certSigningFailed(self, dumpsMock, loadsMock, sleepMock, pformatMock):
  465. register = MagicMock()
  466. self.controller.register = register
  467. dumpsMock.return_value = "request"
  468. response = {"responseId":1,}
  469. loadsMock.return_value = response
  470. self.controller.sendRequest = Mock(side_effect=ssl.SSLError())
  471. self.controller.repeatRegistration=True
  472. self.controller.registerWithServer()
  473. #Conroller thread and the agent stop if the repeatRegistration flag is False
  474. self.assertFalse(self.controller.repeatRegistration)
  475. @patch.object(Controller, "LiveStatus")
  476. def test_updateComponents(self, LiveStatus_mock):
  477. LiveStatus_mock.SERVICES = []
  478. LiveStatus_mock.CLIENT_COMPONENTS = []
  479. LiveStatus_mock.COMPONENTS = []
  480. self.controller.componentsUrl = "foo_url/"
  481. sendRequest = Mock()
  482. self.controller.sendRequest = sendRequest
  483. self.controller.sendRequest.return_value = {"clusterName":"dummy_cluster_name",
  484. "stackName":"dummy_stack_name",
  485. "stackVersion":"dummy_stack_version",
  486. "components":{"PIG":{"PIG":"CLIENT"},
  487. "MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT",
  488. "JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}}
  489. self.controller.updateComponents("dummy_cluster_name")
  490. sendRequest.assert_called_with('foo_url/dummy_cluster_name', None)
  491. services_expected = [u'MAPREDUCE', u'PIG']
  492. client_components_expected = [
  493. {'serviceName':u'MAPREDUCE','componentName':u'MAPREDUCE_CLIENT'},
  494. {'serviceName':u'PIG','componentName':u'PIG'}
  495. ]
  496. components_expected = [
  497. {'serviceName':u'MAPREDUCE','componentName':u'TASKTRACKER'},
  498. {'serviceName':u'MAPREDUCE','componentName':u'JOBTRACKER'}
  499. ]
  500. self.assertEquals(LiveStatus_mock.SERVICES, services_expected)
  501. self.assertEquals(LiveStatus_mock.CLIENT_COMPONENTS, client_components_expected)
  502. self.assertEquals(LiveStatus_mock.COMPONENTS, components_expected)
  503. @patch("socket.gethostbyname")
  504. @patch("ambari_simplejson.dumps")
  505. @patch("time.sleep")
  506. @patch("pprint.pformat")
  507. @patch.object(Controller, "randint")
  508. @patch.object(Controller, "LiveStatus")
  509. def test_recoveryRegConfig(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
  510. dumpsMock, socketGhbnMock):
  511. self.assertEquals(self.controller.recovery_manager.recovery_enabled, False)
  512. self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
  513. self.assertEquals(self.controller.recovery_manager.max_count, 6)
  514. self.assertEquals(self.controller.recovery_manager.window_in_min, 60)
  515. self.assertEquals(self.controller.recovery_manager.retry_gap, 5)
  516. out = StringIO.StringIO()
  517. sys.stdout = out
  518. dumpsMock.return_value = '{"valid_object": true}'
  519. socketGhbnMock.return_value = "host1"
  520. sendRequest = MagicMock(name="sendRequest")
  521. self.controller.sendRequest = sendRequest
  522. register = MagicMock(name="register")
  523. self.controller.register = register
  524. sendRequest.return_value = {
  525. "responseId": 1,
  526. "recoveryConfig": {
  527. "type": "FULL",
  528. "maxCount": 5,
  529. "windowInMinutes": 50,
  530. "retryGap": 3,
  531. "maxLifetimeCount": 7},
  532. "log": "", "exitstatus": "0"}
  533. self.controller.isRegistered = False
  534. self.controller.registerWithServer()
  535. self.assertEquals(self.controller.recovery_manager.recovery_enabled, True)
  536. self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
  537. self.assertEquals(self.controller.recovery_manager.max_count, 5)
  538. self.assertEquals(self.controller.recovery_manager.window_in_min, 50)
  539. self.assertEquals(self.controller.recovery_manager.retry_gap, 3)
  540. self.assertEquals(self.controller.recovery_manager.max_lifetime_count, 7)
  541. sys.stdout = sys.__stdout__
  542. self.controller.sendRequest = Controller.Controller.sendRequest
  543. self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
  544. pass
  545. @patch.object(threading._Event, "wait")
  546. @patch("time.sleep")
  547. @patch("ambari_simplejson.dumps")
  548. def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock):
  549. out = StringIO.StringIO()
  550. sys.stdout = out
  551. hearbeat = MagicMock()
  552. self.controller.heartbeat = hearbeat
  553. event_mock.return_value = False
  554. dumpsMock.return_value = "data"
  555. sendRequest = MagicMock(name="sendRequest")
  556. self.controller.sendRequest = sendRequest
  557. addToQueue = MagicMock(name="addToQueue")
  558. addToStatusQueue = MagicMock(name="addToStatusQueue")
  559. self.addToQueue = addToQueue
  560. self.addToStatusQueue = addToStatusQueue
  561. process_execution_commands = MagicMock(name="process_execution_commands")
  562. self.controller.recovery_manager.process_execution_commands = process_execution_commands
  563. process_status_commands = MagicMock(name="process_status_commands")
  564. self.controller.recovery_manager.process_status_commands = process_status_commands
  565. set_paused = MagicMock(name = "set_paused")
  566. self.controller.recovery_manager.set_paused = set_paused
  567. self.controller.responseId = 0
  568. response = {"responseId":1,
  569. "statusCommands": "commands2",
  570. "executionCommands" : "commands1",
  571. "log":"",
  572. "exitstatus":"0",
  573. "hasPendingTasks": True}
  574. sendRequest.return_value = response
  575. def one_heartbeat(*args, **kwargs):
  576. self.controller.DEBUG_STOP_HEARTBEATING = True
  577. return response
  578. sendRequest.side_effect = one_heartbeat
  579. actionQueue = MagicMock()
  580. actionQueue.isIdle.return_value = True
  581. # one successful request, after stop
  582. self.controller.actionQueue = actionQueue
  583. self.controller.heartbeatWithServer()
  584. self.assertTrue(sendRequest.called)
  585. self.assertTrue(process_execution_commands.called)
  586. self.assertTrue(process_status_commands.called)
  587. process_execution_commands.assert_called_with("commands1")
  588. process_status_commands.assert_called_with("commands2")
  589. set_paused.assert_called_with(True)
  590. self.controller.heartbeatWithServer()
  591. sys.stdout = sys.__stdout__
  592. self.controller.sendRequest = Controller.Controller.sendRequest
  593. self.controller.sendRequest = Controller.Controller.addToQueue
  594. self.controller.sendRequest = Controller.Controller.addToStatusQueue
  595. pass
  596. if __name__ == "__main__":
  597. unittest.main(verbosity=2)