Explorar o código

AMBARI-4992. Sometimes cluster installation pauses for few minutes between tasks (dlysnichenko)

Lisnichenko Dmitro %!s(int64=11) %!d(string=hai) anos
pai
achega
4e78cb3410

+ 30 - 6
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -46,6 +46,11 @@ class ActionQueue(threading.Thread):
   # How many actions can be performed in parallel. Feel free to change
   MAX_CONCURRENT_ACTIONS = 5
 
+
+  #How much time(in seconds) we need wait for new incoming execution command before checking
+  #status command queue
+  EXECUTION_COMMAND_WAIT_TIME = 2
+
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
   ROLE_COMMAND_INSTALL = 'INSTALL'
@@ -64,6 +69,7 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
+    self.statusCommandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =
       self.status_update_callback)
     self.config = config
@@ -81,6 +87,17 @@ class ActionQueue(threading.Thread):
   def stopped(self):
     return self._stop.isSet()
 
+  def put_status(self, commands):
+    #Was supposed that we got all set of statuses, we don't need to keep old ones
+    self.statusCommandQueue.queue.clear()
+
+    for command in commands:
+      logger.info("Adding " + command['commandType'] + " for service " + \
+                    command['serviceName'] + " of cluster " + \
+                    command['clusterName'] + " to the queue.")
+      logger.debug(pprint.pformat(command))
+      self.statusCommandQueue.put(command)
+
   def put(self, commands):
     for command in commands:
       logger.info("Adding " + command['commandType'] + " for service " + \
@@ -89,14 +106,21 @@ class ActionQueue(threading.Thread):
       logger.debug(pprint.pformat(command))
       self.commandQueue.put(command)
 
-  def empty(self):
-    return self.commandQueue.empty()
-
-
   def run(self):
     while not self.stopped():
-      command = self.commandQueue.get() # Will block if queue is empty
-      self.process_command(command)
+      while  not self.statusCommandQueue.empty():
+        try:
+          command = self.statusCommandQueue.get(False)
+          self.process_command(command)
+        except (Queue.Empty):
+          pass
+      try:
+        command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+        self.process_command(command)
+      except (Queue.Empty):
+        pass
+
+
 
 
   def process_command(self, command):

+ 10 - 3
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -107,7 +107,7 @@ class Controller(threading.Thread):
         self.isRegistered = True
         if 'statusCommands' in ret.keys():
           logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
-          self.addToQueue(ret['statusCommands'])
+          self.addToStatusQueue(ret['statusCommands'])
           pass
         else:
           self.hasMappedComponents = False
@@ -138,6 +138,13 @@ class Controller(threading.Thread):
       self.actionQueue.put(commands)
     pass
 
+  def addToStatusQueue(self, commands):
+    if not commands:
+      logger.debug("No status commands from the server : " + pprint.pformat(commands))
+    else:
+      self.actionQueue.put_status(commands)
+    pass
+
   # For testing purposes
   DEBUG_HEARTBEAT_RETRIES = 0
   DEBUG_SUCCESSFULL_HEARTBEATS = 0
@@ -190,8 +197,8 @@ class Controller(threading.Thread):
         if 'executionCommands' in response.keys():
           self.addToQueue(response['executionCommands'])
           pass
-        if 'statusCommands' in response.keys() and self.actionQueue.empty():
-          self.addToQueue(response['statusCommands'])
+        if 'statusCommands' in response.keys():
+          self.addToStatusQueue(response['statusCommands'])
           pass
         if "true" == response['restartAgent']:
           logger.error("Got restartAgent command")

+ 27 - 6
ambari-agent/src/test/python/ambari_agent/TestController.py

@@ -78,10 +78,10 @@ class TestController(unittest.TestCase):
     self.assertEqual({"responseId":1}, self.controller.registerWithServer())
 
     self.controller.sendRequest.return_value = '{"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}'
-    self.controller.addToQueue = MagicMock(name="addToQueue")
+    self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue")
     self.controller.isRegistered = False
     self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer())
-    self.controller.addToQueue.assert_called_with("commands")
+    self.controller.addToStatusQueue.assert_called_with("commands")
 
     calls = []
 
@@ -102,7 +102,7 @@ class TestController(unittest.TestCase):
     sys.stdout = sys.__stdout__
 
     self.controller.sendRequest = Controller.Controller.sendRequest
-    self.controller.addToQueue = Controller.Controller.addToQueue
+    self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
 
 
   @patch("pprint.pformat")
@@ -116,6 +116,17 @@ class TestController(unittest.TestCase):
     self.assertTrue(actionQueue.put.called)
 
 
+  @patch("pprint.pformat")
+  def test_addToStatusQueue(self, pformatMock):
+
+    actionQueue = MagicMock()
+    self.controller.actionQueue = actionQueue
+    self.controller.addToStatusQueue(None)
+    self.assertFalse(actionQueue.put_status.called)
+    self.controller.addToStatusQueue("cmd")
+    self.assertTrue(actionQueue.put_status.called)
+
+
   @patch("urllib2.build_opener")
   @patch("urllib2.install_opener")
   @patch.object(Controller, "ActionQueue")
@@ -369,17 +380,25 @@ class TestController(unittest.TestCase):
 
     restartAgent.assert_called_once_with()
 
-    # executionCommands, statusCommands
+    # executionCommands
     self.controller.responseId = 1
     addToQueue = MagicMock(name="addToQueue")
     self.controller.addToQueue = addToQueue
     response["executionCommands"] = "executionCommands"
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    self.controller.heartbeatWithServer()
+
+    addToQueue.assert_has_calls([call("executionCommands")])
+
+    # statusCommands
+    self.controller.responseId = 1
+    addToStatusQueue = MagicMock(name="addToStatusQueue")
+    self.controller.addToStatusQueue = addToStatusQueue
     response["statusCommands"] = "statusCommands"
     self.controller.DEBUG_STOP_HEARTBEATING = False
     self.controller.heartbeatWithServer()
 
-    addToQueue.assert_has_calls([call("executionCommands"),
-                                 call("statusCommands")])
+    addToStatusQueue.assert_has_calls([call("statusCommands")])
 
     # restartAgent command
     self.controller.responseId = 1
@@ -404,6 +423,8 @@ class TestController(unittest.TestCase):
     sys.stdout = sys.__stdout__
     self.controller.sendRequest = Controller.Controller.sendRequest
     self.controller.sendRequest = Controller.Controller.addToQueue
+    self.controller.sendRequest = Controller.Controller.addToStatusQueue
+
 
   @patch("pprint.pformat")
   @patch("time.sleep")