Prechádzať zdrojové kódy

AMBARI-10029. Node recovery support - phase 2

Sumit Mohanty 10 rokov pred
rodič
commit
a3a0ae041a

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -400,7 +400,7 @@ class ActionQueue(threading.Thread):
       if component_status_result.has_key('structuredOut'):
         component_extra = component_status_result['structuredOut']
 
-      result = livestatus.build(forsed_component_status= component_status)
+      result = livestatus.build(forced_component_status= component_status)
       if self.controller.recovery_manager.enabled():
         result['sendExecCmdDet'] = str(request_execution_cmd)
 

+ 4 - 1
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -233,7 +233,6 @@ class Controller(threading.Thread):
           logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
 
         response = self.sendRequest(self.heartbeatUrl, data)
-
         exitStatus = 0
         if 'exitstatus' in response.keys():
           exitStatus = int(response['exitstatus'])
@@ -248,6 +247,10 @@ class Controller(threading.Thread):
         if 'hasMappedComponents' in response.keys():
           self.hasMappedComponents = response['hasMappedComponents'] is not False
 
+        if 'hasPendingTasks' in response.keys():
+          self.recovery_manager.set_paused(response['hasPendingTasks'])
+
+
         if 'registrationCommand' in response.keys():
           # check if the registration command is None. If none skip
           if response['registrationCommand'] is not None:

+ 4 - 4
ambari-agent/src/main/python/ambari_agent/LiveStatus.py

@@ -51,16 +51,16 @@ class LiveStatus:
     #TODO: Should also check belonging of server to cluster
     return component['serviceName'] == self.service
 
-  def build(self, forsed_component_status = None):
+  def build(self, forced_component_status = None):
     """
-    If forsed_component_status is explicitly defined, than StatusCheck methods are
+    If forced_component_status is explicitly defined, than StatusCheck methods are
     not used. This feature has been added to support custom (ver 2.0) services.
     """
     global SERVICES, CLIENT_COMPONENTS, COMPONENTS, LIVE_STATUS, DEAD_STATUS
 
     component = {"serviceName" : self.service, "componentName" : self.component}
-    if forsed_component_status: # If already determined
-      status = forsed_component_status  # Nothing to do
+    if forced_component_status: # If already determined
+      status = forced_component_status  # Nothing to do
     elif component in self.CLIENT_COMPONENTS:
       status = self.DEAD_STATUS # CLIENT components can't have status STARTED
     elif component in self.COMPONENTS:

+ 141 - 30
ambari-agent/src/main/python/ambari_agent/RecoveryManager.py

@@ -43,6 +43,7 @@ class RecoveryManager:
   ROLE = "role"
   TASK_ID = "taskId"
   DESIRED_STATE = "desiredState"
+  HAS_STALE_CONFIG = "hasStaleConfigs"
   EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
   ROLE_COMMAND = "roleCommand"
   PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
@@ -51,6 +52,8 @@ class RecoveryManager:
   STARTED = "STARTED"
   INSTALLED = "INSTALLED"
   INIT = "INIT"  # TODO: What is the state when machine is reset
+  COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
+  COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
 
   default_action_counter = {
     "lastAttempt": 0,
@@ -62,6 +65,12 @@ class RecoveryManager:
     "warnedThresholdReached": False
   }
 
+  default_component_status = {
+    "current": "",
+    "desired": "",
+    "stale_config": False
+  }
+
 
   def __init__(self, recovery_enabled=False, auto_start_only=False):
     self.recovery_enabled = recovery_enabled
@@ -78,16 +87,42 @@ class RecoveryManager:
     self.actions = {}
     self.statuses = {}
     self.__status_lock = threading.RLock()
+    self.__command_lock = threading.RLock()
+    self.paused = False
 
     self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
 
     pass
 
 
+  def set_paused(self, paused):
+    if self.paused != paused:
+      logger.debug("RecoveryManager is transitioning from isPaused = " + str(self.paused) + " to " + str(paused))
+    self.paused = paused
+
   def enabled(self):
     return self.recovery_enabled
 
 
