فهرست منبع

AMBARI-3490. Remove RCO management logic at ambari-agent (dlysnichencko)

Lisnichenko Dmitro 11 سال پیش
والد
کامیت
489a193dd2

+ 0 - 4
ambari-agent/pom.xml

@@ -328,10 +328,6 @@
                   <location>../version</location>
                   <location>../version</location>
                   <filter>true</filter>
                   <filter>true</filter>
                 </source>
                 </source>
-                <!--<source>-->
-                  <!--&lt;!&ndash; This file is also included into server rpm&ndash;&gt;-->
-                  <!--<location>../ambari-common/src/main/resources/role_command_order.json</location>-->
-                <!--</source>-->
               </sources>
               </sources>
             </mapping>
             </mapping>
             <!-- -->
             <!-- -->

+ 0 - 163
ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py

@@ -1,163 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-import logging
-import Queue
-import threading
-import pprint
-import os
-import json
-
-logger = logging.getLogger()
-
-class ActionDependencyManager():
-  """
-  Implemments a scheduler of role commands (like DATANODE-START) based on
-  dependencies between them. Class does not execute actions, it only
-  breaks them on groups that may be executed in parallel.
-  """
-
-  DEPS_FILE_NAME="role_command_order.json"
-  COMMENT_STR="_comment"
-
-  # Dictionary of dependencies. Format:
-  # BlockedRole-Command : [BlockerRole1-Command1, BlockerRole2-Command2, ...]
-
-
-  def __init__(self, config):
-    self.deps = {}
-    self.last_scheduled_group = []
-    self.scheduled_action_groups = Queue.Queue()
-    self.lock = threading.RLock()
-    self.config = config
-    #self.read_dependencies()
-
-
-  def read_dependencies(self):
-    """
-    Load dependencies from file
-    """
-    prefix_dir = self.config.get('agent', 'prefix')
-    action_order_file = os.path.join(prefix_dir, self.DEPS_FILE_NAME)
-    with open(action_order_file) as f:
-      action_order_data = json.load(f)
-    for deps_group in action_order_data.keys():
-      if deps_group != self.COMMENT_STR: # if entry is not a comment
-        deps_group_list = action_order_data[deps_group]
-        for blocked_str in deps_group_list:
-          if blocked_str != self.COMMENT_STR: # if entry is not a comment
-            blocker_list = deps_group_list[blocked_str]
-            if blocked_str not in self.deps:
-              self.deps[blocked_str]=[]
-            for blocker_str in blocker_list:
-              self.deps[blocked_str].append(blocker_str)
-    pass
-
-
-  def is_action_group_available(self):
-    return not self.scheduled_action_groups.empty()
-
-
-  def get_next_action_group(self):
-    """
-    Returns next group of scheduled actions that may be
-    executed in parallel. If queue is empty, blocks until
-    an item is available (until next put_action() call)
-    """
-    next_group = self.scheduled_action_groups.get(block=True)
-    with self.lock: # Synchronized
-      if next_group is self.last_scheduled_group:
-        # Group is not eligible for appending, creating new one
-        self.last_scheduled_group = []
-
-      dump_str = pprint.pformat(next_group)
-      logger.debug("Next action group: {0}".format(dump_str))
-      return next_group
-
-
-  def put_actions(self, actions):
-    """
-    Schedules actions to be executed in some time at future.
-    Here we rely on serial command execution sequence received from server.
-    Some of these commands may be executed in parallel with others, so we
-    unite them into a group.
-    """
-    with self.lock: # Synchronized
-      for action in actions:
-        self.dump_info(action)
-        was_empty = len(self.last_scheduled_group) == 0
-        if self.can_be_executed_in_parallel(action, self.last_scheduled_group):
-          self.last_scheduled_group.append(action)
-        else: # create a new group
-          self.last_scheduled_group = [action]
-          was_empty = True
-        if was_empty:
-          # last_scheduled_group is not empty now, so we add it to the queue
-          self.scheduled_action_groups.put(self.last_scheduled_group)
-
-
-  def dump_info(self, action):
-    """
-    Prints info about command to log
-    """
-    logger.info("Adding " + action['commandType'] + " for service " + \
-                action['serviceName'] + " of cluster " + \
-                action['clusterName'] + " to the queue.")
-    logger.debug(pprint.pformat(action))
-
-
-  def can_be_executed_in_parallel(self, action, group):
-    """
-    Checks whether action may be executed in parallel with a given group
-    """
-    # Hack: parallel execution disabled
-    return False
-
-    # from ActionQueue import ActionQueue
-    # # Empty group is compatible with any action
-    # if not group:
-    #   return True
-    # # Status commands are placed into a separate group to avoid race conditions
-    # if action['commandType'] == ActionQueue.STATUS_COMMAND:
-    #   for scheduled_action in group:
-    #     if scheduled_action['commandType'] != ActionQueue.STATUS_COMMAND:
-    #       return False
-    #   return True
-    # # We avoid executing install/upgrade threads in parallel with anything
-    # standalone_commands = ["INSTALL", ActionQueue.ROLE_COMMAND_UPGRADE]
-    # if action['roleCommand'] in standalone_commands:
-    #   return False
-    # # We can not perform few actions (like STOP and START) for a component
-    # # at the same time
-    # for scheduled_action in group:
-    #   if scheduled_action['role'] == action['role']:
-    #     return False
-    # # In other cases, check dependencies
-    # pattern = "{0}-{1}"
-    # new_action_str = pattern.format(action['role'], action['roleCommand'])
-    # for scheduled_action in group:
-    #   if new_action_str in self.deps:
-    #     blockers = self.deps[new_action_str]
-    #     scheduled_action_str = pattern.format(
-    #       scheduled_action['role'], scheduled_action['roleCommand'])
-    #     if scheduled_action_str in blockers:
-    #       return False
-    # # Everything seems to be ok
-    # return True

