123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- #!/usr/bin/env python2.6
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- from unittest import TestCase
- from ambari_agent.LiveStatus import LiveStatus
- from ambari_agent.PuppetExecutor import PuppetExecutor
- from ambari_agent.ActionQueue import ActionQueue
- from ambari_agent.AmbariConfig import AmbariConfig
- from ambari_agent.ActionDependencyManager import ActionDependencyManager
- import os, errno, time, pprint, tempfile, threading
- import StringIO
- import sys
- from threading import Thread
- from mock.mock import patch, MagicMock, call
- from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
- from ambari_agent.UpgradeExecutor import UpgradeExecutor
- class TestActionQueue(TestCase):
- def setUp(self):
- out = StringIO.StringIO()
- sys.stdout = out
- # save original open() method for later use
- self.original_open = open
- def tearDown(self):
- sys.stdout = sys.__stdout__
- datanode_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 3,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }}
- }
- datanode_upgrade_command = {
- 'commandId': 17,
- 'role' : "role",
- 'taskId' : "taskId",
- 'clusterName' : "clusterName",
- 'serviceName' : "serviceName",
- 'roleCommand' : 'UPGRADE',
- 'hostname' : "localhost.localdomain",
- 'hostLevelParams': "hostLevelParams",
- 'clusterHostInfo': "clusterHostInfo",
- 'commandType': "EXECUTION_COMMAND",
- 'configurations':{'global' : {}},
- 'roleParams': {},
- 'commandParams' : {
- 'source_stack_version' : 'HDP-1.2.1',
- 'target_stack_version' : 'HDP-1.3.0'
- }
- }
- namenode_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'NAMENODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 4,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- }
- snamenode_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'SECONDARY_NAMENODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 5,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- }
- nagios_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'NAGIOS',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 6,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- }
- hbase_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'HBASE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 7,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- }
- status_command = {
- "serviceName" : 'HDFS',
- "commandType" : "STATUS_COMMAND",
- "clusterName" : "",
- "componentName" : "DATANODE",
- 'configurations':{}
- }
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionDependencyManager, "get_next_action_group")
- @patch.object(ActionQueue, "process_portion_of_actions")
- def test_ActionQueueStartStop(self, process_portion_of_actions_mock,
- get_next_action_group_mock, read_dependencies_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
- actionQueue.start()
- time.sleep(0.1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- self.assertTrue(get_next_action_group_mock.call_count > 1)
- self.assertTrue(process_portion_of_actions_mock.call_count > 1)
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(ActionQueue, "execute_status_command")
- def test_process_portion_of_actions(self, execute_status_command_mock,
- executeCommand_mock, read_dependencies_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
- # Test execution of EXECUTION_COMMANDs
- max = 3
- actionQueue.MAX_CONCURRENT_ACTIONS = max
- unfreeze_flag = threading.Event()
- sync_lock = threading.RLock()
- stats = {
- 'waiting_threads' : 0
- }
- def side_effect(self):
- with sync_lock: # Synchtonized to avoid race effects during test execution
- stats['waiting_threads'] += 1
- unfreeze_flag.wait()
- executeCommand_mock.side_effect = side_effect
- portion = [self.datanode_install_command,
- self.namenode_install_command,
- self.snamenode_install_command,
- self.nagios_install_command,
- self.hbase_install_command]
- # We call method in a separate thread
- action_thread = Thread(target = actionQueue.process_portion_of_actions, args = (portion, ))
- action_thread.start()
- # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running
- while stats['waiting_threads'] != max:
- time.sleep(0.1)
- self.assertEqual(stats['waiting_threads'], max)
- # unfreezing waiting threads
- unfreeze_flag.set()
- # wait until all threads are finished
- action_thread.join()
- self.assertTrue(executeCommand_mock.call_count == 5)
- self.assertFalse(execute_status_command_mock.called)
- executeCommand_mock.reset_mock()
- execute_status_command_mock.reset_mock()
- # Test execution of STATUS_COMMANDs
- n = 5
- portion = []
- for i in range(0, n):
- status_command = {
- 'componentName': 'DATANODE',
- 'commandType': 'STATUS_COMMAND',
- }
- portion.append(status_command)
- actionQueue.process_portion_of_actions(portion)
- self.assertTrue(execute_status_command_mock.call_count == n)
- self.assertFalse(executeCommand_mock.called)
- # Test execution of unknown command
- unknown_command = {
- 'commandType': 'WRONG_COMMAND',
- }
- portion = [unknown_command]
- actionQueue.process_portion_of_actions(portion)
- # no exception expected
- pass
- @patch("traceback.print_exc")
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionQueue, "execute_command")
- def test_execute_command_safely(self, execute_command_mock,
- read_dependencies_mock, print_exc_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
- # Try normal execution
- actionQueue.execute_command_safely('command')
- # Try exception ro check proper logging
- def side_effect(self):
- raise Exception("TerribleException")
- execute_command_mock.side_effect = side_effect
- actionQueue.execute_command_safely('command')
- self.assertTrue(print_exc_mock.called)
- @patch("__builtin__.open")
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionDependencyManager, "read_dependencies")
- def test_execute_command(self, read_dependencies_mock,
- status_update_callback_mock, open_mock):
- # Make file read calls visible
- def open_side_effect(file, mode):
- if mode == 'r':
- file_mock = MagicMock()
- file_mock.read.return_value = "Read from " + str(file)
- return file_mock
- else:
- return self.original_open(file, mode)
- open_mock.side_effect = open_side_effect
- config = AmbariConfig().getConfig()
- tmpfile = tempfile.gettempdir()
- config.set('agent', 'prefix', tmpfile)
- actionQueue = ActionQueue(config, 'dummy_controller')
- unfreeze_flag = threading.Event()
- puppet_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- }
- def side_effect(command, tmpoutfile, tmperrfile):
- unfreeze_flag.wait()
- return puppet_execution_result_dict
- def patched_aq_execute_command(command):
- # We have to perform patching for separate thread in the same thread
- with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
- with patch.object(UpgradeExecutor, "perform_stack_upgrade") \
- as perform_stack_upgrade_mock:
- runCommand_mock.side_effect = side_effect
- perform_stack_upgrade_mock.side_effect = side_effect
- actionQueue.execute_command(command)
- ### Test install/start/stop command ###
- ## Test successful execution with configuration tags
- puppet_execution_result_dict['status'] = 'COMPLETE'
- puppet_execution_result_dict['exitcode'] = 0
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- # check in progress report
- # wait until ready
- while True:
- time.sleep(0.1)
- report = actionQueue.result()
- if len(report['reports']) != 0:
- break
- expected = {'status': 'IN_PROGRESS',
- 'stderr': 'Read from /tmp/errors-3.txt',
- 'stdout': 'Read from /tmp/output-3.txt',
- 'clusterName': u'cc',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 777}
- self.assertEqual(report['reports'][0], expected)
- # Continue command execution
- unfreeze_flag.set()
- # wait until ready
- while report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- configname = os.path.join(tmpfile, 'config.json')
- expected = {'status': 'COMPLETED',
- 'stderr': 'stderr',
- 'stdout': 'out',
- 'clusterName': u'cc',
- 'configurationTags': {'global': {'tag': 'v1'}},
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- self.assertTrue(os.path.isfile(configname))
- # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
- self.assertEqual(status_update_callback_mock.call_count, 2)
- os.remove(configname)
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
- ## Test failed execution
- puppet_execution_result_dict['status'] = 'FAILED'
- puppet_execution_result_dict['exitcode'] = 13
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # check in progress report
- # wait until ready
- report = actionQueue.result()
- while len(report['reports']) == 0 or \
- report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- expected = {'status': 'FAILED',
- 'stderr': 'stderr',
- 'stdout': 'out',
- 'clusterName': u'cc',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 13}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
- ### Test upgrade command ###
- puppet_execution_result_dict['status'] = 'COMPLETE'
- puppet_execution_result_dict['exitcode'] = 0
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_upgrade_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # wait until ready
- report = actionQueue.result()
- while len(report['reports']) == 0 or \
- report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- expected = {'status': 'COMPLETED',
- 'stderr': 'stderr',
- 'stdout': 'out',
- 'clusterName': 'clusterName',
- 'roleCommand': 'UPGRADE',
- 'serviceName': 'serviceName',
- 'role': 'role',
- 'actionId': 17,
- 'taskId': 'taskId',
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(StackVersionsFileHandler, "read_stack_version")
- @patch.object(ActionDependencyManager, "read_dependencies")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- def test_execute_status_command(self, build_mock, execute_command_mock,
- read_dependencies_mock, read_stack_version_mock,
- status_update_callback):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
- build_mock.return_value = "dummy report"
- # Try normal execution
- actionQueue.execute_status_command(self.status_command)
- report = actionQueue.result()
- expected = 'dummy report'
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
|