TestController.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. Licensed to the Apache Software Foundation (ASF) under one
  5. or more contributor license agreements. See the NOTICE file
  6. distributed with this work for additional information
  7. regarding copyright ownership. The ASF licenses this file
  8. to you under the Apache License, Version 2.0 (the
  9. "License"); you may not use this file except in compliance
  10. with the License. You may obtain a copy of the License at
  11. http://www.apache.org/licenses/LICENSE-2.0
  12. Unless required by applicable law or agreed to in writing, software
  13. distributed under the License is distributed on an "AS IS" BASIS,
  14. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. See the License for the specific language governing permissions and
  16. limitations under the License.
  17. '''
  18. import StringIO
  19. import os
  20. import ssl
  21. import tempfile
  22. import unittest, threading
  23. import sys
  24. from mock.mock import patch, MagicMock, call, Mock
  25. import logging
  26. import platform
  27. from threading import Event
  28. import ambari_simplejson
  29. from ambari_commons import OSCheck
  30. from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS
  31. from ambari_agent import Controller, ActionQueue, Register
  32. from ambari_agent import hostname
  33. from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
  34. from ambari_commons import OSCheck
  35. from ambari_agent.Hardware import Hardware
  36. from ambari_agent.ExitHelper import ExitHelper
  37. from ambari_agent.AmbariConfig import AmbariConfig
  38. from ambari_agent.Facter import FacterLinux
  39. import ambari_commons
  40. @not_for_platform(PLATFORM_WINDOWS)
  41. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  42. class TestController(unittest.TestCase):
  43. logger = logging.getLogger()
  44. @patch.object(Controller, "NetUtil", MagicMock())
  45. @patch.object(Controller, "AlertSchedulerHandler", MagicMock())
  46. @patch.object(Controller.Controller, "read_agent_version")
  47. @patch("threading.Thread")
  48. @patch("threading.Lock")
  49. @patch.object(hostname, "hostname")
  50. def setUp(self, hostname_method, lockMock, threadMock, read_agent_versionMock):
  51. Controller.logger = MagicMock()
  52. lockMock.return_value = MagicMock()
  53. hostname_method.return_value = "test_hostname"
  54. read_agent_versionMock.return_value = '2.1.0'
  55. config = MagicMock()
  56. #config.get.return_value = "something"
  57. config.get.return_value = "5"
  58. server_hostname = "test_server"
  59. self.controller = Controller.Controller(config, server_hostname)
  60. self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
  61. self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
  62. @patch.object(OSCheck, "get_os_type")
  63. @patch.object(OSCheck, "get_os_version")
  64. def test_read_agent_version(self, get_os_version_mock, get_os_type_mock):
  65. config = AmbariConfig().getConfig()
  66. tmpdir = tempfile.gettempdir()
  67. config.set('agent', 'prefix', tmpdir)
  68. config.set('agent', 'current_ping_port', '33777')
  69. ver_file = os.path.join(tmpdir, "version")
  70. reference_version = "1.3.0"
  71. with open(ver_file, "w") as text_file:
  72. text_file.write(reference_version)
  73. version = self.controller.read_agent_version(config)
  74. os.remove(ver_file)
  75. self.assertEqual(reference_version, version)
  76. @patch("ambari_simplejson.dumps")
  77. @patch("time.sleep")
  78. @patch("pprint.pformat")
  79. @patch.object(Controller, "randint")
  80. @patch.object(Controller, "LiveStatus")
  81. def test_registerWithServer(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
  82. dumpsMock):
  83. out = StringIO.StringIO()
  84. sys.stdout = out
  85. LiveStatus_mock.SERVICES = ["foo"]
  86. LiveStatus_mock.CLIENT_COMPONENTS = ["foo"]
  87. LiveStatus_mock.COMPONENTS = ["foo"]
  88. register = MagicMock()
  89. self.controller.register = register
  90. self.controller.sendRequest = MagicMock()
  91. dumpsMock.return_value = '{"valid_object": true}'
  92. self.controller.sendRequest.return_value = {"log":"Error text", "exitstatus":"1"}
  93. self.assertEqual({u'exitstatus': u'1', u'log': u'Error text'}, self.controller.registerWithServer())
  94. self.assertEqual(LiveStatus_mock.SERVICES, [])
  95. self.assertEqual(LiveStatus_mock.CLIENT_COMPONENTS, [])
  96. self.assertEqual(LiveStatus_mock.COMPONENTS, [])
  97. self.controller.sendRequest.return_value = {"responseId":1}
  98. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  99. self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}
  100. self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue")
  101. self.controller.isRegistered = False
  102. self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer())
  103. self.controller.addToStatusQueue.assert_called_with("commands")
  104. calls = []
  105. def side_effect(*args):
  106. if len(calls) == 0:
  107. calls.append(1)
  108. raise Exception("test")
  109. return "request"
  110. self.controller.sendRequest.return_value = {"responseId":1}
  111. dumpsMock.side_effect = side_effect
  112. self.controller.isRegistered = False
  113. self.assertEqual({"responseId":1}, self.controller.registerWithServer())
  114. self.assertTrue(randintMock.called)
  115. self.assertTrue(sleepMock.called)
  116. sys.stdout = sys.__stdout__
  117. self.controller.sendRequest = Controller.Controller.sendRequest
  118. self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
  119. @patch("pprint.pformat")
  120. def test_addToQueue(self, pformatMock):
  121. actionQueue = MagicMock()
  122. self.controller.actionQueue = actionQueue
  123. self.controller.addToQueue(None)
  124. self.assertFalse(actionQueue.put.called)
  125. self.controller.addToQueue("cmd")
  126. self.assertTrue(actionQueue.put.called)
  127. @patch("pprint.pformat")
  128. @patch.object(Controller, "LiveStatus")
  129. def test_addToStatusQueue(self, LiveStatus_mock, pformatMock):
  130. LiveStatus_mock.SERVICES = ["foo"]
  131. LiveStatus_mock.CLIENT_COMPONENTS = ["foo"]
  132. LiveStatus_mock.COMPONENTS = ["foo"]
  133. commands = ambari_simplejson.loads('[{"clusterName":"dummy_cluster"}]')
  134. actionQueue = MagicMock()
  135. self.controller.actionQueue = actionQueue
  136. process_status_commands = MagicMock(name="process_status_commands")
  137. self.controller.recovery_manager.process_status_commands = process_status_commands
  138. updateComponents = Mock()
  139. self.controller.updateComponents = updateComponents
  140. self.controller.addToStatusQueue(None)
  141. self.assertFalse(actionQueue.put_status.called)
  142. self.assertFalse(updateComponents.called)
  143. self.controller.addToStatusQueue(commands)
  144. self.assertTrue(actionQueue.put_status.called)
  145. self.assertFalse(updateComponents.called)
  146. LiveStatus_mock.SERVICES = []
  147. LiveStatus_mock.CLIENT_COMPONENTS = []
  148. LiveStatus_mock.COMPONENTS = []
  149. self.controller.addToStatusQueue(commands)
  150. self.assertTrue(updateComponents.called)
  151. self.assertTrue(actionQueue.put_status.called)
  152. self.assertTrue(process_status_commands.called)
  153. @patch("subprocess.Popen")
  154. @patch.object(Hardware, "_chk_writable_mount", new = MagicMock(return_value=True))
  155. @patch.object(FacterLinux, "facterInfo", new = MagicMock(return_value={}))
  156. @patch.object(FacterLinux, "__init__", new = MagicMock(return_value = None))
  157. @patch("urllib2.build_opener")
  158. @patch("urllib2.install_opener")
  159. @patch.object(Controller, "ActionQueue")
  160. @patch.object(OSCheck, "get_os_type")
  161. @patch.object(OSCheck, "get_os_version")
  162. def test_run(self, get_os_version_mock, get_os_type_mock, ActionQueue_mock, installMock, buildMock, Popen_mock):
  163. aq = MagicMock()
  164. ActionQueue_mock.return_value = aq
  165. get_os_type_mock.return_value = "suse"
  166. get_os_version_mock.return_value = "11"
  167. buildMock.return_value = "opener"
  168. registerAndHeartbeat = MagicMock("registerAndHeartbeat")
  169. calls = []
  170. def side_effect():
  171. if len(calls) == 0:
  172. self.controller.repeatRegistration = True
  173. calls.append(1)
  174. registerAndHeartbeat.side_effect = side_effect
  175. self.controller.registerAndHeartbeat = registerAndHeartbeat
  176. # repeat registration
  177. self.controller.run()
  178. self.assertTrue(buildMock.called)
  179. installMock.called_once_with("opener")
  180. self.assertEqual(2, registerAndHeartbeat.call_count)
  181. # one call, +1
  182. registerAndHeartbeat.side_effect = None
  183. self.controller.run()
  184. self.assertEqual(3, registerAndHeartbeat.call_count)
  185. # Action queue should be started during calls
  186. self.assertTrue(ActionQueue_mock.called)
  187. self.assertTrue(aq.start.called)
  188. @patch("subprocess.Popen")
  189. @patch.object(Hardware, "_chk_writable_mount", new = MagicMock(return_value=True))
  190. @patch.object(FacterLinux, "facterInfo", new = MagicMock(return_value={}))
  191. @patch.object(FacterLinux, "__init__", new = MagicMock(return_value = None))
  192. @patch("urllib2.build_opener")
  193. @patch("urllib2.install_opener")
  194. @patch.object(ActionQueue.ActionQueue, "run")
  195. @patch.object(OSCheck, "get_os_type")
  196. @patch.object(OSCheck, "get_os_version")
  197. def test_repeatRegistration(self, get_os_version_mock, get_os_type_mock,
  198. run_mock, installMock, buildMock, Popen_mock):
  199. registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
  200. get_os_type_mock.return_value = "suse"
  201. get_os_version_mock.return_value = "11"
  202. self.controller.registerAndHeartbeat = registerAndHeartbeat
  203. self.controller.run()
  204. self.assertTrue(installMock.called)
  205. self.assertTrue(buildMock.called)
  206. self.controller.registerAndHeartbeat.assert_called_once_with()
  207. calls = []
  208. def switchBool():
  209. if len(calls) == 0:
  210. self.controller.repeatRegistration = True
  211. calls.append(1)
  212. self.controller.repeatRegistration = False
  213. registerAndHeartbeat.side_effect = switchBool
  214. self.controller.run()
  215. self.assertEqual(2, registerAndHeartbeat.call_count)
  216. self.controller.registerAndHeartbeat = \
  217. Controller.Controller.registerAndHeartbeat
  218. @patch("time.sleep")
  219. def test_registerAndHeartbeatWithException(self, sleepMock):
  220. registerWithServer = MagicMock(name="registerWithServer")
  221. registerWithServer.return_value = {"response":"resp"}
  222. self.controller.registerWithServer = registerWithServer
  223. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  224. self.controller.heartbeatWithServer = heartbeatWithServer
  225. actionQueue = MagicMock(name="actionQueue")
  226. self.controller.actionQueue = actionQueue
  227. Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
  228. self.controller.isRegistered = True
  229. self.controller.registerAndHeartbeat()
  230. registerWithServer.assert_called_once_with()
  231. heartbeatWithServer.assert_called_once_with()
  232. self.controller.registerWithServer =\
  233. Controller.Controller.registerWithServer
  234. self.controller.heartbeatWithServer =\
  235. Controller.Controller.registerWithServer
  236. @patch("time.sleep")
  237. def test_registerAndHeartbeat(self, sleepMock):
  238. registerWithServer = MagicMock(name="registerWithServer")
  239. registerWithServer.return_value = {"response":"resp"}
  240. self.controller.registerWithServer = registerWithServer
  241. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  242. self.controller.heartbeatWithServer = heartbeatWithServer
  243. actionQueue = MagicMock(name="actionQueue")
  244. self.controller.actionQueue = actionQueue
  245. listener1 = MagicMock()
  246. listener2 = MagicMock()
  247. self.controller.registration_listeners.append(listener1)
  248. self.controller.registration_listeners.append(listener2)
  249. self.controller.isRegistered = True
  250. self.controller.registerAndHeartbeat()
  251. registerWithServer.assert_called_once_with()
  252. heartbeatWithServer.assert_called_once_with()
  253. self.assertTrue(listener1.called)
  254. self.assertTrue(listener2.called)
  255. self.controller.registerWithServer = \
  256. Controller.Controller.registerWithServer
  257. self.controller.heartbeatWithServer = \
  258. Controller.Controller.registerWithServer
  259. @patch("time.sleep")
  260. def test_registerAndHeartbeat_check_registration_listener(self, sleepMock):
  261. registerWithServer = MagicMock(name="registerWithServer")
  262. registerWithServer.return_value = {"response":"resp"}
  263. self.controller.registerWithServer = registerWithServer
  264. heartbeatWithServer = MagicMock(name="heartbeatWithServer")
  265. self.controller.heartbeatWithServer = heartbeatWithServer
  266. actionQueue = MagicMock(name="actionQueue")
  267. self.controller.actionQueue = actionQueue
  268. self.controller.isRegistered = True
  269. self.controller.registerAndHeartbeat()
  270. registerWithServer.assert_called_once_with()
  271. heartbeatWithServer.assert_called_once_with()
  272. self.controller.registerWithServer = \
  273. Controller.Controller.registerWithServer
  274. self.controller.heartbeatWithServer = \
  275. Controller.Controller.registerWithServer
  276. @patch("time.sleep")
  277. @patch.object(Controller.Controller, "sendRequest")
  278. def test_registerWithIOErrors(self, sendRequestMock, sleepMock):
  279. # Check that server continues to heartbeat after connection errors
  280. registerMock = MagicMock(name="Register")
  281. registerMock.build.return_value = {}
  282. actionQueue = MagicMock()
  283. actionQueue.isIdle.return_value = True
  284. self.controller.actionQueue = actionQueue
  285. self.controller.register = registerMock
  286. self.controller.responseId = 1
  287. self.controller.TEST_IOERROR_COUNTER = 1
  288. self.controller.isRegistered = False
  289. def util_throw_IOErrors(*args, **kwargs):
  290. """
  291. Throws IOErrors 10 times and then stops heartbeats/registrations
  292. """
  293. if self.controller.TEST_IOERROR_COUNTER == 10:
  294. self.controller.isRegistered = True
  295. self.controller.TEST_IOERROR_COUNTER += 1
  296. raise IOError("Sample error")
  297. actionQueue.isIdle.return_value = False
  298. sendRequestMock.side_effect = util_throw_IOErrors
  299. self.controller.registerWithServer()
  300. self.assertTrue(sendRequestMock.call_count > 5)
  301. @patch.object(ExitHelper, "exit")
  302. def test_restartAgent(self, exit_mock):
  303. self.controller.restartAgent()
  304. self.assertTrue(exit_mock.called)
  305. self.assertTrue(exit_mock.call_args[0][0] == AGENT_AUTO_RESTART_EXIT_CODE)
  306. @patch("urllib2.Request")
  307. @patch.object(Controller, "security")
  308. def test_sendRequest(self, security_mock, requestMock):
  309. conMock = MagicMock()
  310. security_mock.CachedHTTPSConnection.return_value = conMock
  311. url = "http://ambari.apache.org:8081/agent"
  312. data = "data"
  313. requestMock.return_value = "request"
  314. self.controller.cachedconnect = None
  315. conMock.request.return_value = '{"valid_object": true}'
  316. actual = self.controller.sendRequest(url, data)
  317. expected = ambari_simplejson.loads('{"valid_object": true}')
  318. self.assertEqual(actual, expected)
  319. security_mock.CachedHTTPSConnection.assert_called_once_with(
  320. self.controller.config, self.controller.serverHostname)
  321. requestMock.called_once_with(url, data,
  322. {'Content-Type': 'application/ambari_simplejson'})
  323. conMock.request.return_value = '{invalid_object}'
  324. try:
  325. self.controller.sendRequest(url, data)
  326. self.fail("Should throw exception!")
  327. except IOError, e: # Expected
  328. self.assertEquals('Response parsing failed! Request data: ' + data +
  329. '; Response: {invalid_object}', str(e))
  330. exceptionMessage = "Connection Refused"
  331. conMock.request.side_effect = Exception(exceptionMessage)
  332. try:
  333. self.controller.sendRequest(url, data)
  334. self.fail("Should throw exception!")
  335. except IOError, e: # Expected
  336. self.assertEquals('Request to ' + url + ' failed due to ' +
  337. exceptionMessage, str(e))
  338. @patch.object(ExitHelper, "exit")
  339. @patch.object(threading._Event, "wait")
  340. @patch("time.sleep")
  341. @patch("ambari_simplejson.dumps")
  342. def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock, exit_mock):
  343. out = StringIO.StringIO()
  344. sys.stdout = out
  345. hearbeat = MagicMock()
  346. self.controller.heartbeat = hearbeat
  347. event_mock.return_value = False
  348. dumpsMock.return_value = "data"
  349. sendRequest = MagicMock(name="sendRequest")
  350. self.controller.sendRequest = sendRequest
  351. self.controller.responseId = 1
  352. response = {"responseId":"2", "restartAgent":"false"}
  353. sendRequest.return_value = response
  354. def one_heartbeat(*args, **kwargs):
  355. self.controller.DEBUG_STOP_HEARTBEATING = True
  356. return response
  357. sendRequest.side_effect = one_heartbeat
  358. actionQueue = MagicMock()
  359. actionQueue.isIdle.return_value = True
  360. # one successful request, after stop
  361. self.controller.actionQueue = actionQueue
  362. self.controller.alert_scheduler_handler = MagicMock()
  363. self.controller.heartbeatWithServer()
  364. self.assertTrue(sendRequest.called)
  365. calls = []
  366. def retry(*args, **kwargs):
  367. if len(calls) == 0:
  368. calls.append(1)
  369. response["responseId"] = "3"
  370. raise Exception()
  371. if len(calls) > 0:
  372. self.controller.DEBUG_STOP_HEARTBEATING = True
  373. return response
  374. # exception, retry, successful and stop
  375. sendRequest.side_effect = retry
  376. self.controller.DEBUG_STOP_HEARTBEATING = False
  377. self.controller.heartbeatWithServer()
  378. self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
  379. # retry registration
  380. self.controller.responseId = 2
  381. response["registrationCommand"] = "true"
  382. sendRequest.side_effect = one_heartbeat
  383. self.controller.DEBUG_STOP_HEARTBEATING = False
  384. self.controller.heartbeatWithServer()
  385. self.assertTrue(self.controller.repeatRegistration)
  386. # components are not mapped
  387. self.controller.responseId = 2
  388. response["registrationCommand"] = "false"
  389. response["hasMappedComponents"] = False
  390. sendRequest.side_effect = one_heartbeat
  391. self.controller.DEBUG_STOP_HEARTBEATING = False
  392. self.controller.heartbeatWithServer()
  393. self.assertFalse(self.controller.hasMappedComponents)
  394. # components are mapped
  395. self.controller.responseId = 2
  396. response["hasMappedComponents"] = True
  397. sendRequest.side_effect = one_heartbeat
  398. self.controller.DEBUG_STOP_HEARTBEATING = False
  399. self.controller.heartbeatWithServer()
  400. self.assertTrue(self.controller.hasMappedComponents)
  401. # components are mapped
  402. self.controller.responseId = 2
  403. del response["hasMappedComponents"]
  404. sendRequest.side_effect = one_heartbeat
  405. self.controller.DEBUG_STOP_HEARTBEATING = False
  406. self.controller.heartbeatWithServer()
  407. self.assertTrue(self.controller.hasMappedComponents)
  408. # wrong responseId => restart
  409. self.controller.responseId = 2
  410. response = {"responseId":"2", "restartAgent":"false"}
  411. restartAgent = MagicMock(name="restartAgent")
  412. self.controller.restartAgent = restartAgent
  413. self.controller.DEBUG_STOP_HEARTBEATING = False
  414. self.controller.heartbeatWithServer()
  415. restartAgent.assert_called_with()
  416. # executionCommands
  417. self.controller.responseId = 1
  418. addToQueue = MagicMock(name="addToQueue")
  419. self.controller.addToQueue = addToQueue
  420. response["executionCommands"] = "executionCommands"
  421. self.controller.DEBUG_STOP_HEARTBEATING = False
  422. self.controller.heartbeatWithServer()
  423. addToQueue.assert_has_calls([call("executionCommands")])
  424. # statusCommands
  425. self.controller.responseId = 1
  426. addToStatusQueue = MagicMock(name="addToStatusQueue")
  427. self.controller.addToStatusQueue = addToStatusQueue
  428. response["statusCommands"] = "statusCommands"
  429. self.controller.DEBUG_STOP_HEARTBEATING = False
  430. self.controller.heartbeatWithServer()
  431. addToStatusQueue.assert_has_calls([call("statusCommands")])
  432. # restartAgent command
  433. self.controller.responseId = 1
  434. self.controller.DEBUG_STOP_HEARTBEATING = False
  435. response["restartAgent"] = "true"
  436. restartAgent = MagicMock(name="restartAgent")
  437. self.controller.restartAgent = restartAgent
  438. self.controller.heartbeatWithServer()
  439. restartAgent.assert_called_with()
  440. # actionQueue not idle
  441. self.controller.responseId = 1
  442. self.controller.DEBUG_STOP_HEARTBEATING = False
  443. actionQueue.isIdle.return_value = False
  444. response["restartAgent"] = "false"
  445. self.controller.heartbeatWithServer()
  446. # Check that server continues to heartbeat after connection errors
  447. self.controller.responseId = 1
  448. self.controller.TEST_IOERROR_COUNTER = 1
  449. sendRequest.reset()
  450. def util_throw_IOErrors(*args, **kwargs):
  451. """
  452. Throws IOErrors 100 times and then stops heartbeats/registrations
  453. """
  454. if self.controller.TEST_IOERROR_COUNTER == 10:
  455. self.controller.DEBUG_STOP_HEARTBEATING = True
  456. self.controller.TEST_IOERROR_COUNTER += 1
  457. raise IOError("Sample error")
  458. self.controller.DEBUG_STOP_HEARTBEATING = False
  459. actionQueue.isIdle.return_value = False
  460. sendRequest.side_effect = util_throw_IOErrors
  461. self.controller.heartbeatWithServer()
  462. self.assertTrue(sendRequest.call_count > 5)
  463. sys.stdout = sys.__stdout__
  464. self.controller.sendRequest = Controller.Controller.sendRequest
  465. self.controller.sendRequest = Controller.Controller.addToQueue
  466. self.controller.sendRequest = Controller.Controller.addToStatusQueue
  467. @patch("pprint.pformat")
  468. @patch("time.sleep")
  469. @patch("ambari_simplejson.loads")
  470. @patch("ambari_simplejson.dumps")
  471. def test_certSigningFailed(self, dumpsMock, loadsMock, sleepMock, pformatMock):
  472. register = MagicMock()
  473. self.controller.register = register
  474. dumpsMock.return_value = "request"
  475. response = {"responseId":1,}
  476. loadsMock.return_value = response
  477. self.controller.sendRequest = Mock(side_effect=ssl.SSLError())
  478. self.controller.repeatRegistration=True
  479. self.controller.registerWithServer()
  480. #Conroller thread and the agent stop if the repeatRegistration flag is False
  481. self.assertFalse(self.controller.repeatRegistration)
  482. @patch.object(Controller, "LiveStatus")
  483. def test_updateComponents(self, LiveStatus_mock):
  484. LiveStatus_mock.SERVICES = []
  485. LiveStatus_mock.CLIENT_COMPONENTS = []
  486. LiveStatus_mock.COMPONENTS = []
  487. self.controller.componentsUrl = "foo_url/"
  488. sendRequest = Mock()
  489. self.controller.sendRequest = sendRequest
  490. self.controller.sendRequest.return_value = {"clusterName":"dummy_cluster_name",
  491. "stackName":"dummy_stack_name",
  492. "stackVersion":"dummy_stack_version",
  493. "components":{"PIG":{"PIG":"CLIENT"},
  494. "MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT",
  495. "JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}}
  496. self.controller.updateComponents("dummy_cluster_name")
  497. sendRequest.assert_called_with('foo_url/dummy_cluster_name', None)
  498. services_expected = [u'MAPREDUCE', u'PIG']
  499. client_components_expected = [
  500. {'serviceName':u'MAPREDUCE','componentName':u'MAPREDUCE_CLIENT'},
  501. {'serviceName':u'PIG','componentName':u'PIG'}
  502. ]
  503. components_expected = [
  504. {'serviceName':u'MAPREDUCE','componentName':u'TASKTRACKER'},
  505. {'serviceName':u'MAPREDUCE','componentName':u'JOBTRACKER'}
  506. ]
  507. self.assertEquals(LiveStatus_mock.SERVICES, services_expected)
  508. self.assertEquals(LiveStatus_mock.CLIENT_COMPONENTS, client_components_expected)
  509. self.assertEquals(LiveStatus_mock.COMPONENTS, components_expected)
  510. @patch("socket.gethostbyname")
  511. @patch("ambari_simplejson.dumps")
  512. @patch("time.sleep")
  513. @patch("pprint.pformat")
  514. @patch.object(Controller, "randint")
  515. @patch.object(Controller, "LiveStatus")
  516. def test_recoveryRegConfig(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
  517. dumpsMock, socketGhbnMock):
  518. self.assertEquals(self.controller.recovery_manager.recovery_enabled, False)
  519. self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
  520. self.assertEquals(self.controller.recovery_manager.max_count, 6)
  521. self.assertEquals(self.controller.recovery_manager.window_in_min, 60)
  522. self.assertEquals(self.controller.recovery_manager.retry_gap, 5)
  523. out = StringIO.StringIO()
  524. sys.stdout = out
  525. dumpsMock.return_value = '{"valid_object": true}'
  526. socketGhbnMock.return_value = "host1"
  527. sendRequest = MagicMock(name="sendRequest")
  528. self.controller.sendRequest = sendRequest
  529. register = MagicMock(name="register")
  530. self.controller.register = register
  531. sendRequest.return_value = {
  532. "responseId": 1,
  533. "recoveryConfig": {
  534. "type": "FULL",
  535. "maxCount": 5,
  536. "windowInMinutes": 50,
  537. "retryGap": 3,
  538. "maxLifetimeCount": 7},
  539. "log": "", "exitstatus": "0"}
  540. self.controller.isRegistered = False
  541. self.controller.registerWithServer()
  542. self.assertEquals(self.controller.recovery_manager.recovery_enabled, True)
  543. self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
  544. self.assertEquals(self.controller.recovery_manager.max_count, 5)
  545. self.assertEquals(self.controller.recovery_manager.window_in_min, 50)
  546. self.assertEquals(self.controller.recovery_manager.retry_gap, 3)
  547. self.assertEquals(self.controller.recovery_manager.max_lifetime_count, 7)
  548. sys.stdout = sys.__stdout__
  549. self.controller.sendRequest = Controller.Controller.sendRequest
  550. self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
  551. pass
  552. @patch.object(ExitHelper, "exit")
  553. @patch.object(threading._Event, "wait")
  554. @patch("time.sleep")
  555. @patch("ambari_simplejson.dumps")
  556. def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock, exit_mock):
  557. out = StringIO.StringIO()
  558. sys.stdout = out
  559. hearbeat = MagicMock()
  560. self.controller.heartbeat = hearbeat
  561. event_mock.return_value = False
  562. dumpsMock.return_value = "data"
  563. sendRequest = MagicMock(name="sendRequest")
  564. self.controller.sendRequest = sendRequest
  565. addToQueue = MagicMock(name="addToQueue")
  566. addToStatusQueue = MagicMock(name="addToStatusQueue")
  567. self.addToQueue = addToQueue
  568. self.addToStatusQueue = addToStatusQueue
  569. process_execution_commands = MagicMock(name="process_execution_commands")
  570. self.controller.recovery_manager.process_execution_commands = process_execution_commands
  571. process_status_commands = MagicMock(name="process_status_commands")
  572. self.controller.recovery_manager.process_status_commands = process_status_commands
  573. set_paused = MagicMock(name = "set_paused")
  574. self.controller.recovery_manager.set_paused = set_paused
  575. self.controller.responseId = 0
  576. response = {"responseId":1,
  577. "statusCommands": "commands2",
  578. "executionCommands" : "commands1",
  579. "log":"",
  580. "exitstatus":"0",
  581. "hasPendingTasks": True}
  582. sendRequest.return_value = response
  583. def one_heartbeat(*args, **kwargs):
  584. self.controller.DEBUG_STOP_HEARTBEATING = True
  585. return response
  586. sendRequest.side_effect = one_heartbeat
  587. actionQueue = MagicMock()
  588. actionQueue.isIdle.return_value = True
  589. # one successful request, after stop
  590. self.controller.actionQueue = actionQueue
  591. self.controller.heartbeatWithServer()
  592. self.assertTrue(sendRequest.called)
  593. self.assertTrue(process_execution_commands.called)
  594. self.assertFalse(process_status_commands.called)
  595. process_execution_commands.assert_called_with("commands1")
  596. set_paused.assert_called_with(True)
  597. self.controller.heartbeatWithServer()
  598. sys.stdout = sys.__stdout__
  599. self.controller.sendRequest = Controller.Controller.sendRequest
  600. self.controller.sendRequest = Controller.Controller.addToQueue
  601. self.controller.sendRequest = Controller.Controller.addToStatusQueue
  602. pass
  603. if __name__ == "__main__":
  604. unittest.main(verbosity=2)