CommandStatusDict.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import json
  18. import logging
  19. import threading
  20. from Grep import Grep
  21. logger = logging.getLogger()
  22. class CommandStatusDict():
  23. """
  24. Holds results for all commands that are being executed or have finished
  25. execution (but are not yet reported). Implementation is thread-safe.
  26. Dict format:
  27. task_id -> (command, cmd_report)
  28. """
  29. def __init__(self, callback_action):
  30. """
  31. callback_action is called every time when status of some command is
  32. updated
  33. """
  34. self.current_state = {} # Contains all statuses
  35. self.callback_action = callback_action
  36. self.lock = threading.RLock()
  37. def put_command_status(self, command, new_report):
  38. """
  39. Stores new version of report for command (replaces previous)
  40. """
  41. if 'taskId' in command:
  42. key = command['taskId']
  43. else: # Status command reports has no task id
  44. key = id(command)
  45. with self.lock: # Synchronized
  46. self.current_state[key] = (command, new_report)
  47. self.callback_action()
  48. def generate_report(self):
  49. """
  50. Generates status reports about commands that are IN_PROGRESS, COMPLETE or
  51. FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
  52. generation
  53. """
  54. from ActionQueue import ActionQueue
  55. with self.lock: # Synchronized
  56. resultReports = []
  57. resultComponentStatus = []
  58. for key, item in self.current_state.items():
  59. command = item[0]
  60. report = item[1]
  61. if command ['commandType'] == ActionQueue.EXECUTION_COMMAND:
  62. if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
  63. resultReports.append(report)
  64. # Removing complete/failed command status from dict
  65. del self.current_state[key]
  66. else:
  67. in_progress_report = self.generate_in_progress_report(command, report)
  68. resultReports.append(in_progress_report)
  69. elif command ['commandType'] == ActionQueue.STATUS_COMMAND:
  70. resultComponentStatus.append(report)
  71. # Component status is useful once, removing it
  72. del self.current_state[key]
  73. result = {
  74. 'reports': resultReports,
  75. 'componentStatus': resultComponentStatus
  76. }
  77. return result
  78. def generate_in_progress_report(self, command, report):
  79. """
  80. Reads stdout/stderr for IN_PROGRESS command from disk file
  81. and populates other fields of report.
  82. """
  83. from ActionQueue import ActionQueue
  84. try:
  85. tmpout = open(report['tmpout'], 'r').read()
  86. tmperr = open(report['tmperr'], 'r').read()
  87. except Exception, err:
  88. logger.warn(err)
  89. tmpout = '...'
  90. tmperr = '...'
  91. try:
  92. with open(report['structuredOut'], 'r') as fp:
  93. tmpstructuredout = json.load(fp)
  94. except Exception:
  95. tmpstructuredout = {}
  96. grep = Grep()
  97. output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
  98. inprogress = self.generate_report_template(command)
  99. inprogress.update({
  100. 'stdout': grep.filterMarkup(output),
  101. 'stderr': tmperr,
  102. 'structuredOut': tmpstructuredout,
  103. 'exitCode': 777,
  104. 'status': ActionQueue.IN_PROGRESS_STATUS,
  105. })
  106. return inprogress
  107. def generate_report_template(self, command):
  108. """
  109. Generates stub dict for command.
  110. Other fields should be populated manually
  111. """
  112. stub = {
  113. 'role': command['role'],
  114. 'actionId': command['commandId'],
  115. 'taskId': command['taskId'],
  116. 'clusterName': command['clusterName'],
  117. 'serviceName': command['serviceName'],
  118. 'roleCommand': command['roleCommand']
  119. }
  120. return stub