+  def update_config_staleness(self, component, is_config_stale):
+    """
+    Updates staleness of config
+    """
+    if component not in self.statuses:
+      self.__status_lock.acquire()
+      try:
+        if component not in self.statuses:
+          self.statuses[component] = copy.deepcopy(self.default_component_status)
+      finally:
+        self.__status_lock.release()
+      pass
+
+    self.statuses[component]["stale_config"] = is_config_stale
+    if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
+            self.statuses[component]["stale_config"] == False:
+      self.remove_command(component)
+    pass
+
   def update_current_status(self, component, state):
     """
     Updates the current status of a host component managed by the agent
@@ -96,15 +131,15 @@ class RecoveryManager:
       self.__status_lock.acquire()
       try:
         if component not in self.statuses:
-          self.statuses[component] = {
-            "current": "",
-            "desired": ""
-          }
+          self.statuses[component] = copy.deepcopy(self.default_component_status)
       finally:
         self.__status_lock.release()
       pass
 
     self.statuses[component]["current"] = state
+    if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
+            self.statuses[component]["stale_config"] == False:
+      self.remove_command(component)
     pass
 
 
@@ -116,15 +151,15 @@ class RecoveryManager:
       self.__status_lock.acquire()
       try:
         if component not in self.statuses:
-          self.statuses[component] = {
-            "current": "",
-            "desired": ""
-          }
+          self.statuses[component] = copy.deepcopy(self.default_component_status)
       finally:
         self.__status_lock.release()
       pass
 
     self.statuses[component]["desired"] = state
+    if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
+            self.statuses[component]["stale_config"] == False:
+      self.remove_command(component)
     pass
 
 
@@ -133,7 +168,7 @@ class RecoveryManager:
     Recovery is allowed for:
     INISTALLED --> STARTED
     INIT --> INSTALLED --> STARTED
-    CLIENTs may be RE-INSTALLED (TODO)
+    RE-INSTALLED (if configs do not match)
     """
     if not self.enabled():
       return False
@@ -141,17 +176,18 @@ class RecoveryManager:
     if component not in self.statuses:
       return False
 
-    status = self.statuses[component]
-    if status["current"] == status["desired"]:
-      return False
+    if self.auto_start_only:
+      status = self.statuses[component]
+      if status["current"] == status["desired"]:
+        return False
+    else:
+      status = self.statuses[component]
+      if status["current"] == status["desired"] and status['stale_config'] == False:
+        return False
 
     if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
       return False
 
-    ### No recovery to INSTALLED or INIT states
-    if status["current"] == self.STARTED:
-      return False
-
     logger.info("%s needs recovery.", component)
     return True
     pass
@@ -213,14 +249,29 @@ class RecoveryManager:
     for component in self.statuses.keys():
       if self.requires_recovery(component) and self.may_execute(component):
         status = copy.deepcopy(self.statuses[component])
-        if status["desired"] == self.STARTED:
-          if status["current"] == self.INSTALLED:
-            command = self.get_start_command(component)
-          elif status["current"] == self.INIT:
-            command = self.get_install_command(component)
-        elif status["desired"] == self.INSTALLED:
-          if status["current"] == self.INIT:
-            command = self.get_install_command(component)
+        command = None
+        if self.auto_start_only:
+          if status["desired"] == self.STARTED:
+            if status["current"] == self.INSTALLED:
+              command = self.get_start_command(component)
+        else:
+          # START, INSTALL, RESTART
+          if status["desired"] != status["current"]:
+            if status["desired"] == self.STARTED:
+              if status["current"] == self.INSTALLED:
+                command = self.get_start_command(component)
+              elif status["current"] == self.INIT:
+                command = self.get_install_command(component)
+            elif status["desired"] == self.INSTALLED:
+              if status["current"] == self.INIT:
+                command = self.get_install_command(component)
+              # else issue a STOP command
+          else:
+            if status["current"] == self.INSTALLED:
+              command = self.get_install_command(component)
+            elif status["current"] == self.STARTED:
+              command = self.get_restart_command(component)
+
         if command:
           self.execute(component)
           commands.append(command)
