فهرست منبع

AMBARI-16766 Implement in ams collector batch insert operations to ams-hbase (dsen)

Dmytro Sen 9 سال پیش
والد
کامیت
7dfce9b11c

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -285,7 +285,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
     // Error indicated by the Sql exception
     TimelinePutResponse response = new TimelinePutResponse();
 
-    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics);
+    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
 
     return response;
   }

+ 38 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsCacheCommitterThread.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MetricsCacheCommitterThread implements Runnable {
+
+    private static final Log LOG = LogFactory.getLog(MetricsCacheCommitterThread.class);
+    private static PhoenixHBaseAccessor phoenixHBaseAccessor;
+
+    public MetricsCacheCommitterThread(PhoenixHBaseAccessor phoenixHBaseAccessor) {
+        this.phoenixHBaseAccessor = phoenixHBaseAccessor;
+    }
+    @Override
+    public void run() {
+        LOG.debug("Checking if metrics cache is empty");
+        if (!phoenixHBaseAccessor.isInsertCacheEmpty()) {
+            phoenixHBaseAccessor.commitMetricsFromCache();
+        }
+    }
+}

+ 147 - 76
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -60,14 +60,20 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -91,6 +97,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
@@ -149,6 +158,12 @@ public class PhoenixHBaseAccessor {
   private final RetryCounterFactory retryCounterFactory;
   private final PhoenixConnectionProvider dataSource;
   private final long outOfBandTimeAllowance;
+  private final int cacheSize;
+  private final boolean cacheEnabled;
+  private final BlockingQueue<TimelineMetrics> insertCache;
+  private ScheduledExecutorService scheduledExecutorService;
+  private MetricsCacheCommitterThread metricsCommiterThread;
+  private final int cacheCommitInterval;
   private final boolean skipBlockCacheForAggregatorsEnabled;
   private final String timelineMetricsTablesDurability;
 
@@ -182,9 +197,13 @@ public class PhoenixHBaseAccessor {
     }
     this.dataSource = dataSource;
     this.retryCounterFactory = new RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
-      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
+      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3)));
     this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
       DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE);
+    this.cacheEnabled = Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_ENABLED, "true"));
+    this.cacheSize = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_SIZE, "150"));
+    this.cacheCommitInterval = Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "3"));
+    this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
     this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false);
     this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_TABLES_DURABILITY, "");
 
@@ -197,6 +216,107 @@ public class PhoenixHBaseAccessor {
     tableTTL.put(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, String.valueOf(30 * 86400))); //30 days
     tableTTL.put(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.get(CLUSTER_HOUR_TABLE_TTL, String.valueOf(365 * 86400))); //1 year
     tableTTL.put(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, metricsConf.get(CLUSTER_DAILY_TABLE_TTL, String.valueOf(730 * 86400))); //2 years
