Browse Source

AMBARI-3308. Parallelize events on the ambari agent wherein the agent can run start/stop/configure/install in parallel. (dlysnichenko)

Lisnichenko Dmitro 11 years ago
parent
commit
8f4ab529c5
26 changed files with 1751 additions and 1067 deletions
  1. 2 0
      ambari-agent/conf/unix/ambari-agent.ini
  2. 4 0
      ambari-agent/pom.xml
  3. 163 0
      ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py
  4. 124 150
      ambari-agent/src/main/python/ambari_agent/ActionQueue.py
  5. 1 0
      ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
  6. 130 0
      ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
  7. 13 11
      ambari-agent/src/main/python/ambari_agent/Controller.py
  8. 1 1
      ambari-agent/src/main/python/ambari_agent/Heartbeat.py
  9. 1 1
      ambari-agent/src/main/python/ambari_agent/NetUtil.py
  10. 6 7
      ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py
  11. 180 0
      ambari-agent/src/test/python/TestActionDependencyManager.py
  12. 341 285
      ambari-agent/src/test/python/TestActionQueue.py
  13. 125 0
      ambari-agent/src/test/python/TestCommandStatusDict.py
  14. 11 5
      ambari-agent/src/test/python/TestController.py
  15. 130 105
      ambari-agent/src/test/python/TestHeartbeat.py
  16. 8 5
      ambari-agent/src/test/python/TestPuppetExecutor.py
  17. 19 0
      ambari-agent/src/test/python/dummy_files/test_rco_data.json
  18. 99 0
      ambari-common/src/main/resources/role_command_order.json
  19. 4 0
      ambari-server/pom.xml
  20. 28 14
      ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
  21. 101 478
      ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java
  22. 247 0
      ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java
  23. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleGraphTest.java
  24. 2 1
      ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
  25. 3 3
      ambari-server/src/test/java/org/apache/ambari/server/stageplanner/TestStagePlanner.java
  26. 7 0
      ambari-server/src/test/resources/test_rco_data.json

+ 2 - 0
ambari-agent/conf/unix/ambari-agent.ini

@@ -34,6 +34,8 @@ puppetmodules=/var/lib/ambari-agent/puppet
 ruby_home=/usr/lib/ambari-agent/lib/ruby-1.8.7-p370
 puppet_home=/usr/lib/ambari-agent/lib/puppet-2.7.9
 facter_home=/usr/lib/ambari-agent/lib/facter-1.6.10
+# How many seconds will pass before running puppet is terminated on timeout
+timeout_seconds = 600
 
 [command]
 maxretries=2

+ 4 - 0
ambari-agent/pom.xml

@@ -328,6 +328,10 @@
                   <location>../version</location>
                   <filter>true</filter>
                 </source>
+                <!--<source>-->
+                  <!--&lt;!&ndash; This file is also included into server rpm&ndash;&gt;-->
+                  <!--<location>../ambari-common/src/main/resources/role_command_order.json</location>-->
+                <!--</source>-->
               </sources>
             </mapping>
             <!-- -->

+ 163 - 0
ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py

@@ -0,0 +1,163 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import Queue
+import threading
+import pprint
+import os
+import json
+
+logger = logging.getLogger()
+
+class ActionDependencyManager():
+  """
+  Implemments a scheduler of role commands (like DATANODE-START) based on
+  dependencies between them. Class does not execute actions, it only
+  breaks them on groups that may be executed in parallel.
+  """
+
+  DEPS_FILE_NAME="role_command_order.json"
+  COMMENT_STR="_comment"
+
+  # Dictionary of dependencies. Format:
+  # BlockedRole-Command : [BlockerRole1-Command1, BlockerRole2-Command2, ...]
+
+
+  def __init__(self, config):
+    self.deps = {}
+    self.last_scheduled_group = []
+    self.scheduled_action_groups = Queue.Queue()
+    self.lock = threading.RLock()
+    self.config = config
+    #self.read_dependencies()
+
+
+  def read_dependencies(self):
+    """
+    Load dependencies from file
+    """
+    prefix_dir = self.config.get('agent', 'prefix')
+    action_order_file = os.path.join(prefix_dir, self.DEPS_FILE_NAME)
+    with open(action_order_file) as f:
+      action_order_data = json.load(f)
+    for deps_group in action_order_data.keys():
+      if deps_group != self.COMMENT_STR: # if entry is not a comment
+        deps_group_list = action_order_data[deps_group]
+        for blocked_str in deps_group_list:
+          if blocked_str != self.COMMENT_STR: # if entry is not a comment
+            blocker_list = deps_group_list[blocked_str]
+            if blocked_str not in self.deps:
+              self.deps[blocked_str]=[]
+            for blocker_str in blocker_list:
+              self.deps[blocked_str].append(blocker_str)
+    pass
+
+
+  def is_action_group_available(self):
+    return not self.scheduled_action_groups.empty()
+
+
+  def get_next_action_group(self):
+    """
+    Returns next group of scheduled actions that may be
+    executed in parallel. If queue is empty, blocks until
+    an item is available (until next put_action() call)
+    """
+    next_group = self.scheduled_action_groups.get(block=True)
+    with self.lock: # Synchronized
+      if next_group is self.last_scheduled_group:
+        # Group is not eligible for appending, creating new one
+        self.last_scheduled_group = []
+
+      dump_str = pprint.pformat(next_group)
+      logger.debug("Next action group: {0}".format(dump_str))
+      return next_group
+
+
+  def put_actions(self, actions):
+    """
+    Schedules actions to be executed in some time at future.
+    Here we rely on serial command execution sequence received from server.
+    Some of these commands may be executed in parallel with others, so we
+    unite them into a group.
+    """
+    with self.lock: # Synchronized
+      for action in actions:
+        self.dump_info(action)
+        was_empty = len(self.last_scheduled_group) == 0
+        if self.can_be_executed_in_parallel(action, self.last_scheduled_group):
+          self.last_scheduled_group.append(action)
+        else: # create a new group
+          self.last_scheduled_group = [action]
+          was_empty = True
+        if was_empty:
+          # last_scheduled_group is not empty now, so we add it to the queue
+          self.scheduled_action_groups.put(self.last_scheduled_group)
+
+
+  def dump_info(self, action):
+    """
+    Prints info about command to log
+    """
+    logger.info("Adding " + action['commandType'] + " for service " + \
+                action['serviceName'] + " of cluster " + \
+                action['clusterName'] + " to the queue.")
+    logger.debug(pprint.pformat(action))
+
+
+  def can_be_executed_in_parallel(self, action, group):
+    """
+    Checks whether action may be executed in parallel with a given group
+    """
+    # Hack: parallel execution disabled
+    return False
+
+    # from ActionQueue import ActionQueue
+    # # Empty group is compatible with any action
+    # if not group:
+    #   return True
+    # # Status commands are placed into a separate group to avoid race conditions
+    # if action['commandType'] == ActionQueue.STATUS_COMMAND:
+    #   for scheduled_action in group:
+    #     if scheduled_action['commandType'] != ActionQueue.STATUS_COMMAND:
+    #       return False
+    #   return True
+    # # We avoid executing install/upgrade threads in parallel with anything
+    # standalone_commands = ["INSTALL", ActionQueue.ROLE_COMMAND_UPGRADE]
+    # if action['roleCommand'] in standalone_commands:
+    #   return False
+    # # We can not perform few actions (like STOP and START) for a component
+    # # at the same time
+    # for scheduled_action in group:
+    #   if scheduled_action['role'] == action['role']:
+    #     return False
+    # # In other cases, check dependencies
+    # pattern = "{0}-{1}"
+    # new_action_str = pattern.format(action['role'], action['roleCommand'])
+    # for scheduled_action in group:
+    #   if new_action_str in self.deps:
+    #     blockers = self.deps[new_action_str]
+    #     scheduled_action_str = pattern.format(
+    #       scheduled_action['role'], scheduled_action['roleCommand'])
+    #     if scheduled_action_str in blockers:
+    #       return False
+    # # Everything seems to be ok
+    # return True

+ 124 - 150
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -20,19 +20,19 @@ limitations under the License.
 
 import logging
 import traceback
-import Queue
 import threading
+from threading import Thread
 import pprint
 import os
-import time
 
 from LiveStatus import LiveStatus
 from shell import shellRunner
 import PuppetExecutor
 import UpgradeExecutor
 import PythonExecutor
-from Grep import Grep
 from ActualConfigHandler import ActualConfigHandler
+from ActionDependencyManager import ActionDependencyManager
+from CommandStatusDict import CommandStatusDict
 
 
 logger = logging.getLogger()