@@ -417,7 +468,7 @@ class RecoveryManager:
     self.max_lifetime_count = max_lifetime_count
 
     self.allowed_desired_states = [self.STARTED, self.INSTALLED]
-    self.allowed_current_states = [self.INIT, self.INSTALLED]
+    self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED]
 
     if self.auto_start_only:
       self.allowed_desired_states = [self.STARTED]
@@ -481,12 +532,13 @@ class RecoveryManager:
 
       component = command[self.COMPONENT_NAME]
       self.update_desired_status(component, command[self.DESIRED_STATE])
+      self.update_config_staleness(component, command[self.HAS_STALE_CONFIG])
 
       if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
         if self.EXECUTION_COMMAND_DETAILS in command:
           # Store the execution command details
           self.remove_command(component)
-          self.stored_exec_commands[component] = command[self.EXECUTION_COMMAND_DETAILS]
+          self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS])
           logger.debug("Stored command details for " + component)
         else:
           logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
@@ -495,6 +547,10 @@ class RecoveryManager:
 
 
   def get_install_command(self, component):
+    if self.paused:
+      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
+      return None
+
     if self.enabled():
       logger.debug("Using stored INSTALL command for %s", component)
       if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
@@ -504,14 +560,39 @@ class RecoveryManager:
         command[self.TASK_ID] = self.get_unique_task_id()
         return command
       else:
-        logger.info("INSTALL command cannot be computed.")
+        logger.info("INSTALL command cannot be computed as details are not received from Server.")
     else:
       logger.info("Recovery is not enabled. INSTALL command will not be computed.")
     return None
     pass
 
+  def get_restart_command(self, component):
+    if self.paused:
+      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
+      return None
+
+    if self.enabled():
+      logger.debug("Using stored INSTALL command for %s", component)
+      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+        command = copy.deepcopy(self.stored_exec_commands[component])
+        command[self.ROLE_COMMAND] = "CUSTOM_COMMAND"
+        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+        command[self.TASK_ID] = self.get_unique_task_id()
+        command['hostLevelParams']['custom_command'] = 'RESTART'
+        return command
+      else:
+        logger.info("RESTART command cannot be computed as details are not received from Server.")
+    else:
+      logger.info("Recovery is not enabled. RESTART command will not be computed.")
+    return None
+    pass
+
 
   def get_start_command(self, component):
+    if self.paused:
+      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
+      return None
+
     if self.enabled():
       logger.debug("Using stored START command for %s", component)
       if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
@@ -521,7 +602,7 @@ class RecoveryManager:
         command[self.TASK_ID] = self.get_unique_task_id()
         return command
       else:
-        logger.info("START command cannot be computed.")
+        logger.info("START command cannot be computed as details are not received from Server.")
     else:
       logger.info("Recovery is not enabled. START command will not be computed.")
 
@@ -531,6 +612,7 @@ class RecoveryManager:
 
   def command_exists(self, component, command_type):
     if command_type == ActionQueue.EXECUTION_COMMAND:
+      self.remove_stale_command(component)
       if component in self.stored_exec_commands:
         return True
 
@@ -538,13 +620,42 @@ class RecoveryManager:
     pass
 
 
+  def remove_stale_command(self, component):
+    component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
+    if component in self.stored_exec_commands:
+      insert_time = self.stored_exec_commands[component_update_key]
+      age = self._now_() - insert_time
+      if self.COMMAND_REFRESH_DELAY_SEC < age:
+        logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old")
+        self.remove_command(component)
+    pass
+
+
   def remove_command(self, component):
     if component in self.stored_exec_commands:
