Przeglądaj źródła

AMBARI-9604 Fix Flume Agent graphs on Ambari service page (dsen)

Dmytro Sen 10 lat temu
rodzic
commit
075acd71f3
24 zmienionych plików z 980 dodań i 575 usunięć
  1. 6 2
      ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
  2. 21 4
      ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
  3. 10 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
  4. 169 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java
  5. 96 8
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  6. 158 79
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
  7. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
  8. 42 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java
  9. 3 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
  10. 3 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
  11. 65 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java
  12. 47 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
  13. 65 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
  14. 76 4
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
  15. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
  16. 17 5
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
  17. 13 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
  18. 160 450
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/metrics.json
  19. 3 1
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
  20. 4 0
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
  21. 6 2
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
  22. 6 6
      ambari-web/app/utils/ajax/ajax.js
  23. 5 3
      ambari-web/app/views/main/service/info/metrics/flume/channel_size_mma.js
  24. 2 2
      ambari-web/app/views/main/service/info/metrics/flume/channel_sum.js

+ 6 - 2
ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2

@@ -16,7 +16,11 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_host}}:8188
+collector={{metric_collector_host}}
+port={{metric_collector_port}}
 collectionFrequency=60000
 maxRowCacheSize=10000
-sendInterval=59000
+sendInterval=59000
+
+# Metric names having type COUNTER
+counters=EventTakeSuccessCount,EventPutSuccessCount,EventTakeAttemptCount,EventPutAttemptCount

+ 21 - 4
ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java

@@ -24,6 +24,7 @@ import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
@@ -38,8 +39,11 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -51,6 +55,8 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private ScheduledExecutorService scheduledExecutorService;
   private long pollFrequency;
   private String hostname;
+  private final static String COUNTER_METRICS_PROPERTY = "counters";
+  private final Set<String> counterMetrics = new HashSet<String>();
 
   @Override
   public void start() {
@@ -84,13 +90,18 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
-    collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics";
+    String collectorHostname = configuration.getProperty(COLLECTOR_HOST_PROPERTY);
+    String port = configuration.getProperty(COLLECTOR_PORT_PROPERTY);
+    collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
     List<InetSocketAddress> socketAddresses =
-        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 6188);
+      Servers.parse(collectorHostname, Integer.valueOf(port));
     if (socketAddresses != null && !socketAddresses.isEmpty()) {
       socketAddress = socketAddresses.get(0);
     }
     pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
+
+    String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
+    Collections.addAll(counterMetrics, metrics);
   }
 
   @Override
@@ -147,7 +158,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
           TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis,
               component, attributeName, attributeValue);
           // Put intermediate values into the cache until it is time to send
-          metricsCache.putTimelineMetric(timelineMetric);
+          metricsCache.putTimelineMetric(timelineMetric, getMetricType(attributeName));
 
           TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName);
 
@@ -168,7 +179,8 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
       TimelineMetric timelineMetric = new TimelineMetric();
       timelineMetric.setMetricName(attributeName);
       timelineMetric.setHostName(hostname);
-      timelineMetric.setAppId("flume." + component);
+      timelineMetric.setInstanceId(component);
+      timelineMetric.setAppId("FLUME_HANDLER");
       timelineMetric.setStartTime(currentTimeMillis);
       timelineMetric.setType(ClassUtils.getShortCanonicalName(
           attributeValue, "Number"));
@@ -176,4 +188,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
       return timelineMetric;
     }
   }
+
+  private MetricType getMetricType(String attributeName) {
+    return counterMetrics.contains(attributeName) ?
+      MetricType.COUNTER : MetricType.GAUGE;
+  }
 }

+ 10 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java