+
+    if (cacheEnabled) {
+      LOG.debug("Initialising and starting metrics cache committer thread...");
+      metricsCommiterThread = new MetricsCacheCommitterThread(this);
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+      scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 0, cacheCommitInterval, TimeUnit.SECONDS);
+    }
+  }
+
+  public boolean isInsertCacheEmpty() {
+    return insertCache.isEmpty();
+  }
+
+  public void commitMetricsFromCache() {
+    LOG.debug("Clearing metrics cache");
+    List<TimelineMetrics> metricsArray = new ArrayList<TimelineMetrics>(insertCache.size());
+    while (!insertCache.isEmpty()) {
+      metricsArray.add(insertCache.poll());
+    }
+    if (metricsArray.size() > 0) {
+      commitMetrics(metricsArray);
+    }
+  }
+
+  public void commitMetrics(TimelineMetrics timelineMetrics) {
+    commitMetrics(Collections.singletonList(timelineMetrics));
+  }
+
+  public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
+    LOG.debug("Committing metrics to store");
+    Connection conn = null;
+    PreparedStatement metricRecordStmt = null;
+    long currentTime = System.currentTimeMillis();
+
+    try {
+      conn = getConnection();
+      metricRecordStmt = conn.prepareStatement(String.format(
+              UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+      for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
+        for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+          if (Math.abs(currentTime - metric.getStartTime()) > outOfBandTimeAllowance) {
+            // If timeseries start time is way in the past : discard
+            LOG.debug("Discarding out of band timeseries, currentTime = "
+                    + currentTime + ", startTime = " + metric.getStartTime()
+                    + ", hostname = " + metric.getHostName());
+            continue;
+          }
+
+          metricRecordStmt.clearParameters();
+
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("host: " + metric.getHostName() + ", " +
+                    "metricName = " + metric.getMetricName() + ", " +
+                    "values: " + metric.getMetricValues());
+          }
+          double[] aggregates = AggregatorUtils.calculateAggregates(
+                  metric.getMetricValues());
+
+          metricRecordStmt.setString(1, metric.getMetricName());
+          metricRecordStmt.setString(2, metric.getHostName());
+          metricRecordStmt.setString(3, metric.getAppId());
+          metricRecordStmt.setString(4, metric.getInstanceId());
+          metricRecordStmt.setLong(5, currentTime);
+          metricRecordStmt.setLong(6, metric.getStartTime());
+          metricRecordStmt.setString(7, metric.getUnits());
+          metricRecordStmt.setDouble(8, aggregates[0]);
+          metricRecordStmt.setDouble(9, aggregates[1]);
+          metricRecordStmt.setDouble(10, aggregates[2]);
+          metricRecordStmt.setLong(11, (long) aggregates[3]);
+          String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+          metricRecordStmt.setString(12, json);
+
+          try {
+            metricRecordStmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error("Failed on insert records to store.", sql);
+          }
+        }
+      }
+
+      // commit() blocked if HBase unavailable
+      conn.commit();
+    } catch (Exception exception){
+      exception.printStackTrace();
+    }
+    finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
   }
 
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
@@ -553,94 +673,45 @@ public class PhoenixHBaseAccessor {
   }
 
   public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
