浏览代码

AMBARI-11539. Auto recovery should recover components to INSTALLED if desired state is INSTALLED but the component instance is running (smohanty)

Sumit Mohanty 10 年之前
父节点
当前提交
3c563d3b28

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -422,7 +422,7 @@ class ActionQueue(threading.Thread):
       else:
       else:
         component_status = LiveStatus.DEAD_STATUS
         component_status = LiveStatus.DEAD_STATUS
         self.controller.recovery_manager.update_current_status(component, component_status)
         self.controller.recovery_manager.update_current_status(component, component_status)
-        request_execution_cmd = self.controller.recovery_manager.requires_recovery(component)
+      request_execution_cmd = self.controller.recovery_manager.requires_recovery(component)
 
 
       if component_status_result.has_key('structuredOut'):
       if component_status_result.has_key('structuredOut'):
         component_extra = component_status_result['structuredOut']
         component_extra = component_status_result['structuredOut']

+ 25 - 2
ambari-agent/src/main/python/ambari_agent/RecoveryManager.py

@@ -194,6 +194,8 @@ class RecoveryManager:
       status = self.statuses[component]
       status = self.statuses[component]
       if status["current"] == status["desired"]:
       if status["current"] == status["desired"]:
         return False
         return False
+      if status["desired"] not in self.allowed_desired_states:
+        return False
     else:
     else:
       status = self.statuses[component]
       status = self.statuses[component]
       if status["current"] == status["desired"] and status['stale_config'] == False:
       if status["current"] == status["desired"] and status['stale_config'] == False:
@@ -279,7 +281,8 @@ class RecoveryManager:
             elif status["desired"] == self.INSTALLED:
             elif status["desired"] == self.INSTALLED:
               if status["current"] == self.INIT:
               if status["current"] == self.INIT:
                 command = self.get_install_command(component)
                 command = self.get_install_command(component)
-              # else issue a STOP command
+              elif status["current"] == self.STARTED:
+                command = self.get_stop_command(component)
           else:
           else:
             if status["current"] == self.INSTALLED:
             if status["current"] == self.INSTALLED:
               command = self.get_install_command(component)
               command = self.get_install_command(component)
@@ -523,7 +526,7 @@ class RecoveryManager:
       for command in commands:
       for command in commands:
         if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
         if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
           if self.ROLE in command:
           if self.ROLE in command:
-            if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+            if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP):
               self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
               self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
             if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
             if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
               self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
               self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
@@ -580,6 +583,26 @@ class RecoveryManager:
     return None
     return None
     pass
     pass
 
 
+  def get_stop_command(self, component):
+    if self.paused:
+      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
+      return None
+
+    if self.enabled():
+      logger.debug("Using stored STOP command for %s", component)
+      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+        command = copy.deepcopy(self.stored_exec_commands[component])
+        command[self.ROLE_COMMAND] = "STOP"
+        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+        command[self.TASK_ID] = self.get_unique_task_id()
+        return command
+      else:
+        logger.info("STOP command cannot be computed as details are not received from Server.")
+    else:
+      logger.info("Recovery is not enabled. STOP command will not be computed.")
+    return None
+    pass
+
   def get_restart_command(self, component):
   def get_restart_command(self, component):
     if self.paused:
     if self.paused:
       logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
       logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")

+ 8 - 1
ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py

@@ -316,7 +316,8 @@ class TestRecoveryManager(TestCase):
        1200, 1201, 1203,
        1200, 1201, 1203,
        4000, 4001, 4002, 4003,
        4000, 4001, 4002, 4003,
        4100, 4101, 4102, 4103,
        4100, 4101, 4102, 4103,
-       4200, 4201, 4202]
+       4200, 4201, 4202,
+       4300, 4301, 4302]
     rm = RecoveryManager(True)
     rm = RecoveryManager(True)
     rm.update_config(15, 5, 1, 16, True, False)
     rm.update_config(15, 5, 1, 16, True, False)
 
 
@@ -383,6 +384,12 @@ class TestRecoveryManager(TestCase):
     self.assertEqual(1, len(commands))
     self.assertEqual(1, len(commands))
     self.assertEqual("CUSTOM_COMMAND", commands[0]["roleCommand"])
     self.assertEqual("CUSTOM_COMMAND", commands[0]["roleCommand"])
     self.assertEqual("RESTART", commands[0]["hostLevelParams"]["custom_command"])
     self.assertEqual("RESTART", commands[0]["hostLevelParams"]["custom_command"])
+
+    rm.update_current_status("NODEMANAGER", "STARTED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("STOP", commands[0]["roleCommand"])
     pass
     pass
 
 
   @patch.object(RecoveryManager, "update_config")
   @patch.object(RecoveryManager, "update_config")