@@ -203,13 +203,15 @@ public abstract class AbstractTimelineAggregator implements Runnable {
 
     Connection conn = null;
     PreparedStatement stmt = null;
+    ResultSet rs = null;
 
     try {
       conn = hBaseAccessor.getConnection();
+      // FLUME 2. aggregate and ignore the instance
       stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
 
       LOG.debug("Query issued @: " + new Date());
-      ResultSet rs = stmt.executeQuery();
+      rs = stmt.executeQuery();
       LOG.debug("Query returned @: " + new Date());
 
       aggregate(rs, startTime, endTime);
@@ -222,6 +224,13 @@ public abstract class AbstractTimelineAggregator implements Runnable {
       LOG.error("Exception during aggregating metrics.", e);
       success = false;
     } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
       if (stmt != null) {
         try {
           stmt.close();

+ 169 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java

@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+/**
+ * Is used to determine metrics aggregate table.
+ *
+ * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric
+ * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics
+ */
+public class Function {
+  public static Function DEFAULT_VALUE_FUNCTION =
+    new Function(ReadFunction.VALUE, null);
+  private static final String SUFFIX_SEPARATOR = "\\._";
+
+  private ReadFunction readFunction = ReadFunction.VALUE;
+  private PostProcessingFunction postProcessingFunction = null;
+
+  public Function(){
+
+  }
+
+  public Function(ReadFunction readFunction,
+                  PostProcessingFunction ppFunction){
+    if (readFunction!=null){
+      this.readFunction = readFunction ;
+    }
+    this.postProcessingFunction = ppFunction;
+  }
+
+  public static Function fromMetricName(String metricName){
+    // gets postprocessing, and aggregation function
+    // ex. Metric._rate._avg
+    String[] parts = metricName.split(SUFFIX_SEPARATOR);
+
+    ReadFunction readFunction = ReadFunction.VALUE;
+    PostProcessingFunction ppFunction = null;
+
+      if (parts.length == 3) {
+        ppFunction = PostProcessingFunction.getFunction(parts[1]);
+        readFunction = ReadFunction.getFunction(parts[2]);
+      } else if (parts.length == 2) {
+        ppFunction = null;
+        readFunction = ReadFunction.getFunction(parts[1]);
+      }
+
+
+    return new Function(readFunction, ppFunction);
+  }
+
+  public String getSuffix(){
+    return (postProcessingFunction == null)? readFunction.getSuffix() :
+      postProcessingFunction.getSuffix() + readFunction.getSuffix();
+  }
+
+  public ReadFunction getReadFunction() {
+    return readFunction;
+  }
+
+  @Override
+  public String toString() {
+    return "Function{" +
+      "readFunction=" + readFunction +
+      ", postProcessingFunction=" + postProcessingFunction +
+      '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Function)) return false;
+
+    Function function = (Function) o;
+
+    return postProcessingFunction == function.postProcessingFunction
+      && readFunction == function.readFunction;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = readFunction.hashCode();
+    result = 31 * result + (postProcessingFunction != null ?
+      postProcessingFunction.hashCode() : 0);
+    return result;
+  }
+
+  public enum PostProcessingFunction {
+    NONE(""),
+    RATE("._rate");
+
+    PostProcessingFunction(String suffix){
+      this.suffix = suffix;
+    }
+
+    private String suffix = "";
+
+    public String getSuffix(){
+      return suffix;
+    }
+
+    public static PostProcessingFunction getFunction(String functionName) throws
+      FunctionFormatException {
+      if (functionName == null) {
+        return NONE;
+      }
+
+      try {
+        return PostProcessingFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new FunctionFormatException("Function should be value, avg, min, " +
+          "max", e);
+      }
+    }
+  }
+
+  public enum ReadFunction {
+    VALUE(""),
+    AVG("._avg"),
+    MIN("._min"),
+    MAX("._max"),
+    SUM("._sum");
+
+    private final String suffix;
+
+    ReadFunction(String suffix){
+      this.suffix = suffix;
+    }
+
+    public String getSuffix() {
+      return suffix;
+    }
+
+    public static ReadFunction getFunction(String functionName) throws
+      FunctionFormatException {
+      if (functionName == null) {
+        return VALUE;
+      }
+      try {
+        return ReadFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new FunctionFormatException(
+          "Function should be value, avg, min, max. Got " + functionName, e);
+      }
+    }
+  }
+
+  public static class FunctionFormatException extends IllegalArgumentException {
+    public FunctionFormatException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+}

+ 96 - 8
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -27,7 +27,9 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -107,14 +109,94 @@ public class HBaseTimelineMetricStore extends AbstractService
       Long startTime, Long endTime, Precision precision, Integer limit,
       boolean groupedByHosts) throws SQLException, IOException {
 
-    Condition condition = new DefaultCondition(metricNames, hostname, applicationId,
-      instanceId, startTime, endTime, precision, limit, groupedByHosts);
+    Map<String, List<Function>> metricFunctions =
+      parseMetricNamesToAggregationFunctions(metricNames);
+
+    Condition condition = new DefaultCondition(
+      new ArrayList<String>(metricFunctions.keySet()),
+      hostname, applicationId, instanceId, startTime, endTime,
+      precision, limit, groupedByHosts);
 
     if (hostname == null) {
-      return hBaseAccessor.getAggregateMetricRecords(condition);
+      TimelineMetrics metrics = hBaseAccessor.getAggregateMetricRecords
+        (condition,  metricFunctions);
+
+      return postProcessMetrics(metrics);
+    }
+
+    return postProcessMetrics(
+      hBaseAccessor.getMetricRecords(condition, metricFunctions));
+  }
+
+  private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
+    List<TimelineMetric> metricsList = metrics.getMetrics();
+
+    for (TimelineMetric metric: metricsList){
+      String name = metric.getMetricName();
+      if (name.contains("._rate")){
+        updateValueAsRate(metric.getMetricValues());
+      }
     }
 
-    return hBaseAccessor.getMetricRecords(condition);
+    return metrics;
+  }
+
+  private Map<Long, Double> updateValueAsRate(Map<Long, Double> metricValues) {
+    Long prevTime = null;
+    long step;
+
+    for (Map.Entry<Long, Double> timeValueEntry : metricValues.entrySet()) {
+      Long currTime = timeValueEntry.getKey();
+      Double currVal = timeValueEntry.getValue();
+
+      if (prevTime != null) {
+        step = currTime - prevTime;
+        Double rate = currVal / step;
+        timeValueEntry.setValue(rate);
+      } else {
+        timeValueEntry.setValue(0.0);
+      }
+
+      prevTime = currTime;
+    }
+
+    return metricValues;
+  }
+
+  public static HashMap<String, List<Function>>
+  parseMetricNamesToAggregationFunctions(List<String> metricNames) {
+    HashMap<String, List<Function>> metricsFunctions = new HashMap<String,
+      List<Function>>();
+
+    for (String metricName : metricNames){
+      Function function = Function.DEFAULT_VALUE_FUNCTION;
+      String cleanMetricName = metricName;
+
+      try {
+        function = Function.fromMetricName(metricName);
+        int functionStartIndex = metricName.indexOf("._");
+        if(functionStartIndex > 0 ) {
+          cleanMetricName = metricName.substring(0, functionStartIndex);
+        }
+      } catch (Function.FunctionFormatException ffe){
+        // unknown function so
+        // fallback to VALUE, and fullMetricName
+      }
+
+      addFunctionToMetricName(metricsFunctions, cleanMetricName, function);
+    }
+
+    return metricsFunctions;
+  }
+
+  private static void addFunctionToMetricName(
+    HashMap<String, List<Function>> metricsFunctions, String cleanMetricName,
+    Function function) {
+
+    List<Function> functionsList = metricsFunctions.get(cleanMetricName);
+    if (functionsList==null) functionsList = new ArrayList<Function>(1);
+    functionsList.add(function);
+    metricsFunctions.put(cleanMetricName, functionsList);
   }
 
   @Override
@@ -123,10 +205,16 @@ public class HBaseTimelineMetricStore extends AbstractService
       Long endTime, Precision precision, Integer limit)
       throws SQLException, IOException {
 
-    TimelineMetrics metrics = hBaseAccessor.getMetricRecords(
-      new DefaultCondition(Collections.singletonList(metricName), hostname,
-        applicationId, instanceId, startTime, endTime, precision, limit, true)
-    );
+    Map<String, List<Function>> metricFunctions =
+      parseMetricNamesToAggregationFunctions(Collections.singletonList(metricName));
+
+    Condition condition = new DefaultCondition(
+      new ArrayList<String>(metricFunctions.keySet()), hostname, applicationId,
+      instanceId, startTime, endTime, precision, limit, true);
+    TimelineMetrics metrics = hBaseAccessor.getMetricRecords(condition,
+      metricFunctions);
+
+    metrics = postProcessMetrics(metrics);
 
     TimelineMetric metric = new TimelineMetric();
     List<TimelineMetric> metricList = metrics.getMetrics();

+ 158 - 79
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -34,6 +34,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -77,6 +78,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 public class PhoenixHBaseAccessor {
 
   private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+  private static final TimelineMetricReader timelineMetricReader = new TimelineMetricReader();
   private final Configuration hbaseConf;
   private final Configuration metricsConf;
   private final RetryCounterFactory retryCounterFactory;
@@ -150,50 +152,49 @@ public class PhoenixHBaseAccessor {
 
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
     throws SQLException, IOException {
-    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+    TimelineMetric metric = timelineMetricReader
+      .getTimelineMetricCommonsFromResultSet(rs);
     metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
 
     return metric;
   }
 
-  static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    Map<Long, Double> sortedByTimeMetrics =
-      new TreeMap<Long, Double>(readMetricFromJSON(rs.getString("METRICS")));
-    metric.setMetricValues(sortedByTimeMetrics);
-    return metric;
-  }
-
-  /**
-   * Returns common part of timeline metrics record without the values.
-   */
-  private static TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) 
-    throws SQLException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setHostName(rs.getString("HOSTNAME"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("START_TIME"));
-    metric.setType(rs.getString("UNITS"));
-    return metric;
-  }
+  static TimelineMetric getAggregatedTimelineMetricFromResultSet(
+    ResultSet rs, Function f) throws SQLException, IOException {
 
-  static TimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs)
-      throws SQLException, IOException {
     TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setHostName(rs.getString("HOSTNAME"));
     metric.setAppId(rs.getString("APP_ID"));
     metric.setInstanceId(rs.getString("INSTANCE_ID"));
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setStartTime(rs.getLong("SERVER_TIME"));
     metric.setType(rs.getString("UNITS"));
+
+    // get functions for metricnames
+
+    double value;
+    switch(f.getReadFunction()){
+      case AVG:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+      case MIN:
+        value = rs.getDouble("METRIC_MIN");
+        break;
+      case MAX:
+        value = rs.getDouble("METRIC_MAX");
+        break;
+      case SUM:
+        value = rs.getDouble("METRIC_SUM");
+        break;
+      default:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+    }
+
+    metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
+
     Map<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(rs.getLong("SERVER_TIME"),
-        rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"));
+    valueMap.put(rs.getLong("SERVER_TIME"), value);
     metric.setMetricValues(valueMap);
     return metric;
   }
@@ -238,18 +239,6 @@ public class PhoenixHBaseAccessor {
     return metricHostAggregate;
   }
 
-  static TimelineClusterMetric getTimelineMetricClusterKeyFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineClusterMetric metric = new TimelineClusterMetric(
-      rs.getString("METRIC_NAME"),
-      rs.getString("APP_ID"),
-      rs.getString("INSTANCE_ID"),
-      rs.getLong("SERVER_TIME"),
-      rs.getString("UNITS"));
-
-    return metric;
-  }
-
   static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
     throws SQLException {
     MetricClusterAggregate agg = new MetricClusterAggregate();
@@ -411,7 +400,8 @@ public class PhoenixHBaseAccessor {
   }
 
   @SuppressWarnings("unchecked")
-  public TimelineMetrics getMetricRecords(final Condition condition)
+  public TimelineMetrics getMetricRecords(
+    final Condition condition, Map<String, List<Function>> metricFunctions)
     throws SQLException, IOException {
 
     verifyCondition(condition);
@@ -429,19 +419,7 @@ public class PhoenixHBaseAccessor {
 
         ResultSet rs = stmt.executeQuery();
         while (rs.next()) {
-          TimelineMetric metric;
-          if (condition.getPrecision() == Precision.HOURS
-              || condition.getPrecision() == Precision.MINUTES) {
-            metric = getAggregatedTimelineMetricFromResultSet(rs);
-          } else {
-            metric = getTimelineMetricFromResultSet(rs);
-          }
-
-          if (condition.isGrouped()) {
-            metrics.addOrMergeTimelineMetric(metric);
-          } else {
-            metrics.getMetrics().add(metric);
-          }
+          appendMetricFromResultSet(metrics, condition, metricFunctions, rs);
         }
       }
 
@@ -466,6 +444,40 @@ public class PhoenixHBaseAccessor {
     return metrics;
   }
 
+  private void appendMetricFromResultSet(
+    TimelineMetrics metrics, Condition condition, Map<String,
+    List<Function>> metricFunctions, ResultSet rs)
+    throws SQLException, IOException {
+    if (condition.getPrecision() == Precision.HOURS
+      || condition.getPrecision() == Precision.MINUTES) {
+
+      String metricName = rs.getString("METRIC_NAME");
+      List<Function> functions = metricFunctions.get(metricName);
+
+      for (Function f : functions) {
+        TimelineMetric metric;
+
+        metric = getAggregatedTimelineMetricFromResultSet(rs, f);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+    }
+    else {
+      TimelineMetric metric;
+      metric = timelineMetricReader.getTimelineMetricFromResultSet(rs);
+
+      if (condition.isGrouped()) {
+        metrics.addOrMergeTimelineMetric(metric);
+      } else {
+        metrics.getMetrics().add(metric);
+      }
+    }
+  }
+
   private PreparedStatement getLatestMetricRecords(
     Condition condition, Connection conn, TimelineMetrics metrics)
     throws SQLException, IOException {
@@ -495,13 +507,16 @@ public class PhoenixHBaseAccessor {
    * @return @TimelineMetrics
    * @throws SQLException
    */
-  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+  public TimelineMetrics getAggregateMetricRecords(
+    final Condition condition,
+    Map<String, List<Function>> metricFunctions)
     throws SQLException {
 
     verifyCondition(condition);
 
     Connection conn = getConnection();
     PreparedStatement stmt = null;
+    ResultSet rs = null;
     TimelineMetrics metrics = new TimelineMetrics();
 
     try {
@@ -511,23 +526,20 @@ public class PhoenixHBaseAccessor {
       } else {
         stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
 
-        ResultSet rs = stmt.executeQuery();
+        rs = stmt.executeQuery();
         while (rs.next()) {
-          TimelineMetric metric;
-          if (condition.getPrecision() == Precision.HOURS) {
-            metric = getAggregateHoursTimelineMetricFromResultSet(rs);
-          } else {
-            metric = getAggregateTimelineMetricFromResultSet(rs);
-          }
-
-          if (condition.isGrouped()) {
-            metrics.addOrMergeTimelineMetric(metric);
-          } else {
-            metrics.getMetrics().add(metric);
-          }
+          appendAggregateMetricFromResultSet(metrics, condition,
+            metricFunctions, rs);
         }
       }
     } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
       if (stmt != null) {
         try {
           stmt.close();
@@ -548,6 +560,31 @@ public class PhoenixHBaseAccessor {
     return metrics;
   }
 
+  private void appendAggregateMetricFromResultSet(
+    TimelineMetrics metrics, Condition condition,
+    Map<String, List<Function>> metricFunctions, ResultSet rs)
+    throws SQLException {
+
+    String metricName = rs.getString("METRIC_NAME");
+    List<Function> functions = metricFunctions.get(metricName);
+
+    for (Function aggregateFunction : functions) {
+      TimelineMetric metric;
+
+      if (condition.getPrecision() == Precision.HOURS) {
+        metric = getAggregateHoursTimelineMetricFromResultSet(rs, aggregateFunction);
+      } else {
+        metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction);
+      }
+
+      if (condition.isGrouped()) {
+        metrics.addOrMergeTimelineMetric(metric);
+      } else {
+        metrics.getMetrics().add(metric);
+      }
+    }
+  }
+
   private PreparedStatement getLatestAggregateMetricRecords(
     Condition condition, Connection conn, TimelineMetrics metrics)
     throws SQLException {
@@ -564,7 +601,8 @@ public class PhoenixHBaseAccessor {
 
       ResultSet rs = stmt.executeQuery();
       while (rs.next()) {
-        TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
+        TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs,
+          new Function());
         metrics.getMetrics().add(metric);
       }
     }
@@ -572,32 +610,73 @@ public class PhoenixHBaseAccessor {
     return stmt;
   }
 
-  private TimelineMetric getAggregateTimelineMetricFromResultSet(ResultSet rs) throws SQLException {
+  private TimelineMetric getAggregateTimelineMetricFromResultSet(
+    ResultSet rs, Function f) throws SQLException {
     TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
     metric.setInstanceId(rs.getString("INSTANCE_ID"));
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setStartTime(rs.getLong("SERVER_TIME"));
+
+    double value;
+    switch(f.getReadFunction()){
+      case AVG:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT");
+        break;
+      case MIN:
+        value = rs.getDouble("METRIC_MIN");
+        break;
+      case MAX:
+        value = rs.getDouble("METRIC_MAX");
+        break;
+      case SUM:
+        value = rs.getDouble("METRIC_SUM");
+        break;
+      default:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT");
+        break;
+    }
+
+    metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
+
     Map<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(rs.getLong("SERVER_TIME"),
-      rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+    valueMap.put(rs.getLong("SERVER_TIME"), value);
     metric.setMetricValues(valueMap);
 
     return metric;
   }
 
   private TimelineMetric getAggregateHoursTimelineMetricFromResultSet(
-    ResultSet rs) throws SQLException {
+    ResultSet rs, Function f) throws SQLException {
     TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
     metric.setInstanceId(rs.getString("INSTANCE_ID"));
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setStartTime(rs.getLong("SERVER_TIME"));
+
+    double value;
+    switch(f.getReadFunction()){
+      case AVG:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+      case MIN:
+        value = rs.getDouble("METRIC_MIN");
+        break;
+      case MAX:
+        value = rs.getDouble("METRIC_MAX");
+        break;
+      case SUM:
+        value = rs.getDouble("METRIC_SUM");
+        break;
+      default:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+    }
+
+    metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
+
     Map<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(rs.getLong("SERVER_TIME"),
-        rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"));
+    valueMap.put(rs.getLong("SERVER_TIME"), value);
     metric.setMetricValues(valueMap);
 
     return metric;

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java

@@ -683,7 +683,7 @@ public class PhoenixTransactSQL {
     public String getHostname() {
       return hostname == null || hostname.isEmpty() ? null : hostname;
     }
-    
+
     public Precision getPrecision() {
       return precision;
     }
@@ -694,7 +694,7 @@ public class PhoenixTransactSQL {
 
     public String getAppId() {
       if (appId != null && !appId.isEmpty()) {
-        if (!appId.equals("HOST")) {
+        if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) {
           return appId.toLowerCase();
         } else {
           return appId;

+ 42 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class TimelineClusterMetricReader {
+
+  private boolean ignoreInstance;
+
+  public TimelineClusterMetricReader(boolean ignoreInstance) {
+    this.ignoreInstance = ignoreInstance;
+  }
+
+  public TimelineClusterMetric fromResultSet(ResultSet rs)
+    throws SQLException {
+
+    return new TimelineClusterMetric(
+      rs.getString("METRIC_NAME"),
+      rs.getString("APP_ID"),
+      ignoreInstance ? null : rs.getString("INSTANCE_ID"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS"));
+  }
+}
+

+ 3 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java

@@ -55,6 +55,8 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
   private final Long sleepIntervalMillis;
   public final int timeSliceIntervalMillis;
   private final Integer checkpointCutOffMultiplier;
+  private TimelineMetricReader timelineMetricReader =
+    new TimelineMetricReader(true);
 
   public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf) {
@@ -125,7 +127,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
 
     while (rs.next()) {
       TimelineMetric metric =
-        PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+        timelineMetricReader.getTimelineMetricFromResultSet(rs);
 
       Map<TimelineClusterMetric, Double> clusterMetrics =
         sliceFromTimelineMetric(metric, timeSlices);

+ 3 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java

@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.Map;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
@@ -53,6 +52,8 @@ public class TimelineMetricClusterAggregatorHourly extends
   private final Integer checkpointCutOffMultiplier;
   private long checkpointCutOffIntervalMillis;
   private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
+  private final TimelineClusterMetricReader timelineClusterMetricReader
+     = new TimelineClusterMetricReader(true);
 
   public TimelineMetricClusterAggregatorHourly(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -115,7 +116,7 @@ public class TimelineMetricClusterAggregatorHourly extends
 
     while (rs.next()) {
       TimelineClusterMetric currentMetric =
-        getTimelineMetricClusterKeyFromResultSet(rs);
+        timelineClusterMetricReader.fromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
         getMetricClusterAggregateFromResultSet(rs);
 

+ 65 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TimelineMetricReader {
+
+  private boolean ignoreInstance = false;
+
+  public TimelineMetricReader() {}
+
+  public TimelineMetricReader(boolean ignoreInstance) {
+    this.ignoreInstance = ignoreInstance;
+  }
+
+  public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+    Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>(
+        PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")));
+    metric.setMetricValues(sortedByTimeMetrics);
+    return metric;
+  }
+
+  /**
+   * Returns common part of timeline metrics record without the values.
+   */
+  public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
+    throws SQLException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setStartTime(rs.getLong("START_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    return metric;
+  }
+
+}
+

+ 47 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.fromMetricName;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class FunctionTest {
+
+  @Test
+  public void testCreation() throws Exception {
+    Function f = fromMetricName("Metric._avg");
+    assertThat(f).isEqualTo(new Function(AVG, null));
+
+
+    f = fromMetricName("Metric._rate._avg");
+    assertThat(f).isEqualTo(new Function(AVG, RATE));
+
+    f = fromMetricName("bytes_in");
+    assertThat(f).isEqualTo(Function.DEFAULT_VALUE_FUNCTION);
+  }
+
+
+  @Test(expected = Function.FunctionFormatException.class)
+  public void testNotAFunction() throws Exception {
+    Function f = fromMetricName("bytes._not._afunction");
+  }
+}

+ 65 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE;
+import static org.assertj.core.api.Assertions.*;
+
+public class HBaseTimelineMetricStoreTest {
+
+  public static final String MEM_METRIC = "mem";
+  public static final String BYTES_IN_METRIC = "bytes_in";
+  public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not" +
+    "._afunction";
+
+  @Test
+  public void testParseMetricNamesToAggregationFunctions() throws Exception {
+    //giwen
+    List<String> metricNames = Arrays.asList(
+      MEM_METRIC + "._avg",
+      MEM_METRIC + "._rate._avg",
+      BYTES_IN_METRIC,
+      BYTES_NOT_AFUNCTION_METRIC);
+
+    //when
+    HashMap<String, List<Function>> mfm = HBaseTimelineMetricStore
+      .parseMetricNamesToAggregationFunctions(metricNames);
+
+    //then
+    assertThat(mfm).hasSize(3)
+      .containsKeys(MEM_METRIC, BYTES_IN_METRIC, BYTES_NOT_AFUNCTION_METRIC);
+
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(AVG, null),
+      new Function(AVG, RATE));
+
+    assertThat(mfm.get(BYTES_IN_METRIC))
+      .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+    assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
+      .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+  }
+}

+ 76 - 4
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java

@@ -44,6 +44,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   private Connection conn;
   private PhoenixHBaseAccessor hdb;
+  private final TimelineClusterMetricReader metricReader = new
+    TimelineClusterMetricReader(false);
 
   @Before
   public void setUp() throws Exception {
@@ -106,8 +108,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
     int recordCount = 0;
     while (rs.next()) {
-      TimelineClusterMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
         PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
 
@@ -124,6 +125,78 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   }
 
 
+  @Test
+  public void testShouldAggregateClusterIgnoringInstance() throws
+    Exception {
+    // GIVEN
+    TimelineMetricClusterAggregator agg =
+      new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    /**
+     * Here we have two nodes with two instances each:
+     *              | local1 | local2 |
+     *  instance i1 |   1    |   2    |
+     *  instance i2 |   3    |   4    |
+     *
+     */
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "i1", "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "i1", "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "i2", "disk_free", 3));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "i2", "disk_free", 4));
+    ctime += minute;
+
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "i1", "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "i1", "disk_free", 3));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "i2", "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "i2", "disk_free", 4));
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+//        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        System.out.println("OUTPUT: " + currentMetric+" - " +
+          ""+currentHostAggregate);
+        assertEquals(4, currentHostAggregate.getNumberOfHosts());
+        assertEquals(4.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(10.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+  }
+
   @Test
   public void testShouldAggregateDifferentMetricsOnClusterProperly()
     throws Exception {
@@ -167,8 +240,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
     int recordCount = 0;
     while (rs.next()) {
-      TimelineClusterMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
         PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
 

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java

@@ -85,7 +85,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
 
     Condition queryCondition = new DefaultCondition(null, "local", null, null,
       startTime, startTime + (15 * 60 * 1000), null, null, false);
-    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
+    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null);
 
     // THEN
     assertThat(recordRead.getMetrics()).hasSize(2)

+ 17 - 5
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java

@@ -31,6 +31,7 @@ import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import static junit.framework.Assert.assertEquals;
@@ -92,7 +93,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
         Collections.singletonList("disk_free"), "local1", null, null, startTime,
         endTime, Precision.SECONDS, null, true);
-    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition);
+    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("disk_free"));
 
     //THEN
     assertEquals(1, timelineMetrics.getMetrics().size());
@@ -126,7 +128,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
         Collections.singletonList("disk_free"), "local1", null, null, startTime,
         endTime, Precision.MINUTES, null, false);
-    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition);
+    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("disk_free"));
 
     //THEN
     assertEquals(1, timelineMetrics.getMetrics().size());
@@ -176,7 +179,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
         Collections.singletonList("disk_used"), "test_host", "test_app", null,
         startTime, endTime, Precision.HOURS, null, true);
-    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition);
+    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("disk_used"));
 
     //THEN
     assertEquals(1, timelineMetrics.getMetrics().size());
@@ -216,7 +220,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
         Collections.singletonList("disk_free"), null, null, null,
         startTime, endTime, Precision.SECONDS, null, true);
-    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition);
+    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
+      singletonValueFunctionMap("disk_free"));
 
     //THEN
     assertEquals(1, timelineMetrics.getMetrics().size());
@@ -257,7 +262,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
         Collections.singletonList("disk_used"), null, null, null,
         startTime, ctime + minute, Precision.HOURS, null, true);
-    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition);
+    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
+      singletonValueFunctionMap("disk_used"));
 
     // THEN
     assertEquals(1, timelineMetrics.getMetrics().size());
@@ -268,4 +274,10 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     assertEquals(1, metric.getMetricValues().size());
     assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 0.00001);
   }
+
+  private Map<String, List<Function>> singletonValueFunctionMap(String
+                                                                  metricName) {
+    return Collections.singletonMap(metricName, Collections.singletonList
+      (new Function()));
+  }
 }

+ 13 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java

@@ -43,9 +43,17 @@ public class MetricTestHelper {
                                                         String host,
                                                         String metricName,
                                                         double val) {
+    return prepareSingleTimelineMetric(startTime, host, null, metricName, val);
+  }
+
+  public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
+                                                        String host,
+                                                        String instanceId,
+                                                        String metricName,
+                                                        double val) {
     TimelineMetrics m = new TimelineMetrics();
     m.setMetrics(Arrays.asList(
-        createTimelineMetric(startTime, metricName, host, val)));
+        createTimelineMetric(startTime, metricName, host, instanceId, val)));
 
     return m;
   }