-                                              TimelineMetrics metrics) throws SQLException, IOException {
+                                              TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException {
     List<TimelineMetric> timelineMetrics = metrics.getMetrics();
     if (timelineMetrics == null || timelineMetrics.isEmpty()) {
       LOG.debug("Empty metrics insert request.");
       return;
     }
-
-    Connection conn = getConnection();
-    PreparedStatement metricRecordStmt = null;
-    long currentTime = System.currentTimeMillis();
-
-    try {
-      metricRecordStmt = conn.prepareStatement(String.format(
-        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
-
-      for (TimelineMetric metric : timelineMetrics) {
-        if (Math.abs(currentTime - metric.getStartTime()) > outOfBandTimeAllowance) {
-          // If timeseries start time is way in the past : discard
-          LOG.debug("Discarding out of band timeseries, currentTime = "
-            + currentTime + ", startTime = " + metric.getStartTime()
-            + ", hostname = " + metric.getHostName());
-          continue;
-        }
-
-        metricRecordStmt.clearParameters();
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("host: " + metric.getHostName() + ", " +
-            "metricName = " + metric.getMetricName() + ", " +
-            "values: " + metric.getMetricValues());
-        }
-        double[] aggregates =  AggregatorUtils.calculateAggregates(
-          metric.getMetricValues());
-
-        metricRecordStmt.setString(1, metric.getMetricName());
-        metricRecordStmt.setString(2, metric.getHostName());
-        metricRecordStmt.setString(3, metric.getAppId());
-        metricRecordStmt.setString(4, metric.getInstanceId());
-        metricRecordStmt.setLong(5, currentTime);
-        metricRecordStmt.setLong(6, metric.getStartTime());
-        metricRecordStmt.setString(7, metric.getUnits());
-        metricRecordStmt.setDouble(8, aggregates[0]);
-        metricRecordStmt.setDouble(9, aggregates[1]);
-        metricRecordStmt.setDouble(10, aggregates[2]);
-        metricRecordStmt.setLong(11, (long) aggregates[3]);
-        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
-        metricRecordStmt.setString(12, json);
-
-        try {
-          metricRecordStmt.executeUpdate();
-
-          if (metadataManager != null) {
-            // Write to metadata cache on successful write to store
-            metadataManager.putIfModifiedTimelineMetricMetadata(
-              metadataManager.getTimelineMetricMetadata(metric));
-
-            metadataManager.putIfModifiedHostedAppsMetadata(
-              metric.getHostName(), metric.getAppId());
-          }
-
-        } catch (SQLException sql) {
-          LOG.error("Failed on insert records to store.", sql);
-        }
+    for (TimelineMetric tm: timelineMetrics) {
+      // Write to metadata cache on successful write to store
+      if (metadataManager != null) {
+        metadataManager.putIfModifiedTimelineMetricMetadata(
+                metadataManager.getTimelineMetricMetadata(tm));
+
+        metadataManager.putIfModifiedHostedAppsMetadata(
+                tm.getHostName(), tm.getAppId());
       }
+    }
 
-      // commit() blocked if HBase unavailable
-      conn.commit();
-
-    } finally {
-      if (metricRecordStmt != null) {
-        try {
-          metricRecordStmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
+    if  (!skipCache && cacheEnabled) {
+      LOG.debug("Adding metrics to cache");
+      if (insertCache.size() >= cacheSize) {
+        commitMetricsFromCache();
       }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
+      try {
+        insertCache.put(metrics); // blocked while the queue is full
+      } catch (InterruptedException e) {
+        e.printStackTrace();
       }
+    } else {
+      LOG.debug("Skipping metrics cache");
+      commitMetrics(metrics);
     }
   }
 
+  public void insertMetricRecords(TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException {
+    insertMetricRecordsWithMetadata(null, metrics, skipCache);
+  }
+
   public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException {
-    insertMetricRecordsWithMetadata(null, metrics);
+    insertMetricRecords(metrics, false);
   }
 
   @SuppressWarnings("unchecked")

+ 9 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java

@@ -42,6 +42,15 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
     "timeline.metrics.aggregator.checkpoint.dir";
 
+  public static final String TIMELINE_METRICS_CACHE_SIZE =
+    "timeline.metrics.cache.size";
+
+  public static final String TIMELINE_METRICS_CACHE_COMMIT_INTERVAL =
+    "timeline.metrics.cache.commit.interval";
+
+  public static final String TIMELINE_METRICS_CACHE_ENABLED =
+    "timeline.metrics.cache.enabled";
+
   public static final String DEFAULT_CHECKPOINT_LOCATION =
     System.getProperty("java.io.tmpdir");
 

+ 38 - 1
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java

@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
@@ -41,6 +41,8 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -206,4 +208,39 @@ public class PhoenixHBaseAccessorTest {
     PowerMock.verifyAll();
   }
 
+  @Test
+  public void testMetricsCacheCommittingWhenFull() throws IOException, SQLException {
+    Configuration hbaseConf = new Configuration();
+    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
+    Configuration metricsConf = new Configuration();
+    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
+    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100");
+    final Connection connection = EasyMock.createNiceMock(Connection.class);
+
+
+    PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf) {
+      @Override
+      public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
+        try {
+          connection.commit();
+        } catch (SQLException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    TimelineMetrics timelineMetrics = EasyMock.createNiceMock(TimelineMetrics.class);
+    EasyMock.expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes();
+    connection.commit();
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(timelineMetrics, connection);
+
+    accessor.insertMetricRecords(timelineMetrics);
+    accessor.insertMetricRecords(timelineMetrics);
+    accessor.insertMetricRecords(timelineMetrics);
+
+    EasyMock.verify(timelineMetrics, connection);
+  }
+
 }

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java

@@ -69,7 +69,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
     }});
     timelineMetrics.getMetrics().add(metric2);
 
-    hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics);
+    hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics, true);
   }
 
   @Test(timeout = 180000)

+ 24 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

@@ -552,6 +552,30 @@
       utilization only for user queries.
     </description>
   </property>
+  <property>
+    <name>timeline.metrics.cache.commit.interval</name>
+    <value>3</value>
+    <description>
+      Time in seconds between committing metrics from cache
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+  </property>
+  <property>
+    <name>timeline.metrics.cache.size</name>
+    <value>150</value>
+    <description>
+      Size of array blocking queue used to cache metrics
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.cache.enabled</name>
+    <value>true</value>
+    <description>
+      If set to true PhoenixHBaseAccessor will use cache to store metrics before committing them
+    </description>
+  </property>
   <property>
     <name>timeline.metrics.service.http.policy</name>
     <value>HTTP_ONLY</value>

+ 45 - 33
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py

@@ -41,6 +41,8 @@ class AMSServiceCheck(Script):
   AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
   AMS_CONNECT_TRIES = 30
   AMS_CONNECT_TIMEOUT = 15
+  AMS_READ_TRIES = 10
+  AMS_READ_TIMEOUT = 5
 
   @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
   def service_check(self, env):
@@ -131,41 +133,51 @@ class AMSServiceCheck(Script):
     Logger.info("Connecting (GET) to %s:%s%s" % (params.metric_collector_host,
                                                  params.metric_collector_port,
                                               self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
+    for i in xrange(0, self.AMS_READ_TRIES):
+      conn = network.get_http_connection(params.metric_collector_host,
+                                         int(params.metric_collector_port),
+                                         params.metric_collector_https_enabled,
+                                         ca_certs)
+      conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
+      response = conn.getresponse()
+      Logger.info("Http response: %s %s" % (response.status, response.reason))
 
-    conn = network.get_http_connection(params.metric_collector_host,
-                                       int(params.metric_collector_port),
-                                       params.metric_collector_https_enabled,
-                                       ca_certs)
-    conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
-    response = conn.getresponse()
-    Logger.info("Http response: %s %s" % (response.status, response.reason))
-
-    data = response.read()
-    Logger.info("Http data: %s" % data)
-    conn.close()
-
-    if response.status == 200:
-      Logger.info("Metrics were retrieved.")
-    else:
-      Logger.info("Metrics were not retrieved. Service check has failed.")
-      raise Fail("Metrics were not retrieved. Service check has failed. GET request status: %s %s \n%s" %
-                 (response.status, response.reason, data))
-    data_json = json.loads(data)
-
-    def floats_eq(f1, f2, delta):
-      return abs(f1-f2) < delta
-
-    for metrics_data in data_json["metrics"]:
-      if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
-          and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
-          and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)):
-        Logger.info("Values %s and %s were found in the response." % (random_value1, current_time))
-        break
-      pass
-    else:
-      Logger.info("Values %s and %s were not found in the response." % (random_value1, current_time))
-      raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
+      data = response.read()
+      Logger.info("Http data: %s" % data)
+      conn.close()
 
+      if response.status == 200:
+        Logger.info("Metrics were retrieved.")
+      else:
+        Logger.info("Metrics were not retrieved. Service check has failed.")
+        raise Fail("Metrics were not retrieved. Service check has failed. GET request status: %s %s \n%s" %
+                   (response.status, response.reason, data))
+      data_json = json.loads(data)
+
+      def floats_eq(f1, f2, delta):
+        return abs(f1-f2) < delta
+
+      values_are_present = False
+      for metrics_data in data_json["metrics"]:
+        if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
+            and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
+            and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)):
+          Logger.info("Values %s and %s were found in the response." % (random_value1, current_time))
+          values_are_present = True
+          break
+          pass
+
+      if not values_are_present:
+        if i < self.AMS_READ_TRIES - 1:  #range/xrange returns items from start to end-1
+          Logger.info("Values weren't stored yet. Retrying in %s seconds."
+                    % (self.AMS_READ_TIMEOUT))
+          time.sleep(self.AMS_READ_TIMEOUT)
+        else:
+          Logger.info("Values %s and %s were not found in the response." % (random_value1, current_time))
+          raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
+      else:
+        break
+        pass
     Logger.info("Ambari Metrics service check is finished.")
 
 if __name__ == "__main__":