|
|
@@ -1,5 +1,3 @@
|
|
|
-#!/usr/bin/env python
|
|
|
-
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
@@ -15,31 +13,30 @@
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
-import json
|
|
|
import logging
|
|
|
import copy
|
|
|
-import os
|
|
|
import time
|
|
|
import threading
|
|
|
import pprint
|
|
|
|
|
|
from ambari_agent.ActionQueue import ActionQueue
|
|
|
from ambari_agent.LiveStatus import LiveStatus
|
|
|
-
|
|
|
+from ambari_agent.models.commands import CommandStatus, RoleCommand, CustomCommand, AgentCommand
|
|
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
-"""
|
|
|
-RecoveryManager has the following capabilities:
|
|
|
-* Store data needed for execution commands extracted from STATUS command
|
|
|
-* Generate INSTALL command
|
|
|
-* Generate START command
|
|
|
-"""
|
|
|
-
|
|
|
|
|
|
class RecoveryManager:
|
|
|
+ """
|
|
|
+ RecoveryManager has the following capabilities:
|
|
|
+ * Store data needed for execution commands extracted from STATUS command
|
|
|
+ * Generate INSTALL command
|
|
|
+ * Generate START command
|
|
|
+ """
|
|
|
+ BLUEPRINT_STATE_IN_PROGRESS = 'IN_PROGRESS'
|
|
|
COMMAND_TYPE = "commandType"
|
|
|
PAYLOAD_LEVEL = "payloadLevel"
|
|
|
+ SERVICE_NAME = "serviceName"
|
|
|
COMPONENT_NAME = "componentName"
|
|
|
ROLE = "role"
|
|
|
TASK_ID = "taskId"
|
|
|
@@ -57,7 +54,7 @@ class RecoveryManager:
|
|
|
INIT = "INIT" # TODO: What is the state when machine is reset
|
|
|
INSTALL_FAILED = "INSTALL_FAILED"
|
|
|
COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
|
|
|
- COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
|
|
|
+ COMMAND_REFRESH_DELAY_SEC = 600
|
|
|
|
|
|
FILENAME = "recovery.json"
|
|
|
|
|
|
@@ -77,13 +74,15 @@ class RecoveryManager:
|
|
|
"stale_config": False
|
|
|
}
|
|
|
|
|
|
- def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
|
|
|
+ def __init__(self, initializer_module, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
|
|
|
self.recovery_enabled = recovery_enabled
|
|
|
self.auto_start_only = auto_start_only
|
|
|
self.auto_install_start = auto_install_start
|
|
|
self.max_count = 6
|
|
|
self.window_in_min = 60
|
|
|
self.retry_gap = 5
|
|
|
+ self.window_in_sec = self.window_in_min * 60
|
|
|
+ self.retry_gap_in_sec = self.retry_gap * 60
|
|
|
self.max_lifetime_count = 12
|
|
|
|
|
|
self.id = int(time.time())
|
|
|
@@ -91,36 +90,34 @@ class RecoveryManager:
|
|
|
self.allowed_current_states = [self.INIT, self.INSTALLED]
|
|
|
self.enabled_components = []
|
|
|
self.statuses = {}
|
|
|
+ self.__component_to_service_map = {} # component => service map TODO: fix it later(hack here)
|
|
|
self.__status_lock = threading.RLock()
|
|
|
self.__command_lock = threading.RLock()
|
|
|
self.__active_command_lock = threading.RLock()
|
|
|
self.__cache_lock = threading.RLock()
|
|
|
self.active_command_count = 0
|
|
|
self.cluster_id = None
|
|
|
-
|
|
|
- if not os.path.exists(cache_dir):
|
|
|
- try:
|
|
|
- os.makedirs(cache_dir)
|
|
|
- except:
|
|
|
- logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir))
|
|
|
-
|
|
|
- self.__actions_json_file = os.path.join(cache_dir, self.FILENAME)
|
|
|
+ self.initializer_module = initializer_module
|
|
|
+ self.host_level_params_cache = initializer_module.host_level_params_cache
|
|
|
|
|
|
self.actions = {}
|
|
|
-
|
|
|
- self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "")
|
|
|
-
|
|
|
- pass
|
|
|
+ self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start)
|
|
|
|
|
|
def on_execution_command_start(self):
|
|
|
with self.__active_command_lock:
|
|
|
self.active_command_count += 1
|
|
|
- pass
|
|
|
|
|
|
def on_execution_command_finish(self):
|
|
|
with self.__active_command_lock:
|
|
|
self.active_command_count -= 1
|
|
|
- pass
|
|
|
+
|
|
|
+ def is_blueprint_provisioning_for_component(self, component_name):
|
|
|
+ try:
|
|
|
+ blueprint_state = self.host_level_params_cache[self.cluster_id]['blueprint_provisioning_state'][component_name]
|
|
|
+ except KeyError:
|
|
|
+ blueprint_state = 'NONE'
|
|
|
+
|
|
|
+ return blueprint_state == RecoveryManager.BLUEPRINT_STATE_IN_PROGRESS
|
|
|
|
|
|
def has_active_command(self):
|
|
|
return self.active_command_count > 0
|
|
|
@@ -131,12 +128,10 @@ class RecoveryManager:
|
|
|
def get_current_status(self, component):
|
|
|
if component in self.statuses:
|
|
|
return self.statuses[component]["current"]
|
|
|
- pass
|
|
|
|
|
|
def get_desired_status(self, component):
|
|
|
if component in self.statuses:
|
|
|
return self.statuses[component]["desired"]
|
|
|
- pass
|
|
|
|
|
|
def update_config_staleness(self, component, is_config_stale):
|
|
|
"""
|
|
|
@@ -154,17 +149,10 @@ class RecoveryManager:
|
|
|
pass
|
|
|
|
|
|
self.statuses[component]["stale_config"] = is_config_stale
|
|
|
- pass
|
|
|
|
|
|
def handle_status_change(self, component, component_status):
|
|
|
- if not self.enabled() or not self.configured_for_recovery(component):
|
|
|
- return
|
|
|
-
|
|
|
- if component_status == LiveStatus.LIVE_STATUS:
|
|
|
- self.update_current_status(component, component_status)
|
|
|
- else:
|
|
|
- if (self.get_current_status(component) != self.INSTALL_FAILED):
|
|
|
- self.update_current_status(component, component_status)
|
|
|
+ if component_status == LiveStatus.LIVE_STATUS or self.get_current_status(component) != self.INSTALL_FAILED:
|
|
|
+ self.update_current_status(component, component_status)
|
|
|
|
|
|
def update_current_status(self, component, state):
|
|
|
"""
|
|
|
@@ -184,9 +172,8 @@ class RecoveryManager:
|
|
|
|
|
|
if self.statuses[component]["current"] != state:
|
|
|
logger.info("current status is set to %s for %s", state, component)
|
|
|
- self.statuses[component]["current"] = state
|
|
|
- pass
|
|
|
|
|
|
+ self.statuses[component]["current"] = state
|
|
|
|
|
|
def update_desired_status(self, component, state):
|
|
|
"""
|
|
|
@@ -202,21 +189,16 @@ class RecoveryManager:
|
|
|
logger.info("New status, desired status is set to %s for %s", self.statuses[component]["desired"], component)
|
|
|
finally:
|
|
|
self.__status_lock.release()
|
|
|
- pass
|
|
|
|
|
|
if self.statuses[component]["desired"] != state:
|
|
|
logger.info("desired status is set to %s for %s", state, component)
|
|
|
self.statuses[component]["desired"] = state
|
|
|
- pass
|
|
|
|
|
|
- """
|
|
|
- Whether specific components are enabled for recovery.
|
|
|
- """
|
|
|
def configured_for_recovery(self, component):
|
|
|
- if len(self.enabled_components) > 0 and component in self.enabled_components:
|
|
|
- return True
|
|
|
-
|
|
|
- return False
|
|
|
+ """
|
|
|
+ Whether specific components are enabled for recovery.
|
|
|
+ """
|
|
|
+ return len(self.enabled_components) > 0 and component in self.enabled_components
|
|
|
|
|
|
def requires_recovery(self, component):
|
|
|
"""
|
|
|
@@ -225,23 +207,15 @@ class RecoveryManager:
|
|
|
INIT --> INSTALLED --> STARTED
|
|
|
RE-INSTALLED (if configs do not match)
|
|
|
"""
|
|
|
- if not self.enabled():
|
|
|
- return False
|
|
|
-
|
|
|
- if not self.configured_for_recovery(component):
|
|
|
- return False
|
|
|
-
|
|
|
- if component not in self.statuses:
|
|
|
+ if not self.enabled() or not self.configured_for_recovery(component) or component not in self.statuses:
|
|
|
return False
|
|
|
|
|
|
status = self.statuses[component]
|
|
|
if self.auto_start_only or self.auto_install_start:
|
|
|
- if status["current"] == status["desired"]:
|
|
|
- return False
|
|
|
- if status["desired"] not in self.allowed_desired_states:
|
|
|
+ if status["current"] == status["desired"] or status["desired"] not in self.allowed_desired_states:
|
|
|
return False
|
|
|
else:
|
|
|
- if status["current"] == status["desired"] and status['stale_config'] == False:
|
|
|
+ if status["current"] == status["desired"] and status['stale_config'] is False:
|
|
|
return False
|
|
|
|
|
|
if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
|
|
|
@@ -249,9 +223,6 @@ class RecoveryManager:
|
|
|
|
|
|
logger.info("%s needs recovery, desired = %s, and current = %s.", component, status["desired"], status["current"])
|
|
|
return True
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
|
|
|
def get_recovery_status(self):
|
|
|
"""
|
|
|
@@ -268,8 +239,7 @@ class RecoveryManager:
|
|
|
]
|
|
|
}
|
|
|
"""
|
|
|
- report = {}
|
|
|
- report["summary"] = "DISABLED"
|
|
|
+ report = {"summary": "DISABLED"}
|
|
|
if self.enabled():
|
|
|
report["summary"] = "RECOVERABLE"
|
|
|
num_limits_reached = 0
|
|
|
@@ -279,24 +249,23 @@ class RecoveryManager:
|
|
|
try:
|
|
|
for component in self.actions.keys():
|
|
|
action = self.actions[component]
|
|
|
- recovery_state = {}
|
|
|
- recovery_state["name"] = component
|
|
|
- recovery_state["numAttempts"] = action["lifetimeCount"]
|
|
|
- recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
|
|
|
+ recovery_state = {
|
|
|
+ "name": component,
|
|
|
+ "numAttempts": action["lifetimeCount"],
|
|
|
+ "limitReached": self.max_lifetime_count <= action["lifetimeCount"]
|
|
|
+ }
|
|
|
recovery_states.append(recovery_state)
|
|
|
- if recovery_state["limitReached"] == True:
|
|
|
+ if recovery_state["limitReached"] is True:
|
|
|
num_limits_reached += 1
|
|
|
- pass
|
|
|
finally:
|
|
|
self.__status_lock.release()
|
|
|
|
|
|
- if num_limits_reached > 0:
|
|
|
+ if num_limits_reached > 0 and num_limits_reached == len(recovery_states):
|
|
|
+ report["summary"] = "UNRECOVERABLE"
|
|
|
+ elif num_limits_reached > 0:
|
|
|
report["summary"] = "PARTIALLY_RECOVERABLE"
|
|
|
- if num_limits_reached == len(recovery_states):
|
|
|
- report["summary"] = "UNRECOVERABLE"
|
|
|
|
|
|
return report
|
|
|
- pass
|
|
|
|
|
|
def get_recovery_commands(self):
|
|
|
"""
|
|
|
@@ -308,39 +277,34 @@ class RecoveryManager:
|
|
|
"""
|
|
|
commands = []
|
|
|
for component in self.statuses.keys():
|
|
|
- if self.requires_recovery(component) and self.may_execute(component):
|
|
|
+ if self.configured_for_recovery(component) and self.requires_recovery(component) and self.may_execute(component):
|
|
|
status = copy.deepcopy(self.statuses[component])
|
|
|
command = None
|
|
|
if self.auto_start_only:
|
|
|
- if status["desired"] == self.STARTED:
|
|
|
- if status["current"] == self.INSTALLED:
|
|
|
- command = self.get_start_command(component)
|
|
|
+ if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
|
|
|
+ command = self.get_start_command(component)
|
|
|
elif self.auto_install_start:
|
|
|
- if status["desired"] == self.STARTED:
|
|
|
- if status["current"] == self.INSTALLED:
|
|
|
- command = self.get_start_command(component)
|
|
|
- elif status["current"] == self.INSTALL_FAILED:
|
|
|
- command = self.get_install_command(component)
|
|
|
- elif status["desired"] == self.INSTALLED:
|
|
|
- if status["current"] == self.INSTALL_FAILED:
|
|
|
+ if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
|
|
|
+ command = self.get_start_command(component)
|
|
|
+ elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED:
|
|
|
+ command = self.get_install_command(component)
|
|
|
+ elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED:
|
|
|
command = self.get_install_command(component)
|
|
|
else:
|
|
|
# START, INSTALL, RESTART
|
|
|
if status["desired"] != status["current"]:
|
|
|
- if status["desired"] == self.STARTED:
|
|
|
- if status["current"] == self.INSTALLED:
|
|
|
- command = self.get_start_command(component)
|
|
|
- elif status["current"] == self.INIT:
|
|
|
- command = self.get_install_command(component)
|
|
|
- elif status["current"] == self.INSTALL_FAILED:
|
|
|
- command = self.get_install_command(component)
|
|
|
- elif status["desired"] == self.INSTALLED:
|
|
|
- if status["current"] == self.INIT:
|
|
|
- command = self.get_install_command(component)
|
|
|
- elif status["current"] == self.INSTALL_FAILED:
|
|
|
- command = self.get_install_command(component)
|
|
|
- elif status["current"] == self.STARTED:
|
|
|
- command = self.get_stop_command(component)
|
|
|
+ if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
|
|
|
+ command = self.get_start_command(component)
|
|
|
+ elif status["desired"] == self.STARTED and status["current"] == self.INIT:
|
|
|
+ command = self.get_install_command(component)
|
|
|
+ elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED:
|
|
|
+ command = self.get_install_command(component)
|
|
|
+ elif status["desired"] == self.INSTALLED and status["current"] == self.INIT:
|
|
|
+ command = self.get_install_command(component)
|
|
|
+ elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED:
|
|
|
+ command = self.get_install_command(component)
|
|
|
+ elif status["desired"] == self.INSTALLED and status["current"] == self.STARTED:
|
|
|
+ command = self.get_stop_command(component)
|
|
|
else:
|
|
|
if status["current"] == self.INSTALLED:
|
|
|
command = self.get_install_command(component)
|
|
|
@@ -349,11 +313,10 @@ class RecoveryManager:
|
|
|
|
|
|
if command:
|
|
|
self.execute(component)
|
|
|
- logger.info("Created recovery command %s for component %s",
|
|
|
- command[self.ROLE_COMMAND], command[self.ROLE])
|
|
|
+ logger.info("Created recovery command %s for component %s", command[self.ROLE_COMMAND], command[self.ROLE])
|
|
|
commands.append(command)
|
|
|
- return commands
|
|
|
|
|
|
+ return commands
|
|
|
|
|
|
def may_execute(self, action):
|
|
|
"""
|
|
|
@@ -369,8 +332,6 @@ class RecoveryManager:
|
|
|
finally:
|
|
|
self.__status_lock.release()
|
|
|
return self._execute_action_chk_only(action)
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
def execute(self, action):
|
|
|
"""
|
|
|
@@ -386,8 +347,6 @@ class RecoveryManager:
|
|
|
finally:
|
|
|
self.__status_lock.release()
|
|
|
return self._execute_action_(action)
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
def _execute_action_(self, action_name):
|
|
|
"""
|
|
|
@@ -398,7 +357,7 @@ class RecoveryManager:
|
|
|
executed = False
|
|
|
seconds_since_last_attempt = now - action_counter["lastAttempt"]
|
|
|
if action_counter["lifetimeCount"] < self.max_lifetime_count:
|
|
|
- #reset if window_in_sec seconds passed since last attempt
|
|
|
+ # reset if window_in_sec seconds passed since last attempt
|
|
|
if seconds_since_last_attempt > self.window_in_sec:
|
|
|
action_counter["count"] = 0
|
|
|
action_counter["lastReset"] = now
|
|
|
@@ -406,7 +365,7 @@ class RecoveryManager:
|
|
|
if action_counter["count"] < self.max_count:
|
|
|
if seconds_since_last_attempt > self.retry_gap_in_sec:
|
|
|
action_counter["count"] += 1
|
|
|
- action_counter["lifetimeCount"] +=1
|
|
|
+ action_counter["lifetimeCount"] += 1
|
|
|
if self.retry_gap > 0:
|
|
|
action_counter["lastAttempt"] = now
|
|
|
action_counter["warnedLastAttempt"] = False
|
|
|
@@ -414,28 +373,27 @@ class RecoveryManager:
|
|
|
action_counter["lastReset"] = now
|
|
|
executed = True
|
|
|
else:
|
|
|
- if action_counter["warnedLastAttempt"] == False:
|
|
|
+ if action_counter["warnedLastAttempt"] is False:
|
|
|
action_counter["warnedLastAttempt"] = True
|
|
|
logger.warn(
|
|
|
"%s seconds has not passed since last occurrence %s seconds back for %s. " +
|
|
|
"Will silently skip execution without warning till retry gap is passed",
|
|
|
self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
|
|
|
else:
|
|
|
- logger.debug(
|
|
|
- "%s seconds has not passed since last occurrence %s seconds back for %s",
|
|
|
- self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
|
|
|
+ logger.debug("%s seconds has not passed since last occurrence %s seconds back for %s",
|
|
|
+ self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
|
|
|
else:
|
|
|
sec_since_last_reset = now - action_counter["lastReset"]
|
|
|
if sec_since_last_reset > self.window_in_sec:
|
|
|
action_counter["count"] = 1
|
|
|
- action_counter["lifetimeCount"] +=1
|
|
|
+ action_counter["lifetimeCount"] += 1
|
|
|
if self.retry_gap > 0:
|
|
|
action_counter["lastAttempt"] = now
|
|
|
action_counter["lastReset"] = now
|
|
|
action_counter["warnedLastReset"] = False
|
|
|
executed = True
|
|
|
else:
|
|
|
- if action_counter["warnedLastReset"] == False:
|
|
|
+ if action_counter["warnedLastReset"] is False:
|
|
|
action_counter["warnedLastReset"] = True
|
|
|
logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
|
|
|
"Will silently skip execution without warning till window is reset",
|
|
|
@@ -444,7 +402,7 @@ class RecoveryManager:
|
|
|
logger.debug("%s occurrences in %s minutes reached the limit for %s",
|
|
|
action_counter["count"], self.window_in_min, action_name)
|
|
|
else:
|
|
|
- if action_counter["warnedThresholdReached"] == False:
|
|
|
+ if action_counter["warnedThresholdReached"] is False:
|
|
|
action_counter["warnedThresholdReached"] = True
|
|
|
logger.warn("%s occurrences in agent life time reached the limit for %s. " +
|
|
|
"Will silently skip execution without warning till window is reset",
|
|
|
@@ -452,47 +410,7 @@ class RecoveryManager:
|
|
|
else:
|
|
|
logger.error("%s occurrences in agent life time reached the limit for %s",
|
|
|
action_counter["lifetimeCount"], action_name)
|
|
|
- self._dump_actions()
|
|
|
return executed
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
- def _dump_actions(self):
|
|
|
- """
|
|
|
- Dump recovery actions to FS
|
|
|
- """
|
|
|
- self.__cache_lock.acquire()
|
|
|
- try:
|
|
|
- with open(self.__actions_json_file, 'w') as f:
|
|
|
- json.dump(self.actions, f, indent=2)
|
|
|
- except Exception, exception:
|
|
|
- logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file))
|
|
|
- return False
|
|
|
- finally:
|
|
|
- self.__cache_lock.release()
|
|
|
-
|
|
|
- return True
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
- def _load_actions(self):
|
|
|
- """
|
|
|
- Loads recovery actions from FS
|
|
|
- """
|
|
|
- self.__cache_lock.acquire()
|
|
|
-
|
|
|
- try:
|
|
|
- if os.path.isfile(self.__actions_json_file):
|
|
|
- with open(self.__actions_json_file, 'r') as fp:
|
|
|
- return json.load(fp)
|
|
|
- except Exception, exception:
|
|
|
- logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file))
|
|
|
- finally:
|
|
|
- self.__cache_lock.release()
|
|
|
-
|
|
|
- return {}
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
def get_actions_copy(self):
|
|
|
"""
|
|
|
@@ -503,8 +421,6 @@ class RecoveryManager:
|
|
|
return copy.deepcopy(self.actions)
|
|
|
finally:
|
|
|
self.__status_lock.release()
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
def is_action_info_stale(self, action_name):
|
|
|
"""
|
|
|
@@ -518,7 +434,6 @@ class RecoveryManager:
|
|
|
seconds_since_last_attempt = now - action_counter["lastAttempt"]
|
|
|
return seconds_since_last_attempt > self.window_in_sec
|
|
|
return False
|
|
|
- pass
|
|
|
|
|
|
def _execute_action_chk_only(self, action_name):
|
|
|
"""
|
|
|
@@ -532,31 +447,41 @@ class RecoveryManager:
|
|
|
if action_counter["count"] < self.max_count:
|
|
|
if seconds_since_last_attempt > self.retry_gap_in_sec:
|
|
|
return True
|
|
|
+ else:
|
|
|
+ logger.info("Not running recovery command due to retry_gap = {0} (seconds)".format(self.retry_gap_in_sec))
|
|
|
else:
|
|
|
sec_since_last_reset = now - action_counter["lastReset"]
|
|
|
if sec_since_last_reset > self.window_in_sec:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
- pass
|
|
|
|
|
|
def _now_(self):
|
|
|
return int(time.time())
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
def update_recovery_config(self, dictionary):
|
|
|
- """
|
|
|
- TODO: Server sends the recovery configuration - call update_config after parsing
|
|
|
- "recoveryConfig": {
|
|
|
- "type" : "DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
|
|
|
- "maxCount" : 10,
|
|
|
- "windowInMinutes" : 60,
|
|
|
- "retryGap" : 0,
|
|
|
- "components" : "a,b"
|
|
|
- }
|
|
|
- """
|
|
|
-
|
|
|
+ if dictionary and "recoveryConfig" in dictionary:
|
|
|
+ if logger.isEnabledFor(logging.INFO):
|
|
|
+ logger.info("RecoverConfig = %s", pprint.pformat(dictionary["recoveryConfig"]))
|
|
|
+ config = dictionary["recoveryConfig"]
|
|
|
+ if 'components' in config:
|
|
|
+ enabled_components = config['components']
|
|
|
+ enabled_components_list = []
|
|
|
+
|
|
|
+ components = [(item["service_name"], item["component_name"], item["desired_state"]) for item in enabled_components]
|
|
|
+ for service, component, state in components:
|
|
|
+ enabled_components_list.append(component)
|
|
|
+ self.update_desired_status(component, state)
|
|
|
+ # Recovery Manager is Component oriented, however Agent require Service and component name to build properly
|
|
|
+ # commands. As workaround, we pushing service name from the server and keeping it relation at agent.
|
|
|
+ #
|
|
|
+ # However it important to keep map actual, for this reason relation could be updated if service will
|
|
|
+ # push another service <-> component relation
|
|
|
+ self.__component_to_service_map[component] = service
|
|
|
+
|
|
|
+ self.enabled_components = enabled_components_list
|
|
|
+
|
|
|
+ def on_config_update(self):
|
|
|
recovery_enabled = False
|
|
|
auto_start_only = False
|
|
|
auto_install_start = False
|
|
|
@@ -564,56 +489,51 @@ class RecoveryManager:
|
|
|
window_in_min = 60
|
|
|
retry_gap = 5
|
|
|
max_lifetime_count = 12
|
|
|
- enabled_components = ""
|
|
|
|
|
|
+ cluster_cache = self.initializer_module.configurations_cache[self.cluster_id]
|
|
|
|
|
|
- if dictionary and "recoveryConfig" in dictionary:
|
|
|
- if logger.isEnabledFor(logging.INFO):
|
|
|
- logger.info("RecoverConfig = %s", pprint.pformat(dictionary["recoveryConfig"]))
|
|
|
- config = dictionary["recoveryConfig"]
|
|
|
- if "type" in config:
|
|
|
- if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
|
|
|
+ if 'configurations' in cluster_cache and 'cluster-env' in cluster_cache['configurations']:
|
|
|
+ config = cluster_cache['configurations']['cluster-env']
|
|
|
+ if "recovery_type" in config:
|
|
|
+ if config["recovery_type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
|
|
|
recovery_enabled = True
|
|
|
- if config["type"] == "AUTO_START":
|
|
|
+ if config["recovery_type"] == "AUTO_START":
|
|
|
auto_start_only = True
|
|
|
- elif config["type"] == "AUTO_INSTALL_START":
|
|
|
+ elif config["recovery_type"] == "AUTO_INSTALL_START":
|
|
|
auto_install_start = True
|
|
|
|
|
|
- if "maxCount" in config:
|
|
|
- max_count = self._read_int_(config["maxCount"], max_count)
|
|
|
- if "windowInMinutes" in config:
|
|
|
- window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
|
|
|
- if "retryGap" in config:
|
|
|
- retry_gap = self._read_int_(config["retryGap"], retry_gap)
|
|
|
- if 'maxLifetimeCount' in config:
|
|
|
- max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
|
|
|
+ if "recovery_enabled" in config:
|
|
|
+ recovery_enabled = self._read_bool_(config, "recovery_enabled", recovery_enabled)
|
|
|
|
|
|
- if 'components' in config:
|
|
|
- enabled_components = config['components']
|
|
|
+ if "recovery_max_count" in config:
|
|
|
+ max_count = self._read_int_(config, "recovery_max_count", max_count)
|
|
|
+ if "recovery_window_in_minutes" in config:
|
|
|
+ window_in_min = self._read_int_(config, "recovery_window_in_minutes", window_in_min)
|
|
|
+ if "recovery_retry_interval" in config:
|
|
|
+ retry_gap = self._read_int_(config, "recovery_retry_interval", retry_gap)
|
|
|
+ if 'recovery_lifetime_max_count' in config:
|
|
|
+ max_lifetime_count = self._read_int_(config, 'recovery_lifetime_max_count', max_lifetime_count)
|
|
|
|
|
|
self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
|
|
|
- auto_install_start, enabled_components)
|
|
|
- pass
|
|
|
+ auto_install_start)
|
|
|
|
|
|
- """
|
|
|
- Update recovery configuration with the specified values.
|
|
|
-
|
|
|
- max_count - Configured maximum count of recovery attempt allowed per host component in a window.
|
|
|
- window_in_min - Configured window size in minutes.
|
|
|
- retry_gap - Configured retry gap between tries per host component
|
|
|
- max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
|
|
|
- recovery_enabled - True or False. Indicates whether recovery is enabled or not.
|
|
|
- auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
|
|
|
- auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
|
|
|
- enabled_components - CSV of componenents enabled for auto start.
|
|
|
- """
|
|
|
def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
|
|
|
- auto_start_only, auto_install_start, enabled_components):
|
|
|
+ auto_start_only, auto_install_start):
|
|
|
"""
|
|
|
+ Update recovery configuration with the specified values.
|
|
|
+
|
|
|
+ max_count - Configured maximum count of recovery attempt allowed per host component in a window.
|
|
|
+ window_in_min - Configured window size in minutes.
|
|
|
+ retry_gap - Configured retry gap between tries per host component
|
|
|
+ max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
|
|
|
+ recovery_enabled - True or False. Indicates whether recovery is enabled or not.
|
|
|
+ auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
|
|
|
+ auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
|
|
|
+
|
|
|
Update recovery configuration, recovery is disabled if configuration values
|
|
|
are not correct
|
|
|
"""
|
|
|
- self.recovery_enabled = False;
|
|
|
+ self.recovery_enabled = False
|
|
|
if max_count <= 0:
|
|
|
logger.warn("Recovery disabled: max_count must be a non-negative number")
|
|
|
return
|
|
|
@@ -640,7 +560,6 @@ class RecoveryManager:
|
|
|
self.auto_start_only = auto_start_only
|
|
|
self.auto_install_start = auto_install_start
|
|
|
self.max_lifetime_count = max_lifetime_count
|
|
|
- self.enabled_components = []
|
|
|
|
|
|
self.allowed_desired_states = [self.STARTED, self.INSTALLED]
|
|
|
self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED]
|
|
|
@@ -652,21 +571,7 @@ class RecoveryManager:
|
|
|
self.allowed_desired_states = [self.INSTALLED, self.STARTED]
|
|
|
self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED]
|
|
|
|
|
|
- if enabled_components is not None and len(enabled_components) > 0:
|
|
|
- components = enabled_components.split(",")
|
|
|
- for component in components:
|
|
|
- if len(component.strip()) > 0:
|
|
|
- self.enabled_components.append(component.strip())
|
|
|
-
|
|
|
self.recovery_enabled = recovery_enabled
|
|
|
- if self.recovery_enabled:
|
|
|
- logger.info(
|
|
|
- "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and"
|
|
|
- " lifetime max being %s. Enabled components - %s",
|
|
|
- self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count,
|
|
|
- ', '.join(self.enabled_components))
|
|
|
- pass
|
|
|
-
|
|
|
|
|
|
def get_unique_task_id(self):
|
|
|
self.id += 1
|
|
|
@@ -679,33 +584,31 @@ class RecoveryManager:
|
|
|
if not self.enabled():
|
|
|
return
|
|
|
|
|
|
- if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']):
|
|
|
+ if self.ROLE_COMMAND not in command or not self.configured_for_recovery(command['role']):
|
|
|
return
|
|
|
|
|
|
- if status == ActionQueue.COMPLETED_STATUS:
|
|
|
- if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
|
|
|
+ if status == CommandStatus.completed:
|
|
|
+ if command[self.ROLE_COMMAND] == RoleCommand.start:
|
|
|
self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
|
|
|
- #self.update_config_staleness(command['role'], False)
|
|
|
- logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
|
|
|
- ", current state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_current_status(command[self.ROLE]) )
|
|
|
- elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
|
|
|
+ logger.info("After EXECUTION_COMMAND (START), with taskId={}, current state of {} to {}".format(
|
|
|
+ command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
|
|
|
+
|
|
|
+ elif command['roleCommand'] == RoleCommand.stop or command[self.ROLE_COMMAND] == RoleCommand.install:
|
|
|
self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
|
|
|
- logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
|
|
|
- ", current state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_current_status(command[self.ROLE]) )
|
|
|
- elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND:
|
|
|
- if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
|
|
|
+ logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId={}, current state of {} to {}".format(
|
|
|
+ command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
|
|
|
+
|
|
|
+ elif command[self.ROLE_COMMAND] == RoleCommand.custom_command:
|
|
|
+ if 'custom_command' in command and command['custom_command'] == CustomCommand.restart:
|
|
|
self.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
|
|
|
- #self.update_config_staleness(command['role'], False)
|
|
|
- logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_current_status(command[self.ROLE]) )
|
|
|
- elif status == ActionQueue.FAILED_STATUS:
|
|
|
- if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
|
|
|
+ logger.info("After EXECUTION_COMMAND (RESTART), current state of {} to {}".format(
|
|
|
+ command[self.ROLE], self.get_current_status(command[self.ROLE])))
|
|
|
+
|
|
|
+ elif status == CommandStatus.failed:
|
|
|
+ if command[self.ROLE_COMMAND] == RoleCommand.install:
|
|
|
self.update_current_status(command[self.ROLE], self.INSTALL_FAILED)
|
|
|
- logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
|
|
|
- ", current state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_current_status(command[self.ROLE]))
|
|
|
+ logger.info("After EXECUTION_COMMAND (INSTALL), with taskId={}, current state of {} to {}".format(
|
|
|
+ command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
|
|
|
|
|
|
def process_execution_command(self, command):
|
|
|
"""
|
|
|
@@ -714,28 +617,30 @@ class RecoveryManager:
|
|
|
if not self.enabled():
|
|
|
return
|
|
|
|
|
|
- if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
|
|
|
+ if self.COMMAND_TYPE not in command or not command[self.COMMAND_TYPE] == AgentCommand.execution:
|
|
|
return
|
|
|
|
|
|
- if not self.ROLE in command:
|
|
|
+ if self.ROLE not in command:
|
|
|
return
|
|
|
|
|
|
- if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
|
|
|
+ if command[self.ROLE_COMMAND] in (RoleCommand.install, RoleCommand.stop) \
|
|
|
and self.configured_for_recovery(command[self.ROLE]):
|
|
|
+
|
|
|
self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
|
|
|
- logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_desired_status(command[self.ROLE]) )
|
|
|
- elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
|
|
|
- and self.configured_for_recovery(command[self.ROLE]):
|
|
|
+ logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of {} to {}".format(
|
|
|
+ command[self.ROLE], self.get_desired_status(command[self.ROLE])))
|
|
|
+
|
|
|
+ elif command[self.ROLE_COMMAND] == RoleCommand.start and self.configured_for_recovery(command[self.ROLE]):
|
|
|
self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
|
|
|
- logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_desired_status(command[self.ROLE]) )
|
|
|
- elif command.has_key('custom_command') and \
|
|
|
- command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
|
|
|
+ logger.info("Received EXECUTION_COMMAND (START), desired state of {} to {}".format(
|
|
|
+ command[self.ROLE], self.get_desired_status(command[self.ROLE])))
|
|
|
+
|
|
|
+ elif 'custom_command' in command and command['custom_command'] == CustomCommand.restart \
|
|
|
and self.configured_for_recovery(command[self.ROLE]):
|
|
|
+
|
|
|
self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
|
|
|
- logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
|
|
|
- self.get_desired_status(command[self.ROLE]) )
|
|
|
+ logger.info("Received EXECUTION_COMMAND (RESTART), desired state of {} to {}".format(
|
|
|
+ command[self.ROLE], self.get_desired_status(command[self.ROLE])))
|
|
|
|
|
|
def get_command(self, component, command_name):
|
|
|
"""
|
|
|
@@ -745,16 +650,24 @@ class RecoveryManager:
|
|
|
logger.info("Recovery is paused, tasks waiting in pipeline for this host.")
|
|
|
return None
|
|
|
|
|
|
+ if self.is_blueprint_provisioning_for_component(component):
|
|
|
+ logger.info("Recovery is paused, blueprint is being provisioned.")
|
|
|
+ return None
|
|
|
+
|
|
|
if self.enabled():
|
|
|
command_id = self.get_unique_task_id()
|
|
|
command = {
|
|
|
self.CLUSTER_ID: self.cluster_id,
|
|
|
self.ROLE_COMMAND: command_name,
|
|
|
- self.COMMAND_TYPE: ActionQueue.AUTO_EXECUTION_COMMAND,
|
|
|
+ self.COMMAND_TYPE: AgentCommand.auto_execution,
|
|
|
self.TASK_ID: command_id,
|
|
|
self.ROLE: component,
|
|
|
self.COMMAND_ID: command_id
|
|
|
}
|
|
|
+
|
|
|
+ if component in self.__component_to_service_map:
|
|
|
+ command[self.SERVICE_NAME] = self.__component_to_service_map[component]
|
|
|
+
|
|
|
return command
|
|
|
else:
|
|
|
logger.info("Recovery is not enabled. START command will not be computed.")
|
|
|
@@ -779,19 +692,18 @@ class RecoveryManager:
|
|
|
def get_start_command(self, component):
|
|
|
return self.get_command(component, "START")
|
|
|
|
|
|
- def _read_int_(self, value, default_value=0):
|
|
|
+ def _read_int_(self, config, key, default_value=0):
|
|
|
int_value = default_value
|
|
|
try:
|
|
|
- int_value = int(value)
|
|
|
- except ValueError:
|
|
|
+ int_value = int(config[key])
|
|
|
+ except (ValueError, KeyError):
|
|
|
pass
|
|
|
return int_value
|
|
|
|
|
|
-
|
|
|
-def main(argv=None):
|
|
|
- cmd_mgr = RecoveryManager('/tmp')
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == '__main__':
|
|
|
- main()
|
|
|
+ def _read_bool_(self, config, key, default_value=False):
|
|
|
+ bool_value = default_value
|
|
|
+ try:
|
|
|
+ bool_value = (config[key].lower() == "true")
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ return bool_value
|