@@ -54,10 +62,12 @@ public class MetricTestHelper {
   public static TimelineMetric createTimelineMetric(long startTime,
                                                 String metricName,
                                                 String host,
+                                                String instanceId,
                                                 double val) {
     TimelineMetric m = new TimelineMetric();
     m.setAppId("host");
     m.setHostName(host);
+    m.setInstanceId(instanceId);
     m.setMetricName(metricName);
     m.setStartTime(startTime);
     Map<Long, Double> vals = new HashMap<Long, Double>();
@@ -75,6 +85,7 @@ public class MetricTestHelper {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName("disk_used");
     metric.setAppId("test_app");
+    metric.setInstanceId("test_instance");
     metric.setHostName("test_host");
     metric.setTimestamp(startTime);
 
@@ -84,7 +95,7 @@ public class MetricTestHelper {
   public static TimelineClusterMetric createEmptyTimelineClusterMetric(
       String name, long startTime) {
     TimelineClusterMetric metric = new TimelineClusterMetric(name,
-        "test_app", null, startTime, null);
+        "test_app", "instance_id", startTime, null);
 
     return metric;
   }

+ 160 - 450
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/metrics.json

@@ -4,351 +4,206 @@
         {
         "type": "ganglia",
         "metrics": {
-          "metrics/boottime":{
-            "metric":"boottime",
+          "metrics/flume/flume/CHANNEL/ChannelCapacity":{
+            "metric":"ChannelCapacity",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_aidle":{
-            "metric":"cpu_aidle",
+          "metrics/flume/flume/CHANNEL/StartTime":{
+            "metric":"StartTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_idle":{
-            "metric":"cpu_idle",
+          "metrics/flume/flume/CHANNEL/EventTakeAttemptCount":{
+            "metric":"EventTakeAttemptCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_nice":{
-            "metric":"cpu_nice",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount":{
+            "metric":"EventTakeSuccessCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_num":{
-            "metric":"cpu_num",
+          "metrics/flume/flume/CHANNEL/EventPutAttemptCount":{
+            "metric":"EventPutAttemptCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_speed":{
-            "metric":"cpu_speed",
+          "metrics/flume/flume/CHANNEL/StopTime":{
+            "metric":"StopTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_system":{
-            "metric":"cpu_system",
+          "metrics/flume/flume/CHANNEL/ChannelFillPercentage":{
+            "metric":"ChannelFillPercentage",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_user":{
-            "metric":"cpu_user",
+          "metrics/flume/flume/CHANNEL/ChannelSize":{
+            "metric":"ChannelSize",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/cpu/cpu_wio":{
-            "metric":"cpu_wio",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount":{
+            "metric":"EventPutSuccessCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/disk/disk_free":{
-            "metric":"disk_free",
+          "metrics/flume/flume/SINK/ConnectionCreatedCount":{
+            "metric":"ConnectionCreatedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/disk/disk_total":{
-            "metric":"disk_total",
+          "metrics/flume/flume/SINK/BatchCompleteCount":{
+            "metric":"BatchCompleteCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/disk/part_max_used":{
-            "metric":"part_max_used",
+          "metrics/flume/flume/SINK/EventDrainSuccessCount":{
+            "metric":"EventDrainSuccessCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/gcCount":{
-            "metric":"jvm.metrics.gcCount",
+          "metrics/flume/flume/SINK/StartTime":{
+            "metric":"StartTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/gcTimeMillis":{
-            "metric":"jvm.metrics.gcTimeMillis",
+          "metrics/flume/flume/SINK/EventDrainAttemptCount":{
+            "metric":"EventDrainAttemptCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/logError":{
-            "metric":"jvm.metrics.logError",
+          "metrics/flume/flume/SINK/ConnectionFailedCount":{
+            "metric":"ConnectionFailedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/logFatal":{
-            "metric":"jvm.metrics.logFatal",
+          "metrics/flume/flume/SINK/BatchUnderflowCount":{
+            "metric":"BatchUnderflowCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/logInfo":{
-            "metric":"jvm.metrics.logInfo",
+          "metrics/flume/flume/SINK/ConnectionClosedCount":{
+            "metric":"ConnectionClosedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/logWarn":{
-            "metric":"jvm.metrics.logWarn",
+          "metrics/flume/flume/SINK/StopTime":{
+            "metric":"StopTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/maxMemoryM":{
-            "metric":"jvm.metrics.maxMemoryM",
+          "metrics/flume/flume/SINK/BatchEmptyCount":{
+            "metric":"BatchEmptyCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/memHeapCommittedM":{
-            "metric":"jvm.metrics.memHeapCommittedM",
+          "metrics/flume/flume/SOURCE/AppendBatchReceivedCount":{
+            "metric":"AppendBatchReceivedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/memHeapUsedM":{
-            "metric":"jvm.metrics.memHeapUsedM",
+          "metrics/flume/flume/SOURCE/AppendAcceptedCount":{
+            "metric":"AppendAcceptedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/memNonHeapCommittedM":{
-            "metric":"jvm.metrics.memNonHeapCommittedM",
+          "metrics/flume/flume/SOURCE/StartTime":{
+            "metric":"StartTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/memNonHeapUsedM":{
-            "metric":"jvm.metrics.memNonHeapUsedM",
+          "metrics/flume/flume/SOURCE/OpenConnectionCount":{
+            "metric":"OpenConnectionCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/threadsBlocked":{
-            "metric":"jvm.metrics.threadsBlocked",
+          "metrics/flume/flume/SOURCE/AppendBatchAcceptedCount":{
+            "metric":"AppendBatchAcceptedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/threadsNew":{
-            "metric":"jvm.metrics.threadsNew",
+          "metrics/flume/flume/SOURCE/AppendReceivedCount":{
+            "metric":"AppendReceivedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/threadsRunnable":{
-            "metric":"jvm.metrics.threadsRunnable",
+          "metrics/flume/flume/SOURCE/EventReceivedCount":{
+            "metric":"EventReceivedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/threadsTerminated":{
-            "metric":"jvm.metrics.threadsTerminated",
+          "metrics/flume/flume/SOURCE/StopTime":{
+            "metric":"StopTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/jvm/threadsTimedWaiting":{
-            "metric":"jvm.metrics.threadsTimedWaiting",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsWaiting":{
-            "metric":"jvm.metrics.threadsWaiting",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/ChannelCapacity":{
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelCapacity",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/StartTime":{
-            "metric":"(\\w+).CHANNEL.(\\w+).StartTime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/EventTakeAttemptCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeAttemptCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/EventTakeSuccessCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/EventPutAttemptCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutAttemptCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/StopTime":{
-            "metric":"(\\w+).CHANNEL.(\\w+).StopTime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/ChannelFillPercentage":{
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelFillPercentage",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/ChannelSize":{
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/EventPutSuccessCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/ConnectionCreatedCount":{
-            "metric":"(\\w+).SINK.(\\w+).ConnectionCreatedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/BatchCompleteCount":{
-            "metric":"(\\w+).SINK.(\\w+).BatchCompleteCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/EventDrainSuccessCount":{
-            "metric":"(\\w+).SINK.(\\w+).EventDrainSuccessCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/StartTime":{
-            "metric":"(\\w+).SINK.(\\w+).StartTime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/EventDrainAttemptCount":{
-            "metric":"(\\w+).SINK.(\\w+).EventDrainAttemptCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/ConnectionFailedCount":{
-            "metric":"(\\w+).SINK.(\\w+).ConnectionFailedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/BatchUnderflowCount":{
-            "metric":"(\\w+).SINK.(\\w+).BatchUnderflowCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/ConnectionClosedCount":{
-            "metric":"(\\w+).SINK.(\\w+).ConnectionClosedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/StopTime":{
-            "metric":"(\\w+).SINK.(\\w+).StopTime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SINK/$2/BatchEmptyCount":{
-            "metric":"(\\w+).SINK.(\\w+).BatchEmptyCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/AppendBatchReceivedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendBatchReceivedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/AppendAcceptedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendAcceptedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/StartTime":{
-            "metric":"(\\w+).SOURCE.(\\w+).StartTime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/OpenConnectionCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).OpenConnectionCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/AppendBatchAcceptedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendBatchAcceptedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/AppendReceivedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendReceivedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/EventReceivedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).EventReceivedCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/StopTime":{
-            "metric":"(\\w+).SOURCE.(\\w+).StopTime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/SOURCE/$2/EventAcceptedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).EventAcceptedCount",
+          "metrics/flume/flume/SOURCE/EventAcceptedCount":{
+            "metric":"EventAcceptedCount",
             "pointInTime":true,
             "temporal":true
           },
 
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/min": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._min",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/min": {
+            "metric":"EventTakeSuccessCount._rate._min",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/max": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._max",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/max": {
+            "metric":"EventTakeSuccessCount._rate._max",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/avg": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._avg",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/avg": {
+            "metric":"EventTakeSuccessCount._rate._avg",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/sum": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._sum",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/sum": {
+            "metric":"EventTakeSuccessCount._rate._sum",
             "pointInTime":false,
             "temporal":true
           },
 
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/avg": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._avg",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/avg": {
+            "metric":"EventPutSuccessCount._rate._avg",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/max": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._max",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/max": {
+            "metric":"EventPutSuccessCount._rate._max",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/min": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._min",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/min": {
+            "metric":"EventPutSuccessCount._rate._min",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/sum": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._sum",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/sum": {
+            "metric":"EventPutSuccessCount._rate._sum",
             "pointInTime":false,
             "temporal":true
           },
           
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/avg": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._avg",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/avg": {
+            "metric":"ChannelSize._rate._avg",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/max": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._max",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/max": {
+            "metric":"ChannelSize._rate._max",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/min": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._min",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/min": {
+            "metric":"ChannelSize._rate._min",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/sum": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._sum",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/sum": {
+            "metric":"ChannelSize._rate._sum",
             "pointInTime":false,
             "temporal":true
           }
@@ -360,351 +215,206 @@
       {
         "type": "ganglia",
         "metrics": {
-          "metrics/boottime":{
-            "metric":"boottime",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_aidle":{
-            "metric":"cpu_aidle",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_idle":{
-            "metric":"cpu_idle",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_nice":{
-            "metric":"cpu_nice",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_num":{
-            "metric":"cpu_num",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_speed":{
-            "metric":"cpu_speed",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_system":{
-            "metric":"cpu_system",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_user":{
-            "metric":"cpu_user",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/cpu/cpu_wio":{
-            "metric":"cpu_wio",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/disk/disk_free":{
-            "metric":"disk_free",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/disk/disk_total":{
-            "metric":"disk_total",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/disk/part_max_used":{
-            "metric":"part_max_used",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/gcCount":{
-            "metric":"jvm.metrics.gcCount",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/gcTimeMillis":{
-            "metric":"jvm.metrics.gcTimeMillis",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/logError":{
-            "metric":"jvm.metrics.logError",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/logFatal":{
-            "metric":"jvm.metrics.logFatal",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/logInfo":{
-            "metric":"jvm.metrics.logInfo",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/logWarn":{
-            "metric":"jvm.metrics.logWarn",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/maxMemoryM":{
-            "metric":"jvm.metrics.maxMemoryM",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/memHeapCommittedM":{
-            "metric":"jvm.metrics.memHeapCommittedM",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/memHeapUsedM":{
-            "metric":"jvm.metrics.memHeapUsedM",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/memNonHeapCommittedM":{
-            "metric":"jvm.metrics.memNonHeapCommittedM",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/memNonHeapUsedM":{
-            "metric":"jvm.metrics.memNonHeapUsedM",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsBlocked":{
-            "metric":"jvm.metrics.threadsBlocked",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsNew":{
-            "metric":"jvm.metrics.threadsNew",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsRunnable":{
-            "metric":"jvm.metrics.threadsRunnable",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsTerminated":{
-            "metric":"jvm.metrics.threadsTerminated",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsTimedWaiting":{
-            "metric":"jvm.metrics.threadsTimedWaiting",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/jvm/threadsWaiting":{
-            "metric":"jvm.metrics.threadsWaiting",
-            "pointInTime":true,
-            "temporal":true
-          },
-          "metrics/flume/$1/CHANNEL/$2/ChannelCapacity":{
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelCapacity",
+          "metrics/flume/flume/CHANNEL/ChannelCapacity":{
+            "metric":"ChannelCapacity",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/StartTime":{
-            "metric":"(\\w+).CHANNEL.(\\w+).StartTime",
+          "metrics/flume/flume/CHANNEL/StartTime":{
+            "metric":"StartTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/EventTakeAttemptCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeAttemptCount",
+          "metrics/flume/flume/CHANNEL/EventTakeAttemptCount":{
+            "metric":"EventTakeAttemptCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/EventTakeSuccessCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount":{
+            "metric":"EventTakeSuccessCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/EventPutAttemptCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutAttemptCount",
+          "metrics/flume/flume/CHANNEL/EventPutAttemptCount":{
+            "metric":"EventPutAttemptCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/StopTime":{
-            "metric":"(\\w+).CHANNEL.(\\w+).StopTime",
+          "metrics/flume/flume/CHANNEL/StopTime":{
+            "metric":"StopTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/ChannelFillPercentage":{
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelFillPercentage",
+          "metrics/flume/flume/CHANNEL/ChannelFillPercentage":{
+            "metric":"ChannelFillPercentage",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/ChannelSize":{
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize",
+          "metrics/flume/flume/CHANNEL/ChannelSize":{
+            "metric":"ChannelSize",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/CHANNEL/$2/EventPutSuccessCount":{
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount":{
+            "metric":"EventPutSuccessCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/ConnectionCreatedCount":{
-            "metric":"(\\w+).SINK.(\\w+).ConnectionCreatedCount",
+          "metrics/flume/flume/SINK/ConnectionCreatedCount":{
+            "metric":"ConnectionCreatedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/BatchCompleteCount":{
-            "metric":"(\\w+).SINK.(\\w+).BatchCompleteCount",
+          "metrics/flume/flume/SINK/BatchCompleteCount":{
+            "metric":"BatchCompleteCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/EventDrainSuccessCount":{
-            "metric":"(\\w+).SINK.(\\w+).EventDrainSuccessCount",
+          "metrics/flume/flume/SINK/EventDrainSuccessCount":{
+            "metric":"EventDrainSuccessCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/StartTime":{
-            "metric":"(\\w+).SINK.(\\w+).StartTime",
+          "metrics/flume/flume/SINK/StartTime":{
+            "metric":"StartTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/EventDrainAttemptCount":{
-            "metric":"(\\w+).SINK.(\\w+).EventDrainAttemptCount",
+          "metrics/flume/flume/SINK/EventDrainAttemptCount":{
+            "metric":"EventDrainAttemptCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/ConnectionFailedCount":{
-            "metric":"(\\w+).SINK.(\\w+).ConnectionFailedCount",
+          "metrics/flume/flume/SINK/ConnectionFailedCount":{
+            "metric":"ConnectionFailedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/BatchUnderflowCount":{
-            "metric":"(\\w+).SINK.(\\w+).BatchUnderflowCount",
+          "metrics/flume/flume/SINK/BatchUnderflowCount":{
+            "metric":"BatchUnderflowCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/ConnectionClosedCount":{
-            "metric":"(\\w+).SINK.(\\w+).ConnectionClosedCount",
+          "metrics/flume/flume/SINK/ConnectionClosedCount":{
+            "metric":"ConnectionClosedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/StopTime":{
-            "metric":"(\\w+).SINK.(\\w+).StopTime",
+          "metrics/flume/flume/SINK/StopTime":{
+            "metric":"StopTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SINK/$2/BatchEmptyCount":{
-            "metric":"(\\w+).SINK.(\\w+).BatchEmptyCount",
+          "metrics/flume/flume/SINK/BatchEmptyCount":{
+            "metric":"BatchEmptyCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/AppendBatchReceivedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendBatchReceivedCount",
+          "metrics/flume/flume/SOURCE/AppendBatchReceivedCount":{
+            "metric":"AppendBatchReceivedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/AppendAcceptedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendAcceptedCount",
+          "metrics/flume/flume/SOURCE/AppendAcceptedCount":{
+            "metric":"AppendAcceptedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/StartTime":{
-            "metric":"(\\w+).SOURCE.(\\w+).StartTime",
+          "metrics/flume/flume/SOURCE/StartTime":{
+            "metric":"StartTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/OpenConnectionCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).OpenConnectionCount",
+          "metrics/flume/flume/SOURCE/OpenConnectionCount":{
+            "metric":"OpenConnectionCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/AppendBatchAcceptedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendBatchAcceptedCount",
+          "metrics/flume/flume/SOURCE/AppendBatchAcceptedCount":{
+            "metric":"AppendBatchAcceptedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/AppendReceivedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).AppendReceivedCount",
+          "metrics/flume/flume/SOURCE/AppendReceivedCount":{
+            "metric":"AppendReceivedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/EventReceivedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).EventReceivedCount",
+          "metrics/flume/flume/SOURCE/EventReceivedCount":{
+            "metric":"EventReceivedCount",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/StopTime":{
-            "metric":"(\\w+).SOURCE.(\\w+).StopTime",
+          "metrics/flume/flume/SOURCE/StopTime":{
+            "metric":"StopTime",
             "pointInTime":true,
             "temporal":true
           },
-          "metrics/flume/$1/SOURCE/$2/EventAcceptedCount":{
-            "metric":"(\\w+).SOURCE.(\\w+).EventAcceptedCount",
+          "metrics/flume/flume/SOURCE/EventAcceptedCount":{
+            "metric":"EventAcceptedCount",
             "pointInTime":true,
             "temporal":true
           },
 
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/avg": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._avg",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/avg": {
+            "metric":"EventTakeSuccessCount._rate._avg",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/max": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._max",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/max": {
+            "metric":"EventTakeSuccessCount._rate._max",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/min": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._min",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/min": {
+            "metric":"EventTakeSuccessCount._rate._min",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/sum": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._sum",
+          "metrics/flume/flume/CHANNEL/EventTakeSuccessCount/rate/sum": {
+            "metric":"EventTakeSuccessCount._rate._sum",
             "pointInTime":false,
             "temporal":true
           },
 
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/avg": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._avg",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/avg": {
+            "metric":"EventPutSuccessCount._rate._avg",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/max": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._max",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/max": {
+            "metric":"EventPutSuccessCount._rate._max",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/min": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._min",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/min": {
+            "metric":"EventPutSuccessCount._rate._min",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/sum": {
-            "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._sum",
+          "metrics/flume/flume/CHANNEL/EventPutSuccessCount/rate/sum": {
+            "metric":"EventPutSuccessCount._rate._sum",
             "pointInTime":false,
             "temporal":true
           },
           
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/avg": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._avg",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/avg": {
+            "metric":"ChannelSize._rate._avg",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/max": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._max",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/max": {
+            "metric":"ChannelSize._rate._max",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/min": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._min",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/min": {
+            "metric":"ChannelSize._rate._min",
             "pointInTime":false,
             "temporal":true
           },
-          "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/sum": {
-            "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._sum",
+          "metrics/flume/flume/CHANNEL/ChannelSize/rate/sum": {
+            "metric":"ChannelSize._rate._sum",
             "pointInTime":false,
             "temporal":true
           }

+ 3 - 1
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py

@@ -140,7 +140,9 @@ def flume(action = None):
           extra_args = '-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts={0}:{1}'
           extra_args = extra_args.format(params.ganglia_server_host, '8655')
         if params.has_metric_collector:
-          extra_args = '-Dflume.monitoring.type=org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink'
+          extra_args = '-Dflume.monitoring.type=org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink ' \
+                       '-Dflume.monitoring.node={0}:{1}'
+          extra_args = extra_args.format(params.metric_collector_host, params.metric_collector_port)
 
         flume_cmd = flume_base.format(agent, flume_agent_conf_dir,
            flume_agent_conf_file, extra_args, agent)

+ 4 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py

@@ -84,3 +84,7 @@ ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
 if has_metric_collector:
   metric_collector_host = ams_collector_hosts[0]
+  metric_collector_port = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188")
+  if metric_collector_port and metric_collector_port.find(':') != -1:
+    metric_collector_port = metric_collector_port.split(':')[1]
+  pass

+ 6 - 2
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2

@@ -16,7 +16,11 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_host}}:6188
+collector={{metric_collector_host}}
+port={{metric_collector_port}}
 collectionFrequency=60000
 maxRowCacheSize=10000
-sendInterval=59000
+sendInterval=59000
+
+# Metric names having type COUNTER
+counters=EventTakeSuccessCount,EventPutSuccessCount,EventTakeAttemptCount,EventPutAttemptCount

+ 6 - 6
ambari-web/app/utils/ajax/ajax.js

@@ -794,32 +794,32 @@ var urls = {
     'mock': '/data/cluster_metrics/cpu_1hr.json'
   },
   'service.metrics.flume.channel_fill_percent': {
-    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/CHANNEL/*/ChannelFillPercentage[{fromSeconds},{toSeconds},{stepSeconds}]',
+    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/CHANNEL/ChannelFillPercentage[{fromSeconds},{toSeconds},{stepSeconds}]',
     'mock': '/data/services/metrics/flume/channelFillPct.json',
     'testInProduction': true
   },
   'service.metrics.flume.channel_size': {
-    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/CHANNEL/*/ChannelSize[{fromSeconds},{toSeconds},{stepSeconds}]',
+    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/CHANNEL/ChannelSize[{fromSeconds},{toSeconds},{stepSeconds}]',
     'mock': '/data/services/metrics/flume/channelSize.json',
     'testInProduction': true
   },
   'service.metrics.flume.sink_drain_success': {
-    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/SINK/*/EventDrainSuccessCount[{fromSeconds},{toSeconds},{stepSeconds}]',
+    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/SINK/EventDrainSuccessCount[{fromSeconds},{toSeconds},{stepSeconds}]',
     'mock': '/data/services/metrics/flume/sinkDrainSuccessCount.json',
     'testInProduction': true
   },
   'service.metrics.flume.sink_connection_failed': {
-    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/SINK/*/ConnectionFailedCount[{fromSeconds},{toSeconds},{stepSeconds}]',
+    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/SINK/ConnectionFailedCount[{fromSeconds},{toSeconds},{stepSeconds}]',
     'mock': '/data/services/metrics/flume/sinkConnectionFailedCount.json',
     'testInProduction': true
   },
   'service.metrics.flume.source_accepted': {
-    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/SOURCE/*/EventAcceptedCount[{fromSeconds},{toSeconds},{stepSeconds}]',
+    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/flume/flume/SOURCE/EventAcceptedCount[{fromSeconds},{toSeconds},{stepSeconds}]',
     'mock': '/data/services/metrics/flume/sourceEventAccepted.json',
     'testInProduction': true
   },
   'service.metrics.flume.channel_size_for_all': {
-    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=metrics/flume/flume/CHANNEL/ChannelSize/_sum[{fromSeconds},{toSeconds},{stepSeconds}]'
+    'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=metrics/flume/flume/CHANNEL/ChannelSize/rate[{fromSeconds},{toSeconds},{stepSeconds}]'
   },
   'service.metrics.flume.gc': {
     'real': '/clusters/{clusterName}/services/FLUME/components/FLUME_HANDLER?fields=host_components/metrics/jvm/gcTimeMillis[{fromSeconds},{toSeconds},{stepSeconds}]',

+ 5 - 3
ambari-web/app/views/main/service/info/metrics/flume/channel_size_mma.js

@@ -31,16 +31,18 @@ App.ChartServiceMetricsFlume_ChannelSizeMMA = App.ChartLinearTimeView.extend({
   title: Em.I18n.t('services.service.info.metrics.flume.channelSizeMMA'),
   renderer: 'line',
   ajaxIndex: 'service.metrics.flume.channel_size_for_all',
+  yAxisFormatter: App.ChartLinearTimeView.CreateRateFormatter('',
+    App.ChartLinearTimeView.DefaultFormatter),
 
   transformToSeries: function (jsonData) {
     var seriesArray = [];
     var self = this;
 
-    if (Em.get(jsonData, "metrics.flume.flume.CHANNEL.ChannelSize")) {
-      for ( var cname in jsonData.metrics.flume.flume.CHANNEL.ChannelSize) {
+    if (Em.get(jsonData, "metrics.flume.flume.CHANNEL.ChannelSize.rate")) {
+      for ( var cname in jsonData.metrics.flume.flume.CHANNEL.ChannelSize.rate) {
         if(cname != "sum"){
           var seriesName = Em.I18n.t('services.service.info.metrics.flume.channelType').format(cname);
-          var seriesData = jsonData.metrics.flume.flume.CHANNEL.ChannelSize[cname];
+          var seriesData = jsonData.metrics.flume.flume.CHANNEL.ChannelSize.rate[cname];
           if (seriesData) {
             seriesArray.push(self.transformData(seriesData, seriesName));
           }

+ 2 - 2
ambari-web/app/views/main/service/info/metrics/flume/channel_sum.js

@@ -36,9 +36,9 @@ App.ChartServiceMetricsFlume_ChannelSizeSum = App.ChartLinearTimeView.extend({
   transformToSeries: function (jsonData) {
     var seriesArray = [];
     var self = this;
-    if(Em.get(jsonData, "metrics.flume.flume.CHANNEL.ChannelSize.sum")){
+    if(Em.get(jsonData, "metrics.flume.flume.CHANNEL.ChannelSize.rate.sum")){
       var seriesName = Em.I18n.t('services.service.info.metrics.flume.channelSizeSum');
-      var seriesData = jsonData.metrics.flume.flume.CHANNEL.ChannelSize.sum;
+      var seriesData = jsonData.metrics.flume.flume.CHANNEL.ChannelSize.rate.sum;
       if (seriesData) {
         seriesArray.push(self.transformData(seriesData, seriesName));
       }