-      del self.stored_exec_commands[component]
-      return True
+      self.__status_lock.acquire()
+      try:
+        component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
+        del self.stored_exec_commands[component]
+        del self.stored_exec_commands[component_update_key]
+        logger.debug("Removed stored command for component : " + str(component))
+        return True
+      finally:
+        self.__status_lock.release()
     return False
 
 
+  def add_command(self, component, command):
+    self.__status_lock.acquire()
+    try:
+      component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
+      self.stored_exec_commands[component] = command
+      self.stored_exec_commands[component_update_key] = self._now_()
+      logger.debug("Added command for component : " + str(component))
+    finally:
+      self.__status_lock.release()
+
+
   def _read_int_(self, value, default_value=0):
     int_value = default_value
     try:

+ 1 - 1
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -678,9 +678,9 @@ class TestActionQueue(TestCase):
     gpeo_mock.return_value = 1
     config.get_parallel_exec_option = gpeo_mock
     actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
     self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.start()
     time.sleep(1)
     actionQueue.stop()
     actionQueue.join()

+ 9 - 1
ambari-agent/src/test/python/ambari_agent/TestController.py

@@ -675,9 +675,16 @@ class TestController(unittest.TestCase):
     self.controller.recovery_manager.process_execution_commands = process_execution_commands
     process_status_commands = MagicMock(name="process_status_commands")
     self.controller.recovery_manager.process_status_commands = process_status_commands
+    set_paused = MagicMock(name = "set_paused")
+    self.controller.recovery_manager.set_paused = set_paused
 
     self.controller.responseId = 0
-    response = {"responseId":1, "statusCommands": "commands2", "executionCommands" : "commands1", "log":"", "exitstatus":"0"}
+    response = {"responseId":1,
+                "statusCommands": "commands2",
+                "executionCommands" : "commands1",
+                "log":"",
+                "exitstatus":"0",
+                "hasPendingTasks": True}
     sendRequest.return_value = response
 
     def one_heartbeat(*args, **kwargs):
@@ -697,6 +704,7 @@ class TestController(unittest.TestCase):
     self.assertTrue(process_status_commands.called)
     process_execution_commands.assert_called_with("commands1")
     process_status_commands.assert_called_with("commands2")
+    set_paused.assert_called_with(True)
 
     self.controller.heartbeatWithServer()
     sys.stdout = sys.__stdout__

+ 5 - 5
ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py

@@ -165,20 +165,20 @@ class TestLiveStatus(TestCase):
     result = livestatus.build()
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result.has_key('configurationTags'))
-    # Test build status with forsed_component_status
+    # Test build status with forced_component_status
     ## Alive
     livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
-    result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
+    result = livestatus.build(forced_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
     ## Dead
     livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
-    result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS)
+    result = livestatus.build(forced_component_status = LiveStatus.DEAD_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.DEAD_STATUS)
 
     livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {})
-    result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
+    result = livestatus.build(forced_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
 
@@ -197,7 +197,7 @@ class TestLiveStatus(TestCase):
                             'SOME_UNKNOWN_COMPONENT', {}, config, {})
     livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \
                       os.sep + "dummy_files" + os.sep + "dummy_current_stack"