@@ -40,35 +40,29 @@ installScriptHash = -1
 
 class ActionQueue(threading.Thread):
   """ Action Queue for the agent. We pick one command at a time from the queue
-  and execute that """
+  and execute it
+  Note: Action and command terms in this and related classes are used interchangeably
+  """
 
-  commandQueue = Queue.Queue()
-  resultQueue = Queue.Queue()
+  # How many actions can be performed in parallel. Feel free to change
+  MAX_CONCURRENT_ACTIONS = 5
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
-  UPGRADE_STATUS = 'UPGRADE'
+  ROLE_COMMAND_UPGRADE = 'UPGRADE'
 
-  IDLE_SLEEP_TIME = 5
+  IN_PROGRESS_STATUS = 'IN_PROGRESS'
 
-  def __init__(self, config):
+  def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
+    self.commandQueue = ActionDependencyManager(config)
+    self.commandStatuses = CommandStatusDict(callback_action =
+      self.status_update_callback)
     self.config = config
+    self.controller = controller
     self.sh = shellRunner()
     self._stop = threading.Event()
-    self.maxRetries = config.getint('command', 'maxretries')
-    self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
-    self.puppetExecutor = PuppetExecutor.PuppetExecutor(
-      config.get('puppet', 'puppetmodules'),
-      config.get('puppet', 'puppet_home'),
-      config.get('puppet', 'facter_home'),
-      config.get('agent', 'prefix'), config)
-    self.pythonExecutor = PythonExecutor.PythonExecutor(
-      config.get('agent', 'prefix'), config)
-    self.upgradeExecutor = UpgradeExecutor.UpgradeExecutor(self.pythonExecutor,
-      self.puppetExecutor, config)
     self.tmpdir = config.get('agent', 'prefix')
-    self.commandInProgress = None
 
   def stop(self):
     self._stop.set()
@@ -76,114 +70,61 @@ class ActionQueue(threading.Thread):
   def stopped(self):
     return self._stop.isSet()
 
-  def put(self, command):
-    logger.info("Adding " + command['commandType'] + " for service " +\
-                command['serviceName'] + " of cluster " +\
-                command['clusterName'] + " to the queue.")
-    logger.debug(pprint.pformat(command))
-    self.commandQueue.put(command)
-    pass
+  def put(self, commands):
+    self.commandQueue.put_actions(commands)
+
 
   def run(self):
-    result = []
     while not self.stopped():
-      while not self.commandQueue.empty():
-        command = self.commandQueue.get()
+      # Taking a new portion of tasks
+      portion = self.commandQueue.get_next_action_group() # Will block if queue is empty
+      portion = portion[::-1] # Reverse list order
+      self.process_portion_of_actions(portion)
+
+
+  def process_portion_of_actions(self, portion):
+    # starting execution of a group of commands
+    running_list = []
+    finished_list = []
+    while portion or running_list: # While not finished actions in portion
+      # poll threads under execution
+      for thread in running_list:
+        if not thread.is_alive():
+          finished_list.append(thread)
+        # Remove finished from the running list
+      running_list[:] = [b for b in running_list if not b in finished_list]
+      # Start next actions
+      free_slots = self.MAX_CONCURRENT_ACTIONS - len(running_list)
+      while free_slots > 0 and portion: # Start new threads if available
+        command = portion.pop()
         logger.debug("Took an element of Queue: " + pprint.pformat(command))
         if command['commandType'] == self.EXECUTION_COMMAND:
-          try:
-            #pass a copy of action since we don't want anything to change in the
-            #action dict
-            result = self.executeCommand(command)
+          # Start separate threads for commands of this type
+          action_thread = Thread(target =  self.execute_command_safely, args = (command, ))
+          running_list.append(action_thread)
+          free_slots -= 1
+          action_thread.start()
+        elif command['commandType'] == self.STATUS_COMMAND:
+          # Execute status commands immediately, in current thread
+          self.execute_status_command(command)
+        else:
+          logger.error("Unrecognized command " + pprint.pformat(command))
+    pass
 
-          except Exception, err:
-            traceback.print_exc()
-            logger.warn(err)
-            pass
 
-          for entry in result:
-            self.resultQueue.put((command['commandType'], entry))
+  def execute_command_safely(self, command):
+    # make sure we log failures
+    try:
+      self.execute_command(command)
+    except Exception, err:
+      # Should not happen
+      traceback.print_exc()
+      logger.warn(err)
 
-        elif command['commandType'] == self.STATUS_COMMAND:
-          try:
-            cluster = command['clusterName']
-            service = command['serviceName']
-            component = command['componentName']
-            configurations = command['configurations']
-            if configurations.has_key('global'):
-              globalConfig = configurations['global']
-            else:
-              globalConfig = {}
-            livestatus = LiveStatus(cluster, service, component,
-              globalConfig, self.config)
-            result = livestatus.build()
-            logger.debug("Got live status for component " + component +\
-                         " of service " + str(service) +\
-                         " of cluster " + str(cluster))
-            logger.debug(pprint.pformat(result))
-            if result is not None:
-              self.resultQueue.put((ActionQueue.STATUS_COMMAND, result))
-
-          except Exception, err:
-            traceback.print_exc()
-            logger.warn(err)
-          pass
-        else:
-          logger.warn("Unrecognized command " + pprint.pformat(command))
-      if not self.stopped():
-        time.sleep(self.IDLE_SLEEP_TIME)
 
-  # Store action result to agent response queue
-  def result(self):
-    resultReports = []
-    resultComponentStatus = []
-    while not self.resultQueue.empty():
-      res = self.resultQueue.get()
-      if res[0] == self.EXECUTION_COMMAND:
-        resultReports.append(res[1])
-      elif res[0] == ActionQueue.STATUS_COMMAND:
-        resultComponentStatus.append(res[1])
-
-    # Building report for command in progress
-    if self.commandInProgress is not None:
-      try:
-        tmpout = open(self.commandInProgress['tmpout'], 'r').read()
-        tmperr = open(self.commandInProgress['tmperr'], 'r').read()
-      except Exception, err:
-        logger.warn(err)
-        tmpout = '...'
-        tmperr = '...'
-      grep = Grep()
-      output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
-      inprogress = {
-        'role': self.commandInProgress['role'],
-        'actionId': self.commandInProgress['actionId'],
-        'taskId': self.commandInProgress['taskId'],
-        'stdout': grep.filterMarkup(output),
-        'clusterName': self.commandInProgress['clusterName'],
-        'stderr': tmperr,
-        'exitCode': 777,
-        'serviceName': self.commandInProgress['serviceName'],
-        'status': 'IN_PROGRESS',
-        'roleCommand': self.commandInProgress['roleCommand']
-      }
-      resultReports.append(inprogress)
-    result = {
-      'reports': resultReports,
-      'componentStatus': resultComponentStatus
-    }
-    return result
-
-  def executeCommand(self, command):
+  def execute_command(self, command):
     clusterName = command['clusterName']
     commandId = command['commandId']
-    hostname = command['hostname']
-    params = command['hostLevelParams']
-    clusterHostInfo = command['clusterHostInfo']
-    roleCommand = command['roleCommand']
-    serviceName = command['serviceName']
-    configurations = command['configurations']
-    result = []
 
     logger.info("Executing command with id = " + str(commandId) +\
                 " for role = " + command['role'] + " of " +\
@@ -192,41 +133,43 @@ class ActionQueue(threading.Thread):
 
     taskId = command['taskId']
     # Preparing 'IN_PROGRESS' report
-    self.commandInProgress = {
-      'role': command['role'],
-      'actionId': commandId,
-      'taskId': taskId,
-      'clusterName': clusterName,
-      'serviceName': serviceName,
+    in_progress_status = self.commandStatuses.generate_report_template(command)
+    in_progress_status.update({
       'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
       'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
-      'roleCommand': roleCommand
-    }
+      'status': self.IN_PROGRESS_STATUS
+    })
+    self.commandStatuses.put_command_status(command, in_progress_status)
     # running command
-    if command['commandType'] == ActionQueue.EXECUTION_COMMAND:
-      if command['roleCommand'] == ActionQueue.UPGRADE_STATUS:
-        commandresult = self.upgradeExecutor.perform_stack_upgrade(command, self.commandInProgress['tmpout'],
-          self.commandInProgress['tmperr'])
-      else:
-        commandresult = self.puppetExecutor.runCommand(command, self.commandInProgress['tmpout'],
-          self.commandInProgress['tmperr'])
-      # dumping results
-    self.commandInProgress = None
+    # Create a new instance of executor for the current thread
+    puppetExecutor = PuppetExecutor.PuppetExecutor(
+      self.config.get('puppet', 'puppetmodules'),
+      self.config.get('puppet', 'puppet_home'),
+      self.config.get('puppet', 'facter_home'),
+      self.config.get('agent', 'prefix'), self.config)
+    if command['roleCommand'] == ActionQueue.ROLE_COMMAND_UPGRADE:
+      # Create new instances for the current thread
+      pythonExecutor = PythonExecutor.PythonExecutor(
+          self.config.get('agent', 'prefix'), self.config)
+      upgradeExecutor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
+          puppetExecutor, self.config)
+      commandresult = upgradeExecutor.perform_stack_upgrade(command, in_progress_status['tmpout'],
+        in_progress_status['tmperr'])
+    else:
+      commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
+        in_progress_status['tmperr'])
+    # dumping results
     status = "COMPLETED"
     if commandresult['exitcode'] != 0:
       status = "FAILED"
-
+    roleResult = self.commandStatuses.generate_report_template(command)
     # assume some puppet plumbing to run these commands
-    roleResult = {'role': command['role'],
-                  'actionId': commandId,
-                  'taskId': command['taskId'],
-                  'stdout': commandresult['stdout'],
-                  'clusterName': clusterName,
-                  'stderr': commandresult['stderr'],
-                  'exitCode': commandresult['exitcode'],
-                  'serviceName': serviceName,
-                  'status': status,
-                  'roleCommand': roleCommand}
+    roleResult.update({
+      'stdout': commandresult['stdout'],
+      'stderr': commandresult['stderr'],
+      'exitCode': commandresult['exitcode'],
+      'status': status,
+    })
     if roleResult['stdout'] == '':
       roleResult['stdout'] = 'None'
     if roleResult['stderr'] == '':
@@ -242,10 +185,41 @@ class ActionQueue(threading.Thread):
       if command.has_key('roleCommand') and command['roleCommand'] == 'START':
         configHandler.copy_to_component(command['role'])
         roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
+    self.commandStatuses.put_command_status(command, roleResult)
+
 
-    result.append(roleResult)
-    return result
+  def execute_status_command(self, command):
+    try:
+      cluster = command['clusterName']
+      service = command['serviceName']
+      component = command['componentName']
+      configurations = command['configurations']
+      if configurations.has_key('global'):
+        globalConfig = configurations['global']
+      else:
+        globalConfig = {}
+      livestatus = LiveStatus(cluster, service, component,
+                              globalConfig, self.config)
+      result = livestatus.build()
+      logger.debug("Got live status for component " + component + \
+                   " of service " + str(service) + \
+                   " of cluster " + str(cluster))
+      logger.debug(pprint.pformat(result))
+      if result is not None:
+        self.commandStatuses.put_command_status(command, result)
+    except Exception, err:
+      traceback.print_exc()
+      logger.warn(err)
+    pass
+
+
+  # Store action result to agent response queue
+  def result(self):
+    return self.commandStatuses.generate_report()
 
 
-  def isIdle(self):
-    return self.commandQueue.empty()
+  def status_update_callback(self):
+    """
+    Actions that are executed every time when command status changes
+    """
+    self.controller.heartbeat_wait_event.set()

+ 1 - 0
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -45,6 +45,7 @@ upgradeScriptsDir=/var/lib/ambari-agent/upgrade_stack
 puppetmodules=/var/lib/ambari-agent/puppet/
 puppet_home=/root/workspace/puppet-install/puppet-2.7.9
 facter_home=/root/workspace/puppet-install/facter-1.6.10
+timeout_seconds = 600
 
 [command]
 maxretries=2

+ 130 - 0
ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py

@@ -0,0 +1,130 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import threading
+from Grep import Grep
+
+logger = logging.getLogger()
+
+class CommandStatusDict():
+  """
+  Holds results for all commands that are being executed or have finished
+  execution (but are not yet reported). Implementation is thread-safe.
+  Dict format:
+    task_id -> (command, cmd_report)
+  """
+
+  def __init__(self, callback_action):
+    """
+    callback_action is called every time when status of some command is
+    updated
+    """
+    self.current_state = {} # Contains all statuses
+    self.callback_action = callback_action
+    self.lock = threading.RLock()
+
+
+  def put_command_status(self, command, new_report):
+    """
+    Stores new version of report for command (replaces previous)
+    """
+    if 'taskId' in command:
+      key = command['taskId']
+    else: # Status command reports has no task id
+      key = id(command)
+    with self.lock: # Synchronized
+      self.current_state[key] = (command, new_report)
+    self.callback_action()
+
+
+  def generate_report(self):
+    """
+    Generates status reports about commands that are IN_PROGRESS, COMPLETE or
+    FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
+    generation
+    """
+    from ActionQueue import ActionQueue
+    with self.lock: # Synchronized
+      resultReports = []
+      resultComponentStatus = []
+      for key, item in self.current_state.items():
+        command = item[0]
+        report = item[1]
+        if command ['commandType'] == ActionQueue.EXECUTION_COMMAND:
+          if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
+            resultReports.append(report)
+            # Removing complete/failed command status from dict
+            del self.current_state[key]
+          else:
+            in_progress_report = self.generate_in_progress_report(command, report)
+            resultReports.append(in_progress_report)
+        elif command ['commandType'] == ActionQueue.STATUS_COMMAND:
+          resultComponentStatus.append(report)
+          # Component status is useful once, removing it
+          del self.current_state[key]
+      result = {
+        'reports': resultReports,
+        'componentStatus': resultComponentStatus
+      }
+      return result
+
+
+  def generate_in_progress_report(self, command, report):
+    """
+    Reads stdout/stderr for IN_PROGRESS command from disk file
+    and populates other fields of report.
+    """
+    from ActionQueue import ActionQueue
+    try:
+      tmpout = open(report['tmpout'], 'r').read()
+      tmperr = open(report['tmperr'], 'r').read()
+    except Exception, err:
+      logger.warn(err)
+      tmpout = '...'
+      tmperr = '...'
+    grep = Grep()
+    output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
+    inprogress = self.generate_report_template(command)
+    inprogress.update({
+      'stdout': grep.filterMarkup(output),
+      'stderr': tmperr,
+      'exitCode': 777,
+      'status': ActionQueue.IN_PROGRESS_STATUS,
+    })
+    return inprogress
+
+
+  def generate_report_template(self, command):
+    """
+    Generates stub dict for command.
+    Other fields should be populated manually
+    """
+    stub = {
+      'role': command['role'],
+      'actionId': command['commandId'],
+      'taskId': command['taskId'],
+      'clusterName': command['clusterName'],
+      'serviceName': command['serviceName'],
+      'roleCommand': command['roleCommand']
+    }
+    return stub
+
+

+ 13 - 11
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -60,6 +60,9 @@ class Controller(threading.Thread):
     self.cachedconnect = None
     self.range = range
     self.hasMappedComponents = True
+    # Event is used for synchronizing heartbeat iterations (to make possible
+    # manual wait() interruption between heartbeats )
+    self.heartbeat_wait_event = threading.Event()
 
   def __del__(self):
     logger.info("Server connection disconnected.")
@@ -112,12 +115,7 @@ class Controller(threading.Thread):
       logger.debug("No commands from the server : " + pprint.pformat(commands))
     else:
       """Only add to the queue if not empty list """
-      for command in commands:
-        logger.debug("Adding command to the action queue: \n" +\
-                     pprint.pformat(command))
-        self.actionQueue.put(command)
-        pass
-      pass
+      self.actionQueue.put(commands)
     pass
 
   # For testing purposes
@@ -188,6 +186,7 @@ class Controller(threading.Thread):
         certVerifFailed = False
         self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
         self.DEBUG_HEARTBEAT_RETRIES = 0
+        self.heartbeat_wait_event.clear()
       except ssl.SSLError:
         self.repeatRegistration=False
         return
@@ -207,14 +206,17 @@ class Controller(threading.Thread):
             certVerifFailed = True
         self.cachedconnect = None # Previous connection is broken now
         retry=True
-      if self.actionQueue.isIdle():
-        time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
-      else:
-        time.sleep(self.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC)
+      # Sleep for some time
+      timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
+                - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
+      self.heartbeat_wait_event.wait(timeout = timeout)
+      # Sleep a bit more to allow STATUS_COMMAND results to be collected
+      # and sent in one heartbeat. Also avoid server overload with heartbeats
+      time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
     pass
 
   def run(self):
-    self.actionQueue = ActionQueue(self.config)
+    self.actionQueue = ActionQueue(self.config, controller=self)
     self.actionQueue.start()
     self.register = Register(self.config)
     self.heartbeat = Heartbeat(self.actionQueue, self.config)

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -55,7 +55,7 @@ class Heartbeat:
                 }
 
     commandsInProgress = False
-    if self.actionQueue.commandQueue.empty() == False:
+    if self.actionQueue.commandQueue.is_action_group_available():
       commandsInProgress = True
     if len(queueResult) != 0:
       heartbeat['reports'] = queueResult['reports']

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/NetUtil.py

@@ -25,7 +25,7 @@ class NetUtil:
 
   CONNECT_SERVER_RETRY_INTERVAL_SEC = 10
   HEARTBEAT_IDDLE_INTERVAL_SEC = 10
-  HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 5
+  MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
 
   # Url within server to request during status check. This url
   # should return HTTP code 200

+ 6 - 7
ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py

@@ -40,12 +40,7 @@ class PuppetExecutor:
   """ Class that executes the commands that come from the server using puppet.
   This is the class that provides the pluggable point for executing the puppet"""
 
-  # How many seconds will pass before running puppet is terminated on timeout
-  PUPPET_TIMEOUT_SECONDS = 600
   grep = Grep()
-  event = threading.Event()
-  last_puppet_has_been_killed = False
-
   NO_ERROR = "none"
 
   def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config):
@@ -56,7 +51,10 @@ class PuppetExecutor:
     self.reposInstalled = False
     self.config = config
     self.modulesdir = self.puppetModule + "/modules"
+    self.event = threading.Event()
+    self.last_puppet_has_been_killed = False
     self.sh = shellRunner()
+    self.puppet_timeout = config.get("puppet", "timeout_seconds")
 
   def configureEnviron(self, environ):
     if not self.config.has_option("puppet", "ruby_home"):
@@ -221,7 +219,8 @@ class PuppetExecutor:
       result["stderr"] = str(error)
     puppetOutput = open(tmpoutfile, 'r').read()
     logger.debug("Output from puppet :\n" + puppetOutput)
-    logger.info("Puppet exit code is " + str(returncode))
+    logger.info("Puppet execution process with pid %s exited with code %s." %
+                (str(puppet.pid), str(returncode)))
     if result.has_key("exitcode"):
       result["exitcode"] = max(returncode, result["exitcode"])
     else:
@@ -244,7 +243,7 @@ class PuppetExecutor:
       env=puppetEnv)
 
   def puppet_watchdog_func(self, puppet):
-    self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
+    self.event.wait(float(self.puppet_timeout))
     if puppet.returncode is None:
       logger.error("Task timed out, killing process with PID: " + str(puppet.pid))
       shell.kill_process_with_children(puppet.pid)

+ 180 - 0
ambari-agent/src/test/python/TestActionDependencyManager.py

@@ -0,0 +1,180 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.ActionDependencyManager import ActionDependencyManager
+import os, errno, time, pprint, tempfile, threading, sys
+from mock.mock import patch, MagicMock, call
+
+class TestActionDependencyManager(TestCase):
+
+  dummy_RCO_file = os.path.join('dummy_files', 'test_rco_data.json')
+
+  def setUp(self):
+    self.config = AmbariConfig().getConfig()
+    self.config.set('agent', 'prefix', os.getcwd())
+    ActionDependencyManager.DEPS_FILE_NAME = self.dummy_RCO_file
+
+  # TODO: disabled for now
+  def disabled_test_init(self):
+    """
+    Tests config load
+    """
+    adm = ActionDependencyManager(self.config)
+    deps_dump = pprint.pformat(adm.deps)
+    expected = "{u'DATANODE-STOP': [u'JOBTRACKER-STOP',\n                    " \
+               "u'TASKTRACKER-STOP',\n                    " \
+               "u'RESOURCEMANAGER-STOP',\n                    " \
+               "u'NODEMANAGER-STOP',\n                    " \
+               "u'HISTORYSERVER-STOP',\n                    " \
+               "u'HBASE_MASTER-STOP'],\n u'HBASE_MASTER-START': " \
+               "[u'PEERSTATUS-START'],\n u'JOBTRACKER-START': " \
+               "[u'PEERSTATUS-START'],\n u'RESOURCEMANAGER-START': " \
+               "[u'NAMENODE-START', u'DATANODE-START'],\n " \
+               "u'SECONDARY_NAMENODE-START': [u'DATANODE-START', " \
+               "u'NAMENODE-START'],\n u'SECONDARY_NAMENODE-UPGRADE': " \
+               "[u'NAMENODE-UPGRADE']}"
+    self.assertEqual(deps_dump, expected)
+
+
+  def test_is_action_group_available(self):
+    adm = ActionDependencyManager(self.config)
+    self.assertFalse(adm.is_action_group_available())
+    adm.scheduled_action_groups.put(["test"])
+    self.assertTrue(adm.is_action_group_available())
+
+
+  def test_get_next_action_group(self):
+    adm = ActionDependencyManager(self.config)
+    test1 = ["test1"]
+    test2 = ["test2"]
+    adm.scheduled_action_groups.put(test1)
+    adm.scheduled_action_groups.put(test2)
+    adm.last_scheduled_group = test2
+    self.assertTrue(adm.is_action_group_available())
+    # Taking 1st
+    self.assertEqual(test1, adm.get_next_action_group())
+    self.assertTrue(len(adm.last_scheduled_group) == 1)
+    self.assertTrue(adm.is_action_group_available())
+    # Taking 2nd
+    self.assertEqual(test2, adm.get_next_action_group())
+    self.assertTrue(len(adm.last_scheduled_group) == 0)
+    self.assertTrue(adm.last_scheduled_group is not test2)
+    self.assertFalse(adm.is_action_group_available())
+
+
+  @patch.object(ActionDependencyManager, "dump_info")
+  @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
+  def test_put_action(self, can_be_executed_in_parallel_mock, dump_info_mock):
+    can_be_executed_in_parallel_mock.side_effect = [True, False, True, False,
+                                                     True, True, True, False]
+    adm = ActionDependencyManager(self.config)
+
+    adm.put_actions(list(range(0, 8)))
+
+    queue = []
+    while adm.is_action_group_available():
+      next = adm.get_next_action_group()
+      queue.append(next)
+
+    str = pprint.pformat(queue)
+    expected = "[[0], [1, 2], [3, 4, 5, 6], [7]]"
+    self.assertEqual(str, expected)
+
+
+  # TODO: disabled for now
+  def disabled_test_can_be_executed_in_parallel(self):
+    adm = ActionDependencyManager(self.config)
+    # empty group
+    group = []
+    install_command = {
+      'role': 'DATANODE',
+      'roleCommand': 'INSTALL',
+      'commandType': ActionQueue.EXECUTION_COMMAND
+    }
+    upgrade_command = {
+      'role': 'DATANODE',
+      'roleCommand': 'UPGRADE',
+      'commandType': ActionQueue.EXECUTION_COMMAND
+    }
+    start_command = {
+      'role': 'DATANODE',
+      'roleCommand': 'START',
+      'commandType': ActionQueue.EXECUTION_COMMAND
+    }
+    stop_command = {
+      'role': 'DATANODE',
+      'roleCommand': 'STOP',
+      'commandType': ActionQueue.EXECUTION_COMMAND
+    }
+    status_command = {
+      'commandType': ActionQueue.STATUS_COMMAND
+    }
+    rm_start_command = {
+      'role': 'RESOURCEMANAGER',
+      'roleCommand': 'START',
+      'commandType': ActionQueue.EXECUTION_COMMAND
+    }
+    hm_start_command = {
+      'role': 'HBASE_MASTER',
+      'roleCommand': 'START',
+      'commandType': ActionQueue.EXECUTION_COMMAND
+    }
+    self.assertTrue(adm.can_be_executed_in_parallel(install_command, group))
+    self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
+    # multiple status commands
+    group = []
+    for i in range(0, 3):
+      group.append(status_command)
+    self.assertTrue(adm.can_be_executed_in_parallel(status_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
+    # new status command
+    group = [install_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
+    # install/upgrade commands
+    group = [install_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
+    group = [upgrade_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
+    # Other commands
+    group = [start_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(install_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group))
+    self.assertFalse(adm.can_be_executed_in_parallel(status_command, group))
+    self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
+    # Check dependency processing
+    group = [start_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(rm_start_command, group))
+    group = [start_command]
+    self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group))
+    # actions for the same component
+    group = [start_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(stop_command, group))
+    group = [stop_command]
+    self.assertFalse(adm.can_be_executed_in_parallel(start_command, group))
+

+ 341 - 285
ambari-agent/src/test/python/TestActionQueue.py

@@ -19,140 +19,46 @@ limitations under the License.
 '''
 
 from unittest import TestCase
+from ambari_agent.LiveStatus import LiveStatus
+from ambari_agent.PuppetExecutor import PuppetExecutor
 from ambari_agent.ActionQueue import ActionQueue
-import ambari_agent.ActionQueue as AQM
 from ambari_agent.AmbariConfig import AmbariConfig
-from ambari_agent.UpgradeExecutor import UpgradeExecutor
-from ambari_agent.PuppetExecutor import PuppetExecutor
-from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
-from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from ambari_agent.ActionDependencyManager import ActionDependencyManager
 import os, errno, time, pprint, tempfile, threading
-import TestStackVersionsFileHandler
+import StringIO
+import sys
+from threading import Thread
 
 from mock.mock import patch, MagicMock, call
+from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
+from ambari_agent.UpgradeExecutor import UpgradeExecutor
 
-class TestActionQueue(TestCase):
-  def test_ActionQueueStartStop(self):
-    actionQueue = ActionQueue(AmbariConfig().getConfig())
-    actionQueue.IDLE_SLEEP_TIME = 0.01
-    actionQueue.start()
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-
-  def test_command_in_progress(self):
-    config = AmbariConfig().getConfig()
-    tmpfile = tempfile.gettempdir()
-    config.set('agent', 'prefix', tmpfile)
-    actionQueue = ActionQueue(config)
-    actionQueue.IDLE_SLEEP_TIME = 0.01
-    executor_started_event = threading.Event()
-    end_executor_event = threading.Event()
-    actionQueue.puppetExecutor = FakeExecutor(executor_started_event, end_executor_event)
-    before_start_result = actionQueue.result()
-
-    command = {
-      'commandId': 17,
-      'role' : "role",
-      'taskId' : "taskId",
-      'clusterName' : "clusterName",
-      'serviceName' : "serviceName",
-      'status' : 'IN_PROGRESS',
-      'hostname' : "localhost.localdomain",
-      'hostLevelParams': "hostLevelParams",
-      'clusterHostInfo': "clusterHostInfo",
-      'roleCommand': "roleCommand",
-      'configurations': "configurations",
-      'commandType': "EXECUTION_COMMAND",
-      'configurations':{'global' : {}}
-    }
-    actionQueue.put(command)
-
-    actionQueue.start()
-    executor_started_event.wait()
-    #print ("ii: " + pprint.pformat(actionQueue.commandInProgress))
-    in_progress_result = actionQueue.result()
-    end_executor_event.set()
-    actionQueue.stop()
-    actionQueue.join()
-    after_start_result = actionQueue.result()
-
-    self.assertEquals(len(before_start_result['componentStatus']), 0)
-    self.assertEquals(len(before_start_result['reports']), 0)
-
-    self.assertEquals(len(in_progress_result['componentStatus']), 0)
-    self.assertEquals(len(in_progress_result['reports']), 1)
-    self.assertEquals(in_progress_result['reports'][0]['status'], "IN_PROGRESS")
-    self.assertEquals(in_progress_result['reports'][0]['stdout'], "Dummy output")
-    self.assertEquals(in_progress_result['reports'][0]['exitCode'], 777)
-    self.assertEquals(in_progress_result['reports'][0]['stderr'], 'Dummy err')
-
-    self.assertEquals(len(after_start_result['componentStatus']), 0)
-    self.assertEquals(len(after_start_result['reports']), 1)
-    self.assertEquals(after_start_result['reports'][0]['status'], "COMPLETED")
-    self.assertEquals(after_start_result['reports'][0]['stdout'], "returned stdout")
-    self.assertEquals(after_start_result['reports'][0]['exitCode'], 0)
-    self.assertEquals(after_start_result['reports'][0]['stderr'], 'returned stderr')
-
-    #print("tmpout: " + pprint.pformat(actionQueue.tmpdir))
-    #print("before: " + pprint.pformat(before_start_result))
-    #print("in_progress: " + pprint.pformat(in_progress_result))
-    #print("after: " + pprint.pformat(after_start_result))
-
-  def test_configtags(self):
-    config = AmbariConfig().getConfig()
-    tmpfile = tempfile.gettempdir()
-    config.set('agent', 'prefix', tmpfile)
-    actionQueue = ActionQueue(config)
-    actionQueue.IDLE_SLEEP_TIME = 0.01
-    executor_started_event = threading.Event()
-    end_executor_event = threading.Event()
-    actionQueue.puppetExecutor = FakeExecutor(executor_started_event, end_executor_event)
-
-    command = {
-      'commandId': 17,
-      'role' : "role",
-      'taskId' : "taskId",
-      'clusterName' : "clusterName",
-      'serviceName' : "serviceName",
-      'status' : 'IN_PROGRESS',
-      'hostname' : "localhost.localdomain",
-      'hostLevelParams': "hostLevelParams",
-      'clusterHostInfo': "clusterHostInfo",
-      'roleCommand': "roleCommand",
-      'configurations': "configurations",
-      'commandType': "EXECUTION_COMMAND",
-      'configurations':{'global' : {}},
-      'configurationTags':{'global' : { 'tag': 'v1' }}
-    }
-    actionQueue.put(command)
-
-    actionQueue.start()
-    executor_started_event.wait()
-
-    end_executor_event.set()
-    actionQueue.stop()
-    actionQueue.join()
-    after_start_result = actionQueue.result()
-
-    configname = os.path.join(tmpfile, 'config.json')
-
-    self.assertEquals(len(after_start_result['componentStatus']), 0)
-    self.assertEquals(len(after_start_result['reports']), 1)
-    self.assertEquals(after_start_result['reports'][0]['status'], "COMPLETED")
-    self.assertEquals(after_start_result['reports'][0]['stdout'], "returned stdout")
-    self.assertEquals(after_start_result['reports'][0]['exitCode'], 0)
-    self.assertEquals(after_start_result['reports'][0]['stderr'], 'returned stderr')
-    self.assertEquals(len(after_start_result['reports'][0]['configurationTags']), 1)
-    self.assertEquals(True, os.path.isfile(configname))
 
-    os.remove(configname)
+class TestActionQueue(TestCase):
 
-  @patch.object(ActionQueue, "executeCommand")
-  @patch.object(ActionQueue, "stopped")
-  def test_upgradeCommand_dispatching(self, stopped_method, executeCommand_method):
-    queue = ActionQueue(config = MagicMock())
-    command = {
+  def setUp(self):
+    out = StringIO.StringIO()
+    sys.stdout = out
+    # save original open() method for later use
+    self.original_open = open
+
+
+  def tearDown(self):
+    sys.stdout = sys.__stdout__
+
+  datanode_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 3,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v1' }}
+  }
+
+  datanode_upgrade_command = {
       'commandId': 17,
       'role' : "role",
       'taskId' : "taskId",
@@ -162,7 +68,6 @@ class TestActionQueue(TestCase):
       'hostname' : "localhost.localdomain",
       'hostLevelParams': "hostLevelParams",
       'clusterHostInfo': "clusterHostInfo",
-      'configurations': "configurations",
       'commandType': "EXECUTION_COMMAND",
       'configurations':{'global' : {}},
       'roleParams': {},
@@ -171,169 +76,320 @@ class TestActionQueue(TestCase):
         'target_stack_version' : 'HDP-1.3.0'
       }
     }
-    result = [{
-      'exitcode' : 0,
-      'stdout'   : 'abc',
-      'stderr'   : 'def'
-    }]
-    executeCommand_method.return_value = result
-    stopped_method.side_effect = [False, False, True, True, True]
-    queue.stopped = stopped_method
-    queue.IDLE_SLEEP_TIME = 0.001
-    queue.put(command)
-    queue.run()
-    self.assertTrue(executeCommand_method.called)
-    self.assertEquals(queue.resultQueue.qsize(), 1)
-    returned_result = queue.resultQueue.get()
-    self.assertTrue(returned_result[1] is result[0])
-
-
-  @patch.object(UpgradeExecutor, "perform_stack_upgrade")
-  @patch.object(PuppetExecutor, "runCommand")
-  @patch.object(ActualConfigHandler, "findRunDir")
-  def test_upgradeCommand_executeCommand(self, action_conf_handler_findRunDir_method,
-                                         puppet_executor_run_command_method, perform_stack_upgrade_method):
-    queue = ActionQueue(config = MagicMock())
-    command = {
-      'commandId': 17,
-      'role' : "role",
-      'taskId' : "taskId",
-      'clusterName' : "clusterName",
-      'serviceName' : "serviceName",
-      'roleCommand' : 'UPGRADE',
-      'hostname' : "localhost.localdomain",
-      'hostLevelParams': "hostLevelParams",
-      'clusterHostInfo': "clusterHostInfo",
-      'configurations': "configurations",
-      'commandType': "EXECUTION_COMMAND",
-      'configurations':{'global' : {}},
-      'roleParams': {},
-      'commandParams' :	{
-        'source_stack_version' : 'HDP-1.2.1',
-        'target_stack_version' : 'HDP-1.3.0'
-      }
+
+  namenode_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'NAMENODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 4,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
     }
 
-    upgrade_method_return_value = {'exitcode' : 0,
-                                    'stdout'   : 'abc',
-                                    'stderr'   : 'def'}
-
-    perform_stack_upgrade_method.return_value = upgrade_method_return_value
-
-    result = queue.executeCommand(command)
-    expected_result = [{'actionId': 17,
-                        'clusterName': 'clusterName',
-                        'exitCode': 0,
-                        'role': 'role',
-                        'serviceName': 'serviceName',
-                        'status': 'COMPLETED',
-                        'stderr': 'def',
-                        'stdout': 'abc',
-                        'taskId': 'taskId',
-                        'roleCommand': 'UPGRADE'}]
-    self.assertEquals(result, expected_result)
-
-    puppet_executor_run_command_method.return_value = {'exitcode' : 0,
-                                                       'stdout'   : 'abc',
-                                                       'stderr'   : 'def'}
-
-    command['roleCommand'] = 'START'
-    action_conf_handler_findRunDir_method.return_value = AmbariConfig().getConfig().get("stack", "installprefix")
-    expected_result[0]['configurationTags'] = None
-    expected_result[0]['roleCommand'] = 'START'
-    result = queue.executeCommand(command)
-    self.assertEquals(result, expected_result)
-
-    #--------------------------------------------
-    command['roleCommand'] = 'UPGRADE'
-
-    upgrade_method_return_value['exitcode'] = 1
-    upgrade_method_return_value['stdout'] = ''
-    upgrade_method_return_value['stderr'] = ''
-
-    perform_stack_upgrade_method.return_value = upgrade_method_return_value
-    result = queue.executeCommand(command)
-
-    expected_result[0]['roleCommand'] = 'UPGRADE'
-    del expected_result[0]['configurationTags']
-    expected_result[0]['exitCode'] = 1
-    expected_result[0]['stderr'] = 'None'
-    expected_result[0]['stdout'] = 'None'
-    expected_result[0]['status'] = 'FAILED'
-    self.assertEquals(result, expected_result)
-
-
-  @patch.object(ActionQueue, "stopped")
-  @patch.object(AQM.logger, "warn")
-  def test_run_unrecognized_command(self, logger_method, stopped_method):
-    config = AmbariConfig().getConfig()
-    actionQueue = ActionQueue(config)
-    command = {
-        "serviceName" : 'HDFS',
-        "commandType" : "SOME_UNRECOGNIZED_COMMAND",
-        "clusterName" : "",
-        "componentName" : "DATANODE",
-        'configurations':{}
+  snamenode_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'SECONDARY_NAMENODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 5,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
     }
-    actionQueue.commandQueue.put(command)
-    actionQueue.stopped = stopped_method
-    stopped_method.side_effect = [False, False, True, True, True]
-    actionQueue.IDLE_SLEEP_TIME = 0.001
-    actionQueue.run()
-    self.assertTrue(logger_method.call_args[0][0].startswith('Unrecognized command'))
 
+  nagios_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'NAGIOS',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 6,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    }
 
-  @patch.object(StackVersionsFileHandler, "read_stack_version")
-  @patch.object(ActionQueue, "stopped")
-  def test_status_command_without_globals_section(self, stopped_method,
-                                                  read_stack_version_method):
-    config = AmbariConfig().getConfig()
-    config.set('agent', 'prefix', TestStackVersionsFileHandler.dummyVersionsFile)
-    queue = ActionQueue(config)
-    statusCommand = {
-      "serviceName" : 'HDFS',
-      "commandType" : "STATUS_COMMAND",
-      "clusterName" : "",
-      "componentName" : "DATANODE",
-      'configurations':{}
+  hbase_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'HBASE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 7,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
     }
-    queue.stopped = stopped_method
-    stopped_method.side_effect = [False, False, True, True, True]
-    read_stack_version_method.return_value="1.3.0"
-    queue.IDLE_SLEEP_TIME = 0.001
-    queue.put(statusCommand)
-    queue.run()
-    returned_result = queue.resultQueue.get()
-    returned_result[1]['status'] = 'INSTALLED' # Patch live value
-    self.assertEquals(returned_result, ('STATUS_COMMAND',
-                                        {'clusterName': '',
-                                         'componentName': 'DATANODE',
-                                         'msg': '',
-                                         'serviceName': 'HDFS',
-                                         'stackVersion': '1.3.0',
-                                         'status': 'INSTALLED'}))
-
-
-class FakeExecutor():
-
-  def __init__(self, executor_started_event, end_executor_event):
-    self.executor_started_event = executor_started_event
-    self.end_executor_event = end_executor_event
-    pass
 
-  def runCommand(self, command, tmpoutpath, tmperrpath):
-    tmpout= open(tmpoutpath, 'w')
-    tmpout.write("Dummy output")
-    tmpout.flush()
-
-    tmperr= open(tmperrpath, 'w')
-    tmperr.write("Dummy err")
-    tmperr.flush()
-
-    self.executor_started_event.set()
-    self.end_executor_event.wait()
-    return {
-      "exitcode": 0,
-      "stdout": "returned stdout",
-      "stderr": "returned stderr"
+  status_command = {
+    "serviceName" : 'HDFS',
+    "commandType" : "STATUS_COMMAND",
+    "clusterName" : "",
+    "componentName" : "DATANODE",
+    'configurations':{}
+  }
+
+
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionDependencyManager, "get_next_action_group")
+  @patch.object(ActionQueue, "process_portion_of_actions")
+  def test_ActionQueueStartStop(self, process_portion_of_actions_mock,
+                          get_next_action_group_mock, read_dependencies_mock):
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    actionQueue.start()
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertTrue(get_next_action_group_mock.call_count > 1)
+    self.assertTrue(process_portion_of_actions_mock.call_count > 1)
+
+
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(ActionQueue, "execute_status_command")
+  def test_process_portion_of_actions(self, execute_status_command_mock,
+            executeCommand_mock, read_dependencies_mock):
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    # Test execution of EXECUTION_COMMANDs
+    max = 3
+    actionQueue.MAX_CONCURRENT_ACTIONS = max
+    unfreeze_flag = threading.Event()
+    sync_lock = threading.RLock()
+    stats = {
+      'waiting_threads' : 0
     }
+    def side_effect(self):
+      with sync_lock: # Synchtonized to avoid race effects during test execution
+        stats['waiting_threads'] += 1
+      unfreeze_flag.wait()
+    executeCommand_mock.side_effect = side_effect
+    portion = [self.datanode_install_command,
+               self.namenode_install_command,
+               self.snamenode_install_command,
+               self.nagios_install_command,
+               self.hbase_install_command]
+
+    # We call method in a separate thread
+    action_thread = Thread(target =  actionQueue.process_portion_of_actions, args = (portion, ))
+    action_thread.start()
+    # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running
+    while stats['waiting_threads'] != max:
+      time.sleep(0.1)
+    self.assertEqual(stats['waiting_threads'], max)
+    # unfreezing waiting threads
+    unfreeze_flag.set()
+    # wait until all threads are finished
+    action_thread.join()
+    self.assertTrue(executeCommand_mock.call_count == 5)
+    self.assertFalse(execute_status_command_mock.called)
+    executeCommand_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+
+    # Test execution of STATUS_COMMANDs
+    n = 5
+    portion = []
+    for i in range(0, n):
+      status_command = {
+        'componentName': 'DATANODE',
+        'commandType': 'STATUS_COMMAND',
+      }
+      portion.append(status_command)
+    actionQueue.process_portion_of_actions(portion)
+    self.assertTrue(execute_status_command_mock.call_count == n)
+    self.assertFalse(executeCommand_mock.called)
+
+    # Test execution of unknown command
+    unknown_command = {
+      'commandType': 'WRONG_COMMAND',
+    }
+    portion = [unknown_command]
+    actionQueue.process_portion_of_actions(portion)
+    # no exception expected
+    pass
+
+
+  @patch("traceback.print_exc")
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionQueue, "execute_command")
+  def test_execute_command_safely(self, execute_command_mock,
+                                  read_dependencies_mock, print_exc_mock):
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    # Try normal execution
+    actionQueue.execute_command_safely('command')
+    # Try exception ro check proper logging
+    def side_effect(self):
+      raise Exception("TerribleException")
+    execute_command_mock.side_effect = side_effect
+    actionQueue.execute_command_safely('command')
+    self.assertTrue(print_exc_mock.called)
+
+
+  @patch("__builtin__.open")
+  @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  def test_execute_command(self, read_dependencies_mock,
+                           status_update_callback_mock, open_mock):
+    # Make file read calls visible
+    def open_side_effect(file, mode):
+      if mode == 'r':
+        file_mock = MagicMock()
+        file_mock.read.return_value = "Read from " + str(file)
+        return file_mock
+      else:
+        return self.original_open(file, mode)
+    open_mock.side_effect = open_side_effect
+
+    config = AmbariConfig().getConfig()
+    tmpfile = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpfile)
+    actionQueue = ActionQueue(config, 'dummy_controller')
+    unfreeze_flag = threading.Event()
+    puppet_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      }
+    def side_effect(command, tmpoutfile, tmperrfile):
+      unfreeze_flag.wait()
+      return puppet_execution_result_dict
+    def patched_aq_execute_command(command):
+      # We have to perform patching for separate thread in the same thread
+      with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
+        with patch.object(UpgradeExecutor, "perform_stack_upgrade") \
+              as perform_stack_upgrade_mock:
+          runCommand_mock.side_effect = side_effect
+          perform_stack_upgrade_mock.side_effect = side_effect
+          actionQueue.execute_command(command)
+    ### Test install/start/stop command ###
+    ## Test successful execution with configuration tags
+    puppet_execution_result_dict['status'] = 'COMPLETE'
+    puppet_execution_result_dict['exitcode'] = 0
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_install_command, ))
+    execution_thread.start()
+    #  check in progress report
+    # wait until ready
+    while True:
+      time.sleep(0.1)
+      report = actionQueue.result()
+      if len(report['reports']) != 0:
+        break
+    expected = {'status': 'IN_PROGRESS',
+                'stderr': 'Read from /tmp/errors-3.txt',
+                'stdout': 'Read from /tmp/output-3.txt',
+                'clusterName': u'cc',
+                'roleCommand': u'INSTALL',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 3,
+                'exitCode': 777}
+    self.assertEqual(report['reports'][0], expected)
+    # Continue command execution
+    unfreeze_flag.set()
+    # wait until ready
+    while report['reports'][0]['status'] == 'IN_PROGRESS':
+      time.sleep(0.1)
+      report = actionQueue.result()
+    # check report
+    configname = os.path.join(tmpfile, 'config.json')
+    expected = {'status': 'COMPLETED',
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'configurationTags': {'global': {'tag': 'v1'}},
+                'roleCommand': u'INSTALL',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 3,
+                'exitCode': 0}
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(report['reports'][0], expected)
+    self.assertTrue(os.path.isfile(configname))
+    # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
+    self.assertEqual(status_update_callback_mock.call_count, 2)
+    os.remove(configname)
+
+    # now should not have reports (read complete/failed reports are deleted)
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']), 0)
+
+    ## Test failed execution
+    puppet_execution_result_dict['status'] = 'FAILED'
+    puppet_execution_result_dict['exitcode'] = 13
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_install_command, ))
+    execution_thread.start()
+    unfreeze_flag.set()
+    #  check in progress report
+    # wait until ready
+    report = actionQueue.result()
+    while len(report['reports']) == 0 or \
+                    report['reports'][0]['status'] == 'IN_PROGRESS':
+      time.sleep(0.1)
+      report = actionQueue.result()
+      # check report
+    expected = {'status': 'FAILED',
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'roleCommand': u'INSTALL',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 3,
+                'exitCode': 13}
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(report['reports'][0], expected)
+
+    # now should not have reports (read complete/failed reports are deleted)
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']), 0)
+
+    ### Test upgrade command ###
+    puppet_execution_result_dict['status'] = 'COMPLETE'
+    puppet_execution_result_dict['exitcode'] = 0
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_upgrade_command, ))
+    execution_thread.start()
+    unfreeze_flag.set()
+    # wait until ready
+    report = actionQueue.result()
+    while len(report['reports']) == 0 or \
+                    report['reports'][0]['status'] == 'IN_PROGRESS':
+      time.sleep(0.1)
+      report = actionQueue.result()
+    # check report
+    expected = {'status': 'COMPLETED',
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': 'clusterName',
+                'roleCommand': 'UPGRADE',
+                'serviceName': 'serviceName',
+                'role': 'role',
+                'actionId': 17,
+                'taskId': 'taskId',
+                'exitCode': 0}
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(report['reports'][0], expected)
+
+    # now should not have reports (read complete/failed reports are deleted)
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']), 0)
+
+
+  @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(LiveStatus, "build")
+  def test_execute_status_command(self, build_mock, execute_command_mock,
+                                  read_dependencies_mock, read_stack_version_mock,
+                                  status_update_callback):
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    build_mock.return_value = "dummy report"
+    # Try normal execution
+    actionQueue.execute_status_command(self.status_command)
+    report = actionQueue.result()
+    expected = 'dummy report'
+    self.assertEqual(len(report['componentStatus']), 1)
+    self.assertEqual(report['componentStatus'][0], expected)

+ 125 - 0
ambari-agent/src/test/python/TestCommandStatusDict.py

@@ -0,0 +1,125 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import tempfile
+from unittest import TestCase
+from ambari_agent.CommandStatusDict import CommandStatusDict
+import os
+import logging
+import json, pprint
+from mock.mock import patch, MagicMock, call
+
+class TestCommandStatusDict(TestCase):
+
+  logger = logging.getLogger()
+
+  def test_put_and_generate(self):
+    callback_mock = MagicMock()
+    commandStatuses = CommandStatusDict(callback_action = callback_mock)
+    command_in_progress1 = {
+      'commandType': 'EXECUTION_COMMAND',
+      'commandId': '1-1',
+      'clusterName': u'cc',
+      'exitCode': 777,
+      'role': u'DATANODE',
+      'roleCommand': u'INSTALL',
+      'serviceName': u'HDFS',
+      'stderr': '',
+      'stdout': "notice: /Stage[1]/Hdp::Iptables/Service[iptables]/ensure: ensure changed 'running' to 'stopped'\nnotice: /Stage[1]/Hdp/File[/tmp/changeUid.sh]/ensure: defined content as '{md5}32b994a2e970f8acc3c91c198b484654'\nnotice: /Stage[1]/Hdp::Snappy::Package/Hdp::Package[snappy]/Hdp::Package::Process_pkg[snappy]/Package[snappy]/ensure: created\nnotice: /Stage[1]/Hdp/Hdp::Group[nagios_group]/Group[nagios_group]/ensure: created\nnotice: /Stage[1]/Hdp/Hdp::User[nagios_user]/User[nagios]/ensure: created\nnotice: /Stage[1]/Hdp::Snmp/Hdp::Package[snmp]/Hdp::Package::Process_pkg[snmp]/Package[net-snmp-utils]/ensure: created",
+      'taskId': 5
+    }
+    command_in_progress1_report = {
+      'status': 'IN_PROGRESS',
+      'taskId': 5
+    }
+    command_in_progress2 = {
+      'commandType': 'EXECUTION_COMMAND',
+      'commandId': '1-1',
+      'role': u'DATANODE',
+      'roleCommand': u'INSTALL',
+      'taskId': 6,
+      'clusterName': u'cc',
+      'serviceName': u'HDFS',
+    }
+    command_in_progress2_report = {
+      'status': 'IN_PROGRESS',
+      'taskId': 6
+    }
+    finished_command = {
+      'commandType': 'EXECUTION_COMMAND',
+      'role': u'DATANODE',
+      'roleCommand': u'INSTALL',
+      'commandId': '1-1',
+      'taskId': 4,
+      'clusterName': u'cc',
+      'serviceName': u'HDFS',
+    }
+    finished_command_report = {
+      'status': 'COMPLETE',
+      'taskId': 4,
+    }
+    failed_command = {
+      'commandType': 'EXECUTION_COMMAND',
+      'role': u'DATANODE',
+      'roleCommand': u'INSTALL',
+      'commandId': '1-1',
+      'taskId': 3,
+      'clusterName': u'cc',
+      'serviceName': u'HDFS',
+    }
+    failed_command_report = {
+      'status': 'FAILED',
+      'taskId': 3,
+    }
+    status_command = {
+      'componentName': 'DATANODE',
+      'commandType': 'STATUS_COMMAND',
+    }
+    status_command_report = {
+      'componentName': 'DATANODE',
+      'status': 'HEALTHY'
+    }
+    commandStatuses.put_command_status(command_in_progress1, command_in_progress1_report)
+    commandStatuses.put_command_status(command_in_progress2, command_in_progress2_report)
+    commandStatuses.put_command_status(finished_command, finished_command_report)
+    commandStatuses.put_command_status(failed_command, failed_command_report)
+    commandStatuses.put_command_status(status_command, status_command_report)
+    report = commandStatuses.generate_report()
+    expected = \
+      {'componentStatus': [{'status': 'HEALTHY', 'componentName': 'DATANODE'}],
+       'reports': [{'status': 'FAILED', 'taskId': 3},
+                   {'status': 'COMPLETE', 'taskId': 4},
+                   {'status': 'IN_PROGRESS', 'stderr': '...',
+                    'stdout': '...', 'clusterName': u'cc',
+                    'roleCommand': u'INSTALL', 'serviceName': u'HDFS',
+                    'role': u'DATANODE', 'actionId': '1-1', 'taskId': 5,
+                    'exitCode': 777},
+                   {'status': 'IN_PROGRESS',
+                    'stderr': '...',
+                    'stdout': '...',
+                    'clusterName': u'cc',
+                    'roleCommand': u'INSTALL',
+                    'serviceName': u'HDFS',
+                    'role': u'DATANODE',
+                    'actionId': '1-1',
+                    'taskId': 6,
+                    'exitCode': 777}]
+      }
+    self.assertEquals(report, expected)
+

+ 11 - 5
ambari-agent/src/test/python/TestController.py

@@ -21,12 +21,14 @@ limitations under the License.
 
 import StringIO
 import ssl
-import unittest
+import unittest, threading
 from ambari_agent import Controller, ActionQueue
+from  ambari_agent.ActionDependencyManager import ActionDependencyManager
 from ambari_agent import hostname
 import sys
 from mock.mock import patch, MagicMock, call, Mock
 import logging
+from threading import Event
 
 class TestController(unittest.TestCase):
 
@@ -48,7 +50,7 @@ class TestController(unittest.TestCase):
     config.get.return_value = "something"
 
     self.controller = Controller.Controller(config)
-    self.controller.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC = 0.1
+    self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
     self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
 
 
@@ -150,7 +152,10 @@ class TestController(unittest.TestCase):
   @patch("urllib2.build_opener")
   @patch("urllib2.install_opener")
   @patch.object(ActionQueue.ActionQueue, "run")
-  def test_repeatRegistration(self, run_mock, installMock, buildMock):
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionDependencyManager, "dump_info")
+  def test_repeatRegistration(self, dump_info_mock, read_dependencies_mock,
+                              run_mock, installMock, buildMock):
 
     registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
 
@@ -241,10 +246,11 @@ class TestController(unittest.TestCase):
       {'Content-Type': 'application/json'})
 
 
+  @patch.object(threading._Event, "wait")
   @patch("time.sleep")
   @patch("json.loads")
   @patch("json.dumps")
-  def test_heartbeatWithServer(self, dumpsMock, loadsMock, sleepMock):
+  def test_heartbeatWithServer(self, dumpsMock, loadsMock, sleepMock, event_mock):
 
     out = StringIO.StringIO()
     sys.stdout = out
@@ -366,7 +372,7 @@ class TestController(unittest.TestCase):
     self.controller.heartbeatWithServer()
 
     sleepMock.assert_called_with(
-      self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC)
+      self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
 
     sys.stdout = sys.__stdout__
     self.controller.sendRequest = Controller.Controller.sendRequest

+ 130 - 105
ambari-agent/src/test/python/TestHeartbeat.py

@@ -21,6 +21,7 @@ limitations under the License.
 from unittest import TestCase
 import unittest
 from ambari_agent.Heartbeat import Heartbeat
+from ambari_agent.ActionDependencyManager import ActionDependencyManager
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent import AmbariConfig
@@ -46,8 +47,9 @@ class TestHeartbeat(TestCase):
     sys.stdout = sys.__stdout__
 
 
-  def test_build(self):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  def test_build(self, read_dependencies_mock):
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
     result = heartbeat.build(100)
     print "Heartbeat: " + str(result)
@@ -64,81 +66,132 @@ class TestHeartbeat(TestCase):
     self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
 
 
-  @patch.object(StackVersionsFileHandler, "read_stack_version")
-  def test_heartbeat_with_status(self, read_stack_version_method):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
-    read_stack_version_method.return_value="1.3.0"
-    heartbeat = Heartbeat(actionQueue)
-    statusCommand = {
-      "serviceName" : 'HDFS',
-      "commandType" : "STATUS_COMMAND",
-      "clusterName" : "",
-      "componentName" : "DATANODE",
-      'configurations':{'global' : {}}
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionQueue, "result")
+  @patch.object(ActionDependencyManager, "is_action_group_available")
+  @patch.object(HostInfo, "register")
+  def test_no_mapping(self, register_mock, is_action_group_available_mock, result_mock,
+                      read_dependencies_mock):
+    result_mock.return_value = {
+      'reports': [{'status': 'IN_PROGRESS',
+                   'stderr': 'Read from /tmp/errors-3.txt',
+                   'stdout': 'Read from /tmp/output-3.txt',
+                   'clusterName': u'cc',
+                   'roleCommand': u'INSTALL',
+                   'serviceName': u'HDFS',
+                   'role': u'DATANODE',
+                   'actionId': '1-1',
+                   'taskId': 3,
+                   'exitCode': 777}],
+      'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
     }
-    actionQueue.put(statusCommand)
-    actionQueue.start()
-    time.sleep(0.1)
-    actionQueue.stop()
-    actionQueue.join()
-    result = heartbeat.build(101)
-    self.assertEquals(len(result['componentStatus']) > 0, True, 'Heartbeat should contain status of HDFS components')
-
-  @patch.object(StackVersionsFileHandler, "read_stack_version")
-  def test_heartbeat_with_status_multiple(self, read_stack_version_method):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
-    actionQueue.IDLE_SLEEP_TIME = 0.01
-    read_stack_version_method.return_value="1.3.0"
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
-    actionQueue.start()
-    max_number_of_status_entries = 0
-    for i in range(1,5):
-      statusCommand = {
-        "serviceName" : 'HDFS',
-        "commandType" : "STATUS_COMMAND",
-        "clusterName" : "",
-        "componentName" : "DATANODE",
-        'configurations':{'global' : {}}
-      }
-      actionQueue.put(statusCommand)
-      time.sleep(0.1)
-      result = heartbeat.build(101)
-      number_of_status_entries = len(result['componentStatus'])
-#      print "Heartbeat with status: " + str(result) + " XXX " + str(number_of_status_entries)
-      if max_number_of_status_entries < number_of_status_entries:
-        max_number_of_status_entries = number_of_status_entries
-    actionQueue.stop()
-    actionQueue.join()
-
-    NUMBER_OF_COMPONENTS = 1
-    self.assertEquals(max_number_of_status_entries == NUMBER_OF_COMPONENTS, True)
-
-  @patch.object(HostInfo, 'register')
-  def test_heartbeat_no_host_check_cmd_in_progress(self, register_mock):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
-    actionQueue.commandInProgress= {
-      'role' : "role",
-      'actionId' : "actionId",
-      'taskId' : "taskId",
-      'stdout' : "stdout",
-      'clusterName' : "clusterName",
-      'stderr' : 'none',
-      'exitCode' : 777,
-      'serviceName' : "serviceName",
-      'status' : 'IN_PROGRESS',
-      'configurations':{'global' : {}},
-      'roleCommand' : 'START'
+    hb = heartbeat.build(id = 10, state_interval=1, componentsMapped=True)
+    self.assertEqual(register_mock.call_args_list[0][0][1], True)
+    register_mock.reset_mock()
+
+    hb = heartbeat.build(id = 0, state_interval=1, componentsMapped=True)
+    self.assertEqual(register_mock.call_args_list[0][0][1], False)
+
+
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionQueue, "result")
+  @patch.object(ActionDependencyManager, "is_action_group_available")
+  def test_build_long_result(self, is_action_group_available_mock, result_mock,
+                  read_dependencies_mock):
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+    result_mock.return_value = {
+      'reports': [{'status': 'IN_PROGRESS',
+            'stderr': 'Read from /tmp/errors-3.txt',
+            'stdout': 'Read from /tmp/output-3.txt',
+            'clusterName': u'cc',
+            'roleCommand': u'INSTALL',
+            'serviceName': u'HDFS',
+            'role': u'DATANODE',
+            'actionId': '1-1',
+            'taskId': 3,
+            'exitCode': 777},
+
+            {'status': 'COMPLETED',
+             'stderr': 'stderr',
+             'stdout': 'out',
+             'clusterName': 'clusterName',
+             'roleCommand': 'UPGRADE',
+             'serviceName': 'serviceName',
+             'role': 'role',
+             'actionId': 17,
+             'taskId': 'taskId',
+             'exitCode': 0},
+
+            {'status': 'FAILED',
+             'stderr': 'stderr',
+             'stdout': 'out',
+             'clusterName': u'cc',
+             'roleCommand': u'INSTALL',
+             'serviceName': u'HDFS',
+             'role': u'DATANODE',
+             'actionId': '1-1',
+             'taskId': 3,
+             'exitCode': 13},
+
+            {'status': 'COMPLETED',
+             'stderr': 'stderr',
+             'stdout': 'out',
+             'clusterName': u'cc',
+             'configurationTags': {'global': {'tag': 'v1'}},
+             'roleCommand': u'INSTALL',
+             'serviceName': u'HDFS',
+             'role': u'DATANODE',
+             'actionId': '1-1',
+             'taskId': 3,
+             'exitCode': 0}
+
+            ],
+      'componentStatus': [
+        {'status': 'HEALTHY', 'componentName': 'DATANODE'},
+        {'status': 'UNHEALTHY', 'componentName': 'NAMENODE'},
+      ],
     }
     heartbeat = Heartbeat(actionQueue)
-    heartbeat.build(12, 6)
-    self.assertTrue(register_mock.called)
-    args, kwargs = register_mock.call_args_list[0]
-    self.assertTrue(args[2])
-    self.assertFalse(args[1])
-
+    hb = heartbeat.build(10)
+    hb['hostname'] = 'hostname'
+    hb['timestamp'] = 'timestamp'
+    expected = {'nodeStatus':
+                  {'status': 'HEALTHY',
+                   'cause': 'NONE'},
+                'timestamp': 'timestamp', 'hostname': 'hostname',
+                'responseId': 10, 'reports': [
+      {'status': 'IN_PROGRESS', 'roleCommand': u'INSTALL',
+       'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1',
+       'stderr': 'Read from /tmp/errors-3.txt',
+       'stdout': 'Read from /tmp/output-3.txt', 'clusterName': u'cc',
+       'taskId': 3, 'exitCode': 777},
+      {'status': 'COMPLETED', 'roleCommand': 'UPGRADE',
+       'serviceName': 'serviceName', 'role': 'role', 'actionId': 17,
+       'stderr': 'stderr', 'stdout': 'out', 'clusterName': 'clusterName',
+       'taskId': 'taskId', 'exitCode': 0},
+      {'status': 'FAILED', 'roleCommand': u'INSTALL', 'serviceName': u'HDFS',
+       'role': u'DATANODE', 'actionId': '1-1', 'stderr': 'stderr',
+       'stdout': 'out', 'clusterName': u'cc', 'taskId': 3, 'exitCode': 13},
+      {'status': 'COMPLETED', 'stdout': 'out',
+       'configurationTags': {'global': {'tag': 'v1'}}, 'taskId': 3,
+       'exitCode': 0, 'roleCommand': u'INSTALL', 'clusterName': u'cc',
+       'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1',
+       'stderr': 'stderr'}], 'componentStatus': [
+      {'status': 'HEALTHY', 'componentName': 'DATANODE'},
+      {'status': 'UNHEALTHY', 'componentName': 'NAMENODE'}]}
+    self.assertEquals(hb, expected)
+
+
+  @patch.object(ActionDependencyManager, "read_dependencies")
+  @patch.object(ActionDependencyManager, "dump_info")
+  @patch.object(ActionDependencyManager, "can_be_executed_in_parallel")
   @patch.object(HostInfo, 'register')
-  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock,
+      can_be_executed_in_parallel_mock, dump_info_mock, read_dependencies_mock):
+    can_be_executed_in_parallel_mock.return_value = False
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     statusCommand = {
       "serviceName" : 'HDFS',
       "commandType" : "STATUS_COMMAND",
@@ -146,7 +199,7 @@ class TestHeartbeat(TestCase):
       "componentName" : "DATANODE",
       'configurations':{'global' : {}}
     }
-    actionQueue.commandQueue.put(statusCommand)
+    actionQueue.put(list(statusCommand))
 
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
@@ -155,9 +208,11 @@ class TestHeartbeat(TestCase):
     self.assertTrue(args[2])
     self.assertFalse(args[1])
 
+
+  @patch.object(ActionDependencyManager, "read_dependencies")
   @patch.object(HostInfo, 'register')
-  def test_heartbeat_host_check_no_cmd(self, register_mock):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+  def test_heartbeat_host_check_no_cmd(self, register_mock, read_dependencies_mock):
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
     self.assertTrue(register_mock.called)
@@ -165,36 +220,6 @@ class TestHeartbeat(TestCase):
     self.assertFalse(args[1])
     self.assertFalse(args[2])
 
-  def test_heartbeat_with_task_in_progress(self):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
-    actionQueue.commandInProgress= {
-      'role' : "role",
-      'actionId' : "actionId",
-      'taskId' : "taskId",
-      'stdout' : "stdout",
-      'clusterName' : "clusterName",
-      'stderr' : 'none',
-      'exitCode' : 777,
-      'serviceName' : "serviceName",
-      'status' : 'IN_PROGRESS',
-      'configurations':{'global' : {}},
-      'roleCommand' : 'START'
-    }
-    heartbeat = Heartbeat(actionQueue)
-    result = heartbeat.build(100)
-    #print "Heartbeat: " + str(result)
-    self.assertEquals(len(result['reports']), 1)
-    self.assertEquals(result['reports'][0]['role'], "role")
-    self.assertEquals(result['reports'][0]['actionId'], "actionId")
-    self.assertEquals(result['reports'][0]['taskId'], "taskId")
-    self.assertEquals(result['reports'][0]['stdout'], "...")
-    self.assertEquals(result['reports'][0]['clusterName'], "clusterName")
-    self.assertEquals(result['reports'][0]['stderr'], "...")
-    self.assertEquals(result['reports'][0]['exitCode'], 777)
-    self.assertEquals(result['reports'][0]['serviceName'], "serviceName")
-    self.assertEquals(result['reports'][0]['status'], "IN_PROGRESS")
-    self.assertEquals(result['reports'][0]['roleCommand'], "START")
-    pass
 
 if __name__ == "__main__":
   unittest.main(verbosity=2)

+ 8 - 5
ambari-agent/src/test/python/TestPuppetExecutor.py

@@ -46,7 +46,8 @@ class TestPuppetExecutor(TestCase):
     
   @patch.object(shellRunner,'run')
   def test_isJavaAvailable(self, cmdrun_mock):
-    puppetInstance = PuppetExecutor("/tmp", "/x", "/y", '/tmpdir', None)
+    puppetInstance = PuppetExecutor("/tmp", "/x", "/y", '/tmpdir',
+                                    AmbariConfig().getConfig())
     command = {'configurations':{'global':{'java64_home':'/usr/jdk/jdk123'}}}
     
     cmdrun_mock.return_value = {'exitCode': 1, 'output': 'Command not found', 'error': ''}
@@ -213,15 +214,16 @@ class TestPuppetExecutor(TestCase):
     Tests whether watchdog works
     """
     subproc_mock = self.Subprocess_mockup()