+ 26 - 53
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -17,21 +17,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
+import Queue
 
 
 import logging
 import logging
 import traceback
 import traceback
 import threading
 import threading
-from threading import Thread
 import pprint
 import pprint
 import os
 import os
 
 
 from LiveStatus import LiveStatus
 from LiveStatus import LiveStatus
 from shell import shellRunner
 from shell import shellRunner
 import PuppetExecutor
 import PuppetExecutor
-import UpgradeExecutor
 import PythonExecutor
 import PythonExecutor
 from ActualConfigHandler import ActualConfigHandler
 from ActualConfigHandler import ActualConfigHandler
-from ActionDependencyManager import ActionDependencyManager
 from CommandStatusDict import CommandStatusDict
 from CommandStatusDict import CommandStatusDict
 
 
 
 
@@ -49,13 +47,12 @@ class ActionQueue(threading.Thread):
 
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
-  ROLE_COMMAND_UPGRADE = 'UPGRADE'
 
 
   IN_PROGRESS_STATUS = 'IN_PROGRESS'
   IN_PROGRESS_STATUS = 'IN_PROGRESS'
 
 
   def __init__(self, config, controller):
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
     super(ActionQueue, self).__init__()
-    self.commandQueue = ActionDependencyManager(config)
+    self.commandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =
     self.commandStatuses = CommandStatusDict(callback_action =
       self.status_update_callback)
       self.status_update_callback)
     self.config = config
     self.config = config
@@ -71,51 +68,30 @@ class ActionQueue(threading.Thread):
     return self._stop.isSet()
     return self._stop.isSet()
 
 
   def put(self, commands):
   def put(self, commands):
-    self.commandQueue.put_actions(commands)
+    for command in commands:
+      logger.info("Adding " + command['commandType'] + " for service " + \
+                  command['serviceName'] + " of cluster " + \
+                  command['clusterName'] + " to the queue.")
+      logger.debug(pprint.pformat(command))
+      self.commandQueue.put(command)
 
 
 
 
   def run(self):
   def run(self):
     while not self.stopped():
     while not self.stopped():
-      # Taking a new portion of tasks
-      portion = self.commandQueue.get_next_action_group() # Will block if queue is empty
-      portion = portion[::-1] # Reverse list order
-      self.process_portion_of_actions(portion)
-
-
-  def process_portion_of_actions(self, portion):
-    # starting execution of a group of commands
-    running_list = []
-    finished_list = []
-    while portion or running_list: # While not finished actions in portion
-      # poll threads under execution
-      for thread in running_list:
-        if not thread.is_alive():
-          finished_list.append(thread)
-        # Remove finished from the running list
-      running_list[:] = [b for b in running_list if not b in finished_list]
-      # Start next actions
-      free_slots = self.MAX_CONCURRENT_ACTIONS - len(running_list)
-      while free_slots > 0 and portion: # Start new threads if available
-        command = portion.pop()
-        logger.debug("Took an element of Queue: " + pprint.pformat(command))
-        if command['commandType'] == self.EXECUTION_COMMAND:
-          # Start separate threads for commands of this type
-          action_thread = Thread(target =  self.execute_command_safely, args = (command, ))
-          running_list.append(action_thread)
-          free_slots -= 1
-          action_thread.start()
-        elif command['commandType'] == self.STATUS_COMMAND:
-          # Execute status commands immediately, in current thread
-          self.execute_status_command(command)
-        else:
-          logger.error("Unrecognized command " + pprint.pformat(command))
-    pass
+      command = self.commandQueue.get() # Will block if queue is empty
+      self.process_command(command)
 
 
 
 
-  def execute_command_safely(self, command):
+  def process_command(self, command):
+    logger.debug("Took an element of Queue: " + pprint.pformat(command))
     # make sure we log failures
     # make sure we log failures
     try:
     try:
-      self.execute_command(command)
+      if command['commandType'] == self.EXECUTION_COMMAND:
+        self.execute_command(command)
+      elif command['commandType'] == self.STATUS_COMMAND:
+        self.execute_status_command(command)
+      else:
+        logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception, err:
     except Exception, err:
       # Should not happen
       # Should not happen
       traceback.print_exc()
       traceback.print_exc()
@@ -123,6 +99,9 @@ class ActionQueue(threading.Thread):
 
 
 
 
   def execute_command(self, command):
   def execute_command(self, command):
+    '''
+    Executes commands of type  EXECUTION_COMMAND
+    '''
     clusterName = command['clusterName']
     clusterName = command['clusterName']
     commandId = command['commandId']
     commandId = command['commandId']
 
 
@@ -147,17 +126,8 @@ class ActionQueue(threading.Thread):
       self.config.get('puppet', 'puppet_home'),
       self.config.get('puppet', 'puppet_home'),
       self.config.get('puppet', 'facter_home'),
       self.config.get('puppet', 'facter_home'),
       self.config.get('agent', 'prefix'), self.config)
       self.config.get('agent', 'prefix'), self.config)
-    if command['roleCommand'] == ActionQueue.ROLE_COMMAND_UPGRADE:
-      # Create new instances for the current thread
-      pythonExecutor = PythonExecutor.PythonExecutor(
-          self.config.get('agent', 'prefix'), self.config)
-      upgradeExecutor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-          puppetExecutor, self.config)
-      commandresult = upgradeExecutor.perform_stack_upgrade(command, in_progress_status['tmpout'],
-        in_progress_status['tmperr'])
-    else:
-      commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
-        in_progress_status['tmperr'])
+    commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
+      in_progress_status['tmperr'])
     # dumping results
     # dumping results
     status = "COMPLETED"
     status = "COMPLETED"
     if commandresult['exitcode'] != 0:
     if commandresult['exitcode'] != 0:
@@ -189,6 +159,9 @@ class ActionQueue(threading.Thread):
 
 
 
 
   def execute_status_command(self, command):
   def execute_status_command(self, command):
+    '''
+    Executes commands of type STATUS_COMMAND
+    '''
     try:
     try:
       cluster = command['clusterName']
       cluster = command['clusterName']
       service = command['serviceName']
       service = command['serviceName']

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -55,7 +55,7 @@ class Heartbeat:
                 }
                 }
 
 
     commandsInProgress = False
     commandsInProgress = False
-    if self.actionQueue.commandQueue.is_action_group_available():
+    if not self.actionQueue.commandQueue.empty():
       commandsInProgress = True
       commandsInProgress = True
     if len(queueResult) != 0:
     if len(queueResult) != 0:
       heartbeat['reports'] = queueResult['reports']
       heartbeat['reports'] = queueResult['reports']

+ 0 - 207
ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py

@@ -1,207 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-import json
-import os.path
-import logging
-import pprint
-import re
-from Grep import Grep
-from StackVersionsFileHandler import StackVersionsFileHandler
-
-
-logger = logging.getLogger()
-grep = Grep()
-
-class UpgradeExecutor:
-
-  """ Class that performs the StackVersion stack upgrade"""
-
-  SCRIPT_DIRS = [
-    'pre-upgrade.d',
-    'upgrade.d',
-    'post-upgrade.d'
-  ]
-
-  NAME_PARSING_FAILED_CODE = 999
-
-  def __init__(self, pythonExecutor, puppetExecutor, config):
-    self.pythonExecutor = pythonExecutor
-    self.puppetExecutor = puppetExecutor
-    self.stacksDir = config.get('stack', 'upgradeScriptsDir')
-    self.config = config
-    versionsFileDir = config.get('agent', 'prefix')
-    self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
-
-
-  def perform_stack_upgrade(self, command, tmpout, tmperr):
-    logger.info("Performing stack upgrade")
-    params = command['commandParams']
-    srcStack = params['source_stack_version']
-    tgtStack = params['target_stack_version']
-    component = command['role']
-
-    srcStackTuple = self.split_stack_version(srcStack)
-    tgtStackTuple = self.split_stack_version(tgtStack)
-
-    if srcStackTuple is None or tgtStackTuple is None:
-      errorstr = "Source (%s) or target (%s) version does not match pattern \
-      <Name>-<Version>" % (srcStack, tgtStack)
-      logger.info(errorstr)
-      result = {
-        'exitcode' : 1,
-        'stdout'   : 'None',
-        'stderr'   : errorstr
-      }
-    elif srcStack != tgtStack:
-      paramTuple = sum((srcStackTuple, tgtStackTuple), ())
-      upgradeId = "%s-%s.%s_%s-%s.%s" % paramTuple
-      # Check stack version (do we need upgrade?)
-      basedir = os.path.join(self.stacksDir, upgradeId, component)
-      if not os.path.isdir(basedir):
-        errorstr = "Upgrade %s is not supported (dir %s does not exist)" \
-                   % (upgradeId, basedir)
-        logger.error(errorstr)
-        result = {
-          'exitcode' : 1,
-          'stdout'   : errorstr,
-          'stderr'   : errorstr
-        }
-      else:
-        result = {
-          'exitcode' : 0,
-          'stdout'   : '',
-          'stderr'   : ''
-        }
-        # Request repos update (will be executed once before running any pp file)
-        self.puppetExecutor.discardInstalledRepos()
-        for dir in self.SCRIPT_DIRS:
-          if result['exitcode'] != 0:
-            break
-          tmpRes = self.execute_dir(command, basedir, dir, tmpout, tmperr)
-
-          result = {
-            'exitcode' : result['exitcode'] or tmpRes['exitcode'],
-            'stdout'   : "%s\n%s" % (result['stdout'], tmpRes['stdout']),
-            'stderr'   : "%s\n%s" % (result['stderr'], tmpRes['stderr']),
-          }
-
-        if result['exitcode'] == 0:
-          logger.info("Upgrade %s successfully finished" % upgradeId)
-          self.versionsHandler.write_stack_version(component, tgtStack)
-    else:
-      infostr = "target_stack_version (%s) matches current stack version" \
-          " for component %s, nothing to do" % (tgtStack, component)
-      logger.info(infostr)
-      result = {
-        'exitcode' : 0,
-        'stdout'   : infostr,
-        'stderr'   : 'None'
-      }
-    result = {
-      'exitcode' : result['exitcode'],
-      'stdout'   : grep.tail(result['stdout'], grep.OUTPUT_LAST_LINES),
-      'stderr'   : grep.tail(result['stderr'], grep.OUTPUT_LAST_LINES)
-    }
-    return result
-
-
-  def get_key_func(self, name):
-    """
-    Returns a number from filenames like 70-foobar.* or 999 for not matching
-    filenames
-    """
-    parts = name.split('-', 1)
-    if not parts or not parts[0].isdigit():
-      logger.warn("Can't parse script filename number %s" % name)
-      return self.NAME_PARSING_FAILED_CODE # unknown element will be placed to the end of list
-    return int(parts[0])
-
-
-  def split_stack_version(self, verstr):
-    verdict = json.loads(verstr)
-    stack_name = verdict["stackName"].strip()
-
-    matchObj = re.match( r'(\d+).(\d+)', verdict["stackVersion"].strip(), re.M|re.I)
-    if matchObj:
-      stack_major_ver = matchObj.group(1)
-      stack_minor_ver = matchObj.group(2)
-      return stack_name, stack_major_ver, stack_minor_ver
-    else:
-      return None
-
-
-  def execute_dir(self, command, basedir, dir, tmpout, tmperr):
-    """
-    Executes *.py and *.pp files located in a given directory.
-    Files a executed in a numeric sorting order.
-    """
-    dirpath = os.path.join(basedir, dir)
-    logger.info("Executing %s" % dirpath)
-    if not os.path.isdir(dirpath):
-      warnstr = "Script directory %s does not exist, skipping" % dirpath
-      logger.warn(warnstr)
-      result = {
-        'exitcode' : 0,
-        'stdout'   : warnstr,
-        'stderr'   : 'None'
-      }
-      return result
-    fileList=os.listdir(dirpath)
-    fileList.sort(key = self.get_key_func)
-    formattedResult = {
-      'exitcode' : 0,
-      'stdout'   : '',
-      'stderr'   : ''
-    }
-    for filename in fileList:
-      prevcode = formattedResult['exitcode']
-      if prevcode != 0 or self.get_key_func(filename) == self.NAME_PARSING_FAILED_CODE:
-        break
-      filepath = os.path.join(dirpath, filename)
-      if filename.endswith(".pp"):
-        logger.info("Running puppet file %s" % filepath)
-        result = self.puppetExecutor.run_manifest(command, filepath,
-                                                                tmpout, tmperr)
-      elif filename.endswith(".py"):
-        logger.info("Running python file %s" % filepath)
-        result = self.pythonExecutor.run_file(command, filepath, tmpout, tmperr)
-      elif filename.endswith(".pyc"):
-        pass # skipping compiled files
-      else:
-        warnstr = "Unrecognized file type, skipping: %s" % filepath
-        logger.warn(warnstr)
-        result = {
-          'exitcode' : 0,
-          'stdout'   : warnstr,
-          'stderr'   : 'None'
-        }
-      formattedResult = {
-        'exitcode' : prevcode or result['exitcode'],
-        'stdout'   : "%s\n%s" % (formattedResult['stdout'], result['stdout']),
-        'stderr'   : "%s\n%s" % (formattedResult['stderr'], result['stderr']),
-      }
-    logger.debug("Result of %s: \n %s" % (dirpath, pprint.pformat(formattedResult)))
-    return formattedResult
-
-
-
-
-
-

