|
|
@@ -21,12 +21,14 @@ limitations under the License.
|
|
|
|
|
|
from ambari_agent import hostname
|
|
|
from ambari_agent.ClusterCache import ClusterCache
|
|
|
-from ambari_agent.Utils import ImmutableDictionary
|
|
|
+from ambari_agent.Utils import ImmutableDictionary, synchronized
|
|
|
|
|
|
from collections import defaultdict
|
|
|
+import threading
|
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
+topology_update_lock = threading.RLock()
|
|
|
|
|
|
class ClusterTopologyCache(ClusterCache):
|
|
|
"""
|
|
|
@@ -53,6 +55,7 @@ class ClusterTopologyCache(ClusterCache):
|
|
|
def get_cache_name(self):
|
|
|
return 'topology'
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def on_cache_update(self):
|
|
|
self.cluster_host_info = None
|
|
|
|
|
|
@@ -95,6 +98,7 @@ class ClusterTopologyCache(ClusterCache):
|
|
|
self.hosts_to_id = ImmutableDictionary(hosts_to_id)
|
|
|
self.components_by_key = ImmutableDictionary(components_by_key)
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_cluster_host_info(self, cluster_id):
|
|
|
"""
|
|
|
Get dictionary used in commands as clusterHostInfo
|
|
|
@@ -124,6 +128,7 @@ class ClusterTopologyCache(ClusterCache):
|
|
|
self.cluster_host_info = cluster_host_info
|
|
|
return cluster_host_info
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_component_info_by_key(self, cluster_id, service_name, component_name):
|
|
|
"""
|
|
|
Find component by service_name and component_name in list of component dictionaries.
|
|
|
@@ -135,12 +140,15 @@ class ClusterTopologyCache(ClusterCache):
|
|
|
except KeyError:
|
|
|
return None
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_cluster_local_components(self, cluster_id):
|
|
|
return self.cluster_local_components[cluster_id]
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_cluster_component_version_map(self, cluster_id):
|
|
|
return self.component_version_map[cluster_id]
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_host_info_by_id(self, cluster_id, host_id):
|
|
|
"""
|
|
|
Find host by id in list of host dictionaries.
|
|
|
@@ -150,10 +158,12 @@ class ClusterTopologyCache(ClusterCache):
|
|
|
except KeyError:
|
|
|
return None
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_current_host_info(self, cluster_id):
|
|
|
current_host_id = self.current_host_ids_to_cluster[cluster_id]
|
|
|
return self.get_host_info_by_id(cluster_id, current_host_id)
|
|
|
|
|
|
+ @synchronized(topology_update_lock)
|
|
|
def get_current_host_id(self, cluster_id):
|
|
|
try:
|
|
|
return self.current_host_ids_to_cluster[cluster_id]
|