|
@@ -25,6 +25,7 @@ from resource_management import Template
|
|
|
|
|
|
from ambari_commons import OSConst
|
|
|
from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
|
|
|
+from ambari_commons.parallel_processing import PrallelProcessResult, execute_in_parallel, SUCCESS
|
|
|
|
|
|
import httplib
|
|
|
import network
|
|
@@ -39,10 +40,10 @@ import socket
|
|
|
class AMSServiceCheck(Script):
|
|
|
AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
|
|
|
AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
|
|
|
- AMS_CONNECT_TRIES = 30
|
|
|
- AMS_CONNECT_TIMEOUT = 15
|
|
|
- AMS_READ_TRIES = 10
|
|
|
- AMS_READ_TIMEOUT = 5
|
|
|
+ AMS_CONNECT_TRIES = 10
|
|
|
+ AMS_CONNECT_TIMEOUT = 10
|
|
|
+ AMS_READ_TRIES = 5
|
|
|
+ AMS_READ_TIMEOUT = 10
|
|
|
|
|
|
@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
|
|
|
def service_check(self, env):
|
|
@@ -62,124 +63,139 @@ class AMSServiceCheck(Script):
|
|
|
if not check_windows_service_exists(params.ams_collector_win_service_name):
|
|
|
raise Fail("Metrics Collector service was not properly installed. Check the logs and retry the installation.")
|
|
|
|
|
|
- @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
|
|
|
- def service_check(self, env):
|
|
|
- import params
|
|
|
-
|
|
|
- Logger.info("Ambari Metrics service check was started.")
|
|
|
- env.set_params(params)
|
|
|
-
|
|
|
+ def service_check_for_single_host(self, metric_collector_host, params):
|
|
|
random_value1 = random.random()
|
|
|
headers = {"Content-type": "application/json"}
|
|
|
ca_certs = os.path.join(params.ams_collector_conf_dir,
|
|
|
params.metric_truststore_ca_certs)
|
|
|
|
|
|
- for i in xrange(0, self.AMS_CONNECT_TRIES):
|
|
|
- try:
|
|
|
- current_time = int(time.time()) * 1000
|
|
|
- metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1,
|
|
|
+ current_time = int(time.time()) * 1000
|
|
|
+ metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1,
|
|
|
current_time=current_time).get_content()
|
|
|
- Logger.info("Generated metrics:\n%s" % metric_json)
|
|
|
-
|
|
|
- Logger.info("Connecting (POST) to %s:%s%s" % (params.metric_collector_host,
|
|
|
- params.metric_collector_port,
|
|
|
- self.AMS_METRICS_POST_URL))
|
|
|
- conn = network.get_http_connection(params.metric_collector_host,
|
|
|
+ try:
|
|
|
+ post_metrics_to_collector(self.AMS_METRICS_POST_URL, metric_collector_host, params.metric_collector_port, params.metric_collector_https_enabled,
|
|
|
+ metric_json, headers, ca_certs, self.AMS_CONNECT_TRIES, self.AMS_CONNECT_TIMEOUT)
|
|
|
+
|
|
|
+ get_metrics_parameters = {
|
|
|
+ "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
|
|
|
+ "appId": "amssmoketestfake",
|
|
|
+ "hostname": params.hostname,
|
|
|
+ "startTime": current_time - 60000,
|
|
|
+ "endTime": current_time + 61000,
|
|
|
+ "precision": "seconds",
|
|
|
+ "grouped": "false",
|
|
|
+ }
|
|
|
+ encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
|
|
|
+
|
|
|
+ Logger.info("Connecting (GET) to %s:%s%s" % (metric_collector_host,
|
|
|
+ params.metric_collector_port,
|
|
|
+ self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
|
|
|
+ for i in xrange(0, self.AMS_READ_TRIES):
|
|
|
+ conn = network.get_http_connection(metric_collector_host,
|
|
|
int(params.metric_collector_port),
|
|
|
params.metric_collector_https_enabled,
|
|
|
ca_certs)
|
|
|
- conn.request("POST", self.AMS_METRICS_POST_URL, metric_json, headers)
|
|
|
-
|
|
|
+ conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
|
|
|
response = conn.getresponse()
|
|
|
- Logger.info("Http response: %s %s" % (response.status, response.reason))
|
|
|
- except (httplib.HTTPException, socket.error) as ex:
|
|
|
- if i < self.AMS_CONNECT_TRIES - 1: #range/xrange returns items from start to end-1
|
|
|
- time.sleep(self.AMS_CONNECT_TIMEOUT)
|
|
|
- Logger.info("Connection failed. Next retry in %s seconds."
|
|
|
- % (self.AMS_CONNECT_TIMEOUT))
|
|
|
- continue
|
|
|
- else:
|
|
|
- raise Fail("Metrics were not saved. Service check has failed. "
|
|
|
- "\nConnection failed.")
|
|
|
+ Logger.info("Http response for host %s : %s %s" % (metric_collector_host, response.status, response.reason))
|
|
|
|
|
|
- data = response.read()
|
|
|
- Logger.info("Http data: %s" % data)
|
|
|
- conn.close()
|
|
|
+ data = response.read()
|
|
|
+ Logger.info("Http data: %s" % data)
|
|
|
+ conn.close()
|
|
|
|
|
|
- if response.status == 200:
|
|
|
- Logger.info("Metrics were saved.")
|
|
|
- break
|
|
|
- else:
|
|
|
- Logger.info("Metrics were not saved. Service check has failed.")
|
|
|
- if i < self.AMS_CONNECT_TRIES - 1: #range/xrange returns items from start to end-1
|
|
|
- time.sleep(self.AMS_CONNECT_TIMEOUT)
|
|
|
- Logger.info("Next retry in %s seconds."
|
|
|
- % (self.AMS_CONNECT_TIMEOUT))
|
|
|
+ if response.status == 200:
|
|
|
+ Logger.info("Metrics were retrieved from host %s" % metric_collector_host)
|
|
|
+ else:
|
|
|
+ raise Fail("Metrics were not retrieved from host %s. GET request status: %s %s \n%s" %
|
|
|
+ (metric_collector_host, response.status, response.reason, data))
|
|
|
+ data_json = json.loads(data)
|
|
|
+
|
|
|
+ def floats_eq(f1, f2, delta):
|
|
|
+ return abs(f1-f2) < delta
|
|
|
+
|
|
|
+ values_are_present = False
|
|
|
+ for metrics_data in data_json["metrics"]:
|
|
|
+ if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
|
|
|
+ and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
|
|
|
+ and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)):
|
|
|
+ Logger.info("Values %s and %s were found in the response from host %s." % (metric_collector_host, random_value1, current_time))
|
|
|
+ values_are_present = True
|
|
|
+ break
|
|
|
+ pass
|
|
|
+
|
|
|
+ if not values_are_present:
|
|
|
+ if i < self.AMS_READ_TRIES - 1: #range/xrange returns items from start to end-1
|
|
|
+ Logger.info("Values weren't stored yet. Retrying in %s seconds."
|
|
|
+ % (self.AMS_READ_TIMEOUT))
|
|
|
+ time.sleep(self.AMS_READ_TIMEOUT)
|
|
|
+ else:
|
|
|
+ raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
|
|
|
else:
|
|
|
- raise Fail("Metrics were not saved. Service check has failed. POST request status: %s %s \n%s" %
|
|
|
- (response.status, response.reason, data))
|
|
|
-
|
|
|
- get_metrics_parameters = {
|
|
|
- "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
|
|
|
- "appId": "amssmoketestfake",
|
|
|
- "hostname": params.hostname,
|
|
|
- "startTime": current_time - 60000,
|
|
|
- "endTime": current_time + 61000,
|
|
|
- "precision": "seconds",
|
|
|
- "grouped": "false",
|
|
|
- }
|
|
|
- encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
|
|
|
-
|
|
|
- Logger.info("Connecting (GET) to %s:%s%s" % (params.metric_collector_host,
|
|
|
- params.metric_collector_port,
|
|
|
- self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
|
|
|
- for i in xrange(0, self.AMS_READ_TRIES):
|
|
|
- conn = network.get_http_connection(params.metric_collector_host,
|
|
|
- int(params.metric_collector_port),
|
|
|
- params.metric_collector_https_enabled,
|
|
|
- ca_certs)
|
|
|
- conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
|
|
|
- response = conn.getresponse()
|
|
|
- Logger.info("Http response: %s %s" % (response.status, response.reason))
|
|
|
-
|
|
|
- data = response.read()
|
|
|
- Logger.info("Http data: %s" % data)
|
|
|
- conn.close()
|
|
|
-
|
|
|
- if response.status == 200:
|
|
|
- Logger.info("Metrics were retrieved.")
|
|
|
- else:
|
|
|
- Logger.info("Metrics were not retrieved. Service check has failed.")
|
|
|
- raise Fail("Metrics were not retrieved. Service check has failed. GET request status: %s %s \n%s" %
|
|
|
- (response.status, response.reason, data))
|
|
|
- data_json = json.loads(data)
|
|
|
-
|
|
|
- def floats_eq(f1, f2, delta):
|
|
|
- return abs(f1-f2) < delta
|
|
|
-
|
|
|
- values_are_present = False
|
|
|
- for metrics_data in data_json["metrics"]:
|
|
|
- if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
|
|
|
- and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
|
|
|
- and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)):
|
|
|
- Logger.info("Values %s and %s were found in the response." % (random_value1, current_time))
|
|
|
- values_are_present = True
|
|
|
break
|
|
|
pass
|
|
|
+ except Fail as ex:
|
|
|
+ Logger.warning("Ambari Metrics service check failed on collector host %s. Reason : %s" % (metric_collector_host, str(ex)))
|
|
|
+ raise Fail("Ambari Metrics service check failed on collector host %s. Reason : %s" % (metric_collector_host, str(ex)))
|
|
|
+
|
|
|
+ @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
|
|
|
+ def service_check(self, env):
|
|
|
+ import params
|
|
|
+
|
|
|
+ Logger.info("Ambari Metrics service check was started.")
|
|
|
+ env.set_params(params)
|
|
|
|
|
|
- if not values_are_present:
|
|
|
- if i < self.AMS_READ_TRIES - 1: #range/xrange returns items from start to end-1
|
|
|
- Logger.info("Values weren't stored yet. Retrying in %s seconds."
|
|
|
- % (self.AMS_READ_TIMEOUT))
|
|
|
- time.sleep(self.AMS_READ_TIMEOUT)
|
|
|
+ results = execute_in_parallel(self.service_check_for_single_host, params.ams_collector_hosts.split(','), params)
|
|
|
+
|
|
|
+ for host in str(params.ams_collector_hosts).split(","):
|
|
|
+ if host in results:
|
|
|
+ if results[host].status == SUCCESS:
|
|
|
+ Logger.info("Ambari Metrics service check passed on host " + host)
|
|
|
+ return
|
|
|
else:
|
|
|
- Logger.info("Values %s and %s were not found in the response." % (random_value1, current_time))
|
|
|
- raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
|
|
|
- else:
|
|
|
- break
|
|
|
- pass
|
|
|
- Logger.info("Ambari Metrics service check is finished.")
|
|
|
+ Logger.warning(results[host].result)
|
|
|
+ raise Fail("All metrics collectors are unavailable.")
|
|
|
+
|
|
|
+def post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, metric_collector_port, metric_collector_https_enabled,
|
|
|
+ metric_json, headers, ca_certs, tries = 1, connect_timeout = 10):
|
|
|
+ for i in xrange(0, tries):
|
|
|
+ try:
|
|
|
+ Logger.info("Generated metrics for host %s :\n%s" % (metric_collector_host, metric_json))
|
|
|
+
|
|
|
+ Logger.info("Connecting (POST) to %s:%s%s" % (metric_collector_host,
|
|
|
+ metric_collector_port,
|
|
|
+ ams_metrics_post_url))
|
|
|
+ conn = network.get_http_connection(metric_collector_host,
|
|
|
+ int(metric_collector_port),
|
|
|
+ metric_collector_https_enabled,
|
|
|
+ ca_certs)
|
|
|
+ conn.request("POST", ams_metrics_post_url, metric_json, headers)
|
|
|
|
|
|
+ response = conn.getresponse()
|
|
|
+ Logger.info("Http response for host %s: %s %s" % (metric_collector_host, response.status, response.reason))
|
|
|
+ except (httplib.HTTPException, socket.error) as ex:
|
|
|
+ if i < tries - 1: #range/xrange returns items from start to end-1
|
|
|
+ time.sleep(connect_timeout)
|
|
|
+ Logger.info("Connection failed for host %s. Next retry in %s seconds."
|
|
|
+ % (metric_collector_host, connect_timeout))
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ raise Fail("Metrics were not saved. Connection failed.")
|
|
|
+
|
|
|
+ data = response.read()
|
|
|
+ Logger.info("Http data: %s" % data)
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ if response.status == 200:
|
|
|
+ Logger.info("Metrics were saved.")
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ Logger.info("Metrics were not saved.")
|
|
|
+ if i < tries - 1: #range/xrange returns items from start to end-1
|
|
|
+ time.sleep(tries)
|
|
|
+ Logger.info("Next retry in %s seconds."
|
|
|
+ % (tries))
|
|
|
+ else:
|
|
|
+ raise Fail("Metrics were not saved. POST request status: %s %s \n%s" %
|
|
|
+ (response.status, response.reason, data))
|
|
|
if __name__ == "__main__":
|
|
|
AMSServiceCheck().execute()
|
|
|
-
|