-    result = livestatus.build(forsed_component_status = "STARTED")
+    result = livestatus.build(forced_component_status = "STARTED")
     result_str = pprint.pformat(result)
     self.assertEqual(result_str,
                      "{'clusterName': '',\n "

+ 83 - 3
ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py

@@ -31,10 +31,13 @@ class TestRecoveryManager(TestCase):
     "payloadLevel": "EXECUTION_COMMAND",
     "componentName": "NODEMANAGER",
     "desiredState": "STARTED",
+    "hasStaleConfigs": False,
     "executionCommandDetails": {
       "commandType": "EXECUTION_COMMAND",
       "roleCommand": "INSTALL",
       "role": "NODEMANAGER",
+      "hostLevelParams": {
+        "custom_command":""},
       "configurations": {
         "capacity-scheduler": {
           "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
@@ -227,7 +230,7 @@ class TestRecoveryManager(TestCase):
 
     rm.update_current_status("NODEMANAGER", "STARTED")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
-    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertFalse(rm.requires_recovery("NODEMANAGER"))
@@ -296,14 +299,26 @@ class TestRecoveryManager(TestCase):
     self.assertEqual(None, rm.get_install_command("component2"))
     self.assertEqual(None, rm.get_start_command("component2"))
 
+    rm.store_or_update_command(command1)
+    self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
+    rm.set_paused(True)
+
+    self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+    self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
     pass
 
   @patch.object(RecoveryManager, "_now_")
   def test_get_recovery_commands(self, time_mock):
     time_mock.side_effect = \
-      [1000, 2000, 3000, 4000, 5000, 6000]
+      [1000, 1001, 1002, 1003,
+       1100, 1101, 1102,
+       1200, 1201, 1203,
+       4000, 4001, 4002, 4003,
+       4100, 4101, 4102, 4103,
+       4200, 4201, 4202]
     rm = RecoveryManager(True)
-    rm.update_config(10, 5, 1, 11, True, False)
+    rm.update_config(15, 5, 1, 16, True, False)
 
     command1 = copy.deepcopy(self.command)
 
@@ -319,6 +334,7 @@ class TestRecoveryManager(TestCase):
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "STARTED")
 
+    # Starts at 1100
     commands = rm.get_recovery_commands()
     self.assertEqual(1, len(commands))
     self.assertEqual("INSTALL", commands[0]["roleCommand"])
@@ -326,6 +342,7 @@ class TestRecoveryManager(TestCase):
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
 
+    # Starts at 1200
     commands = rm.get_recovery_commands()
     self.assertEqual(1, len(commands))
     self.assertEqual("INSTALL", commands[0]["roleCommand"])
@@ -336,6 +353,36 @@ class TestRecoveryManager(TestCase):
 
     commands = rm.get_recovery_commands()
     self.assertEqual(0, len(commands))
+
+    rm.update_config(12, 5, 1, 15, True, False)
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+    rm.store_or_update_command(command1)
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+    rm.update_config_staleness("NODEMANAGER", False)
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    commands = rm.get_recovery_commands()
+    self.assertEqual(0, len(commands))
+
+    command_install = copy.deepcopy(self.command)
+    command_install["desiredState"] = "INSTALLED"
+    rm.store_or_update_command(command_install)
+    rm.update_config_staleness("NODEMANAGER", True)
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+    rm.update_current_status("NODEMANAGER", "STARTED")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("CUSTOM_COMMAND", commands[0]["roleCommand"])
+    self.assertEqual("RESTART", commands[0]["hostLevelParams"]["custom_command"])
     pass
 
   @patch.object(RecoveryManager, "update_config")
@@ -427,4 +474,37 @@ class TestRecoveryManager(TestCase):
                                  {"name": "LION", "numAttempts": 4, "limitReached": True},
                                  {"name": "PUMA", "numAttempts": 4, "limitReached": True}
                                ]})
+    pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_command_expiry(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
+
+    rm = RecoveryManager(True)
+    rm.update_config(5, 5, 1, 11, True, False)
+
+    command1 = copy.deepcopy(self.command)
+
+    rm.store_or_update_command(command1)
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("START", commands[0]["roleCommand"])
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("START", commands[0]["roleCommand"])
+
+    #1807 command is stale
+    commands = rm.get_recovery_commands()
+    self.assertEqual(0, len(commands))
+
+    rm.store_or_update_command(command1)
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("START", commands[0]["roleCommand"])
     pass

+ 21 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -222,6 +222,7 @@ class ActionScheduler implements Runnable {
           LOG.debug("There are no stages currently in progress.");
         }
 
+        actionQueue.updateListOfHostsWithPendingTask(null);
         return;
       }
 
@@ -238,12 +239,17 @@ class ActionScheduler implements Runnable {
           LOG.debug("There are no stages currently in progress.");
         }
 