+    config = AmbariConfig().getConfig()
+    config.set('puppet','timeout_seconds',"0.1")
     executor_mock = self.PuppetExecutor_mock("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
       "/usr/",
       "/root/workspace/puppet-install/facter-1.6.10/",
-      "/tmp", AmbariConfig().getConfig(), subproc_mock)
+      "/tmp", config, subproc_mock)
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
     result = {  }
     puppetEnv = { "RUBYLIB" : ""}
-    executor_mock.PUPPET_TIMEOUT_SECONDS = 0.1
     kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
     subproc_mock.returncode = None
     thread = Thread(target =  executor_mock.runPuppetFile, args = ("fake_puppetFile", result, puppetEnv, tmpoutfile, tmperrfile))
@@ -236,15 +238,16 @@ class TestPuppetExecutor(TestCase):
     Tries to catch false positive watchdog invocations
     """
     subproc_mock = self.Subprocess_mockup()
+    config = AmbariConfig().getConfig()
+    config.set('puppet','timeout_seconds',"5")
     executor_mock = self.PuppetExecutor_mock("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
     "/usr/",
     "/root/workspace/puppet-install/facter-1.6.10/",
-    "/tmp", AmbariConfig().getConfig(), subproc_mock)
+    "/tmp", config, subproc_mock)
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
     result = {  }
     puppetEnv = { "RUBYLIB" : ""}
-    executor_mock.PUPPET_TIMEOUT_SECONDS = 5
     subproc_mock.returncode = 0
     thread = Thread(target =  executor_mock.runPuppetFile, args = ("fake_puppetFile", result, puppetEnv, tmpoutfile, tmperrfile))
     thread.start()

+ 19 - 0
ambari-agent/src/test/python/dummy_files/test_rco_data.json

@@ -0,0 +1,19 @@
+{
+  "_comment": "a comment",
+    "general_deps" : {
+      "_comment": "a comment",
+      "SECONDARY_NAMENODE-START": ["DATANODE-START"],
+      "DATANODE-STOP": ["JOBTRACKER-STOP", "TASKTRACKER-STOP", "RESOURCEMANAGER-STOP",
+        "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP"],
+      "_comment": "a comment",
+      "SECONDARY_NAMENODE-UPGRADE": ["NAMENODE-UPGRADE"]
+    },
+    "optional_hcfs": {
+        "HBASE_MASTER-START": ["PEERSTATUS-START"],
+        "JOBTRACKER-START": ["PEERSTATUS-START"]
+    },
+    "optional_no_hcfs": {
+        "SECONDARY_NAMENODE-START": ["NAMENODE-START"],
+        "RESOURCEMANAGER-START": ["NAMENODE-START", "DATANODE-START"]
+    }
+}

+ 99 - 0
ambari-common/src/main/resources/role_command_order.json

@@ -0,0 +1,99 @@
+{
+  "_comment" : "Record format:",
+  "_comment" : "blockedRole-blockedCommand: [blockerRole1-blockerCommand1, blockerRole2-blockerCommand2, ...]",
+  "general_deps" : {
+    "_comment" : "dependencies for all cases",
+    "NAGIOS_SERVER-INSTALL" : ["HIVE_CLIENT-INSTALL", "HCAT-INSTALL",
+        "MAPREDUCE_CLIENT-INSTALL", "OOZIE_CLIENT-INSTALL"],
+    "HBASE_MASTER-START": ["ZOOKEEPER_SERVER-START"],
+    "HBASE_REGIONSERVER-START": ["HBASE_MASTER-START"],
+    "OOZIE_SERVER-START": ["JOBTRACKER-START", "TASKTRACKER-START"],
+    "WEBHCAT_SERVER-START": ["TASKTRACKER-START", "HIVE_SERVER-START"],
+    "HIVE_METASTORE-START": ["MYSQL_SERVER-START"],
+    "HIVE_SERVER-START": ["TASKTRACKER-START", "MYSQL_SERVER-START"],
+    "HUE_SERVER-START": ["HIVE_SERVER-START", "HCAT-START", "OOZIE_SERVER-START"],
+    "FLUME_SERVER-START": ["OOZIE_SERVER-START"],
+    "NAGIOS_SERVER-START": ["HBASE_MASTER-START", "HBASE_REGIONSERVER-START",
+        "GANGLIA_SERVER-START", "GANGLIA_MONITOR-START", "HCAT-START",
+        "HIVE_SERVER-START", "HIVE_METASTORE-START", "HUE_SERVER-START",
+        "JOBTRACKER-START", "TASKTRACKER-START", "ZOOKEEPER_SERVER-START",
+        "MYSQL_SERVER-START", "OOZIE_SERVER-START", "PIG-START", "SQOOP-START",
+        "WEBHCAT_SERVER-START", "FLUME_SERVER-START"],
+    "MAPREDUCE_SERVICE_CHECK-EXECUTE": ["JOBTRACKER-START", "TASKTRACKER-START"],
+    "OOZIE_SERVICE_CHECK-EXECUTE": ["OOZIE_SERVER-START"],
+    "WEBHCAT_SERVICE_CHECK-EXECUTE": ["WEBHCAT_SERVER-START"],
+    "HBASE_SERVICE_CHECK-EXECUTE": ["HBASE_MASTER-START", "HBASE_REGIONSERVER-START"],
+    "HIVE_SERVICE_CHECK-EXECUTE": ["HIVE_SERVER-START", "HIVE_METASTORE-START"],
+    "HCAT_SERVICE_CHECK-EXECUTE": ["HIVE_SERVER-START"],
+    "PIG_SERVICE_CHECK-EXECUTE": ["JOBTRACKER-START", "TASKTRACKER-START"],
+    "SQOOP_SERVICE_CHECK-EXECUTE": ["JOBTRACKER-START", "TASKTRACKER-START"],
+    "ZOOKEEPER_SERVICE_CHECK-EXECUTE": ["ZOOKEEPER_SERVER-START"],
+    "ZOOKEEPER_QUORUM_SERVICE_CHECK-EXECUTE": ["ZOOKEEPER_SERVER-START"],
+    "ZOOKEEPER_SERVER-STOP" : ["HBASE_MASTER-STOP", "HBASE_REGIONSERVER-STOP"],
+    "HBASE_MASTER-STOP": ["HBASE_REGIONSERVER-STOP"],
+    "TASKTRACKER-UPGRADE": ["JOBTRACKER-UPGRADE"],
+    "MAPREDUCE_CLIENT-UPGRADE": ["TASKTRACKER-UPGRADE", "JOBTRACKER-UPGRADE"],
+    "ZOOKEEPER_SERVER-UPGRADE": ["MAPREDUCE_CLIENT-UPGRADE"],
+    "ZOOKEEPER_CLIENT-UPGRADE": ["ZOOKEEPER_SERVER-UPGRADE"],
+    "HBASE_MASTER-UPGRADE": ["ZOOKEEPER_CLIENT-UPGRADE"],
+    "HBASE_REGIONSERVER-UPGRADE": ["HBASE_MASTER-UPGRADE"],
+    "HBASE_CLIENT-UPGRADE": ["HBASE_REGIONSERVER-UPGRADE"],
+    "HIVE_SERVER-UPGRADE" : ["HBASE_CLIENT-UPGRADE"],
+    "HIVE_METASTORE-UPGRADE" : ["HIVE_SERVER-UPGRADE"],
+    "MYSQL_SERVER-UPGRADE": ["HIVE_METASTORE-UPGRADE"],
+    "HIVE_CLIENT-UPGRADE": ["MYSQL_SERVER-UPGRADE"],
+    "HCAT-UPGRADE": ["HIVE_CLIENT-UPGRADE"],
+    "OOZIE_SERVER-UPGRADE" : ["HCAT-UPGRADE"],
+    "OOZIE_CLIENT-UPGRADE" : ["OOZIE_SERVER-UPGRADE"],
+    "WEBHCAT_SERVER-UPGRADE" : ["OOZIE_CLIENT-UPGRADE"],
+    "PIG-UPGRADE" : ["WEBHCAT_SERVER-UPGRADE"],
+    "SQOOP-UPGRADE" : ["PIG-UPGRADE"],
+    "NAGIOS_SERVER-UPGRADE" : ["SQOOP-UPGRADE"],
+    "GANGLIA_SERVER-UPGRADE" : ["NAGIOS_SERVER-UPGRADE"],
+    "GANGLIA_MONITOR-UPGRADE" : ["GANGLIA_SERVER-UPGRADE"]
+  },
+  "_comment" : "HCFS-specific dependencies",
+  "optional_hcfs": {
+    "HBASE_MASTER-START": ["PEERSTATUS-START"],
+    "JOBTRACKER-START": ["PEERSTATUS-START"],
+    "TASKTRACKER-START": ["PEERSTATUS-START"],
+    "HCFS_SERVICE_CHECK-EXECUTE": ["PEERSTATUS-START"],
+    "JOBTRACKER-UPGRADE": ["HCFS_CLIENT-UPGRADE"]
+  },
+  "_comment" : "Dependencies that are used when HCFS is not present in cluster",
+  "optional_no_hcfs": {
+    "SECONDARY_NAMENODE-START": ["NAMENODE-START"],
+    "RESOURCEMANAGER-START": ["NAMENODE-START", "DATANODE-START"],
+    "NODEMANAGER-START": ["NAMENODE-START", "DATANODE-START", "RESOURCEMANAGER-START"],
+    "HISTORYSERVER-START": ["NAMENODE-START", "DATANODE-START"],
+    "HBASE_MASTER-START": ["NAMENODE-START", "DATANODE-START"],
+    "JOBTRACKER-START": ["NAMENODE-START", "DATANODE-START"],
+    "TASKTRACKER-START": ["NAMENODE-START", "DATANODE-START"],
+    "HIVE_SERVER-START": ["DATANODE-START"],
+    "WEBHCAT_SERVER-START": ["DATANODE-START"],
+    "NAGIOS_SERVER-START": ["NAMENODE-START", "SECONDARY_NAMENODE-START",
+        "DATANODE-START", "RESOURCEMANAGER-START", "NODEMANAGER-START", "HISTORYSERVER-START"],
+    "HDFS_SERVICE_CHECK-EXECUTE": ["NAMENODE-START", "DATANODE-START",
+        "SECONDARY_NAMENODE-START"],
+    "MAPREDUCE2_SERVICE_CHECK-EXECUTE": ["NODEMANAGER-START",
+        "RESOURCEMANAGER-START", "HISTORYSERVER-START", "YARN_SERVICE_CHECK-EXECUTE"],
+    "YARN_SERVICE_CHECK-EXECUTE": ["NODEMANAGER-START", "RESOURCEMANAGER-START"],
+    "RESOURCEMANAGER_SERVICE_CHECK-EXECUTE": ["RESOURCEMANAGER-START"],
+    "PIG_SERVICE_CHECK-EXECUTE": ["RESOURCEMANAGER-START", "NODEMANAGER-START"],
+    "NAMENODE-STOP": ["JOBTRACKER-STOP", "TASKTRACKER-STOP", "RESOURCEMANAGER-STOP",
+        "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP"],
+    "DATANODE-STOP": ["JOBTRACKER-STOP", "TASKTRACKER-STOP", "RESOURCEMANAGER-STOP",
+        "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP"],
+    "SECONDARY_NAMENODE-UPGRADE": ["NAMENODE-UPGRADE"],
+    "DATANODE-UPGRADE": ["SECONDARY_NAMENODE-UPGRADE"],
+    "HDFS_CLIENT-UPGRADE": ["DATANODE-UPGRADE"],
+    "JOBTRACKER-UPGRADE": ["HDFS_CLIENT-UPGRADE"]
+  },
+  "_comment" : "Dependencies that are used in HA NameNode cluster",
+  "optional_ha": {
+    "NAMENODE-START": ["JOURNALNODE-START", "ZOOKEEPER_SERVER-START"],
+    "ZKFC-START": ["NAMENODE-START"],
+    "NAGIOS_SERVER-START": ["ZKFC-START", "JOURNALNODE-START"]
+  }
+}
+

+ 4 - 0
ambari-server/pom.xml

@@ -331,6 +331,10 @@
                  <source>
                   <location>src/main/resources/hive-schema-0.10.0.oracle.sql</location>
                 </source>
+                <source>
+                  <!-- This file is also included into agent rpm-->
+                  <location>../ambari-common/src/main/resources/role_command_order.json</location>
+                </source>
               </sources>
             </mapping>
             <mapping>

+ 28 - 14
ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java

@@ -249,6 +249,22 @@ public class Configuration {
     "AMBARI_SECURITY_MASTER_KEY";
   public static final String MASTER_KEY_FILENAME_DEFAULT = "master";
 
+  /**
+   * File role_command_order.json
+   */
+  public static final String RCO_FILE_LOCATION_KEY = "server.rco.file";
+
+  /**
+   * Location of role_command_order.json in production system
+   */
+  public static final String RCO_FILE_LOCATION_DEFAULT = "/var/lib/ambari-server/" +
+          "resources/role_command_order.json".replace("/", File.separator);
+  /**
+   * Location of role_command_order.json when tests are running
+   */
+  public static final String RCO_FILE_LOCATION_TEST = "../ambari-common/src/" +
+          "main/resources/role_command_order.json".replace("/", File.separator);
+
   private static final Logger LOG = LoggerFactory.getLogger(
       Configuration.class);
 
@@ -265,9 +281,10 @@ public class Configuration {
   }
 
   /**
-   * For Testing only. This is to be able to create Configuration object
-   * for testing.
-   * @param properties properties to use for testing using the Conf object.
+   * This constructor is called from default constructor and
+   * also from most tests.
+   * @param properties properties to use for testing and in production using
+   * the Conf object.
    */
   public Configuration(Properties properties) {
     this.properties = properties;
@@ -315,6 +332,9 @@ public class Configuration {
     configsMap.put(JAVA_HOME_KEY, properties.getProperty(
         JAVA_HOME_KEY, JAVA_HOME_DEFAULT));
 
+    configsMap.put(RCO_FILE_LOCATION_KEY, properties.getProperty(
+            RCO_FILE_LOCATION_KEY, RCO_FILE_LOCATION_DEFAULT));
+
     File passFile = new File(configsMap.get(SRVR_KSTR_DIR_KEY) + File.separator
         + configsMap.get(SRVR_CRT_PASS_FILE_KEY));
     String password = null;
@@ -450,18 +470,12 @@ public class Configuration {
   }
 
   public File getBootStrapDir() {
-    String fileName = properties.getProperty(BOOTSTRAP_DIR);
-    if (fileName == null) {
-      fileName = BOOTSTRAP_DIR_DEFAULT;
-    }
+    String fileName = properties.getProperty(BOOTSTRAP_DIR, BOOTSTRAP_DIR_DEFAULT);
     return new File(fileName);
   }
 
   public String getBootStrapScript() {
-    String bootscript = properties.getProperty(BOOTSTRAP_SCRIPT);
-    if (bootscript == null) {
-      return BOOTSTRAP_SCRIPT_DEFAULT;
-    }
+    String bootscript = properties.getProperty(BOOTSTRAP_SCRIPT, BOOTSTRAP_SCRIPT_DEFAULT);
     return bootscript;
   }
 
@@ -730,15 +744,15 @@ public class Configuration {
   }
 
   public String getOjdbcJarName() {
-	  return properties.getProperty(OJDBC_JAR_NAME_KEY, OJDBC_JAR_NAME_DEFAULT);
+	return properties.getProperty(OJDBC_JAR_NAME_KEY, OJDBC_JAR_NAME_DEFAULT);
   }
   
   public String getServerDBName() {
-	  return properties.getProperty(SERVER_DB_NAME_KEY, SERVER_DB_NAME_DEFAULT);
+	return properties.getProperty(SERVER_DB_NAME_KEY, SERVER_DB_NAME_DEFAULT);
   }
   
   public String getMySQLJarName() {
-	  return properties.getProperty(MYSQL_JAR_NAME_KEY, MYSQL_JAR_NAME_DEFAULT);
+	return properties.getProperty(MYSQL_JAR_NAME_KEY, MYSQL_JAR_NAME_DEFAULT);
   }
   
   public JPATableGenerationStrategy getJPATableGenerationStrategy() {

+ 101 - 478
ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java

@@ -17,16 +17,16 @@
  */
 package org.apache.ambari.server.metadata;
 
-import java.util.HashMap;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.io.*;
+import java.util.*;
 
+import com.google.inject.Inject;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.stageplanner.RoleGraphNode;
 import org.apache.ambari.server.state.Cluster;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.ambari.server.AmbariException;
@@ -37,10 +37,18 @@ import org.apache.ambari.server.AmbariException;
  */
 public class RoleCommandOrder {
 
+  @Inject Configuration configs;
+
   private final static Logger LOG =
 			LoggerFactory.getLogger(RoleCommandOrder.class);
-	
-  private static class RoleCommandPair {
+
+  private final static String GENERAL_DEPS_KEY = "general_deps";
+  private final static String HCFS_DEPS_KEY = "optional_hcfs";
+  private final static String NO_HCFS_DEPS_KEY = "optional_no_hcfs";
+  private final static String HA_DEPS_KEY = "optional_ha";
+  private final static String COMMENT_STR = "_comment";
+
+  static class RoleCommandPair {
     Role role;
     RoleCommand cmd;
 
@@ -66,12 +74,31 @@ public class RoleCommandOrder {
       }
       return false;
     }
+
+    Role getRole() {
+      return role;
+    }
+
+    RoleCommand getCmd() {
+      return cmd;
+    }
+
+    @Override
+    public String toString() {
+      return "RoleCommandPair{" +
+              "role=" + role +
+              ", cmd=" + cmd +
+              '}';
+    }
   }
 
   /**
    * key -> blocked role command value -> set of blocker role commands.
    */
-  private Map<RoleCommandPair, Set<RoleCommandPair>> dependencies = new HashMap<RoleCommandPair, Set<RoleCommandPair>>();
+  private Map<RoleCommandPair, Set<RoleCommandPair>> dependencies =
+          new HashMap<RoleCommandPair, Set<RoleCommandPair>>();
+
+
 
   /**
    * Add a pair of tuples where the tuple defined by the first two parameters are blocked on
@@ -91,494 +118,83 @@ public class RoleCommandOrder {
     this.dependencies.get(rcp1).add(rcp2);
   }
 
+  private File getRCOFile() {
+    Map<String, String> configsMap = configs.getConfigsMap();
+    String rcoLocation = configsMap.get(Configuration.RCO_FILE_LOCATION_KEY);
+    File rcoFile = new File(rcoLocation);
+    return rcoFile;
+  }
+
+  void addDependencies(Map<String, Object> jsonSection) {
+    for (Object blockedObj : jsonSection.keySet()) {
+      String blocked = (String) blockedObj;
+      if (COMMENT_STR.equals(blocked)) {
+        continue; // Skip comments
+      }
+      ArrayList<String> blockers = (ArrayList<String>) jsonSection.get(blocked);
+      for (String blocker : blockers) {
+        String [] blockedTuple = blocked.split("-");
+        String blockedRole = blockedTuple[0];
+        String blockedCommand = blockedTuple[1];
+
+        String [] blockerTuple = blocker.split("-");
+        String blockerRole = blockerTuple[0];
+        String blockerCommand = blockerTuple[1];
+
+        addDependency(
+                Role.valueOf(blockedRole), RoleCommand.valueOf(blockedCommand),
+                Role.valueOf(blockerRole), RoleCommand.valueOf(blockerCommand));
+      }
+    }
+  }
+
   public void initialize(Cluster cluster) {
     Boolean hasHCFS = false;
     Boolean isHAEnabled = false;
-    
+
     try {
       if (cluster != null && cluster.getService("HCFS") != null) {
     	  hasHCFS = true;
       } 
     } catch (AmbariException e) {
     }
+
     try {
-      if (cluster != null && cluster.getService("HDFS").getServiceComponent("JOURNALNODE") != null) {
+      if (cluster != null &&
+              cluster.getService("HDFS") != null &&
+              cluster.getService("HDFS").getServiceComponent("JOURNALNODE") != null) {
         isHAEnabled = true;
-      } 
+      }
     } catch (AmbariException e) {
     }
 
+    // Read data from JSON
+    ObjectMapper mapper = new ObjectMapper();
+    File rcoFile = getRCOFile();
+    Map<String,Object> userData = null;
+    try {
+      userData = mapper.readValue(rcoFile, Map.class);
+    } catch (IOException e) {
+      String errorMsg = String.format("Can not read file %s", rcoFile.getAbsolutePath());
+      LOG.error(errorMsg, e);
+      return;
+    }
+
+    Map<String,Object> generalSection = (Map<String, Object>) userData.get(GENERAL_DEPS_KEY);
+    addDependencies(generalSection);
     if (hasHCFS) {
-      // Installs
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.HIVE_CLIENT,
-        RoleCommand.INSTALL);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.HCAT,
-        RoleCommand.INSTALL);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.MAPREDUCE_CLIENT,
-        RoleCommand.INSTALL);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.OOZIE_CLIENT,
-        RoleCommand.INSTALL);
-
-      // Starts
-      if (isHAEnabled) {
-        addDependency(Role.NAMENODE, RoleCommand.START, Role.JOURNALNODE,
-            RoleCommand.START);
-        addDependency(Role.NAMENODE, RoleCommand.START, Role.ZOOKEEPER_SERVER,
-            RoleCommand.START);
-        addDependency(Role.ZKFC, RoleCommand.START, Role.NAMENODE,
-            RoleCommand.START);
-        addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.ZKFC,
-                RoleCommand.START);
-        addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.JOURNALNODE,
-                RoleCommand.START);
-      }
-      addDependency(Role.HBASE_MASTER, RoleCommand.START, Role.ZOOKEEPER_SERVER,
-          RoleCommand.START);
-      addDependency(Role.HBASE_MASTER, RoleCommand.START, Role.PEERSTATUS,
-          RoleCommand.START);
-      addDependency(Role.HBASE_REGIONSERVER, RoleCommand.START,
-          Role.HBASE_MASTER, RoleCommand.START);
-      addDependency(Role.JOBTRACKER, RoleCommand.START, Role.PEERSTATUS,
-          RoleCommand.START);
-      addDependency(Role.TASKTRACKER, RoleCommand.START, Role.PEERSTATUS,
-          RoleCommand.START);
-      addDependency(Role.OOZIE_SERVER, RoleCommand.START, Role.JOBTRACKER,
-          RoleCommand.START);
-      addDependency(Role.OOZIE_SERVER, RoleCommand.START, Role.TASKTRACKER,
-          RoleCommand.START);
-      addDependency(Role.HIVE_SERVER, RoleCommand.START, Role.TASKTRACKER,
-          RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.START, Role.TASKTRACKER,
-          RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.START, Role.HIVE_SERVER,
-          RoleCommand.START);
-      addDependency(Role.HIVE_METASTORE, RoleCommand.START, Role.MYSQL_SERVER,
-          RoleCommand.START);
-      addDependency(Role.HIVE_SERVER, RoleCommand.START, Role.MYSQL_SERVER,
-          RoleCommand.START);
-      addDependency(Role.HUE_SERVER, RoleCommand.START, Role.HIVE_SERVER,
-          RoleCommand.START);
-      addDependency(Role.HUE_SERVER, RoleCommand.START, Role.HCAT,
-          RoleCommand.START);
-      addDependency(Role.HUE_SERVER, RoleCommand.START, Role.OOZIE_SERVER,
-          RoleCommand.START); 
-      addDependency(Role.FLUME_SERVER, RoleCommand.START, Role.OOZIE_SERVER,
-          RoleCommand.START);
-      // Nagios
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HBASE_MASTER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HBASE_REGIONSERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.GANGLIA_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.GANGLIA_MONITOR,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HCAT,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HIVE_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HIVE_METASTORE,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HUE_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.JOBTRACKER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.TASKTRACKER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.ZOOKEEPER_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.MYSQL_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.OOZIE_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.PIG,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.SQOOP,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.WEBHCAT_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.FLUME_SERVER,
-              RoleCommand.START);
-
-
-
-      // Service checks
-      addDependency(Role.HCFS_SERVICE_CHECK, RoleCommand.EXECUTE, Role.PEERSTATUS,
-          RoleCommand.START);
-      addDependency(Role.MAPREDUCE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.JOBTRACKER, RoleCommand.START);
-      addDependency(Role.MAPREDUCE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.TASKTRACKER, RoleCommand.START);
-      addDependency(Role.OOZIE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.OOZIE_SERVER, RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.WEBHCAT_SERVER, RoleCommand.START);
-      addDependency(Role.HBASE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.HBASE_MASTER, RoleCommand.START);
-      addDependency(Role.HBASE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.HBASE_REGIONSERVER, RoleCommand.START);
-      addDependency(Role.HIVE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.HIVE_SERVER, RoleCommand.START);
-      addDependency(Role.HIVE_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.HIVE_METASTORE, RoleCommand.START);
-      addDependency(Role.HCAT_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.HIVE_SERVER, RoleCommand.START);
-      addDependency(Role.PIG_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.JOBTRACKER, RoleCommand.START);
-      addDependency(Role.PIG_SERVICE_CHECK, RoleCommand.EXECUTE,
-         Role.TASKTRACKER, RoleCommand.START);
-      addDependency(Role.SQOOP_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.JOBTRACKER, RoleCommand.START);
-      addDependency(Role.SQOOP_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.TASKTRACKER, RoleCommand.START);
-      addDependency(Role.ZOOKEEPER_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.ZOOKEEPER_SERVER, RoleCommand.START);
-      addDependency(Role.ZOOKEEPER_QUORUM_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.ZOOKEEPER_SERVER, RoleCommand.START);
-
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.STOP,
-          Role.HBASE_MASTER, RoleCommand.STOP);
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.STOP,
-          Role.HBASE_REGIONSERVER, RoleCommand.STOP);
-      addDependency(Role.HBASE_MASTER, RoleCommand.STOP,
-          Role.HBASE_REGIONSERVER, RoleCommand.STOP);
-
-      addDependency(Role.JOBTRACKER, RoleCommand.UPGRADE,
-          Role.HCFS_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.TASKTRACKER, RoleCommand.UPGRADE,
-          Role.JOBTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE,
-          Role.TASKTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE,
-          Role.JOBTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.UPGRADE,
-          Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.ZOOKEEPER_CLIENT, RoleCommand.UPGRADE,
-          Role.ZOOKEEPER_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_MASTER, RoleCommand.UPGRADE,
-          Role.ZOOKEEPER_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_REGIONSERVER, RoleCommand.UPGRADE,
-          Role.HBASE_MASTER, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_CLIENT, RoleCommand.UPGRADE,
-          Role.HBASE_REGIONSERVER, RoleCommand.UPGRADE);
- 
-      addDependency(Role.JOBTRACKER, RoleCommand.UPGRADE,
-          Role.HCFS_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.TASKTRACKER, RoleCommand.UPGRADE,
-          Role.JOBTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE,
-          Role.TASKTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE,
-          Role.JOBTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.UPGRADE,
-          Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.ZOOKEEPER_CLIENT, RoleCommand.UPGRADE,
-          Role.ZOOKEEPER_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_MASTER, RoleCommand.UPGRADE,
-          Role.ZOOKEEPER_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_REGIONSERVER, RoleCommand.UPGRADE,
-          Role.HBASE_MASTER, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_CLIENT, RoleCommand.UPGRADE,
-          Role.HBASE_REGIONSERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HIVE_SERVER, RoleCommand.UPGRADE,
-          Role.HBASE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.HIVE_METASTORE, RoleCommand.UPGRADE,
-          Role.HIVE_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.MYSQL_SERVER, RoleCommand.UPGRADE,
-          Role.HIVE_METASTORE, RoleCommand.UPGRADE);
-      addDependency(Role.HIVE_CLIENT, RoleCommand.UPGRADE,
-          Role.MYSQL_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HCAT, RoleCommand.UPGRADE,
-          Role.HIVE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.OOZIE_SERVER, RoleCommand.UPGRADE,
-          Role.HCAT, RoleCommand.UPGRADE);
-      addDependency(Role.OOZIE_CLIENT, RoleCommand.UPGRADE,
-          Role.OOZIE_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.UPGRADE,
-          Role.OOZIE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.PIG, RoleCommand.UPGRADE,
-          Role.WEBHCAT_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.SQOOP, RoleCommand.UPGRADE,
-          Role.PIG, RoleCommand.UPGRADE);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.UPGRADE,
-          Role.SQOOP, RoleCommand.UPGRADE);
-      addDependency(Role.GANGLIA_SERVER, RoleCommand.UPGRADE,
-          Role.NAGIOS_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.GANGLIA_MONITOR, RoleCommand.UPGRADE,
-          Role.GANGLIA_SERVER, RoleCommand.UPGRADE);
+      Map<String,Object> hcfsSection = (Map<String, Object>) userData.get(HCFS_DEPS_KEY);
+      addDependencies(hcfsSection);
     } else {
-      // Installs
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.HIVE_CLIENT,
-        RoleCommand.INSTALL);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.HCAT,
-        RoleCommand.INSTALL);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.MAPREDUCE_CLIENT,
-        RoleCommand.INSTALL);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.INSTALL, Role.OOZIE_CLIENT,
-        RoleCommand.INSTALL);
-
-      // Starts
-      
-      if (isHAEnabled) {
-        addDependency(Role.NAMENODE, RoleCommand.START, Role.JOURNALNODE,
-            RoleCommand.START);
-        addDependency(Role.NAMENODE, RoleCommand.START, Role.ZOOKEEPER_SERVER,
-            RoleCommand.START);
-        addDependency(Role.ZKFC, RoleCommand.START, Role.NAMENODE,
-            RoleCommand.START);
-        addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.ZKFC,
-                RoleCommand.START);
-        addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.JOURNALNODE,
-                RoleCommand.START);
-      }
-      
-      addDependency(Role.SECONDARY_NAMENODE, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.RESOURCEMANAGER, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.RESOURCEMANAGER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.NODEMANAGER, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.NODEMANAGER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.NODEMANAGER, RoleCommand.START, Role.RESOURCEMANAGER,
-        RoleCommand.START);
-      addDependency(Role.HISTORYSERVER, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.HISTORYSERVER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.HBASE_MASTER, RoleCommand.START, Role.ZOOKEEPER_SERVER,
-        RoleCommand.START);
-      addDependency(Role.HBASE_MASTER, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.HBASE_MASTER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.HBASE_REGIONSERVER, RoleCommand.START,
-        Role.HBASE_MASTER, RoleCommand.START);
-      addDependency(Role.JOBTRACKER, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.JOBTRACKER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.TASKTRACKER, RoleCommand.START, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.TASKTRACKER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.OOZIE_SERVER, RoleCommand.START, Role.JOBTRACKER,
-        RoleCommand.START);
-      addDependency(Role.OOZIE_SERVER, RoleCommand.START, Role.TASKTRACKER,
-        RoleCommand.START);
-      addDependency(Role.HIVE_SERVER, RoleCommand.START, Role.TASKTRACKER,
-        RoleCommand.START);
-      addDependency(Role.HIVE_SERVER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.START, Role.TASKTRACKER,
-        RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.START, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.START, Role.HIVE_SERVER,
-        RoleCommand.START);
-      addDependency(Role.HIVE_METASTORE, RoleCommand.START, Role.MYSQL_SERVER,
-        RoleCommand.START);
-      addDependency(Role.HIVE_SERVER, RoleCommand.START, Role.MYSQL_SERVER,
-        RoleCommand.START);
-      addDependency(Role.HUE_SERVER, RoleCommand.START, Role.HIVE_SERVER,
-        RoleCommand.START);
-      addDependency(Role.HUE_SERVER, RoleCommand.START, Role.HCAT,
-        RoleCommand.START);
-      addDependency(Role.HUE_SERVER, RoleCommand.START, Role.OOZIE_SERVER,
-        RoleCommand.START);
-      addDependency(Role.FLUME_SERVER, RoleCommand.START, Role.OOZIE_SERVER,
-        RoleCommand.START);
-      // Nagios
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.NAMENODE,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.SECONDARY_NAMENODE,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.DATANODE,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.RESOURCEMANAGER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.NODEMANAGER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HBASE_MASTER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HBASE_REGIONSERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.GANGLIA_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.GANGLIA_MONITOR,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HCAT,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HISTORYSERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HIVE_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HIVE_METASTORE,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.HUE_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.JOBTRACKER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.TASKTRACKER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.ZOOKEEPER_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.MYSQL_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.OOZIE_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.PIG,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.SQOOP,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.WEBHCAT_SERVER,
-              RoleCommand.START);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.START, Role.FLUME_SERVER,
-              RoleCommand.START);
-
-
-      // Service checks
-      addDependency(Role.HDFS_SERVICE_CHECK, RoleCommand.EXECUTE, Role.NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.HDFS_SERVICE_CHECK, RoleCommand.EXECUTE, Role.DATANODE,
-        RoleCommand.START);
-      addDependency(Role.HDFS_SERVICE_CHECK, RoleCommand.EXECUTE, Role.SECONDARY_NAMENODE,
-        RoleCommand.START);
-      addDependency(Role.MAPREDUCE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.JOBTRACKER, RoleCommand.START);
-      addDependency(Role.MAPREDUCE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.TASKTRACKER, RoleCommand.START);
-      addDependency(Role.MAPREDUCE2_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.NODEMANAGER, RoleCommand.START);
-      addDependency(Role.MAPREDUCE2_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.RESOURCEMANAGER, RoleCommand.START);
-      addDependency(Role.MAPREDUCE2_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.HISTORYSERVER, RoleCommand.START);
-      addDependency(Role.MAPREDUCE2_SERVICE_CHECK, RoleCommand.EXECUTE,
-          Role.YARN_SERVICE_CHECK, RoleCommand.EXECUTE);
-      addDependency(Role.YARN_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.NODEMANAGER, RoleCommand.START);
-      addDependency(Role.YARN_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.RESOURCEMANAGER, RoleCommand.START);
-      addDependency(Role.RESOURCEMANAGER_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.RESOURCEMANAGER, RoleCommand.START);
-      addDependency(Role.OOZIE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.OOZIE_SERVER, RoleCommand.START);
-      addDependency(Role.WEBHCAT_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.WEBHCAT_SERVER, RoleCommand.START);
-      addDependency(Role.HBASE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.HBASE_MASTER, RoleCommand.START);
-      addDependency(Role.HBASE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.HBASE_REGIONSERVER, RoleCommand.START);
-      addDependency(Role.HIVE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.HIVE_SERVER, RoleCommand.START);
-      addDependency(Role.HIVE_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.HIVE_METASTORE, RoleCommand.START);
-      addDependency(Role.HCAT_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.HIVE_SERVER, RoleCommand.START);
-      addDependency(Role.PIG_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.JOBTRACKER, RoleCommand.START);
-      addDependency(Role.PIG_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.TASKTRACKER, RoleCommand.START);
-      addDependency(Role.PIG_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.RESOURCEMANAGER, RoleCommand.START);
-      addDependency(Role.PIG_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.NODEMANAGER, RoleCommand.START);
-      addDependency(Role.SQOOP_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.JOBTRACKER, RoleCommand.START);
-      addDependency(Role.SQOOP_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.TASKTRACKER, RoleCommand.START);
-      addDependency(Role.ZOOKEEPER_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.ZOOKEEPER_SERVER, RoleCommand.START);
-      addDependency(Role.ZOOKEEPER_QUORUM_SERVICE_CHECK, RoleCommand.EXECUTE,
-        Role.ZOOKEEPER_SERVER, RoleCommand.START);
-    
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.STOP,
-        Role.HBASE_MASTER, RoleCommand.STOP);
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.STOP,
-        Role.HBASE_REGIONSERVER, RoleCommand.STOP);
-      addDependency(Role.NAMENODE, RoleCommand.STOP,
-        Role.HBASE_MASTER, RoleCommand.STOP);
-      addDependency(Role.DATANODE, RoleCommand.STOP,
-        Role.HBASE_MASTER, RoleCommand.STOP);
-      addDependency(Role.HBASE_MASTER, RoleCommand.STOP,
-        Role.HBASE_REGIONSERVER, RoleCommand.STOP);
-      addDependency(Role.NAMENODE, RoleCommand.STOP,
-        Role.JOBTRACKER, RoleCommand.STOP);
-      addDependency(Role.NAMENODE, RoleCommand.STOP,
-        Role.TASKTRACKER, RoleCommand.STOP);
-      addDependency(Role.NAMENODE, RoleCommand.STOP,
-        Role.RESOURCEMANAGER, RoleCommand.STOP);
-      addDependency(Role.NAMENODE, RoleCommand.STOP,
-        Role.NODEMANAGER, RoleCommand.STOP);
-      addDependency(Role.NAMENODE, RoleCommand.STOP,
-        Role.HISTORYSERVER, RoleCommand.STOP);
-      addDependency(Role.DATANODE, RoleCommand.STOP,
-        Role.JOBTRACKER, RoleCommand.STOP);
-      addDependency(Role.DATANODE, RoleCommand.STOP,
-        Role.TASKTRACKER, RoleCommand.STOP);
-      addDependency(Role.DATANODE, RoleCommand.STOP,
-        Role.RESOURCEMANAGER, RoleCommand.STOP);
-      addDependency(Role.DATANODE, RoleCommand.STOP,
-        Role.NODEMANAGER, RoleCommand.STOP);
-      addDependency(Role.DATANODE, RoleCommand.STOP,
-        Role.HISTORYSERVER, RoleCommand.STOP);
-
-      addDependency(Role.SECONDARY_NAMENODE, RoleCommand.UPGRADE,
-        Role.NAMENODE, RoleCommand.UPGRADE);
-      addDependency(Role.DATANODE, RoleCommand.UPGRADE,
-        Role.SECONDARY_NAMENODE, RoleCommand.UPGRADE);
-      addDependency(Role.HDFS_CLIENT, RoleCommand.UPGRADE,
-        Role.DATANODE, RoleCommand.UPGRADE);
-      addDependency(Role.JOBTRACKER, RoleCommand.UPGRADE,
-        Role.HDFS_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.TASKTRACKER, RoleCommand.UPGRADE,
-        Role.JOBTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE,
-        Role.TASKTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE,
-        Role.TASKTRACKER, RoleCommand.UPGRADE);
-      addDependency(Role.ZOOKEEPER_SERVER, RoleCommand.UPGRADE,
-        Role.MAPREDUCE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.ZOOKEEPER_CLIENT, RoleCommand.UPGRADE,
-        Role.ZOOKEEPER_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_MASTER, RoleCommand.UPGRADE,
-        Role.ZOOKEEPER_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_REGIONSERVER, RoleCommand.UPGRADE,
-        Role.HBASE_MASTER, RoleCommand.UPGRADE);
-      addDependency(Role.HBASE_CLIENT, RoleCommand.UPGRADE,
-        Role.HBASE_REGIONSERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HIVE_SERVER, RoleCommand.UPGRADE,
-        Role.HBASE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.HIVE_METASTORE, RoleCommand.UPGRADE,
-        Role.HIVE_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.MYSQL_SERVER, RoleCommand.UPGRADE,
-        Role.HIVE_METASTORE, RoleCommand.UPGRADE);
-      addDependency(Role.HIVE_CLIENT, RoleCommand.UPGRADE,
-        Role.MYSQL_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.HCAT, RoleCommand.UPGRADE,
-        Role.HIVE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.OOZIE_SERVER, RoleCommand.UPGRADE,
-        Role.HCAT, RoleCommand.UPGRADE);
-      addDependency(Role.OOZIE_CLIENT, RoleCommand.UPGRADE,
-        Role.OOZIE_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.WEBHCAT_SERVER, RoleCommand.UPGRADE,
-        Role.OOZIE_CLIENT, RoleCommand.UPGRADE);
-      addDependency(Role.PIG, RoleCommand.UPGRADE,
-        Role.WEBHCAT_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.SQOOP, RoleCommand.UPGRADE,
-        Role.PIG, RoleCommand.UPGRADE);
-      addDependency(Role.NAGIOS_SERVER, RoleCommand.UPGRADE,
-        Role.SQOOP, RoleCommand.UPGRADE);
-      addDependency(Role.GANGLIA_SERVER, RoleCommand.UPGRADE,
-        Role.NAGIOS_SERVER, RoleCommand.UPGRADE);
-      addDependency(Role.GANGLIA_MONITOR, RoleCommand.UPGRADE,
-        Role.GANGLIA_SERVER, RoleCommand.UPGRADE);
+      Map<String,Object> noHcfsSection = (Map<String, Object>) userData.get(NO_HCFS_DEPS_KEY);
+      addDependencies(noHcfsSection);
     }
-    extendTransitiveDependency();
+    if (isHAEnabled) {
+      Map<String,Object> isHASection = (Map<String, Object>) userData.get(HA_DEPS_KEY);
+      addDependencies(isHASection);
     }
+    extendTransitiveDependency();
+  }
 
   /**
    * Returns the dependency order. -1 => rgn1 before rgn2, 0 => they can be
@@ -693,4 +309,11 @@ public class RoleCommandOrder {
     return 0;
   }
 
+
+  /**
+   * For test purposes
+   */
+  Map<RoleCommandPair, Set<RoleCommandPair>> getDependencies() {
+    return dependencies;
+  }
 }

+ 247 - 0
ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java

@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metadata;
+
+import com.google.gson.Gson;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import static junit.framework.Assert.*;
+import static org.easymock.EasyMock.*;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.entities.*;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonMethod;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ambari.server.metadata.RoleCommandOrder.RoleCommandPair;
+
+public class RoleCommandOrderTest {
+
+  private Injector injector;
+
+  private final static String TEST_RCO_DATA_FILE = "test_rco_data.json";
+
+  @Before
+  public void setup() throws Exception {
+    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector.getInstance(GuiceJpaInitializer.class);
+  }
+
+  @After
+  public void teardown() {
+    injector.getInstance(PersistService.class).stop();
+  }
+
+
+  /**
+   * Tests building dependencies in HCFS cluster. Uses real dependency mapping
+   * file (role_command_order.json)
+   */
+  @Test
+  public void testInitializeAtHCFSCluster() throws AmbariException {
+
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
+    ClusterImpl cluster = createMock(ClusterImpl.class);
+    Service service = createMock(Service.class);
+    expect(cluster.getService("HCFS")).andReturn(service);
+    expect(cluster.getService("HDFS")).andReturn(null);
+    replay(cluster);
+
+    Map<RoleCommandPair, Set<RoleCommandPair>> deps = rco.getDependencies();
+    assertTrue("Dependencies are empty before initialization", deps.size() == 0);
+    rco.initialize(cluster);
+    assertTrue("Dependencies are loaded after initialization", deps.size() > 0);
+    verify(cluster);
+	// Check that HDFS components are not present in dependencies
+    // Checking blocked roles
+    assertFalse(dependenciesContainBlockedRole(deps, Role.DATANODE));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.NAMENODE));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.SECONDARY_NAMENODE));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.JOURNALNODE));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.NAMENODE_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.HDFS_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.HDFS_CLIENT));
+    // Checking blocker roles
+    assertFalse(dependenciesContainBlockerRole(deps, Role.DATANODE));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.NAMENODE));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.SECONDARY_NAMENODE));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.JOURNALNODE));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.NAMENODE_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.HDFS_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.HDFS_CLIENT));
+
+    // And that some HCFS components are present (section has been loaded)
+    assertTrue(dependenciesContainBlockerRole(deps, Role.PEERSTATUS));
+
+  }
+
+
+  /**
+   * Tests building dependencies in not HA-enabled HDFS cluster. Uses real
+   * dependency mapping file (role_command_order.json)
+   */
+  @Test
+  public void testInitializeAtHDFSCluster() throws AmbariException {
+
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
+    ClusterImpl cluster = createMock(ClusterImpl.class);
+    expect(cluster.getService("HCFS")).andReturn(null);
+
+    Service hdfsService = createMock(Service.class);
+
+    expect(cluster.getService("HDFS")).andReturn(hdfsService).atLeastOnce();
+    expect(hdfsService.getServiceComponent("JOURNALNODE")).andReturn(null);
+
+    replay(cluster);
+    replay(hdfsService);
+
+    Map<RoleCommandPair, Set<RoleCommandPair>> deps = rco.getDependencies();
+    assertTrue("Dependencies are empty before initialization", deps.size() == 0);
+    rco.initialize(cluster);
+    assertTrue("Dependencies are loaded after initialization", deps.size() > 0);
+    verify(cluster);
+    verify(hdfsService);
+    // Check that HCFS components are not present in dependencies
+    // Checking blocked roles
+    assertFalse(dependenciesContainBlockedRole(deps, Role.PEERSTATUS));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.HCFS_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.HCFS_CLIENT));
+    // Checking blocker roles
+    assertFalse(dependenciesContainBlockerRole(deps, Role.PEERSTATUS));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.HCFS_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.HCFS_CLIENT));
+
+    // And that some HDFS components are present (section has been loaded)
+    assertTrue(dependenciesContainBlockerRole(deps, Role.DATANODE));
+    // Check that there is no HA NN dependencies present
+    assertFalse(dependenciesContainBlockerRole(deps, Role.JOURNALNODE));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.ZKFC));
+  }
+
+
+  /**
+   * Tests building dependencies in HA-enabled HDFS cluster. Uses real
+   * dependency mapping file (role_command_order.json)
+   */
+  @Test
+  public void testInitializeAtHaHDFSCluster() throws AmbariException {
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
+    ClusterImpl cluster = createMock(ClusterImpl.class);
+    expect(cluster.getService("HCFS")).andReturn(null);
+
+    Service hdfsService = createMock(Service.class);
+    ServiceComponent journalnodeSC = createMock(ServiceComponent.class);
+
+    expect(cluster.getService("HDFS")).andReturn(hdfsService).atLeastOnce();
+    expect(hdfsService.getServiceComponent("JOURNALNODE")).andReturn(journalnodeSC);
+
+    replay(cluster);
+    replay(hdfsService);
+
+    Map<RoleCommandPair, Set<RoleCommandPair>> deps = rco.getDependencies();
+    assertTrue("Dependencies are empty before initialization", deps.size() == 0);
+    rco.initialize(cluster);
+    assertTrue("Dependencies are loaded after initialization", deps.size() > 0);
+    verify(cluster);
+    verify(hdfsService);
+    // Check that HCFS components are not present in dependencies
+    // Checking blocked roles
+    assertFalse(dependenciesContainBlockedRole(deps, Role.PEERSTATUS));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.HCFS_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockedRole(deps, Role.HCFS_CLIENT));
+    // Checking blocker roles
+    assertFalse(dependenciesContainBlockerRole(deps, Role.PEERSTATUS));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.HCFS_SERVICE_CHECK));
+    assertFalse(dependenciesContainBlockerRole(deps, Role.HCFS_CLIENT));
+
+    // And that some HDFS components are present (section has been loaded)
+    assertTrue(dependenciesContainBlockerRole(deps, Role.DATANODE));
+    // Check that some HA NN dependencies are present
+    assertTrue(dependenciesContainBlockerRole(deps, Role.JOURNALNODE));
+    assertTrue(dependenciesContainBlockedRole(deps, Role.ZKFC));
+  }
+
+
+  @Test
+  public void testAddDependencies() throws IOException {
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
+
+    InputStream testJsonIS = getClass().getClassLoader().
+            getResourceAsStream(TEST_RCO_DATA_FILE);
+    ObjectMapper mapper = new ObjectMapper();
+    Map<String,Object> testData = mapper.readValue(testJsonIS, Map.class);
+    rco.addDependencies(testData);
+
+    mapper.setVisibility(JsonMethod.ALL, JsonAutoDetect.Visibility.ANY);
+    String dump = mapper.writeValueAsString(rco.getDependencies());
+    String expected = "{\"RoleCommandPair{role=SECONDARY_NAMENODE, " +
+            "cmd=UPGRADE}\":[{\"role\":\"NAMENODE\",\"cmd\":\"UPGRADE\"}]," +
+            "\"RoleCommandPair{role=SECONDARY_NAMENODE, cmd=START}\":" +
+            "[{\"role\":\"NAMENODE\",\"cmd\":\"START\"}]," +
+            "\"RoleCommandPair{role=DATANODE, cmd=STOP}\":" +
+            "[{\"role\":\"HBASE_MASTER\",\"cmd\":\"STOP\"}," +
+            "{\"role\":\"RESOURCEMANAGER\",\"cmd\":\"STOP\"}," +
+            "{\"role\":\"TASKTRACKER\",\"cmd\":\"STOP\"},{\"role\":" +
+            "\"NODEMANAGER\",\"cmd\":\"STOP\"},{\"role\":\"HISTORYSERVER\"," +
+            "\"cmd\":\"STOP\"},{\"role\":\"JOBTRACKER\",\"cmd\":\"STOP\"}]}";
+    assertEquals(expected, dump);
+  }
+
+  private boolean dependenciesContainBlockedRole(Map<RoleCommandPair,
+          Set<RoleCommandPair>> deps, Role blocked) {
+    for (RoleCommandPair blockedPair : deps.keySet()) {
+      if (blockedPair.getRole() == blocked) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean dependenciesContainBlockerRole(Map<RoleCommandPair,
+          Set<RoleCommandPair>> deps, Role blocker) {
+    for(Set<RoleCommandPair> blockerSet: deps.values()) {
+      for (RoleCommandPair roleCommandPair : blockerSet) {
+        if (roleCommandPair.getRole() == blocker) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+}

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleGraphTest.java

@@ -146,7 +146,7 @@ public class RoleGraphTest {
 
   @Test
   public void testValidateOrder() {
-    RoleCommandOrder rco = new RoleCommandOrder();
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
     ClusterEntity entity = createDummyData();
     ClusterImpl cluster = new ClusterImpl(entity, injector);
     rco.initialize(cluster);

+ 2 - 1
ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java

@@ -31,13 +31,14 @@ public class InMemoryDefaultTestModule extends AbstractModule {
   @Override
   protected void configure() {
     properties.setProperty(Configuration.SERVER_PERSISTENCE_TYPE_KEY, "in-memory");
-//    properties.setProperty(Configuration.SERVER_PERSISTENCE_TYPE_KEY, "local");
     properties.setProperty(Configuration.METADETA_DIR_PATH,
         "src/test/resources/stacks");
     properties.setProperty(Configuration.SERVER_VERSION_FILE,
             "target/version");
     properties.setProperty(Configuration.OS_VERSION_KEY,
         "centos5");
+    properties.setProperty(Configuration.RCO_FILE_LOCATION_KEY,
+            Configuration.RCO_FILE_LOCATION_TEST);
     try {
       install(new ControllerModule(properties));
     } catch (Exception e) {

+ 3 - 3
ambari-server/src/test/java/org/apache/ambari/server/stageplanner/TestStagePlanner.java

@@ -150,7 +150,7 @@ public class TestStagePlanner {
 
   @Test
   public void testSingleStagePlan() {
-    RoleCommandOrder rco = new RoleCommandOrder();
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
     ClusterEntity entity = createDummyData();
     ClusterImpl cluster = new ClusterImpl(entity, injector);
     rco.initialize(cluster);
@@ -170,7 +170,7 @@ public class TestStagePlanner {
 
   @Test
   public void testMultiStagePlan() {
-    RoleCommandOrder rco = new RoleCommandOrder();
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
     ClusterEntity entity = createDummyData();
     ClusterImpl cluster = new ClusterImpl(entity, injector);
     rco.initialize(cluster);
@@ -196,7 +196,7 @@ public class TestStagePlanner {
 
   @Test
   public void testManyStages() {
-    RoleCommandOrder rco = new RoleCommandOrder();
+    RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class);
     ClusterEntity entity = createDummyData();
     ClusterImpl cluster = new ClusterImpl(entity, injector);
     rco.initialize(cluster);

+ 7 - 0
ambari-server/src/test/resources/test_rco_data.json

@@ -0,0 +1,7 @@
+{
+  "_comment": "a comment",
+  "SECONDARY_NAMENODE-START": ["NAMENODE-START"],
+  "DATANODE-STOP": ["JOBTRACKER-STOP", "TASKTRACKER-STOP", "RESOURCEMANAGER-STOP",
+    "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP"],
+  "SECONDARY_NAMENODE-UPGRADE": ["NAMENODE-UPGRADE"]
+}