TestRecoveryManager.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. from unittest import TestCase
  18. import copy
  19. from ambari_agent.RecoveryManager import RecoveryManager
  20. from mock.mock import patch, MagicMock, call
  21. class TestRecoveryManager(TestCase):
  22. command = {
  23. "commandType": "STATUS_COMMAND",
  24. "payloadLevel": "EXECUTION_COMMAND",
  25. "componentName": "NODEMANAGER",
  26. "desiredState": "STARTED",
  27. "hasStaleConfigs": False,
  28. "executionCommandDetails": {
  29. "commandType": "EXECUTION_COMMAND",
  30. "roleCommand": "INSTALL",
  31. "role": "NODEMANAGER",
  32. "hostLevelParams": {
  33. "custom_command":""},
  34. "configurations": {
  35. "capacity-scheduler": {
  36. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  37. "capacity-calculator": {
  38. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  39. "commandParams": {
  40. "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
  41. }
  42. }
  43. }
  44. }
  45. exec_command1 = {
  46. "commandType": "EXECUTION_COMMAND",
  47. "roleCommand": "INSTALL",
  48. "role": "NODEMANAGER",
  49. "configurations": {
  50. "capacity-scheduler": {
  51. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  52. "capacity-calculator": {
  53. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  54. "commandParams": {
  55. "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
  56. }
  57. }
  58. }
  59. exec_command2 = {
  60. "commandType": "EXECUTION_COMMAND",
  61. "roleCommand": "START",
  62. "role": "NODEMANAGER",
  63. "configurations": {
  64. "capacity-scheduler": {
  65. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  66. "capacity-calculator": {
  67. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  68. "commandParams": {
  69. "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
  70. }
  71. }
  72. }
  73. exec_command3 = {
  74. "commandType": "EXECUTION_COMMAND",
  75. "roleCommand": "SERVICE_CHECK",
  76. "role": "NODEMANAGER",
  77. "configurations": {
  78. "capacity-scheduler": {
  79. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  80. "capacity-calculator": {
  81. "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
  82. "commandParams": {
  83. "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
  84. }
  85. }
  86. }
  87. def setUp(self):
  88. pass
  89. def tearDown(self):
  90. pass
  91. @patch.object(RecoveryManager, "update_desired_status")
  92. def test_process_commands(self, mock_uds):
  93. rm = RecoveryManager(True)
  94. rm.process_status_commands(None)
  95. self.assertFalse(mock_uds.called)
  96. rm.process_status_commands([])
  97. self.assertFalse(mock_uds.called)
  98. rm.process_status_commands([self.command])
  99. mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")])
  100. mock_uds.reset_mock()
  101. rm.process_status_commands([self.command, self.exec_command1, self.command])
  102. mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")], [call("NODEMANAGER", "STARTED")])
  103. mock_uds.reset_mock()
  104. rm.process_execution_commands([self.exec_command1, self.exec_command2, self.exec_command3])
  105. mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")], [call("NODEMANAGER", "STARTED")])
  106. mock_uds.reset_mock()
  107. rm.process_execution_commands([self.exec_command1, self.command])
  108. mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")])
  109. pass
  110. def test_defaults(self):
  111. rm = RecoveryManager()
  112. self.assertFalse(rm.enabled())
  113. self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
  114. self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
  115. rm.update_current_status("NODEMANAGER", "INSTALLED")
  116. rm.update_desired_status("NODEMANAGER", "STARTED")
  117. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  118. pass
  119. @patch.object(RecoveryManager, "_now_")
  120. def test_sliding_window(self, time_mock):
  121. time_mock.side_effect = \
  122. [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
  123. 1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
  124. rm = RecoveryManager(True, False)
  125. self.assertTrue(rm.enabled())
  126. rm.update_config(0, 60, 5, 12, True, False)
  127. self.assertFalse(rm.enabled())
  128. rm.update_config(6, 60, 5, 12, True, False)
  129. self.assertTrue(rm.enabled())
  130. rm.update_config(6, 0, 5, 12, True, False)
  131. self.assertFalse(rm.enabled())
  132. rm.update_config(6, 60, 0, 12, True, False)
  133. self.assertFalse(rm.enabled())
  134. rm.update_config(6, 60, 1, 12, True, False)
  135. self.assertTrue(rm.enabled())
  136. rm.update_config(6, 60, 61, 12, True, False)
  137. self.assertFalse(rm.enabled())
  138. rm.update_config(6, 60, 5, 0, True, False)
  139. self.assertFalse(rm.enabled())
  140. rm.update_config(6, 60, 5, 4, True, False)
  141. self.assertFalse(rm.enabled())
  142. # maximum 2 in 2 minutes and at least 1 minute wait
  143. rm.update_config(2, 5, 1, 4, True, False)
  144. self.assertTrue(rm.enabled())
  145. # T = 1000-2
  146. self.assertTrue(rm.may_execute("NODEMANAGER"))
  147. self.assertTrue(rm.may_execute("NODEMANAGER"))
  148. self.assertTrue(rm.may_execute("NODEMANAGER"))
  149. # T = 1003-4
  150. self.assertTrue(rm.execute("NODEMANAGER"))
  151. self.assertFalse(rm.execute("NODEMANAGER")) # too soon
  152. # T = 1071
  153. self.assertTrue(rm.execute("NODEMANAGER")) # 60+ seconds passed
  154. # T = 1150-3
  155. self.assertFalse(rm.execute("NODEMANAGER")) # limit 2 exceeded
  156. self.assertFalse(rm.may_execute("NODEMANAGER"))
  157. self.assertTrue(rm.execute("DATANODE"))
  158. self.assertTrue(rm.may_execute("NAMENODE"))
  159. # T = 1400-1
  160. self.assertTrue(rm.execute("NODEMANAGER")) # windows reset
  161. self.assertFalse(rm.may_execute("NODEMANAGER")) # too soon
  162. # maximum 2 in 2 minutes and no min wait
  163. rm.update_config(2, 5, 1, 5, True, True)
  164. # T = 1500-3
  165. self.assertTrue(rm.execute("NODEMANAGER2"))
  166. self.assertTrue(rm.may_execute("NODEMANAGER2"))
  167. self.assertTrue(rm.execute("NODEMANAGER2"))
  168. self.assertFalse(rm.execute("NODEMANAGER2")) # max limit
  169. # T = 1900-2
  170. self.assertTrue(rm.execute("NODEMANAGER2"))
  171. self.assertTrue(rm.execute("NODEMANAGER2"))
  172. # T = 2300-2
  173. # lifetime max reached
  174. self.assertTrue(rm.execute("NODEMANAGER2"))
  175. self.assertFalse(rm.execute("NODEMANAGER2"))
  176. pass
  177. def test_recovery_required(self):
  178. rm = RecoveryManager(True, False)
  179. rm.update_current_status("NODEMANAGER", "INSTALLED")
  180. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  181. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  182. rm.update_desired_status("NODEMANAGER", "STARTED")
  183. self.assertTrue(rm.requires_recovery("NODEMANAGER"))
  184. rm.update_current_status("NODEMANAGER", "STARTED")
  185. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  186. self.assertTrue(rm.requires_recovery("NODEMANAGER"))
  187. rm.update_desired_status("NODEMANAGER", "STARTED")
  188. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  189. rm.update_current_status("NODEMANAGER", "INSTALLED")
  190. rm.update_desired_status("NODEMANAGER", "XYS")
  191. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  192. rm.update_desired_status("NODEMANAGER", "")
  193. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  194. rm.update_current_status("NODEMANAGER", "INIT")
  195. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  196. self.assertTrue(rm.requires_recovery("NODEMANAGER"))
  197. rm.update_desired_status("NODEMANAGER", "STARTED")
  198. self.assertTrue(rm.requires_recovery("NODEMANAGER"))
  199. rm = RecoveryManager(True, True)
  200. rm.update_current_status("NODEMANAGER", "INIT")
  201. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  202. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  203. rm.update_current_status("NODEMANAGER", "INIT")
  204. rm.update_desired_status("NODEMANAGER", "START")
  205. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  206. rm.update_current_status("NODEMANAGER", "INSTALLED")
  207. rm.update_desired_status("NODEMANAGER", "START")
  208. self.assertFalse(rm.requires_recovery("NODEMANAGER"))
  209. pass
  210. @patch('time.time', MagicMock(side_effects=[1]))
  211. def test_store_from_status_and_use(self):
  212. rm = RecoveryManager(True)
  213. command1 = copy.deepcopy(self.command)
  214. rm.store_or_update_command(command1)
  215. self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
  216. install_command = rm.get_install_command("NODEMANAGER")
  217. start_command = rm.get_start_command("NODEMANAGER")
  218. self.assertEqual("INSTALL", install_command["roleCommand"])
  219. self.assertEqual("START", start_command["roleCommand"])
  220. self.assertEqual("AUTO_EXECUTION_COMMAND", install_command["commandType"])
  221. self.assertEqual("AUTO_EXECUTION_COMMAND", start_command["commandType"])
  222. self.assertEqual("NODEMANAGER", install_command["role"])
  223. self.assertEqual("NODEMANAGER", start_command["role"])
  224. self.assertEquals(install_command["configurations"], start_command["configurations"])
  225. self.assertEqual(2, install_command["taskId"])
  226. self.assertEqual(3, start_command["taskId"])
  227. self.assertEqual(None, rm.get_install_command("component2"))
  228. self.assertEqual(None, rm.get_start_command("component2"))
  229. self.assertTrue(rm.remove_command("NODEMANAGER"))
  230. self.assertFalse(rm.remove_command("NODEMANAGER"))
  231. self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
  232. self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
  233. self.assertEqual(None, rm.get_install_command("component2"))
  234. self.assertEqual(None, rm.get_start_command("component2"))
  235. rm.store_or_update_command(command1)
  236. self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
  237. rm.set_paused(True)
  238. self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
  239. self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
  240. pass
  241. @patch.object(RecoveryManager, "_now_")
  242. def test_get_recovery_commands(self, time_mock):
  243. time_mock.side_effect = \
  244. [1000, 1001, 1002, 1003,
  245. 1100, 1101, 1102,
  246. 1200, 1201, 1203,
  247. 4000, 4001, 4002, 4003,
  248. 4100, 4101, 4102, 4103,
  249. 4200, 4201, 4202]
  250. rm = RecoveryManager(True)
  251. rm.update_config(15, 5, 1, 16, True, False)
  252. command1 = copy.deepcopy(self.command)
  253. rm.store_or_update_command(command1)
  254. rm.update_current_status("NODEMANAGER", "INSTALLED")
  255. rm.update_desired_status("NODEMANAGER", "STARTED")
  256. commands = rm.get_recovery_commands()
  257. self.assertEqual(1, len(commands))
  258. self.assertEqual("START", commands[0]["roleCommand"])
  259. rm.update_current_status("NODEMANAGER", "INIT")
  260. rm.update_desired_status("NODEMANAGER", "STARTED")
  261. # Starts at 1100
  262. commands = rm.get_recovery_commands()
  263. self.assertEqual(1, len(commands))
  264. self.assertEqual("INSTALL", commands[0]["roleCommand"])
  265. rm.update_current_status("NODEMANAGER", "INIT")
  266. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  267. # Starts at 1200
  268. commands = rm.get_recovery_commands()
  269. self.assertEqual(1, len(commands))
  270. self.assertEqual("INSTALL", commands[0]["roleCommand"])
  271. rm.update_config(2, 5, 1, 5, True, True)
  272. rm.update_current_status("NODEMANAGER", "INIT")
  273. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  274. commands = rm.get_recovery_commands()
  275. self.assertEqual(0, len(commands))
  276. rm.update_config(12, 5, 1, 15, True, False)
  277. rm.update_current_status("NODEMANAGER", "INIT")
  278. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  279. rm.store_or_update_command(command1)
  280. commands = rm.get_recovery_commands()
  281. self.assertEqual(1, len(commands))
  282. self.assertEqual("INSTALL", commands[0]["roleCommand"])
  283. rm.update_config_staleness("NODEMANAGER", False)
  284. rm.update_current_status("NODEMANAGER", "INSTALLED")
  285. rm.update_desired_status("NODEMANAGER", "INSTALLED")
  286. commands = rm.get_recovery_commands()
  287. self.assertEqual(0, len(commands))
  288. command_install = copy.deepcopy(self.command)
  289. command_install["desiredState"] = "INSTALLED"
  290. rm.store_or_update_command(command_install)
  291. rm.update_config_staleness("NODEMANAGER", True)
  292. commands = rm.get_recovery_commands()
  293. self.assertEqual(1, len(commands))
  294. self.assertEqual("INSTALL", commands[0]["roleCommand"])
  295. rm.update_current_status("NODEMANAGER", "STARTED")
  296. rm.update_desired_status("NODEMANAGER", "STARTED")
  297. commands = rm.get_recovery_commands()
  298. self.assertEqual(1, len(commands))
  299. self.assertEqual("CUSTOM_COMMAND", commands[0]["roleCommand"])
  300. self.assertEqual("RESTART", commands[0]["hostLevelParams"]["custom_command"])
  301. pass
  302. @patch.object(RecoveryManager, "update_config")
  303. def test_update_rm_config(self, mock_uc):
  304. rm = RecoveryManager()
  305. rm.update_configuration_from_registration(None)
  306. mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
  307. mock_uc.reset_mock()
  308. rm.update_configuration_from_registration({})
  309. mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
  310. mock_uc.reset_mock()
  311. rm.update_configuration_from_registration(
  312. {"recoveryConfig": {
  313. "type" : "DEFAULT"}}
  314. )
  315. mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
  316. mock_uc.reset_mock()
  317. rm.update_configuration_from_registration(
  318. {"recoveryConfig": {
  319. "type" : "FULL"}}
  320. )
  321. mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False)])
  322. mock_uc.reset_mock()
  323. rm.update_configuration_from_registration(
  324. {"recoveryConfig": {
  325. "type" : "AUTO_START",
  326. "max_count" : "med"}}
  327. )
  328. mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True)])
  329. mock_uc.reset_mock()
  330. rm.update_configuration_from_registration(
  331. {"recoveryConfig": {
  332. "type" : "AUTO_START",
  333. "maxCount" : "5",
  334. "windowInMinutes" : 20,
  335. "retryGap" : 2,
  336. "maxLifetimeCount" : 5}}
  337. )
  338. mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True)])
  339. pass
  340. @patch.object(RecoveryManager, "_now_")
  341. def test_recovery_report(self, time_mock):
  342. time_mock.side_effect = \
  343. [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
  344. rm = RecoveryManager()
  345. rec_st = rm.get_recovery_status()
  346. self.assertEquals(rec_st, {"summary": "DISABLED"})
  347. rm.update_config(2, 5, 1, 4, True, True)
  348. rec_st = rm.get_recovery_status()
  349. self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
  350. rm.execute("PUMA")
  351. rec_st = rm.get_recovery_status()
  352. self.assertEquals(rec_st, {"summary": "RECOVERABLE",
  353. "componentReports": [{"name": "PUMA", "numAttempts": 1, "limitReached": False}]})
  354. rm.execute("PUMA")
  355. rm.execute("LION")
  356. rec_st = rm.get_recovery_status()
  357. self.assertEquals(rec_st, {"summary": "RECOVERABLE",
  358. "componentReports": [
  359. {"name": "LION", "numAttempts": 1, "limitReached": False},
  360. {"name": "PUMA", "numAttempts": 2, "limitReached": False}
  361. ]})
  362. rm.execute("PUMA")
  363. rm.execute("LION")
  364. rm.execute("PUMA")
  365. rm.execute("PUMA")
  366. rm.execute("LION")
  367. rec_st = rm.get_recovery_status()
  368. self.assertEquals(rec_st, {"summary": "PARTIALLY_RECOVERABLE",
  369. "componentReports": [
  370. {"name": "LION", "numAttempts": 3, "limitReached": False},
  371. {"name": "PUMA", "numAttempts": 4, "limitReached": True}
  372. ]})
  373. rm.execute("LION")
  374. rec_st = rm.get_recovery_status()
  375. self.assertEquals(rec_st, {"summary": "UNRECOVERABLE",
  376. "componentReports": [
  377. {"name": "LION", "numAttempts": 4, "limitReached": True},
  378. {"name": "PUMA", "numAttempts": 4, "limitReached": True}
  379. ]})
  380. pass
  381. @patch.object(RecoveryManager, "_now_")
  382. def test_command_expiry(self, time_mock):
  383. time_mock.side_effect = \
  384. [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
  385. rm = RecoveryManager(True)
  386. rm.update_config(5, 5, 1, 11, True, False)
  387. command1 = copy.deepcopy(self.command)
  388. rm.store_or_update_command(command1)
  389. rm.update_current_status("NODEMANAGER", "INSTALLED")
  390. rm.update_desired_status("NODEMANAGER", "STARTED")
  391. commands = rm.get_recovery_commands()
  392. self.assertEqual(1, len(commands))
  393. self.assertEqual("START", commands[0]["roleCommand"])
  394. commands = rm.get_recovery_commands()
  395. self.assertEqual(1, len(commands))
  396. self.assertEqual("START", commands[0]["roleCommand"])
  397. #1807 command is stale
  398. commands = rm.get_recovery_commands()
  399. self.assertEqual(0, len(commands))
  400. rm.store_or_update_command(command1)
  401. commands = rm.get_recovery_commands()
  402. self.assertEqual(1, len(commands))
  403. self.assertEqual("START", commands[0]["roleCommand"])
  404. pass
  405. def test_command_count(self):
  406. rm = RecoveryManager(True)
  407. self.assertFalse(rm.has_active_command())
  408. rm.start_execution_command()
  409. self.assertTrue(rm.has_active_command())
  410. rm.start_execution_command()
  411. self.assertTrue(rm.has_active_command())
  412. rm.stop_execution_command()
  413. self.assertTrue(rm.has_active_command())
  414. rm.stop_execution_command()
  415. self.assertFalse(rm.has_active_command())