test_flume.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. from mock.mock import MagicMock, call, patch
  18. from stacks.utils.RMFTestCase import *
  19. import resource_management.core.source
  20. import os
  21. class TestFlumeHandler(RMFTestCase):
  22. def test_configure_default(self):
  23. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  24. classname = "FlumeHandler",
  25. command = "configure",
  26. config_file="default.json")
  27. self.assert_configure_default()
  28. self.assertNoMoreResources()
  29. @patch("os.path.isfile")
  30. @patch("flume.cmd_target_names")
  31. @patch("flume._set_desired_state")
  32. def test_start_default(self, set_desired_mock, cmd_target_names_mock, os_path_isfile_mock):
  33. # 1st call is to check if the conf file is there - that should be True
  34. # 2nd call is to check if the process is live - that should be False
  35. os_path_isfile_mock.side_effect = [True, False]
  36. cmd_target_names_mock.return_value = ["a1"]
  37. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  38. classname = "FlumeHandler", command = "start",
  39. config_file="default.json")
  40. self.assert_configure_default()
  41. self.assertTrue(set_desired_mock.called)
  42. self.assertTrue(set_desired_mock.call_args[0][0] == 'STARTED')
  43. self.assertResourceCalled('Execute', "/usr/bin/sudo su flume -l -s /bin/bash -c 'export PATH=/bin JAVA_HOME=/usr/jdk64/jdk1.7.0_45 > /dev/null ; /usr/bin/flume-ng agent --name a1 --conf /etc/flume/conf/a1 --conf-file /etc/flume/conf/a1/flume.conf -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=c6401.ambari.apache.org:8655' &",
  44. environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
  45. wait_for_finish = False,
  46. )
  47. self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*a1.* > /var/run/flume/a1.pid',
  48. logoutput = True,
  49. tries = 20,
  50. try_sleep = 6)
  51. self.assertNoMoreResources()
  52. @patch("glob.glob")
  53. @patch("flume._set_desired_state")
  54. def test_stop_default(self, set_desired_mock, glob_mock):
  55. glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
  56. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  57. classname = "FlumeHandler",
  58. command = "stop",
  59. config_file="default.json")
  60. self.assertTrue(glob_mock.called)
  61. self.assertTrue(set_desired_mock.called)
  62. self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
  63. self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/a1.pid` > /dev/null 2>&1',
  64. ignore_failures = True)
  65. self.assertResourceCalled('File', '/var/run/flume/a1.pid', action = ['delete'])
  66. self.assertNoMoreResources()
  67. @patch("resource_management.libraries.script.Script.put_structured_out")
  68. @patch("sys.exit")
  69. def test_status_default(self, sys_exit_mock, structured_out_mock):
  70. try:
  71. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  72. classname = "FlumeHandler",
  73. command = "status",
  74. config_file="default.json")
  75. except:
  76. # expected since ComponentIsNotRunning gets raised
  77. pass
  78. # test that the method was called with empty processes
  79. self.assertTrue(structured_out_mock.called)
  80. structured_out_mock.assert_called_with({'processes': []})
  81. self.assertNoMoreResources()
  82. @patch("resource_management.libraries.script.Script.put_structured_out")
  83. @patch("glob.glob")
  84. @patch("sys.exit")
  85. def test_status_with_result(self, sys_exit_mock, glob_mock, structured_out_mock):
  86. glob_mock.return_value = ['/etc/flume/conf/a1/ambari-meta.json']
  87. try:
  88. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  89. classname = "FlumeHandler",
  90. command = "status",
  91. config_file="default.json")
  92. except:
  93. # expected since ComponentIsNotRunning gets raised
  94. pass
  95. self.assertTrue(structured_out_mock.called)
  96. # call_args[0] is a tuple, whose first element is the actual call argument
  97. struct_out = structured_out_mock.call_args[0][0]
  98. self.assertTrue(struct_out.has_key('processes'))
  99. self.assertNoMoreResources()
  100. @patch("resource_management.libraries.script.Script.put_structured_out")
  101. @patch("glob.glob")
  102. @patch("sys.exit")
  103. def test_status_no_agents(self, sys_exit_mock, glob_mock, structured_out_mock):
  104. glob_mock.return_value = []
  105. try:
  106. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  107. classname = "FlumeHandler",
  108. command = "status",
  109. config_file="default.json")
  110. except:
  111. # expected since ComponentIsNotRunning gets raised
  112. pass
  113. self.assertTrue(structured_out_mock.called)
  114. # call_args[0] is a tuple, whose first element is the actual call argument
  115. struct_out = structured_out_mock.call_args[0][0]
  116. self.assertTrue(struct_out.has_key('processes'))
  117. self.assertNoMoreResources()
  118. def assert_configure_default(self):
  119. self.assertResourceCalled('Directory', '/etc/flume/conf', recursive=True)
  120. self.assertResourceCalled('Directory', '/var/log/flume', owner = 'flume')
  121. self.assertResourceCalled('Directory', '/etc/flume/conf/a1')
  122. self.assertResourceCalled('PropertiesFile', '/etc/flume/conf/a1/flume.conf',
  123. mode = 0644,
  124. properties = build_flume(
  125. self.getConfig()['configurations']['flume-conf']['content'])['a1'])
  126. self.assertResourceCalled('File',
  127. '/etc/flume/conf/a1/log4j.properties',
  128. content = Template('log4j.properties.j2', agent_name = 'a1'),
  129. mode = 0644)
  130. self.assertResourceCalled('File',
  131. '/etc/flume/conf/a1/ambari-meta.json',
  132. content='{"channels_count": 1, "sinks_count": 1, "sources_count": 1}',
  133. mode = 0644)
  134. self.assertResourceCalled('File', "/etc/flume/conf/a1/flume-env.sh",
  135. owner="flume",
  136. content=InlineTemplate(self.getConfig()['configurations']['flume-env']['content'])
  137. )
  138. def assert_configure_many(self):
  139. self.assertResourceCalled('Directory', '/etc/flume/conf', recursive=True)
  140. self.assertResourceCalled('Directory', '/var/log/flume', owner = 'flume')
  141. top = build_flume(self.getConfig()['configurations']['flume-conf']['content'])
  142. # a1
  143. self.assertResourceCalled('Directory', '/etc/flume/conf/a1')
  144. self.assertResourceCalled('PropertiesFile', '/etc/flume/conf/a1/flume.conf',
  145. mode = 0644,
  146. properties = top['a1'])
  147. self.assertResourceCalled('File',
  148. '/etc/flume/conf/a1/log4j.properties',
  149. content = Template('log4j.properties.j2', agent_name = 'a1'),
  150. mode = 0644)
  151. self.assertResourceCalled('File',
  152. '/etc/flume/conf/a1/ambari-meta.json',
  153. content='{"channels_count": 1, "sinks_count": 1, "sources_count": 1}',
  154. mode = 0644)
  155. self.assertResourceCalled('File', "/etc/flume/conf/a1/flume-env.sh",
  156. owner="flume",
  157. content=InlineTemplate(self.getConfig()['configurations']['flume-env']['content'])
  158. )
  159. # b1
  160. self.assertResourceCalled('Directory', '/etc/flume/conf/b1')
  161. self.assertResourceCalled('PropertiesFile', '/etc/flume/conf/b1/flume.conf',
  162. mode = 0644,
  163. properties = top['b1'])
  164. self.assertResourceCalled('File',
  165. '/etc/flume/conf/b1/log4j.properties',
  166. content = Template('log4j.properties.j2', agent_name = 'b1'),
  167. mode = 0644)
  168. self.assertResourceCalled('File',
  169. '/etc/flume/conf/b1/ambari-meta.json',
  170. content='{"channels_count": 1, "sinks_count": 1, "sources_count": 1}',
  171. mode = 0644)
  172. self.assertResourceCalled('File', "/etc/flume/conf/b1/flume-env.sh",
  173. owner="flume",
  174. content=InlineTemplate(self.getConfig()['configurations']['flume-env']['content'])
  175. )
  176. @patch("os.path.isfile")
  177. def test_start_single(self, os_path_isfile_mock):
  178. # 1st call is to check if the conf file is there - that should be True
  179. # 2nd call is to check if the process is live - that should be False
  180. os_path_isfile_mock.side_effect = [True, False]
  181. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  182. classname = "FlumeHandler",
  183. command = "start",
  184. config_file="flume_target.json")
  185. self.assert_configure_many()
  186. self.assertResourceCalled('Execute', format('su -s /bin/bash flume -c "export JAVA_HOME=/usr/jdk64/jdk1.7.0_45; /usr/bin/flume-ng agent '
  187. '--name b1 '
  188. '--conf /etc/flume/conf/b1 '
  189. '--conf-file /etc/flume/conf/b1/flume.conf '
  190. '-Dflume.monitoring.type=ganglia '
  191. '-Dflume.monitoring.hosts=c6401.ambari.apache.org:8655"'),
  192. wait_for_finish = False)
  193. self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid',
  194. logoutput = True,
  195. tries = 10,
  196. try_sleep = 6)
  197. self.assertNoMoreResources()
  198. @patch("os.path.isfile")
  199. def test_start_single(self, os_path_isfile_mock):
  200. # 1st call is to check if the conf file is there - that should be True
  201. # 2nd call is to check if the process is live - that should be False
  202. os_path_isfile_mock.side_effect = [True, False]
  203. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  204. classname = "FlumeHandler",
  205. command = "start",
  206. config_file="flume_target.json")
  207. self.assert_configure_many()
  208. self.assertResourceCalled('Execute', "/usr/bin/sudo su flume -l -s /bin/bash -c 'export PATH=/bin JAVA_HOME=/usr/jdk64/jdk1.7.0_45 > /dev/null ; /usr/bin/flume-ng agent --name b1 --conf /etc/flume/conf/b1 --conf-file /etc/flume/conf/b1/flume.conf -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=c6401.ambari.apache.org:8655' &",
  209. environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
  210. wait_for_finish = False,
  211. )
  212. self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid',
  213. logoutput = True,
  214. tries = 20,
  215. try_sleep = 6)
  216. self.assertNoMoreResources()
  217. @patch("glob.glob")
  218. def test_stop_single(self, glob_mock):
  219. glob_mock.return_value = ['/var/run/flume/b1.pid']
  220. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  221. classname = "FlumeHandler",
  222. command = "stop",
  223. config_file="flume_target.json")
  224. self.assertTrue(glob_mock.called)
  225. self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/b1.pid` > /dev/null 2>&1',
  226. ignore_failures = True)
  227. self.assertResourceCalled('File', '/var/run/flume/b1.pid', action = ['delete'])
  228. self.assertNoMoreResources()
  229. @patch("flume.find_expected_agent_names")
  230. @patch("os.unlink")
  231. def test_configure_with_existing(self, os_unlink_mock, expected_names_mock):
  232. expected_names_mock.return_value = ["x1"]
  233. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  234. classname = "FlumeHandler",
  235. command = "configure",
  236. config_file="default.json")
  237. self.assertTrue(os_unlink_mock.called)
  238. os_unlink_mock.assert_called_with('/etc/flume/conf/x1/ambari-meta.json')
  239. self.assert_configure_default()
  240. self.assertNoMoreResources()
  241. def test_flume_env_not_22(self):
  242. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  243. classname = "FlumeHandler",
  244. command = "configure",
  245. config_file="default.json")
  246. self.assertResourceCalled('Directory', '/etc/flume/conf', recursive=True)
  247. self.assertResourceCalled('Directory', '/var/log/flume', owner = 'flume')
  248. self.assertResourceCalled('Directory', '/etc/flume/conf/a1')
  249. self.assertResourceCalled('PropertiesFile', '/etc/flume/conf/a1/flume.conf',
  250. mode = 0644,
  251. properties = build_flume(
  252. self.getConfig()['configurations']['flume-conf']['content'])['a1'])
  253. self.assertResourceCalled('File',
  254. '/etc/flume/conf/a1/log4j.properties',
  255. content = Template('log4j.properties.j2', agent_name = 'a1'),
  256. mode = 0644)
  257. self.assertResourceCalled('File',
  258. '/etc/flume/conf/a1/ambari-meta.json',
  259. content='{"channels_count": 1, "sinks_count": 1, "sources_count": 1}',
  260. mode = 0644)
  261. content = InlineTemplate(self.getConfig()['configurations']['flume-env']['content'])
  262. self.assertTrue(content.get_content().find('/usr/lib/hive') > -1)
  263. self.assertResourceCalled('File', "/etc/flume/conf/a1/flume-env.sh",
  264. owner="flume",
  265. content=content)
  266. def test_flume_env_with_22(self):
  267. self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
  268. classname = "FlumeHandler",
  269. command = "configure",
  270. config_file="flume_22.json")
  271. self.assertResourceCalled('Directory', '/etc/flume/conf', recursive=True)
  272. self.assertResourceCalled('Directory', '/var/log/flume', owner = 'flume')
  273. self.assertResourceCalled('Directory', '/etc/flume/conf/a1')
  274. self.assertResourceCalled('PropertiesFile', '/etc/flume/conf/a1/flume.conf',
  275. mode = 0644,
  276. properties = build_flume(
  277. self.getConfig()['configurations']['flume-conf']['content'])['a1'])
  278. self.assertResourceCalled('File',
  279. '/etc/flume/conf/a1/log4j.properties',
  280. content = Template('log4j.properties.j2', agent_name = 'a1'),
  281. mode = 0644)
  282. self.assertResourceCalled('File',
  283. '/etc/flume/conf/a1/ambari-meta.json',
  284. content='{"channels_count": 1, "sinks_count": 1, "sources_count": 1}',
  285. mode = 0644)
  286. content = InlineTemplate(self.getConfig()['configurations']['flume-env']['content'])
  287. self.assertTrue(content.get_content().find('/usr/hdp/current/hive-metastore') > -1)
  288. self.assertResourceCalled('File', "/etc/flume/conf/a1/flume-env.sh",
  289. owner="flume",
  290. content=content)
  291. def build_flume(content):
  292. result = {}
  293. agent_names = []
  294. for line in content.split('\n'):
  295. rline = line.strip()
  296. if 0 != len(rline) and not rline.startswith('#'):
  297. pair = rline.split('=')
  298. lhs = pair[0].strip()
  299. rhs = pair[1].strip()
  300. part0 = lhs.split('.')[0]
  301. if lhs.endswith(".sources"):
  302. agent_names.append(part0)
  303. if not result.has_key(part0):
  304. result[part0] = {}
  305. result[part0][lhs] = rhs
  306. # trim out non-agents
  307. for k in result.keys():
  308. if not k in agent_names:
  309. del result[k]
  310. return result