ClusterConfiguration.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. import logging
  18. import json
  19. import os
  20. import threading
  21. logger = logging.getLogger(__name__)
  22. class ClusterConfiguration():
  23. """
  24. Maintains an in-memory cache and disk cache of the configurations for
  25. every cluster. This is useful for having quick access to any of the
  26. configuration properties.
  27. """
  28. FILENAME = 'configurations.json'
  29. # constants that define which commands hold configurations that can be
  30. # used to populate this cache
  31. EXECUTION_COMMANDS = 'executionCommands'
  32. ALERT_DEFINITION_COMMANDS = 'alertDefinitionCommands'
  33. COMMANDS_WITH_CONFIGURATIONS = [EXECUTION_COMMANDS, ALERT_DEFINITION_COMMANDS]
  34. def __init__(self, cluster_config_cache_dir):
  35. """
  36. Initializes the configuration cache.
  37. :param cluster_config_cache_dir:
  38. :return:
  39. """
  40. self.cluster_config_cache_dir = cluster_config_cache_dir
  41. # keys are cluster names, values are configurations
  42. self.__configurations = {}
  43. self.__file_lock = threading.RLock()
  44. self.__cache_lock = threading.RLock()
  45. self.__config_json_file = os.path.join(self.cluster_config_cache_dir, self.FILENAME)
  46. # ensure that our cache directory exists
  47. if not os.path.exists(cluster_config_cache_dir):
  48. try:
  49. os.makedirs(cluster_config_cache_dir)
  50. except:
  51. logger.critical("Could not create the cluster configuration cache directory {0}".format(cluster_config_cache_dir))
  52. # if the file exists, then load it
  53. try:
  54. if os.path.isfile(self.__config_json_file):
  55. with open(self.__config_json_file, 'r') as fp:
  56. self.__configurations = json.load(fp)
  57. except Exception, exception:
  58. logger.warning("Unable to load configurations from {0}. This file will be regenerated on registration".format(self.__config_json_file))
  59. def update_configurations_from_heartbeat(self, heartbeat):
  60. """
  61. Updates the in-memory and disk-based cluster configurations based on
  62. the heartbeat. This will only update configurations on the following
  63. types of commands in the heartbeat: execution, and alert definition.
  64. :param new_configurations:
  65. :return:
  66. """
  67. heartbeat_keys = heartbeat.keys()
  68. heartbeat_contains_configurations = False
  69. for commandType in self.COMMANDS_WITH_CONFIGURATIONS:
  70. if commandType in heartbeat_keys:
  71. heartbeat_contains_configurations = True
  72. # if this heartbeat doesn't contain a command with configurations, then
  73. # don't process it
  74. if not heartbeat_contains_configurations:
  75. return
  76. if self.EXECUTION_COMMANDS in heartbeat_keys:
  77. execution_commands = heartbeat[self.EXECUTION_COMMANDS]
  78. for command in execution_commands:
  79. if 'clusterName' in command and 'configurations' in command:
  80. cluster_name = command['clusterName']
  81. configurations = command['configurations']
  82. self._update_configurations(cluster_name, configurations)
  83. return
  84. if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys:
  85. alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS]
  86. for command in alert_definition_commands:
  87. if 'clusterName' in command and 'configurations' in command:
  88. cluster_name = command['clusterName']
  89. configurations = command['configurations']
  90. self._update_configurations(cluster_name, configurations)
  91. return
  92. def _update_configurations(self, cluster_name, configuration):
  93. """
  94. Thread-safe method for writing out the specified cluster configuration
  95. and updating the in-memory representation.
  96. :param cluster_name:
  97. :param configuration:
  98. :return:
  99. """
  100. logger.info("Updating cached configurations for cluster {0}".format(cluster_name))
  101. self.__cache_lock.acquire()
  102. try:
  103. self.__configurations[cluster_name] = configuration
  104. except Exception, exception :
  105. logger.exception("Unable to update configurations for cluster {0}".format(cluster_name))
  106. finally:
  107. self.__cache_lock.release()
  108. self.__file_lock.acquire()
  109. try:
  110. with open(self.__config_json_file, 'w') as f:
  111. json.dump(self.__configurations, f, indent=2)
  112. except Exception, exception :
  113. logger.exception("Unable to update configurations for cluster {0}".format(cluster_name))
  114. finally:
  115. self.__file_lock.release()
  116. def get_configuration_value(self, cluster_name, key):
  117. """
  118. Gets a value from the cluster configuration map for the given cluster and
  119. key. The key is expected to be of the form 'foo-bar/baz' or
  120. 'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping
  121. :param key: a lookup key, like 'foo-bar/baz'
  122. :return: the value, or None if not found
  123. """
  124. self.__cache_lock.acquire()
  125. try:
  126. dictionary = self.__configurations[cluster_name]
  127. for layer_key in key.split('/'):
  128. dictionary = dictionary[layer_key]
  129. return dictionary
  130. except Exception:
  131. logger.debug("Cache miss for configuration property {0} in cluster {1}".format(key, cluster_name))
  132. return None
  133. finally:
  134. self.__cache_lock.release()