+        actionQueue.updateListOfHostsWithPendingTask(null);
         return;
       }
 
       int i_stage = 0;
 
+      HashSet<String> hostsWithTasks = getListOfHostsWithPendingTask(stages);
+      actionQueue.updateListOfHostsWithPendingTask(hostsWithTasks);
+
       stages = filterParallelPerHostStages(stages);
+      // At this point the stages is a filtered list
 
       boolean exclusiveRequestIsGoing = false;
       // This loop greatly depends on the fact that order of stages in
@@ -400,6 +406,21 @@ class ActionScheduler implements Runnable {
     }
   }
 
+  /**
+   * Returns the list of hosts that have a task assigned
+   *
+   * @param stages
+   *
+   * @return
+   */
+  private HashSet<String> getListOfHostsWithPendingTask(List<Stage> stages) {
+    HashSet<String> hostsWithTasks = new HashSet<String>();
+    for (Stage s : stages) {
+      hostsWithTasks.addAll(s.getHosts());
+    }
+    return hostsWithTasks;
+  }
+
   /**
    * Returns filtered list of stages following the rule:
    * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered

+ 32 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.agent;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
@@ -37,8 +38,12 @@ public class ActionQueue {
 
   private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class);
 
+  private static HashSet<String> EMPTY_HOST_LIST = new HashSet<String>();
+
   final ConcurrentMap<String, Queue<AgentCommand>> hostQueues;
 
+  HashSet<String> hostsWithPendingTask = new HashSet<String>();
+
   public ActionQueue() {
     hostQueues = new ConcurrentHashMap<String, Queue<AgentCommand>>();
   }
@@ -174,4 +179,31 @@ public class ActionQueue {
 
     return l;
   }
+
+  /**
+   * Update the cache of hosts that have pending tasks
+   *
+   * @param hosts
+   */
+  public void updateListOfHostsWithPendingTask(HashSet<String> hosts) {
+    if (hosts != null) {
+      hostsWithPendingTask = hosts;
+    } else if (hostsWithPendingTask.size() > 0) {
+      hostsWithPendingTask = EMPTY_HOST_LIST;
+    }
+  }
+
+  /**
+   * Checks whether host has pending tasks
+   * @param hostName
+   * @return
+   */
+  public boolean hasPendingTask(String hostName) {
+    HashSet<String> copyHostsWithTaskPending = hostsWithPendingTask;
+    if (copyHostsWithTaskPending != null) {
+      return copyHostsWithTaskPending.contains(hostName);
+    }
+
+    return false;
+  }
 }

