Browse Source

Branch 2.7 cherry picked commits (#2396)

* [AMBARI-24653] YARN Timeline server v2 related system tests fail. (#2367)

* [AMBARI-24653] YARN Timeline server v2 related system tests fail.

* [AMBARI-24653] YARN Timeline server v2 related system tests fail.

* AMBARI-24661. While registering agent can miss updates from server (aonishuk)

* [AMBARI-24679] Fix race condition in agent during registration and topology updates. (#2368)
avijayanhwx 7 years ago
parent
commit
0e7745639e

+ 15 - 8
ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py

@@ -134,15 +134,22 @@ class HeartbeatThread(threading.Thread):
     self.handle_registration_response(response)
 
     for endpoint, cache, listener, subscribe_to in self.post_registration_requests:
-      # should not hang forever on these requests
-      response = self.blocking_request({'hash': cache.hash}, endpoint, log_handler=listener.get_log_message)
       try:
-        listener.on_event({}, response)
-      except:
-        logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
-        raise
-
-      self.subscribe_to_topics([subscribe_to])
+        listener.enabled = False
+        self.subscribe_to_topics([subscribe_to])
+        response = self.blocking_request({'hash': cache.hash}, endpoint, log_handler=listener.get_log_message)
+
+        try:
+          listener.on_event({}, response)
+        except:
+          logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
+          raise
+      finally:
+        with listener.event_queue_lock:
+          logger.info("Enabling events for listener {0}".format(listener))
+          listener.enabled = True
+          # Process queued messages if any
+          listener.dequeue_unprocessed_events()
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 

+ 39 - 1
ambari-agent/src/main/python/ambari_agent/listeners/__init__.py

@@ -25,15 +25,40 @@ import copy
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 from ambari_agent import Constants
 from ambari_agent.Utils import Utils
+from Queue import Queue
+import threading
 
 logger = logging.getLogger(__name__)
 
 class EventListener(ambari_stomp.ConnectionListener):
+
+  unprocessed_messages_queue = Queue(100)
+
   """
   Base abstract class for event listeners on specific topics.
   """
   def __init__(self, initializer_module):
     self.initializer_module = initializer_module
+    self.enabled = True
+    self.event_queue_lock = threading.RLock()
+
+  def dequeue_unprocessed_events(self):
+    while not self.unprocessed_messages_queue.empty():
+      payload = self.unprocessed_messages_queue.get_nowait()
+      if payload:
+        logger.info("Processing event from unprocessed queue {0} {1}".format(payload[0], payload[1]))
+        destination = payload[0]
+        headers = payload[1]
+        message_json = payload[2]
+        message = payload[3]
+        try:
+          self.on_event(headers, message_json)
+        except Exception as ex:
+          logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
+          self.report_status_to_sender(headers, message, ex)
+        else:
+          self.report_status_to_sender(headers, message)
+
 
   def on_message(self, headers, message):
     """
@@ -55,10 +80,23 @@ class EventListener(ambari_stomp.ConnectionListener):
         return
 
       logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, copy.deepcopy(message_json))))
+
+      if not self.enabled:
+        with self.event_queue_lock:
+          if not self.enabled:
+            logger.info("Queuing event as unprocessed {0} since event "
+                        "listener is disabled".format(destination))
+            try:
+              self.unprocessed_messages_queue.put_nowait((destination, headers, message_json, message))
+            except Exception as ex:
+              logger.warning("Cannot queue any more unprocessed events since "
+                           "queue is full! {0} {1}".format(destination, message))
+            return
+
       try:
         self.on_event(headers, message_json)
       except Exception as ex:
-        logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message))
+        logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
         self.report_status_to_sender(headers, message, ex)
       else:
         self.report_status_to_sender(headers, message)

+ 1 - 0
ambari-server/src/main/resources/scripts/Ambaripreupload.py

@@ -264,6 +264,7 @@ with Environment() as env:
     params.HdfsResource(format('{hdfs_path_prefix}/mapred'), owner='mapred', type='directory', action=['create_on_execute'])
     params.HdfsResource(format('{hdfs_path_prefix}/mapred/system'), owner='hdfs', type='directory', action=['create_on_execute'])
     params.HdfsResource(format('{hdfs_path_prefix}/mr-history/done'), change_permissions_for_parents=True, owner='mapred', group='hadoop', type='directory', action=['create_on_execute'], mode=0777)
+    params.HdfsResource(format('{hdfs_path_prefix}/user/yarn-ats'), owner='yarn-ats', type='directory', action=['create_on_execute'], mode=0755)
     params.HdfsResource(format('{hdfs_path_prefix}/atshistory/done'), owner='yarn', group='hadoop', type='directory', action=['create_on_execute'], mode=0700)
     params.HdfsResource(format('{hdfs_path_prefix}/atshistory/active'), owner='yarn', group='hadoop', type='directory', action=['create_on_execute'], mode=01777)
     params.HdfsResource(format('{hdfs_path_prefix}/ams/hbase'), owner='ams', type='directory', action=['create_on_execute'], mode=0775)