+ 0 - 180
ambari-agent/src/test/python/TestActionDependencyManager.py

@@ -1,180 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-from unittest import TestCase
-from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
-import os, errno, time, pprint, tempfile, threading, sys
-from mock.mock import patch, MagicMock, call
-
-class TestActionDependencyManager(TestCase):
-
-  dummy_RCO_file = os.path.join('dummy_files', 'test_rco_data.json')
-
-  def setUp(self):
-    self.config = AmbariConfig().getConfig()
-    self.config.set('agent', 'prefix', os.getcwd())
-    ActionDependencyManager.DEPS_FILE_NAME = self.dummy_RCO_file
-
-  # TODO: disabled for now
-  def disabled_test_init(self):
-    """
-    Tests config load
-    """
-    adm = ActionDependencyManager(self.config)
-    deps_dump = pprint.pformat(adm.deps)
-    expected = "{u'DATANODE-STOP': [u'JOBTRACKER-STOP',\n                    " \
-               "u'TASKTRACKER-STOP',\n                    " \
-               "u'RESOURCEMANAGER-STOP',\n                    " \
-               "u'NODEMANAGER-STOP',\n                    " \
-               "u'HISTORYSERVER-STOP',\n                    " \
-               "u'HBASE_MASTER-STOP'],\n u'HBASE_MASTER-START': " \
-               "[u'PEERSTATUS-START'],\n u'JOBTRACKER-START': " \
-               "[u'PEERSTATUS-START'],\n u'RESOURCEMANAGER-START': " \
-               "[u'NAMENODE-START', u'DATANODE-START'],\n " \
-               "u'SECONDARY_NAMENODE-START': [u'DATANODE-START', " \
-               "u'NAMENODE-START'],\n u'SECONDARY_NAMENODE-UPGRADE': " \
-               "[u'NAMENODE-UPGRADE']}"
-    self.assertEqual(deps_dump, expected)
-
-
-  def test_is_action_group_available(self):
-    adm = ActionDependencyManager(self.config)
-    self.assertFalse(adm.is_action_group_available())
-    adm.scheduled_action_groups.put(["test"])
-    self.assertTrue(adm.is_action_group_available())
-
-
-  def test_get_next_action_group(self):
-    adm = ActionDependencyManager(self.config)
-    test1 = ["test1"]
-    test2 = ["test2"]
-    adm.scheduled_action_groups.put(test1)
-    adm.scheduled_action_groups.put(test2)
-    adm.last_scheduled_group = test2
-    self.assertTrue(adm.is_action_group_available())
-    # Taking 1st
-    self.assertEqual(test1, adm.get_next_action_group())
-    self.assertTrue(len(adm.last_scheduled_group) == 1)
-    self.assertTrue(adm.is_action_group_available())
-    # Taking 2nd
-    self.assertEqual(test2, adm.get_next_action_group())
-    self.assertTrue(len(adm.last_scheduled_group) == 0)
-    self.assertTrue(adm.last_scheduled_group is not test2)
-    self.assertFalse(adm.is_action_group_available())
-
-
-  @patch.object(ActionDependencyManager, "dump_info")
-  @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
-  def test_put_action(self, can_be_executed_in_parallel_mock, dump_info_mock):
-    can_be_executed_in_parallel_mock.side_effect = [True, False, True, False,
-                                                     True, True, True, False]
-    adm = ActionDependencyManager(self.config)
-
-    adm.put_actions(list(range(0, 8)))
-
-    queue = []
-    while adm.is_action_group_available():
-      next = adm.get_next_action_group()
-      queue.append(next)
-
-    str = pprint.pformat(queue)
-    expected = "[[0], [1, 2], [3, 4, 5, 6], [7]]"
-    self.assertEqual(str, expected)
-
-
-  # TODO: disabled for now
-  def disabled_test_can_be_executed_in_parallel(self):
-    adm = ActionDependencyManager(self.config)
-    # empty group
-    group = []
-    install_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'INSTALL',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    upgrade_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'UPGRADE',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    start_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'START',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    stop_command = {
-      'role': 'DATANODE',
-      'roleCommand': 'STOP',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    status_command = {
-      'commandType': ActionQueue.STATUS_COMMAND
-    }
-    rm_start_command = {
-      'role': 'RESOURCEMANAGER',
-      'roleCommand': 'START',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    hm_start_command = {
-      'role': 'HBASE_MASTER',
-      'roleCommand': 'START',
-      'commandType': ActionQueue.EXECUTION_COMMAND
-    }
-    self.assertTrue(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
-    # multiple status commands
-    group = []
-    for i in range(0, 3):
-      group.append(status_command)
-    self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    # new status command
-    group = [install_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    # install/upgrade commands
-    group = [install_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-    group = [upgrade_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-    # Other commands
-    group = [start_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
-    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
-    self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
-    # Check dependency processing
-    group = [start_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(rm_start_command, group))
-    group = [start_command]
-    self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
-    # actions for the same component
-    group = [start_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(stop_command, group))
-    group = [stop_command]
-    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
-

+ 50 - 82
ambari-agent/src/test/python/TestActionQueue.py

@@ -17,13 +17,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
+from Queue import Queue
 
 
 from unittest import TestCase
 from unittest import TestCase
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.PuppetExecutor import PuppetExecutor
 from ambari_agent.PuppetExecutor import PuppetExecutor
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
 import os, errno, time, pprint, tempfile, threading
 import os, errno, time, pprint, tempfile, threading
 import StringIO
 import StringIO
 import sys
 import sys
@@ -31,7 +31,6 @@ from threading import Thread
 
 
 from mock.mock import patch, MagicMock, call
 from mock.mock import patch, MagicMock, call
 from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
 from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-from ambari_agent.UpgradeExecutor import UpgradeExecutor
 
 
 
 
 class TestActionQueue(TestCase):
 class TestActionQueue(TestCase):
@@ -126,106 +125,79 @@ class TestActionQueue(TestCase):
   }
   }
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionDependencyManager, "get_next_action_group")
-  @patch.object(ActionQueue, "process_portion_of_actions")
-  def test_ActionQueueStartStop(self, process_portion_of_actions_mock,
-                          get_next_action_group_mock, read_dependencies_mock):
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  def test_ActionQueueStartStop(self, get_mock, process_command_mock):
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     actionQueue.start()
     actionQueue.start()
     time.sleep(0.1)
     time.sleep(0.1)
     actionQueue.stop()
     actionQueue.stop()
     actionQueue.join()
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-    self.assertTrue(get_next_action_group_mock.call_count > 1)
-    self.assertTrue(process_portion_of_actions_mock.call_count > 1)
+    self.assertTrue(process_command_mock.call_count > 1)
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch("traceback.print_exc")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(ActionQueue, "execute_status_command")
   @patch.object(ActionQueue, "execute_status_command")
-  def test_process_portion_of_actions(self, execute_status_command_mock,
-            executeCommand_mock, read_dependencies_mock):
+  def test_process_command(self, execute_status_command_mock,
+                           execute_command_mock, print_exc_mock):
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
-    # Test execution of EXECUTION_COMMANDs
-    max = 3
-    actionQueue.MAX_CONCURRENT_ACTIONS = max
-    unfreeze_flag = threading.Event()
-    sync_lock = threading.RLock()
-    stats = {
-      'waiting_threads' : 0
+    execution_command = {
+      'commandType' : ActionQueue.EXECUTION_COMMAND,
     }
     }
-    def side_effect(self):
-      with sync_lock: # Synchtonized to avoid race effects during test execution
-        stats['waiting_threads'] += 1
-      unfreeze_flag.wait()
-    executeCommand_mock.side_effect = side_effect
-    portion = [self.datanode_install_command,
-               self.namenode_install_command,
-               self.snamenode_install_command,
-               self.nagios_install_command,
-               self.hbase_install_command]
+    status_command = {
+      'commandType' : ActionQueue.STATUS_COMMAND,
+    }
+    wrong_command = {
+      'commandType' : "SOME_WRONG_COMMAND",
+    }
+    # Try wrong command
+    actionQueue.process_command(wrong_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertFalse(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
 
 
-    # We call method in a separate thread
-    action_thread = Thread(target =  actionQueue.process_portion_of_actions, args = (portion, ))
-    action_thread.start()
-    # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running
-    while stats['waiting_threads'] != max:
-      time.sleep(0.1)
-    self.assertEqual(stats['waiting_threads'], max)
-    # unfreezing waiting threads
-    unfreeze_flag.set()
-    # wait until all threads are finished
-    action_thread.join()
-    self.assertTrue(executeCommand_mock.call_count == 5)
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
+    # Try normal execution
+    actionQueue.process_command(execution_command)
+    self.assertTrue(execute_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
-    executeCommand_mock.reset_mock()
+    self.assertFalse(print_exc_mock.called)
+
+    execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
 
 
-    # Test execution of STATUS_COMMANDs
-    n = 5
-    portion = []
-    for i in range(0, n):
-      status_command = {
-        'componentName': 'DATANODE',
-        'commandType': 'STATUS_COMMAND',
-      }
-      portion.append(status_command)
-    actionQueue.process_portion_of_actions(portion)
-    self.assertTrue(execute_status_command_mock.call_count == n)
-    self.assertFalse(executeCommand_mock.called)
-
-    # Test execution of unknown command
-    unknown_command = {
-      'commandType': 'WRONG_COMMAND',
-    }
-    portion = [unknown_command]
-    actionQueue.process_portion_of_actions(portion)
-    # no exception expected
-    pass
+    actionQueue.process_command(status_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertTrue(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
 
 
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
 
 
-  @patch("traceback.print_exc")
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionQueue, "execute_command")
-  def test_execute_command_safely(self, execute_command_mock,
-                                  read_dependencies_mock, print_exc_mock):
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
-    # Try normal execution
-    actionQueue.execute_command_safely('command')
-    # Try exception ro check proper logging
+    # Try exception to check proper logging
     def side_effect(self):
     def side_effect(self):
       raise Exception("TerribleException")
       raise Exception("TerribleException")
     execute_command_mock.side_effect = side_effect
     execute_command_mock.side_effect = side_effect
-    actionQueue.execute_command_safely('command')
+    actionQueue.process_command(execution_command)
+    self.assertTrue(print_exc_mock.called)
+
+    print_exc_mock.reset_mock()
+
+    execute_status_command_mock.side_effect = side_effect
+    actionQueue.process_command(execution_command)
     self.assertTrue(print_exc_mock.called)
     self.assertTrue(print_exc_mock.called)
 
 
 
 
+
   @patch("__builtin__.open")
   @patch("__builtin__.open")
   @patch.object(ActionQueue, "status_update_callback")
   @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  def test_execute_command(self, read_dependencies_mock,
-                           status_update_callback_mock, open_mock):
+  def test_execute_command(self, status_update_callback_mock, open_mock):
     # Make file read calls visible
     # Make file read calls visible
     def open_side_effect(file, mode):
     def open_side_effect(file, mode):
       if mode == 'r':
       if mode == 'r':
@@ -251,10 +223,7 @@ class TestActionQueue(TestCase):
     def patched_aq_execute_command(command):
     def patched_aq_execute_command(command):
       # We have to perform patching for separate thread in the same thread
       # We have to perform patching for separate thread in the same thread
       with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
       with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
-        with patch.object(UpgradeExecutor, "perform_stack_upgrade") \
-              as perform_stack_upgrade_mock:
           runCommand_mock.side_effect = side_effect
           runCommand_mock.side_effect = side_effect
-          perform_stack_upgrade_mock.side_effect = side_effect
           actionQueue.execute_command(command)
           actionQueue.execute_command(command)
     ### Test install/start/stop command ###
     ### Test install/start/stop command ###
     ## Test successful execution with configuration tags
     ## Test successful execution with configuration tags
@@ -379,11 +348,10 @@ class TestActionQueue(TestCase):
 
 
   @patch.object(ActionQueue, "status_update_callback")
   @patch.object(ActionQueue, "status_update_callback")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(LiveStatus, "build")
   @patch.object(LiveStatus, "build")
   def test_execute_status_command(self, build_mock, execute_command_mock,
   def test_execute_status_command(self, build_mock, execute_command_mock,
-                                  read_dependencies_mock, read_stack_version_mock,
+                                  read_stack_version_mock,
                                   status_update_callback):
                                   status_update_callback):
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
     build_mock.return_value = "dummy report"
     build_mock.return_value = "dummy report"
@@ -392,4 +360,4 @@ class TestActionQueue(TestCase):
     report = actionQueue.result()
     report = actionQueue.result()
     expected = 'dummy report'
     expected = 'dummy report'
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
+    self.assertEqual(report['componentStatus'][0], expected)

+ 1 - 4
ambari-agent/src/test/python/TestController.py

@@ -23,7 +23,6 @@ import StringIO
 import ssl
 import ssl
 import unittest, threading
 import unittest, threading
 from ambari_agent import Controller, ActionQueue
 from ambari_agent import Controller, ActionQueue
-from  ambari_agent.ActionDependencyManager import ActionDependencyManager
 from ambari_agent import hostname
 from ambari_agent import hostname
 import sys
 import sys
 from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
 from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
@@ -153,9 +152,7 @@ class TestController(unittest.TestCase):
   @patch("urllib2.build_opener")
   @patch("urllib2.build_opener")
   @patch("urllib2.install_opener")
   @patch("urllib2.install_opener")
   @patch.object(ActionQueue.ActionQueue, "run")
   @patch.object(ActionQueue.ActionQueue, "run")
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionDependencyManager, "dump_info")
-  def test_repeatRegistration(self, dump_info_mock, read_dependencies_mock,
+  def test_repeatRegistration(self,
                               run_mock, installMock, buildMock):
                               run_mock, installMock, buildMock):
 
 
     registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
     registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")

+ 8 - 22
ambari-agent/src/test/python/TestHeartbeat.py

@@ -21,7 +21,6 @@ limitations under the License.
 from unittest import TestCase
 from unittest import TestCase
 import unittest
 import unittest
 from ambari_agent.Heartbeat import Heartbeat
 from ambari_agent.Heartbeat import Heartbeat
-from ambari_agent.ActionDependencyManager import ActionDependencyManager
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent import AmbariConfig
 from ambari_agent import AmbariConfig
@@ -47,8 +46,7 @@ class TestHeartbeat(TestCase):
     sys.stdout = sys.__stdout__
     sys.stdout = sys.__stdout__
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  def test_build(self, read_dependencies_mock):
+  def test_build(self):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
     heartbeat = Heartbeat(actionQueue)
     result = heartbeat.build(100)
     result = heartbeat.build(100)
@@ -66,12 +64,9 @@ class TestHeartbeat(TestCase):
     self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
     self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(ActionQueue, "result")
   @patch.object(ActionQueue, "result")
-  @patch.object(ActionDependencyManager, "is_action_group_available")
   @patch.object(HostInfo, "register")
   @patch.object(HostInfo, "register")
-  def test_no_mapping(self, register_mock, is_action_group_available_mock, result_mock,
-                      read_dependencies_mock):
+  def test_no_mapping(self, register_mock, result_mock):
     result_mock.return_value = {
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
       'reports': [{'status': 'IN_PROGRESS',
                    'stderr': 'Read from /tmp/errors-3.txt',
                    'stderr': 'Read from /tmp/errors-3.txt',
@@ -95,11 +90,8 @@ class TestHeartbeat(TestCase):
     self.assertEqual(register_mock.call_args_list[0][0][1], False)
     self.assertEqual(register_mock.call_args_list[0][0][1], False)
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(ActionQueue, "result")
   @patch.object(ActionQueue, "result")
-  @patch.object(ActionDependencyManager, "is_action_group_available")
-  def test_build_long_result(self, is_action_group_available_mock, result_mock,
-                  read_dependencies_mock):
+  def test_build_long_result(self, result_mock):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     result_mock.return_value = {
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
       'reports': [{'status': 'IN_PROGRESS',
@@ -184,22 +176,17 @@ class TestHeartbeat(TestCase):
     self.assertEquals(hb, expected)
     self.assertEquals(hb, expected)
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
-  @patch.object(ActionDependencyManager, "dump_info")
-  @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
   @patch.object(HostInfo, 'register')
   @patch.object(HostInfo, 'register')
-  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock,
-      can_be_executed_in_parallel_mock, dump_info_mock, read_dependencies_mock):
-    can_be_executed_in_parallel_mock.return_value = False
+  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     statusCommand = {
     statusCommand = {
       "serviceName" : 'HDFS',
       "serviceName" : 'HDFS',
       "commandType" : "STATUS_COMMAND",
       "commandType" : "STATUS_COMMAND",
-      "clusterName" : "",
+      "clusterName" : "c1",
       "componentName" : "DATANODE",
       "componentName" : "DATANODE",
       'configurations':{'global' : {}}
       'configurations':{'global' : {}}
     }
     }
-    actionQueue.put(list(statusCommand))
+    actionQueue.put([statusCommand])
 
 
     heartbeat = Heartbeat(actionQueue)
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
     heartbeat.build(12, 6)
@@ -209,9 +196,8 @@ class TestHeartbeat(TestCase):
     self.assertFalse(args[1])
     self.assertFalse(args[1])
 
 
 
 
-  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(HostInfo, 'register')
   @patch.object(HostInfo, 'register')
-  def test_heartbeat_host_check_no_cmd(self, register_mock, read_dependencies_mock):
+  def test_heartbeat_host_check_no_cmd(self, register_mock):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
     heartbeat.build(12, 6)
@@ -222,4 +208,4 @@ class TestHeartbeat(TestCase):
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-  unittest.main(verbosity=2)
+  unittest.main(verbosity=2)

+ 0 - 264
ambari-agent/src/test/python/TestUpgradeExecutor.py

@@ -1,264 +0,0 @@
-#!/usr/bin/env python2.6
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-from unittest import TestCase
-import unittest
-import StringIO
-import socket
-import os, sys, pprint, json
-from mock.mock import patch
-from mock.mock import MagicMock
-from mock.mock import create_autospec
-import os, errno, tempfile
-from ambari_agent import UpgradeExecutor
-import logging
-from ambari_agent import AmbariConfig
-from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-
-class TestUpgradeExecutor(TestCase):
-
-  logger = logging.getLogger()
-
-  @patch.object(StackVersionsFileHandler, 'write_stack_version')
-  @patch('os.path.isdir')
-  def test_perform_stack_upgrade(self, isdir_method, write_stack_version_method):
-    puppetExecutor = MagicMock()
-    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
-      puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    # Checking matching versions
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-       },
-      'role' : 'HDFS'
-    }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('matches current stack version' in result['stdout'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking unsupported update
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-      },
-      'role' : 'HDFS'
-    }
-    isdir_method.return_value = False
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('not supported' in result['stderr'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking wrong source version
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-        },
-      'role' : 'HDFS'
-    }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('does not match pattern' in result['stderr'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking wrong target version
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}',
-        },
-      'role' : 'HDFS'
-    }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue('does not match pattern' in result['stderr'])
-    self.assertFalse(write_stack_version_method.called)
-    # Checking successful result
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-      },
-      'role' : 'HDFS'
-    }
-    isdir_method.return_value = True
-    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr : \
-      {
-        'exitcode' : 0,
-        'stdout'   : "output - %s" % dir,
-        'stderr'   : "errors - %s" % dir,
-      }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue(write_stack_version_method.called)
-    self.assertEquals(result['exitcode'],0)
-    self.assertEquals(result['stdout'],'output - pre-upgrade.d\noutput - upgrade.d\noutput - post-upgrade.d')
-    self.assertEquals(result['stderr'],'errors - pre-upgrade.d\nerrors - upgrade.d\nerrors - post-upgrade.d')
-    # Checking failed result
-    write_stack_version_method.reset()
-    command = {
-      'commandParams' :	{
-        'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}',
-        'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}',
-      },
-      'role' : 'HDFS'
-    }
-    isdir_method.return_value = True
-    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr :\
-    {
-      'exitcode' : 1,
-      'stdout'   : "output - %s" % dir,
-      'stderr'   : "errors - %s" % dir,
-      }
-    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
-    self.assertTrue(write_stack_version_method.called)
-    self.assertEquals(result['exitcode'],1)
-    self.assertEquals(result['stdout'],'output - pre-upgrade.d')
-    self.assertEquals(result['stderr'],'errors - pre-upgrade.d')
-
-
-  def test_get_key_func(self):
-    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
-                 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
-    # Checking unparseable
-    self.assertEqual(executor.get_key_func('fdsfds'), 999)
-    self.assertEqual(executor.get_key_func('99dfsfd'), 999)
-    self.assertEqual(executor.get_key_func('-fdfds'), 999)
-    # checking parseable
-    self.assertEqual(executor.get_key_func('99'), 99)
-    self.assertEqual(executor.get_key_func('45-install'), 45)
-    self.assertEqual(executor.get_key_func('33-install-staff'), 33)
-    #checking sorting of full list
-    testlist1 = ['7-fdfd', '10-erewfds', '11-fdfdfd', '1-hh', '20-kk', '01-tt']
-    testlist1.sort(key = executor.get_key_func)
-    self.assertEqual(testlist1,
-        ['1-hh', '01-tt', '7-fdfd', '10-erewfds', '11-fdfdfd', '20-kk'])
-
-
-  def test_split_stack_version(self):
-    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
-             'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
-    result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.2.1\"}')
-    self.assertEquals(result, ('HDP', '1', '2'))
-    result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.3\"}')
-    self.assertEquals(result, ('HDP', '1', '3'))
-    result = executor.split_stack_version('{\"stackName\":\"ComplexStackVersion\",\"stackVersion\":\"1.3.4.2.2\"}')
-    self.assertEquals(result, ('ComplexStackVersion', '1', '3'))
-    result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1\"}')
-    self.assertEquals(result, None)
-    pass
-
-
-  @patch('os.listdir')
-  @patch('os.path.isdir')
-  @patch.object(UpgradeExecutor.UpgradeExecutor, 'get_key_func')
-  def test_execute_dir(self, get_key_func_method, isdir_method, listdir_method):
-    pythonExecutor = MagicMock()
-    puppetExecutor = MagicMock()
-
-    command = {'debug': 'command'}
-    isdir_method.return_value = True
-    # Mocking sort() method of list
-    class MyList(list):
-      pass
-    files = MyList(['first.py', 'second.pp', 'third.py', 'fourth.nm',
-             'fifth-failing.py', 'six.py'])
-    files.sort = lambda key: None
-    listdir_method.return_value = files
-    # fifth-failing.py will fail
-    pythonExecutor.run_file.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - first.py",
-       'stderr'   : "stderr - first.py"},
-      {'exitcode' : 0,
-       'stdout'   : "stdout - third.py",
-       'stderr'   : "stderr - third.py"},
-      {'exitcode' : 1,
-       'stdout'   : "stdout - fifth-failing.py",
-       'stderr'   : "stderr - fifth-failing.py"},
-      {'exitcode' : 0,
-       'stdout'   : "stdout - six.py",
-       'stderr'   : "stderr - six.py"},
-    ]
-    puppetExecutor.run_manifest.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - second.pp",
-       'stderr'   : "stderr - second.pp"},
-    ]
-
-    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
-    self.assertEquals(result['exitcode'],1)
-    self.assertEquals(result['stdout'],"\nstdout - first.py\nstdout - second.pp\nstdout - third.py\nUnrecognized file type, skipping: basedir/dir/fourth.nm\nstdout - fifth-failing.py")
-    self.assertEquals(result['stderr'],"\nstderr - first.py\nstderr - second.pp\nstderr - third.py\nNone\nstderr - fifth-failing.py")
-
-
-  @patch('os.path.isdir')
-  def test_execute_dir_not_existing(self, isdir_method):
-    pythonExecutor = MagicMock()
-    puppetExecutor = MagicMock()
-
-    command = {'debug': 'command'}
-    isdir_method.return_value = False
-
-    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    result= executor.execute_dir(command, 'basedir', 'not_existing_dir', 'tmpout', 'tmperr')
-    self.assertEquals(result['exitcode'],0)
-    self.assertEquals(result['stdout'],'Script directory basedir/not_existing_dir does not exist, skipping')
-    self.assertEquals(result['stderr'],'None')
-
-
-  @patch('os.listdir')
-  @patch('os.path.isdir')
-  def test_execute_dir_ignore_badly_named(self, isdir_method, listdir_method):
-    pythonExecutor = MagicMock()
-    puppetExecutor = MagicMock()
-
-    command = {'debug': 'command'}
-    isdir_method.return_value = True
-    files = ['00-first.py', 'badly-named.pp', '10-second.pp', '20-wrong.cpp']
-    listdir_method.return_value = files
-    # fifth-failing.py will fail
-    pythonExecutor.run_file.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - python.py",
-       'stderr'   : "stderr - python.py"},
-    ]
-    puppetExecutor.run_manifest.side_effect = [
-      {'exitcode' : 0,
-       'stdout'   : "stdout - puppet.pp",
-       'stderr'   : "stderr - puppet.pp"},
-    ]
-
-    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
-        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
-
-    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
-    self.assertEquals(result['exitcode'],0)
-    self.assertEquals(result['stdout'],'\nstdout - python.py\nstdout - puppet.pp\nUnrecognized file type, skipping: basedir/dir/20-wrong.cpp')
-    self.assertEquals(result['stderr'],'\nstderr - python.py\nstderr - puppet.pp\nNone')
-
-if __name__ == "__main__":
-  unittest.main(verbosity=2)