+ 6 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -981,6 +981,7 @@ public class HeartBeatHandler {
   /**
    * Annotate the response with some housekeeping details.
    * hasMappedComponents - indicates if any components are mapped to the host
+   * hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent yet)
    * @param hostname
    * @param response
    * @throws org.apache.ambari.server.AmbariException
@@ -993,6 +994,11 @@ public class HeartBeatHandler {
         break;
       }
     }
+
+    if(actionQueue.hasPendingTask(hostname)) {
+      LOG.debug("Host " + hostname + " has pending tasks");
+      response.setHasPendingTasks(true);
+    }
   }
 
   /**

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java

@@ -66,6 +66,9 @@ public class HeartBeatResponse {
   @SerializedName("hasMappedComponents")
   private boolean hasMappedComponents = false;
 
+  @SerializedName("hasPendingTasks")
+  private boolean hasPendingTasks = false;
+
   public long getResponseId() {
     return responseId;
   }
@@ -144,6 +147,14 @@ public class HeartBeatResponse {
     this.hasMappedComponents = hasMappedComponents;
   }
 
+  public boolean hasPendingTasks() {
+    return hasPendingTasks;
+  }
+
+  public void setHasPendingTasks(boolean hasPendingTasks) {
+    this.hasPendingTasks = hasPendingTasks;
+  }
+
   public void addExecutionCommand(ExecutionCommand execCmd) {
     executionCommands.add(execCmd);
   }

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java

@@ -304,6 +304,7 @@ public class HeartbeatMonitor implements Runnable {
 
     // If Agent wants the command and the States differ
     statusCmd.setDesiredState(sch.getDesiredState());
+    statusCmd.setHasStaleConfigs(configHelper.isStaleConfigs(sch));
     if (getAgentRequests().shouldSendExecutionDetails(hostname, componentName)) {
       LOG.info(componentName + " is at " + sch.getState() + " adding more payload per agent ask");
       statusCmd.setPayloadLevel(StatusCommand.StatusCommandPayload.EXECUTION_COMMAND);

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java

@@ -62,6 +62,9 @@ public class StatusCommand extends AgentCommand {
   @SerializedName("desiredState")
   private State desiredState;
 
+  @SerializedName("hasStaleConfigs")
+  private Boolean hasStaleConfigs;
+
   @SerializedName("executionCommandDetails")
   private ExecutionCommand executionCommand;
 
@@ -81,6 +84,14 @@ public class StatusCommand extends AgentCommand {
     this.desiredState = desiredState;
   }
 
+  public Boolean getHasStaleConfigs() {
+    return hasStaleConfigs;
+  }
+
+  public void setHasStaleConfigs(Boolean hasStaleConfigs) {
+    this.hasStaleConfigs = hasStaleConfigs;
+  }
+
   public StatusCommandPayload getPayloadLevel() {
     return payloadLevel;
   }

+ 115 - 3
ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java

@@ -18,9 +18,11 @@
 package org.apache.ambari.server.agent;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
@@ -39,7 +41,9 @@ public class TestActionQueue {
     enum OpType {
       ENQUEUE,
       DEQUEUE,
-      DEQUEUEALL
+      DEQUEUEALL,
+      CHECKPENDING,
+      UPDATEHOSTLIST
     }
 
     private volatile boolean shouldRun = true;
@@ -76,8 +80,12 @@ public class TestActionQueue {
         case DEQUEUE:
           dequeueOp();
           break;
-        case DEQUEUEALL:
-          dequeueAllOp();
+          case DEQUEUEALL:
+            dequeueAllOp();
+          case CHECKPENDING:
+            checkPending();
+          case UPDATEHOSTLIST:
+            updateHostList();
           break;
         }
       } catch (Exception ex) {
@@ -86,6 +94,34 @@ public class TestActionQueue {
       }
     }
 
+    private void checkPending() throws InterruptedException {
+      while (shouldRun) {
+        int index = 0;
+        for (String host: hosts) {
+          actionQueue.hasPendingTask(host);
+          opCounts[index]++;
+          index++;
+        }
+        Thread.sleep(1);
+      }
+    }
+
+    private void updateHostList() throws InterruptedException {
+      HashSet<String> hostsWithTasks = new HashSet<String>();
+      while (shouldRun) {
+        for (String host: hosts) {
+          hostsWithTasks.add(host);
+          if (hostsWithTasks.size() % 2 == 0) {
+            actionQueue.updateListOfHostsWithPendingTask(hostsWithTasks);
+          } else {
+            actionQueue.updateListOfHostsWithPendingTask(null);
+          }
+          opCounts[0]++;
+        }
+        Thread.sleep(1);
+      }
+    }
+
     private void enqueueOp() throws InterruptedException {
       while (shouldRun) {
         int index = 0;
@@ -215,6 +251,82 @@ public class TestActionQueue {
     }
   }
 
+  @Test
+  public void testConcurrentHostCheck() throws InterruptedException {
+    ActionQueue aq = new ActionQueue();
+    String[] hosts = new String[] { "h0", "h1", "h2", "h3", "h4" };
+
+    ActionQueueOperation[] hostCheckers = new ActionQueueOperation[threadCount];
+    ActionQueueOperation[] hostUpdaters = new ActionQueueOperation[threadCount];
+
+    List<Thread> producers = new ArrayList<Thread>();
+    List<Thread> consumers = new ArrayList<Thread>();
+
+    for (int i = 0; i < threadCount; i++) {
+      hostCheckers[i] = new ActionQueueOperation(aq, hosts,
+                                                   ActionQueueOperation.OpType.CHECKPENDING);
+      Thread t = new Thread(hostCheckers[i]);
+      consumers.add(t);
+      t.start();
+    }
+
+    for (int i = 0; i < threadCount; i++) {
+      hostUpdaters[i] = new ActionQueueOperation(aq, hosts,
+                                                   ActionQueueOperation.OpType.UPDATEHOSTLIST);
+      Thread t = new Thread(hostUpdaters[i]);
+      producers.add(t);
+      t.start();
+    }
+
+    // Run for some time
+    Thread.sleep(100);
+
+    for (int i = 0; i < threadCount; i++) {
+      hostUpdaters[i].stop();
+    }
+
+    for (Thread producer : producers) {
+      producer.join();
+    }
+
+    for (int i = 0; i < threadCount; i++) {
+      hostCheckers[i].stop();
+    }
+
+    for (Thread consumer : consumers) {
+      consumer.join();
+    }
+
+    int totalChecks = 0;
+    int totalUpdates = 0;
+    for (int i = 0; i < threadCount; i++) {
+      totalChecks += hostUpdaters[i].getOpCounts()[0];
+      for (int h = 0; h<hosts.length; h++) {
+        totalUpdates += hostCheckers[i].getOpCounts()[h];
+      }
+    }
+    LOG.info("Report: totalChecks: " + totalChecks + ", totalUpdates: " + totalUpdates);
+
+    HashSet<String> hostsWithPendingtasks = new HashSet<String>();
+    aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks);
+    hostsWithPendingtasks.add("h1");
+    aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks);
+    assertTrue(aq.hasPendingTask("h1"));
+    assertFalse(aq.hasPendingTask("h2"));
+
+    hostsWithPendingtasks.add("h1");
+    hostsWithPendingtasks.add("h2");
+    aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks);
+    assertTrue(aq.hasPendingTask("h1"));
+    assertTrue(aq.hasPendingTask("h2"));
+
+    hostsWithPendingtasks.clear();
+    hostsWithPendingtasks.add("h2");
+    aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks);
+    assertFalse(aq.hasPendingTask("h1"));
+    assertTrue(aq.hasPendingTask("h2"));
+  }
+
   /**
    * @throws Exception
    */

+ 9 - 1
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java

@@ -245,6 +245,12 @@ public class TestHeartbeatMonitor {
     hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname1).setState(State.INSTALLED);
     hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname2).setState(State.INSTALLED);
 
+    hdfs.getServiceComponent(Role.DATANODE.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED);
+    hdfs.getServiceComponent(Role.NAMENODE.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED);
+    hdfs.getServiceComponent(Role.SECONDARY_NAMENODE.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED);
+    hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED);
+    hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname2).setDesiredState(State.INSTALLED);
+
     ActionQueue aq = new ActionQueue();
     ActionManager am = mock(ActionManager.class);
     HeartbeatMonitor hm = new HeartbeatMonitor(clusters, aq, am,
@@ -283,8 +289,10 @@ public class TestHeartbeatMonitor {
       containsSECONDARY_NAMENODEStatus |= cmd.getComponentName().
         equals("SECONDARY_NAMENODE");
       containsHDFS_CLIENTStatus |= cmd.getComponentName().equals
-        ("HDFS_CLIENT");
+          ("HDFS_CLIENT");
       assertTrue(cmd.getConfigurations().size() > 0);
+      assertEquals(State.INSTALLED, cmd.getDesiredState());
+      assertEquals(false, cmd.getHasStaleConfigs());
     }
     assertTrue(containsDATANODEStatus);
     assertTrue(containsNAMENODEStatus);