|
@@ -27,6 +27,7 @@ import datetime
|
|
|
import time
|
|
|
import re
|
|
|
import copy
|
|
|
+from optparse import OptionGroup
|
|
|
from flask import Flask, Response, jsonify, request, abort
|
|
|
from flask.ext.cors import CORS
|
|
|
from flask_restful import Resource, Api, reqparse
|
|
@@ -35,6 +36,7 @@ from flask_restful import Resource, Api, reqparse
|
|
|
|
|
|
class Params:
|
|
|
|
|
|
+ ACTION = None
|
|
|
AMS_HOSTNAME = 'localhost'
|
|
|
AMS_PORT = '6188'
|
|
|
AMS_APP_ID = None
|
|
@@ -51,6 +53,9 @@ class Params:
|
|
|
FLASK_SERVER_NAME = None
|
|
|
METRICS_FOR_HOSTS = {}
|
|
|
HOSTS_WITH_COMPONENTS = {}
|
|
|
+ INPUT_DIR = None
|
|
|
+ VERBOSE = None
|
|
|
+ AGGREGATE = None
|
|
|
|
|
|
@staticmethod
|
|
|
def get_collector_uri(metricNames, hostname=None):
|
|
@@ -116,43 +121,117 @@ class Utils:
|
|
|
else:
|
|
|
return -1
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def read_json_file(filename):
|
|
|
+ with open(filename) as f:
|
|
|
+ return json.load(f)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_configs():
|
|
|
+ conf_file = None
|
|
|
+ if Params.INPUT_DIR:
|
|
|
+ for metrics_dir in AmsMetricsProcessor.get_metrics_dirs(Params.INPUT_DIR):
|
|
|
+ for dir_item in os.listdir(metrics_dir):
|
|
|
+ dir_item_path = os.path.join(Params.INPUT_DIR, metrics_dir, dir_item)
|
|
|
+ if dir_item == "configs":
|
|
|
+ conf_file = dir_item_path
|
|
|
+ break
|
|
|
+ if conf_file:
|
|
|
+ break
|
|
|
+
|
|
|
+ if os.path.exists(conf_file):
|
|
|
+ json = Utils.read_json_file(conf_file)
|
|
|
+ Params.AMS_APP_ID = json['APP_ID']
|
|
|
+ Params.START_TIME = json['START_TIME']
|
|
|
+ Params.END_TIME = json['END_TIME']
|
|
|
+ Params.AGGREGATE = json['AGGREGATE']
|
|
|
+ else:
|
|
|
+ logger.warn('Not found config file in {0}'.format(os.path.join(Params.INPUT_DIR), "configs"))
|
|
|
+ logger.info('Aborting...')
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def set_configs():
|
|
|
+ conf_file = os.path.join(Params.OUT_DIR, "configs")
|
|
|
+ aggregate = True if not Params.HOSTS else False
|
|
|
+ properties = {"APP_ID" : Params.AMS_APP_ID, "START_TIME" : Params.START_TIME, "END_TIME" : Params.END_TIME, "AGGREGATE" : aggregate}
|
|
|
+
|
|
|
+ with open(conf_file, 'w') as file:
|
|
|
+ file.write(json.dumps(properties))
|
|
|
|
|
|
class AmsMetricsProcessor:
|
|
|
|
|
|
@staticmethod
|
|
|
- def get_metrics_for_host(metrics, host=None):
|
|
|
- metrics_result = {}
|
|
|
+ def write_metrics_to_file(metrics, host=None):
|
|
|
+
|
|
|
for metric in metrics:
|
|
|
uri = Params.get_collector_uri(metric, host)
|
|
|
- logger.debug('Request URI: %s' % str(uri))
|
|
|
- metrics_dict = Utils.get_data_from_url(uri)
|
|
|
- if metrics_dict and "metrics" in metrics_dict and metrics_dict["metrics"]:
|
|
|
- metrics_result[metric] = metrics_dict
|
|
|
- else:
|
|
|
- logger.debug("No found metric {0} on host {1}".format(metric, host))
|
|
|
-
|
|
|
- return metrics_result
|
|
|
+ logger.info('Request URI: %s' % str(uri))
|
|
|
+ metrics_json = Utils.get_data_from_url(uri)
|
|
|
+ if metrics_json:
|
|
|
+ if host:
|
|
|
+ path = os.path.join(Params.OUT_DIR, host, metric)
|
|
|
+ else:
|
|
|
+ path = os.path.join(Params.OUT_DIR, metric)
|
|
|
+ logger.info('Writing metric file: %s' % path)
|
|
|
+ with open(path, 'w') as file:
|
|
|
+ file.write(json.dumps(metrics_json))
|
|
|
|
|
|
@staticmethod
|
|
|
def get_metrics_metadata():
|
|
|
-
|
|
|
app_metrics_metadata = []
|
|
|
for metric in Params.METRICS:
|
|
|
- app_metrics_metadata.append({"metricname": metric, "seriesStartTime": Params.START_TIME, "supportsAggregation": "true", "type": "UNDEFINED"})
|
|
|
- logger.debug("Adding {0} to metadata".format(metric))
|
|
|
+ if not Params.AGGREGATE:
|
|
|
+ app_metrics_metadata.append({"metricname": metric, "seriesStartTime": Params.START_TIME, "supportsAggregation": "false", "type": "UNDEFINED"})
|
|
|
+ else:
|
|
|
+ app_metrics_metadata.append({"metricname": metric, "seriesStartTime": Params.START_TIME, "supportsAggregation": "false"})
|
|
|
+ logger.debug("Adding {0} to metadata".format(metric))
|
|
|
|
|
|
return {Params.AMS_APP_ID : app_metrics_metadata}
|
|
|
|
|
|
@staticmethod
|
|
|
def get_hosts_with_components():
|
|
|
hosts_with_components = {}
|
|
|
- for host in Params.HOSTS:
|
|
|
- hosts_with_components[host] = [Params.AMS_APP_ID]
|
|
|
- return hosts_with_components
|
|
|
+ if Params.AGGREGATE:
|
|
|
+ return {"fakehostname" : [Params.AMS_APP_ID]}
|
|
|
+ else:
|
|
|
+ for host in Params.HOSTS:
|
|
|
+ hosts_with_components[host] = [Params.AMS_APP_ID]
|
|
|
+ return hosts_with_components
|
|
|
|
|
|
@staticmethod
|
|
|
- def export_ams_metrics():
|
|
|
+ def get_metrics_dirs(d):
|
|
|
+ for o in os.listdir(d):
|
|
|
+ if 'ambari_metrics_export_' in o and os.path.isdir(os.path.join(d, o)):
|
|
|
+ yield os.path.join(d, o)
|
|
|
+
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def ger_metrics_from_input_dir():
|
|
|
+ metrics_for_hosts = {}
|
|
|
+
|
|
|
+ for metrics_dir in AmsMetricsProcessor.get_metrics_dirs(Params.INPUT_DIR):
|
|
|
+ for dir_item in os.listdir(metrics_dir):
|
|
|
+ dir_item_path = os.path.join(metrics_dir, dir_item)
|
|
|
+ if os.path.isdir(dir_item_path):
|
|
|
+ if dir_item not in Params.HOSTS:
|
|
|
+ Params.HOSTS.append(os.path.basename(dir_item))
|
|
|
+ metrics_for_hosts[dir_item] = {}
|
|
|
+ for metric in os.listdir(dir_item_path):
|
|
|
+ if metric not in Params.METRICS:
|
|
|
+ Params.METRICS.append(os.path.basename(metric))
|
|
|
+ metric_file = os.path.join(dir_item_path, metric)
|
|
|
+ metrics_for_hosts[dir_item][metric] = Utils.read_json_file(metric_file)
|
|
|
+ elif os.path.isfile(dir_item_path):
|
|
|
+ if dir_item not in Params.METRICS and dir_item != "configs":
|
|
|
+ Params.METRICS.append(os.path.basename(dir_item))
|
|
|
+ metric_file = os.path.join(Params.INPUT_DIR, dir_item_path)
|
|
|
+ metrics_for_hosts[dir_item] = Utils.read_json_file(metric_file)
|
|
|
+
|
|
|
+ return metrics_for_hosts
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def export_ams_metrics():
|
|
|
if not os.path.exists(Params.METRICS_FILE):
|
|
|
logger.error('Metrics file is required.')
|
|
|
sys.exit(1)
|
|
@@ -162,24 +241,31 @@ class AmsMetricsProcessor:
|
|
|
Params.METRICS.append(line.strip())
|
|
|
logger.info('Reading hosts file.')
|
|
|
|
|
|
+ logger.info('Reading hosts file.')
|
|
|
if Params.HOSTS_FILE and os.path.exists(Params.HOSTS_FILE):
|
|
|
with open(Params.HOSTS_FILE, 'r') as file:
|
|
|
for line in file:
|
|
|
Params.HOSTS.append(line.strip())
|
|
|
else:
|
|
|
logger.info('No hosts file found, aggregate metrics will be exported.')
|
|
|
- hosts_metrics = {}
|
|
|
+
|
|
|
if Params.HOSTS:
|
|
|
for host in Params.HOSTS:
|
|
|
- hosts_metrics[host] = AmsMetricsProcessor.get_metrics_for_host(Params.METRICS, host)
|
|
|
- return hosts_metrics
|
|
|
+ os.makedirs(os.path.join(Params.OUT_DIR, host)) # create host dir
|
|
|
+ AmsMetricsProcessor.write_metrics_to_file(Params.METRICS, host)
|
|
|
else:
|
|
|
- return AmsMetricsProcessor.get_metrics_for_host(Params.METRICS, None)
|
|
|
+ os.makedirs(os.path.join(Params.OUT_DIR))
|
|
|
+ AmsMetricsProcessor.write_metrics_to_file(Params.METRICS, None)
|
|
|
|
|
|
def process(self):
|
|
|
- self.metrics_for_hosts = self.export_ams_metrics()
|
|
|
- self.metrics_metadata = self.get_metrics_metadata()
|
|
|
- self.hosts_with_components = self.get_hosts_with_components()
|
|
|
+ if Params.ACTION == "export":
|
|
|
+ self.export_ams_metrics()
|
|
|
+ Utils.set_configs()
|
|
|
+ else:
|
|
|
+ Utils.get_configs()
|
|
|
+ self.metrics_for_hosts = self.ger_metrics_from_input_dir()
|
|
|
+ self.metrics_metadata = self.get_metrics_metadata()
|
|
|
+ self.hosts_with_components = self.get_hosts_with_components()
|
|
|
|
|
|
|
|
|
class FlaskServer():
|
|
@@ -195,7 +281,7 @@ class FlaskServer():
|
|
|
|
|
|
logger.info("Start Flask server. Server URL = " + Params.FLASK_SERVER_NAME + ":5000")
|
|
|
|
|
|
- app.run(debug=True,
|
|
|
+ app.run(debug=Params.VERBOSE,
|
|
|
host=Params.FLASK_SERVER_NAME,
|
|
|
port=5000)
|
|
|
|
|
@@ -237,18 +323,26 @@ class MetricsResource(Resource):
|
|
|
metric_dict = {"metrics" : []}
|
|
|
for host_name in host_names:
|
|
|
if metric_name in self.ams_metrics_processor.metrics_for_hosts[host_name]:
|
|
|
- metric_dict["metrics"].append(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"][0])
|
|
|
+ if len(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"]) > 0:
|
|
|
+ metric_dict["metrics"].append(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"][0])
|
|
|
else:
|
|
|
continue
|
|
|
+
|
|
|
+ elif Params.AGGREGATE:
|
|
|
+ for metric in self.ams_metrics_processor.metrics_for_hosts:
|
|
|
+ if metric_name == metric:
|
|
|
+ metric_dict = self.ams_metrics_processor.metrics_for_hosts[metric_name]
|
|
|
+ break
|
|
|
+
|
|
|
else:
|
|
|
for host in self.ams_metrics_processor.metrics_for_hosts:
|
|
|
for metric in self.ams_metrics_processor.metrics_for_hosts[host]:
|
|
|
- if metric_name == metric:
|
|
|
- metric_dict = self.ams_metrics_processor.metrics_for_hosts[host][metric_name]
|
|
|
- break
|
|
|
+ if metric_name == metric and len(self.ams_metrics_processor.metrics_for_hosts[host][metric_name]["metrics"]) > 0:
|
|
|
+ metric_dict = self.ams_metrics_processor.metrics_for_hosts[host][metric_name]
|
|
|
+ break
|
|
|
|
|
|
if metric_dict:
|
|
|
- metrics_json_new = copy.copy(metric_dict)
|
|
|
+ metrics_json_new = copy.deepcopy(metric_dict)
|
|
|
for i in range (0, len(metrics_json_new["metrics"])):
|
|
|
metrics_json_new["metrics"][i]["metricname"] += separator + operation
|
|
|
return jsonify(metrics_json_new)
|
|
@@ -266,78 +360,123 @@ def main():
|
|
|
'from Ambari Metrics Service to a output dir. '
|
|
|
'The metrics will be exported to a file with name of '
|
|
|
'the metric and in a directory with the name as the '
|
|
|
- 'hostname under the output dir.')
|
|
|
+ 'hostname under the output dir. '
|
|
|
+ 'Also this python program is a thin REST server '
|
|
|
+ 'that implements a subset of the Ambari Metrics Service metrics server interfaces. '
|
|
|
+ 'You can use it to visualize information exported by the AMS thin client')
|
|
|
|
|
|
d = datetime.datetime.now()
|
|
|
time_suffix = '{0}-{1}-{2}-{3}-{4}-{5}'.format(d.year, d.month, d.day,
|
|
|
d.hour, d.minute, d.second)
|
|
|
print 'Time: %s' % time_suffix
|
|
|
+
|
|
|
logfile = os.path.join('/tmp', 'ambari_metrics_export.out')
|
|
|
|
|
|
- parser.add_option("-v", "--verbose", dest="verbose", action="store_false",
|
|
|
+ output_dir = os.path.join('/tmp', 'ambari_metrics_export_' + time_suffix)
|
|
|
+
|
|
|
+ parser.add_option("-a", "--action", dest="action", default="set_action", help="Use action 'export' for exporting AMS metrics. "
|
|
|
+ "Use action 'run' for run REST server")
|
|
|
+ parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
|
|
|
default=False, help="output verbosity.")
|
|
|
- parser.add_option("-s", "--host", dest="server_hostname",
|
|
|
+ parser.add_option("-l", "--logfile", dest="log_file", default=logfile,
|
|
|
+ metavar='FILE', help="Log file. [default: %s]" % logfile)
|
|
|
+
|
|
|
+ export_options_group = OptionGroup(parser, "Required options for action 'export'")
|
|
|
+ #export metrics -----------------------------------------------------
|
|
|
+ export_options_group.add_option("-s", "--host", dest="server_hostname",
|
|
|
help="AMS host name.")
|
|
|
- parser.add_option("-p", "--port", dest="server_port",
|
|
|
+ export_options_group.add_option("-p", "--port", dest="server_port",
|
|
|
default="6188", help="AMS port. [default: 6188]")
|
|
|
- parser.add_option("-a", "--app-id", dest="app_id",
|
|
|
+ export_options_group.add_option("-c", "--app-id", dest="app_id",
|
|
|
help="AMS app id.")
|
|
|
- parser.add_option("-m", "--metrics-file", dest="metrics_file",
|
|
|
+ export_options_group.add_option("-m", "--metrics-file", dest="metrics_file",
|
|
|
help="Metrics file with metric names to query. New line separated.")
|
|
|
- parser.add_option("-f", "--host-file", dest="hosts_file",
|
|
|
+ export_options_group.add_option("-f", "--host-file", dest="hosts_file",
|
|
|
help="Host file with hostnames to query. New line separated.")
|
|
|
- parser.add_option("-l", "--logfile", dest="log_file", default=logfile,
|
|
|
- metavar='FILE', help="Log file. [default: %s]" % logfile)
|
|
|
- parser.add_option("-r", "--precision", dest="precision",
|
|
|
+ export_options_group.add_option("-r", "--precision", dest="precision",
|
|
|
default='minutes', help="AMS API precision, default = minutes.")
|
|
|
- parser.add_option("-b", "--start_time", dest="start_time",
|
|
|
+ export_options_group.add_option("-b", "--start_time", dest="start_time",
|
|
|
help="Start time in milliseconds since epoch or UTC timestamp in YYYY-MM-DDTHH:mm:ssZ format.")
|
|
|
- parser.add_option("-e", "--end_time", dest="end_time",
|
|
|
+ export_options_group.add_option("-e", "--end_time", dest="end_time",
|
|
|
help="End time in milliseconds since epoch or UTC timestamp in YYYY-MM-DDTHH:mm:ssZ format.")
|
|
|
- parser.add_option("-n", "--flask-server_name", dest="server_name",
|
|
|
+ export_options_group.add_option("-o", "--output-dir", dest="output_dir", default=output_dir,
|
|
|
+ help="Output dir. [default: %s]" % output_dir)
|
|
|
+ parser.add_option_group(export_options_group)
|
|
|
+ #start Flask server -----------------------------------------------------
|
|
|
+
|
|
|
+ run_server_option_group = OptionGroup(parser, "Required options for action 'run'")
|
|
|
+
|
|
|
+ run_server_option_group.add_option("-i", "--input-dir", dest="input_dir",
|
|
|
+ default='/tmp', help="Input directory for AMS metrics exports. [default: /tmp]")
|
|
|
+ run_server_option_group.add_option("-n", "--flask-server_name", dest="server_name",
|
|
|
help="Flask server name, default = 127.0.0.1", default="127.0.0.1")
|
|
|
|
|
|
+ parser.add_option_group(run_server_option_group)
|
|
|
(options, args) = parser.parse_args()
|
|
|
|
|
|
- Params.AMS_HOSTNAME = options.server_hostname
|
|
|
|
|
|
- Params.AMS_PORT = options.server_port
|
|
|
+ #export metrics -----------------------------------------------------
|
|
|
+ Params.ACTION = options.action
|
|
|
|
|
|
- Params.AMS_APP_ID = options.app_id
|
|
|
+ Params.VERBOSE = options.verbose
|
|
|
|
|
|
- if Params.AMS_APP_ID != "HOST":
|
|
|
- Params.AMS_APP_ID = Params.AMS_APP_ID.lower()
|
|
|
+ Utils.setup_logger(options.verbose, options.log_file)
|
|
|
|
|
|
- Params.METRICS_FILE = options.metrics_file
|
|
|
+ if Params.ACTION == "export":
|
|
|
|
|
|
- Params.HOSTS_FILE = options.hosts_file
|
|
|
+ Params.AMS_HOSTNAME = options.server_hostname
|
|
|
|
|
|
- Params.PRECISION = options.precision
|
|
|
+ Params.AMS_PORT = options.server_port
|
|
|
|
|
|
- Params.FLASK_SERVER_NAME = options.server_name
|
|
|
+ Params.AMS_APP_ID = options.app_id
|
|
|
|
|
|
- Utils.setup_logger(options.verbose, options.log_file)
|
|
|
+ if Params.AMS_APP_ID != "HOST":
|
|
|
+ Params.AMS_APP_ID = Params.AMS_APP_ID.lower()
|
|
|
|
|
|
- Params.START_TIME = Utils.get_epoch(options.start_time)
|
|
|
+ Params.METRICS_FILE = options.metrics_file
|
|
|
|
|
|
- if Params.START_TIME == -1:
|
|
|
- logger.warn('No start time provided, or it is in the wrong format. Please '
|
|
|
- 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format')
|
|
|
- logger.info('Aborting...')
|
|
|
- sys.exit(1)
|
|
|
+ Params.HOSTS_FILE = options.hosts_file
|
|
|
|
|
|
- Params.END_TIME = Utils.get_epoch(options.end_time)
|
|
|
+ Params.PRECISION = options.precision
|
|
|
|
|
|
- if Params.END_TIME == -1:
|
|
|
- logger.warn('No end time provided, or it is in the wrong format. Please '
|
|
|
- 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format')
|
|
|
+ Params.OUT_DIR = options.output_dir
|
|
|
+
|
|
|
+ if Params.START_TIME == -1:
|
|
|
+ logger.warn('No start time provided, or it is in the wrong format. Please '
|
|
|
+ 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format')
|
|
|
+ logger.info('Aborting...')
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ Params.END_TIME = Utils.get_epoch(options.end_time)
|
|
|
+
|
|
|
+ if Params.END_TIME == -1:
|
|
|
+ logger.warn('No end time provided, or it is in the wrong format. Please '
|
|
|
+ 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format')
|
|
|
+ logger.info('Aborting...')
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ Params.START_TIME = Utils.get_epoch(options.start_time)
|
|
|
+
|
|
|
+ ams_metrics_processor = AmsMetricsProcessor()
|
|
|
+ ams_metrics_processor.process()
|
|
|
+
|
|
|
+
|
|
|
+ elif Params.ACTION == "run":
|
|
|
+ #start Flask server -----------------------------------------------------
|
|
|
+ Params.INPUT_DIR = options.input_dir
|
|
|
+
|
|
|
+ Params.FLASK_SERVER_NAME = options.server_name
|
|
|
+
|
|
|
+ ams_metrics_processor = AmsMetricsProcessor()
|
|
|
+ ams_metrics_processor.process()
|
|
|
+ FlaskServer(ams_metrics_processor)
|
|
|
+
|
|
|
+ else:
|
|
|
+ logger.warn('Action \'{0}\' not supported. Please use action \'export\' for exporting AMS metrics '
|
|
|
+ 'or use action \'run\' for starting REST server'.format(Params.ACTION))
|
|
|
logger.info('Aborting...')
|
|
|
sys.exit(1)
|
|
|
|
|
|
- ams_metrics_processor = AmsMetricsProcessor()
|
|
|
- ams_metrics_processor.process()
|
|
|
- FlaskServer(ams_metrics_processor)
|
|
|
-
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
try:
|