|
@@ -33,11 +33,14 @@ from ActualConfigHandler import ActualConfigHandler
|
|
|
from CommandStatusDict import CommandStatusDict
|
|
|
from CustomServiceOrchestrator import CustomServiceOrchestrator
|
|
|
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
|
|
|
+from ambari_commons.str_utils import split_on_chunks
|
|
|
|
|
|
|
|
|
logger = logging.getLogger()
|
|
|
installScriptHash = -1
|
|
|
|
|
|
+MAX_SYMBOLS_PER_LOG_MESSAGE = 7900
|
|
|
+
|
|
|
class ActionQueue(threading.Thread):
|
|
|
""" Action Queue for the agent. We pick one command at a time from the queue
|
|
|
and execute it
|
|
@@ -343,14 +346,14 @@ class ActionQueue(threading.Thread):
|
|
|
if roleResult['stdout'] != '':
|
|
|
logger.info("Begin command output log for command with id = " + str(command['taskId']) + ", role = "
|
|
|
+ command['role'] + ", roleCommand = " + command['roleCommand'])
|
|
|
- logger.info(roleResult['stdout'])
|
|
|
+ self.log_command_output(roleResult['stdout'])
|
|
|
logger.info("End command output log for command with id = " + str(command['taskId']) + ", role = "
|
|
|
+ command['role'] + ", roleCommand = " + command['roleCommand'])
|
|
|
|
|
|
if roleResult['stderr'] != '':
|
|
|
logger.info("Begin command stderr log for command with id = " + str(command['taskId']) + ", role = "
|
|
|
+ command['role'] + ", roleCommand = " + command['roleCommand'])
|
|
|
- logger.info(roleResult['stderr'])
|
|
|
+ self.log_command_output(roleResult['stderr'])
|
|
|
logger.info("End command stderr log for command with id = " + str(command['taskId']) + ", role = "
|
|
|
+ command['role'] + ", roleCommand = " + command['roleCommand'])
|
|
|
|
|
@@ -396,7 +399,6 @@ class ActionQueue(threading.Thread):
|
|
|
configHandler = ActualConfigHandler(self.config, self.configTags)
|
|
|
#update
|
|
|
if command.has_key('forceRefreshConfigTags') and len(command['forceRefreshConfigTags']) > 0 :
|
|
|
-
|
|
|
forceRefreshConfigTags = command['forceRefreshConfigTags']
|
|
|
logger.info("Got refresh additional component tags command")
|
|
|
|
|
@@ -425,6 +427,20 @@ class ActionQueue(threading.Thread):
|
|
|
|
|
|
self.commandStatuses.put_command_status(command, roleResult)
|
|
|
|
|
|
+ def log_command_output(self, text):
|
|
|
+ """
|
|
|
+ Logs a message as multiple enumerated log messages every of which is not larger than MAX_SYMBOLS_PER_LOG_MESSAGE.
|
|
|
+
|
|
|
+ If logs are redirected to syslog (syslog_enabled=1), this is very useful for logging big messages.
|
|
|
+ As syslog usually truncates long messages.
|
|
|
+ """
|
|
|
+ chunks = split_on_chunks(text, MAX_SYMBOLS_PER_LOG_MESSAGE)
|
|
|
+ if len(chunks) > 1:
|
|
|
+ for i in range(len(chunks)):
|
|
|
+ logger.info("Chunk {0}/{1} of log for command: \n".format(i+1, len(chunks)) + chunks[i])
|
|
|
+ else:
|
|
|
+ logger.info(text)
|
|
|
+
|
|
|
def get_retry_delay(self, last_delay):
|
|
|
"""
|
|
|
Returns exponentially growing delay. The idea being if number of retries is high then the reason to retry
|