Browse Source

HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1145525 13f79535-47bb-0310-9956-ffa450edef68
Luke Lu 14 years ago
parent
commit
224972e055

+ 2 - 0
common/CHANGES.txt

@@ -12,6 +12,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu)
+
     HADOOP-7342. Add an utility API in FileUtil for JDK File.list
     avoid NPEs on File.list() (Bharath Mundlapudi via mattf)
 

+ 30 - 0
common/conf/hadoop-metrics2.properties

@@ -25,3 +25,33 @@
 
 #reducetask.sink.file.filename=reducetask-metrics.out
 
+
+#
+# Below are for sending metrics to Ganglia
+#
+# for Ganglia 3.0 support
+# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30
+#
+# for Ganglia 3.1 support
+# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
+
+# *.sink.ganglia.period=10
+
+# default for supportsparse is false
+# *.sink.ganglia.supportsparse=true
+
+#*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both
+#*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
+
+#namenode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#datanode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#jobtracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#tasktracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#maptask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#reducetask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+

+ 288 - 0
common/src/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java

@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.net.DNS;
+
+/**
+ * This the base class for Ganglia sink classes using metrics2. Lot of the code
+ * has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext.
+ * As per the documentation, sink implementations doesn't have to worry about
+ * thread safety. Hence the code wasn't written for thread safety and should
+ * be modified in case the above assumption changes in the future.
+ */
+public abstract class AbstractGangliaSink implements MetricsSink {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());
+
+  /*
+   * Output of "gmetric --help" showing allowable values
+   * -t, --type=STRING
+   *     Either string|int8|uint8|int16|uint16|int32|uint32|float|double
+   * -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius
+   *     (default='')
+   * -s, --slope=STRING Either zero|positive|negative|both
+   *     (default='both')
+   * -x, --tmax=INT The maximum time in seconds between gmetric calls
+   *     (default='60')
+   */
+  public static final String DEFAULT_UNITS = "";
+  public static final int DEFAULT_TMAX = 60;
+  public static final int DEFAULT_DMAX = 0;
+  public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
+  public static final int DEFAULT_PORT = 8649;
+  public static final String SERVERS_PROPERTY = "servers";
+  public static final int BUFFER_SIZE = 1500; // as per libgmond.c
+  public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
+  public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
+  public static final String EQUAL = "=";
+
+  private String hostName = "UNKNOWN.example.com";
+  private DatagramSocket datagramSocket;
+  private List<? extends SocketAddress> metricsServers;
+  private byte[] buffer = new byte[BUFFER_SIZE];
+  private int offset;
+  private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
+
+  /**
+   * Used for visiting Metrics
+   */
+  protected final GangliaMetricVisitor gangliaMetricVisitor =
+    new GangliaMetricVisitor();
+
+  private SubsetConfiguration conf;
+  private Map<String, GangliaConf> gangliaConfMap;
+  private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf();
+
+  /**
+   * ganglia slope values which equal the ordinal
+   */
+  public enum GangliaSlope {
+    zero,       // 0
+    positive,   // 1
+    negative,   // 2
+    both        // 3
+  };
+
+  /**
+   * define enum for various type of conf
+   */
+  public enum GangliaConfType {
+    slope, units, dmax, tmax
+  };
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration
+   * .SubsetConfiguration)
+   */
+  public void init(SubsetConfiguration conf) {
+    LOG.debug("Initializing the GangliaSink for Ganglia metrics.");
+
+    this.conf = conf;
+
+    // Take the hostname from the DNS class.
+    if (conf.getString("slave.host.name") != null) {
+      hostName = conf.getString("slave.host.name");
+    } else {
+      try {
+        hostName = DNS.getDefaultHost(
+            conf.getString("dfs.datanode.dns.interface", "default"),
+            conf.getString("dfs.datanode.dns.nameserver", "default"));
+      } catch (UnknownHostException uhe) {
+        LOG.error(uhe);
+        hostName = "UNKNOWN.example.com";
+      }
+    }
+
+    // load the gannglia servers from properties
+    metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
+        DEFAULT_PORT);
+
+    // extract the Ganglia conf per metrics
+    gangliaConfMap = new HashMap<String, GangliaConf>();
+    loadGangliaConf(GangliaConfType.units);
+    loadGangliaConf(GangliaConfType.tmax);
+    loadGangliaConf(GangliaConfType.dmax);
+    loadGangliaConf(GangliaConfType.slope);
+
+    try {
+      datagramSocket = new DatagramSocket();
+    } catch (SocketException se) {
+      LOG.error(se);
+    }
+
+    // see if sparseMetrics is supported. Default is false
+    supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
+        SUPPORT_SPARSE_METRICS_DEFAULT);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.metrics2.MetricsSink#flush()
+   */
+  public void flush() {
+    // nothing to do as we are not buffering data
+  }
+
+  // Load the configurations for a conf type
+  private void loadGangliaConf(GangliaConfType gtype) {
+    String propertyarr[] = conf.getStringArray(gtype.name());
+    if (propertyarr != null && propertyarr.length > 0) {
+      for (String metricNValue : propertyarr) {
+        String metricNValueArr[] = metricNValue.split(EQUAL);
+        if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) {
+          LOG.error("Invalid propertylist for " + gtype.name());
+        }
+
+        String metricName = metricNValueArr[0].trim();
+        String metricValue = metricNValueArr[1].trim();
+        GangliaConf gconf = gangliaConfMap.get(metricName);
+        if (gconf == null) {
+          gconf = new GangliaConf();
+          gangliaConfMap.put(metricName, gconf);
+        }
+
+        switch (gtype) {
+        case units:
+          gconf.setUnits(metricValue);
+          break;
+        case dmax:
+          gconf.setDmax(Integer.parseInt(metricValue));
+          break;
+        case tmax:
+          gconf.setTmax(Integer.parseInt(metricValue));
+          break;
+        case slope:
+          gconf.setSlope(GangliaSlope.valueOf(metricValue));
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Lookup GangliaConf from cache. If not found, return default values
+   *
+   * @param metricName
+   * @return looked up GangliaConf
+   */
+  protected GangliaConf getGangliaConfForMetric(String metricName) {
+    GangliaConf gconf = gangliaConfMap.get(metricName);
+
+    return gconf != null ? gconf : DEFAULT_GANGLIA_CONF;
+  }
+
+  /**
+   * @return the hostName
+   */
+  protected String getHostName() {
+    return hostName;
+  }
+
+  /**
+   * Puts a string into the buffer by first writing the size of the string as an
+   * int, followed by the bytes of the string, padded if necessary to a multiple
+   * of 4.
+   * @param s the string to be written to buffer at offset location
+   */
+  protected void xdr_string(String s) {
+    byte[] bytes = s.getBytes();
+    int len = bytes.length;
+    xdr_int(len);
+    System.arraycopy(bytes, 0, buffer, offset, len);
+    offset += len;
+    pad();
+  }
+
+  // Pads the buffer with zero bytes up to the nearest multiple of 4.
+  private void pad() {
+    int newOffset = ((offset + 3) / 4) * 4;
+    while (offset < newOffset) {
+      buffer[offset++] = 0;
+    }
+  }
+
+  /**
+   * Puts an integer into the buffer as 4 bytes, big-endian.
+   */
+  protected void xdr_int(int i) {
+    buffer[offset++] = (byte) ((i >> 24) & 0xff);
+    buffer[offset++] = (byte) ((i >> 16) & 0xff);
+    buffer[offset++] = (byte) ((i >> 8) & 0xff);
+    buffer[offset++] = (byte) (i & 0xff);
+  }
+
+  /**
+   * Sends Ganglia Metrics to the configured hosts
+   * @throws IOException
+   */
+  protected void emitToGangliaHosts() throws IOException {
+    try {
+      for (SocketAddress socketAddress : metricsServers) {
+        DatagramPacket packet =
+          new DatagramPacket(buffer, offset, socketAddress);
+        datagramSocket.send(packet);
+      }
+    } finally {
+      // reset the buffer for the next metric to be built
+      offset = 0;
+    }
+  }
+
+  /**
+   * Reset the buffer for the next metric to be built
+   */
+  void resetBuffer() {
+    offset = 0;
+  }
+
+  /**
+   * @return whether sparse metrics are supported
+   */
+  protected boolean isSupportSparseMetrics() {
+    return supportSparseMetrics;
+  }
+
+  /**
+   * Used only by unit test
+   * @param datagramSocket the datagramSocket to set.
+   */
+  void setDatagramSocket(DatagramSocket datagramSocket) {
+    this.datagramSocket = datagramSocket;
+  }
+}

+ 95 - 0
common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope;
+
+/**
+ * class which is used to store ganglia properties
+ */
+class GangliaConf {
+  private String units = AbstractGangliaSink.DEFAULT_UNITS;
+  private GangliaSlope slope;
+  private int dmax = AbstractGangliaSink.DEFAULT_DMAX;
+  private int tmax = AbstractGangliaSink.DEFAULT_TMAX;
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("unit=").append(units).append(", slope=").append(slope)
+        .append(", dmax=").append(dmax).append(", tmax=").append(tmax);
+    return buf.toString();
+  }
+
+  /**
+   * @return the units
+   */
+  String getUnits() {
+    return units;
+  }
+
+  /**
+   * @param units the units to set
+   */
+  void setUnits(String units) {
+    this.units = units;
+  }
+
+  /**
+   * @return the slope
+   */
+  GangliaSlope getSlope() {
+    return slope;
+  }
+
+  /**
+   * @param slope the slope to set
+   */
+  void setSlope(GangliaSlope slope) {
+    this.slope = slope;
+  }
+
+  /**
+   * @return the dmax
+   */
+  int getDmax() {
+    return dmax;
+  }
+
+  /**
+   * @param dmax the dmax to set
+   */
+  void setDmax(int dmax) {
+    this.dmax = dmax;
+  }
+
+  /**
+   * @return the tmax
+   */
+  int getTmax() {
+    return tmax;
+  }
+
+  /**
+   * @param tmax the tmax to set
+   */
+  void setTmax(int tmax) {
+    this.tmax = tmax;
+  }
+}

+ 95 - 0
common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope;
+
+/**
+ * Since implementations of Metric are not public, hence use a visitor to figure
+ * out the type and slope of the metric. Counters have "positive" slope.
+ */
+class GangliaMetricVisitor implements MetricsVisitor {
+  private static final String INT32 = "int32";
+  private static final String FLOAT = "float";
+  private static final String DOUBLE = "double";
+
+  private String type;
+  private GangliaSlope slope;
+
+  /**
+   * @return the type of a visited metric
+   */
+  String getType() {
+    return type;
+  }
+
+  /**
+   * @return the slope of a visited metric. Slope is positive for counters and
+   *         null for others
+   */
+  GangliaSlope getSlope() {
+    return slope;
+  }
+
+  @Override
+  public void gauge(MetricsInfo info, int value) {
+    // MetricGaugeInt.class ==> "int32"
+    type = INT32;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void gauge(MetricsInfo info, long value) {
+    // MetricGaugeLong.class ==> "float"
+    type = FLOAT;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void gauge(MetricsInfo info, float value) {
+    // MetricGaugeFloat.class ==> "float"
+    type = FLOAT;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void gauge(MetricsInfo info, double value) {
+    // MetricGaugeDouble.class ==> "double"
+    type = DOUBLE;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void counter(MetricsInfo info, int value) {
+    // MetricCounterInt.class ==> "int32"
+    type = INT32;
+    // counters have positive slope
+    slope = GangliaSlope.positive;
+  }
+
+  @Override
+  public void counter(MetricsInfo info, long value) {
+    // MetricCounterLong.class ==> "float"
+    type = FLOAT;
+    // counters have positive slope
+    slope = GangliaSlope.positive;
+  }
+}

+ 186 - 0
common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java

@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.util.MetricsCache;
+import org.apache.hadoop.metrics2.util.MetricsCache.Record;
+
+/**
+ * This code supports Ganglia 3.0
+ * 
+ */
+public class GangliaSink30 extends AbstractGangliaSink {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());
+
+  private MetricsCache metricsCache = new MetricsCache();
+
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    // The method handles both cases whether Ganglia support dense publish
+    // of metrics of sparse (only on change) publish of metrics
+    try {
+      String recordName = record.name();
+      String contextName = record.context();
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(contextName);
+      sb.append('.');
+      sb.append(recordName);
+
+      String groupName = sb.toString();
+      sb.append('.');
+      int sbBaseLen = sb.length();
+
+      String type = null;
+      GangliaSlope slopeFromMetric = null;
+      GangliaSlope calculatedSlope = null;
+      Record cachedMetrics = null;
+      resetBuffer();  // reset the buffer to the beginning
+      if (!isSupportSparseMetrics()) {
+        // for sending dense metrics, update metrics cache
+        // and get the updated data
+        cachedMetrics = metricsCache.update(record);
+
+        if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) {
+          for (Map.Entry<String, AbstractMetric> entry : cachedMetrics
+              .metricsEntrySet()) {
+            AbstractMetric metric = entry.getValue();
+            sb.append(metric.name());
+            String name = sb.toString();
+
+            // visit the metric to identify the Ganglia type and
+            // slope
+            metric.visit(gangliaMetricVisitor);
+            type = gangliaMetricVisitor.getType();
+            slopeFromMetric = gangliaMetricVisitor.getSlope();
+
+            GangliaConf gConf = getGangliaConfForMetric(name);
+            calculatedSlope = calculateSlope(gConf, slopeFromMetric);
+
+            // send metric to Ganglia
+            emitMetric(groupName, name, type, metric.value().toString(), gConf,
+                calculatedSlope);
+
+            // reset the length of the buffer for next iteration
+            sb.setLength(sbBaseLen);
+          }
+        }
+      } else {
+        // we support sparse updates
+
+        Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record
+            .metrics();
+        if (metrics.size() > 0) {
+          // we got metrics. so send the latest
+          for (AbstractMetric metric : record.metrics()) {
+            sb.append(metric.name());
+            String name = sb.toString();
+
+            // visit the metric to identify the Ganglia type and
+            // slope
+            metric.visit(gangliaMetricVisitor);
+            type = gangliaMetricVisitor.getType();
+            slopeFromMetric = gangliaMetricVisitor.getSlope();
+
+            GangliaConf gConf = getGangliaConfForMetric(name);
+            calculatedSlope = calculateSlope(gConf, slopeFromMetric);
+
+            // send metric to Ganglia
+            emitMetric(groupName, name, type, metric.value().toString(), gConf,
+                calculatedSlope);
+
+            // reset the length of the buffer for next iteration
+            sb.setLength(sbBaseLen);
+          }
+        }
+      }
+    } catch (IOException io) {
+      throw new MetricsException("Failed to putMetrics", io);
+    }
+  }
+
+  // Calculate the slope from properties and metric
+  private GangliaSlope calculateSlope(GangliaConf gConf,
+      GangliaSlope slopeFromMetric) {
+    if (gConf.getSlope() != null) {
+      // if slope has been specified in properties, use that
+      return gConf.getSlope();
+    } else if (slopeFromMetric != null) {
+      // slope not specified in properties, use derived from Metric
+      return slopeFromMetric;
+    } else {
+      return DEFAULT_SLOPE;
+    }
+  }
+
+  /**
+   * The method sends metrics to Ganglia servers. The method has been taken from
+   * org.apache.hadoop.metrics.ganglia.GangliaContext30 with minimal changes in
+   * order to keep it in sync.
+   * @param groupName The group name of the metric
+   * @param name The metric name
+   * @param type The type of the metric
+   * @param value The value of the metric
+   * @param gConf The GangliaConf for this metric
+   * @param gSlope The slope for this metric
+   * @throws IOException
+   */
+  protected void emitMetric(String groupName, String name, String type,
+      String value, GangliaConf gConf, GangliaSlope gSlope) throws IOException {
+
+    if (name == null) {
+      LOG.warn("Metric was emitted with no name.");
+      return;
+    } else if (value == null) {
+      LOG.warn("Metric name " + name + " was emitted with a null value.");
+      return;
+    } else if (type == null) {
+      LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Emitting metric " + name + ", type " + type + ", value "
+          + value + ", slope " + gSlope.name() + " from hostname "
+          + getHostName());
+    }
+
+    xdr_int(0); // metric_user_defined
+    xdr_string(type);
+    xdr_string(name);
+    xdr_string(value);
+    xdr_string(gConf.getUnits());
+    xdr_int(gSlope.ordinal());
+    xdr_int(gConf.getTmax());
+    xdr_int(gConf.getDmax());
+
+    // send the metric to Ganglia hosts
+    emitToGangliaHosts();
+  }
+}

+ 103 - 0
common/src/java/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This code supports Ganglia 3.1
+ *
+ */
+public class GangliaSink31 extends GangliaSink30 {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());    
+
+  /**
+   * The method sends metrics to Ganglia servers. The method has been taken from
+   * org.apache.hadoop.metrics.ganglia.GangliaContext31 with minimal changes in
+   * order to keep it in sync.
+   * @param groupName The group name of the metric
+   * @param name The metric name
+   * @param type The type of the metric
+   * @param value The value of the metric
+   * @param gConf The GangliaConf for this metric
+   * @param gSlope The slope for this metric
+   * @throws IOException
+   */
+  protected void emitMetric(String groupName, String name, String type,
+      String value, GangliaConf gConf, GangliaSlope gSlope) 
+    throws IOException {
+
+    if (name == null) {
+      LOG.warn("Metric was emitted with no name.");
+      return;
+    } else if (value == null) {
+      LOG.warn("Metric name " + name +" was emitted with a null value.");
+      return;
+    } else if (type == null) {
+      LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Emitting metric " + name + ", type " + type + ", value " + value
+          + ", slope " + gSlope.name()+ " from hostname " + getHostName());
+    }
+
+    // The following XDR recipe was done through a careful reading of
+    // gm_protocol.x in Ganglia 3.1 and carefully examining the output of
+    // the gmetric utility with strace.
+
+    // First we send out a metadata message
+    xdr_int(128);               // metric_id = metadata_msg
+    xdr_string(getHostName());       // hostname
+    xdr_string(name);           // metric name
+    xdr_int(0);                 // spoof = False
+    xdr_string(type);           // metric type
+    xdr_string(name);           // metric name
+    xdr_string(gConf.getUnits());    // units
+    xdr_int(gSlope.ordinal());  // slope
+    xdr_int(gConf.getTmax());        // tmax, the maximum time between metrics
+    xdr_int(gConf.getDmax());        // dmax, the maximum data value
+    xdr_int(1);                 /*Num of the entries in extra_value field for 
+                                  Ganglia 3.1.x*/
+    xdr_string("GROUP");        /*Group attribute*/
+    xdr_string(groupName);      /*Group value*/
+
+    // send the metric to Ganglia hosts
+    emitToGangliaHosts();
+
+    // Now we send out a message with the actual value.
+    // Technically, we only need to send out the metadata message once for
+    // each metric, but I don't want to have to record which metrics we did and
+    // did not send.
+    xdr_int(133);         // we are sending a string value
+    xdr_string(getHostName()); // hostName
+    xdr_string(name);     // metric name
+    xdr_int(0);           // spoof = False
+    xdr_string("%s");     // format field
+    xdr_string(value);    // metric value
+
+    // send the metric to Ganglia hosts
+    emitToGangliaHosts();
+  }
+}

+ 30 - 6
common/src/java/org/apache/hadoop/metrics2/util/MetricsCache.java

@@ -23,9 +23,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,6 +31,9 @@ import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsTag;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+
 /**
  * A metrics cache for sinks that don't support sparse updates.
  */
@@ -68,7 +68,7 @@ public class MetricsCache {
    */
   public static class Record {
     final Map<String, String> tags = Maps.newHashMap();
-    final Map<String, Number> metrics = Maps.newHashMap();
+    final Map<String, AbstractMetric> metrics = Maps.newHashMap();
 
     /**
      * Lookup a tag value
@@ -85,6 +85,16 @@ public class MetricsCache {
      * @return the metric value
      */
     public Number getMetric(String key) {
+      AbstractMetric metric = metrics.get(key);
+      return metric != null ? metric.value() : null;
+    }
+
+    /**
+     * Lookup a metric instance
+     * @param key name of the metric
+     * @return the metric instance
+     */
+    public AbstractMetric getMetricInstance(String key) {
       return metrics.get(key);
     }
 
@@ -96,9 +106,23 @@ public class MetricsCache {
     }
 
     /**
-     * @return entry set of the metrics of the record
+     * @deprecated use metricsEntrySet() instead
+     * @return entry set of metrics
      */
+    @Deprecated
     public Set<Map.Entry<String, Number>> metrics() {
+      Map<String, Number> map = new LinkedHashMap<String, Number>(
+          metrics.size());
+      for (Map.Entry<String, AbstractMetric> mapEntry : metrics.entrySet()) {
+        map.put(mapEntry.getKey(), mapEntry.getValue().value());
+      }
+      return map.entrySet();
+    }
+
+    /**
+     * @return entry set of metrics
+     */
+    public Set<Map.Entry<String, AbstractMetric>> metricsEntrySet() {
       return metrics.entrySet();
     }
 
@@ -141,7 +165,7 @@ public class MetricsCache {
       recordCache.put(tags, record);
     }
     for (AbstractMetric m : mr.metrics()) {
-      record.metrics.put(m.name(), m.value());
+      record.metrics.put(m.name(), m);
     }
     if (includingTags) {
       // mostly for some sinks that include tags as part of a dense schema

+ 197 - 0
common/src/test/core/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaMetricsTestHelper;
+import org.junit.Test;
+
+public class TestGangliaMetrics {
+  public static final Log LOG = LogFactory.getLog(TestMetricsSystemImpl.class);
+  private final String[] expectedMetrics =
+    { "test.s1rec.C1",
+      "test.s1rec.G1",
+      "test.s1rec.Xxx",
+      "test.s1rec.Yyy",
+      "test.s1rec.S1NumOps",
+      "test.s1rec.S1AvgTime" };
+
+  @Test public void testGangliaMetrics2() throws Exception {
+    ConfigBuilder cb = new ConfigBuilder().add("default.period", 10)
+        .add("test.sink.gsink30.context", "test") // filter out only "test"
+        .add("test.sink.gsink31.context", "test") // filter out only "test"
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    TestSource s1 = ms.register("s1", "s1 desc", new TestSource("s1rec"));
+    s1.c1.incr();
+    s1.xxx.incr();
+    s1.g1.set(2);
+    s1.yyy.incr(2);
+    s1.s1.add(0);
+
+    final int expectedCountFromGanglia30 = expectedMetrics.length;
+    final int expectedCountFromGanglia31 = 2 * expectedMetrics.length;
+
+    // use latch to make sure we received required records before shutting
+    // down the MetricSystem
+    CountDownLatch latch = new CountDownLatch(
+        expectedCountFromGanglia30 + expectedCountFromGanglia31);
+
+    // Setup test for GangliaSink30
+    AbstractGangliaSink gsink30 = new GangliaSink30();
+    gsink30.init(cb.subset("test"));
+    MockDatagramSocket mockds30 = new MockDatagramSocket(latch);
+    GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30);
+
+    // Setup test for GangliaSink31
+    AbstractGangliaSink gsink31 = new GangliaSink31();
+    gsink31.init(cb.subset("test"));
+    MockDatagramSocket mockds31 = new MockDatagramSocket(latch);
+    GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31);
+
+    // register the sinks
+    ms.register("gsink30", "gsink30 desc", gsink30);
+    ms.register("gsink31", "gsink31 desc", gsink31);
+    ms.onTimerEvent();  // trigger something interesting
+
+    // wait for all records and the stop MetricSystem.  Without this
+    // sometime the ms gets shutdown before all the sinks have consumed
+    latch.await(200, TimeUnit.MILLISECONDS);
+    ms.stop();
+
+    // check GanfliaSink30 data
+    checkMetrics(mockds30.getCapturedSend(), expectedCountFromGanglia30);
+
+    // check GanfliaSink31 data
+    checkMetrics(mockds31.getCapturedSend(), expectedCountFromGanglia31);
+  }
+
+
+  // check the expected against the actual metrics
+  private void checkMetrics(List<byte[]> bytearrlist, int expectedCount) {
+    boolean[] foundMetrics = new boolean[expectedMetrics.length];
+    for (byte[] bytes : bytearrlist) {
+      String binaryStr = new String(bytes);
+      for (int index = 0; index < expectedMetrics.length; index++) {
+        if (binaryStr.indexOf(expectedMetrics[index]) >= 0) {
+          foundMetrics[index] = true;
+          break;
+        }
+      }
+    }
+
+    for (int index = 0; index < foundMetrics.length; index++) {
+      if (!foundMetrics[index]) {
+        assertTrue("Missing metrics: " + expectedMetrics[index], false);
+      }
+    }
+
+    assertEquals("Mismatch in record count: ",
+        expectedCount, bytearrlist.size());
+  }
+
+  @SuppressWarnings("unused")
+  @Metrics(context="test")
+  private static class TestSource {
+    @Metric("C1 desc") MutableCounterLong c1;
+    @Metric("XXX desc") MutableCounterLong xxx;
+    @Metric("G1 desc") MutableGaugeLong g1;
+    @Metric("YYY desc") MutableGaugeLong yyy;
+    @Metric MutableRate s1;
+    final MetricsRegistry registry;
+
+    TestSource(String recName) {
+      registry = new MetricsRegistry(recName);
+    }
+  }
+
+  /**
+   * This class is used to capture data send to Ganglia servers.
+   *
+   * Initial attempt was to use mockito to mock and capture but
+   * while testing figured out that mockito is keeping the reference
+   * to the byte array and since the sink code reuses the byte array
+   * hence all the captured byte arrays were pointing to one instance.
+   */
+  private class MockDatagramSocket extends DatagramSocket {
+    private ArrayList<byte[]> capture;
+    private CountDownLatch latch;
+
+    /**
+     * @throws SocketException
+     */
+    public MockDatagramSocket() throws SocketException {
+      capture = new  ArrayList<byte[]>();
+    }
+
+    /**
+     * @param latch
+     * @throws SocketException
+     */
+    public MockDatagramSocket(CountDownLatch latch) throws SocketException {
+      this();
+      this.latch = latch;
+    }
+
+    /* (non-Javadoc)
+     * @see java.net.DatagramSocket#send(java.net.DatagramPacket)
+     */
+    @Override
+    public void send(DatagramPacket p) throws IOException {
+      // capture the byte arrays
+      byte[] bytes = new byte[p.getLength()];
+      System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength());
+      capture.add(bytes);
+
+      // decrement the latch
+      latch.countDown();
+    }
+
+    /**
+     * @return the captured byte arrays
+     */
+    ArrayList<byte[]> getCapturedSend() {
+      return capture;
+    }
+  }
+}

+ 39 - 0
common/src/test/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.net.DatagramSocket;
+
+/**
+ * Helper class in the same package as ganglia sinks to be used by unit tests
+ */
+public class GangliaMetricsTestHelper {
+
+  /**
+   * Helper method to access package private method to set DatagramSocket
+   * needed for Unit test
+   * @param gangliaSink
+   * @param datagramSocket
+   */
+  public static void setDatagramSocket(AbstractGangliaSink gangliaSink,
+      DatagramSocket datagramSocket) {
+
+    gangliaSink.setDatagramSocket(datagramSocket);
+  }
+}

+ 16 - 7
common/src/test/core/org/apache/hadoop/metrics2/util/TestMetricsCache.java

@@ -35,6 +35,7 @@ import static org.apache.hadoop.metrics2.lib.Interns.*;
 public class TestMetricsCache {
   private static final Log LOG = LogFactory.getLog(TestMetricsCache.class);
 
+  @SuppressWarnings("deprecation")
   @Test public void testUpdate() {
     MetricsCache cache = new MetricsCache();
     MetricsRecord mr = makeRecord("r",
@@ -54,25 +55,26 @@ public class TestMetricsCache {
         Arrays.asList(makeMetric("m", 2), makeMetric("m2", 42)));
     cr = cache.update(mr2);
     assertEquals("contains 3 metric", 3, cr.metrics().size());
-    assertEquals("updated metric value", 2, cr.getMetric("m"));
-    assertEquals("old metric value", 1, cr.getMetric("m1"));
-    assertEquals("new metric value", 42, cr.getMetric("m2"));
+    checkMetricValue("updated metric value", cr, "m", 2);
+    checkMetricValue("old metric value", cr, "m1", 1);
+    checkMetricValue("new metric value", cr, "m2", 42);
 
     MetricsRecord mr3 = makeRecord("r",
         Arrays.asList(makeTag("t", "tv3")), // different tag value
         Arrays.asList(makeMetric("m3", 3)));
     cr = cache.update(mr3); // should get a new record
     assertEquals("contains 1 metric", 1, cr.metrics().size());
-    assertEquals("updated metric value", 3, cr.getMetric("m3"));
+    checkMetricValue("updated metric value", cr, "m3", 3);
     // tags cache should be empty so far
     assertEquals("no tags", 0, cr.tags().size());
     // until now
     cr = cache.update(mr3, true);
     assertEquals("Got 1 tag", 1, cr.tags().size());
     assertEquals("Tag value", "tv3", cr.getTag("t"));
-    assertEquals("Metric value", 3, cr.getMetric("m3"));
+    checkMetricValue("Metric value", cr, "m3", 3);
   }
 
+  @SuppressWarnings("deprecation")
   @Test public void testGet() {
     MetricsCache cache = new MetricsCache();
     assertNull("empty", cache.get("r", Arrays.asList(makeTag("t", "t"))));
@@ -85,7 +87,7 @@ public class TestMetricsCache {
 
     assertNotNull("Got record", cr);
     assertEquals("contains 1 metric", 1, cr.metrics().size());
-    assertEquals("new metric value", 1, cr.getMetric("m"));
+    checkMetricValue("new metric value", cr, "m", 1);
   }
 
   /**
@@ -109,7 +111,7 @@ public class TestMetricsCache {
       cr = cache.update(makeRecord("r",
           Arrays.asList(makeTag("t"+ i, ""+ i)),
           Arrays.asList(makeMetric("m", i))));
-      assertEquals("new metrics value", i, cr.getMetric("m"));
+      checkMetricValue("new metric value", cr, "m", i);
       if (i < MetricsCache.MAX_RECS_PER_NAME_DEFAULT) {
         assertNotNull("t0 is still there", cache.get("r", t0));
       }
@@ -117,6 +119,13 @@ public class TestMetricsCache {
     assertNull("t0 is gone", cache.get("r", t0));
   }
 
+  private void checkMetricValue(String description, MetricsCache.Record cr,
+      String key, Number val) {
+    assertEquals(description, val, cr.getMetric(key));
+    assertNotNull("metric not null", cr.getMetricInstance(key));
+    assertEquals(description, val, cr.getMetricInstance(key).value());
+  }
+
   private MetricsRecord makeRecord(String name, Collection<MetricsTag> tags,
                                    Collection<AbstractMetric> metrics) {
     MetricsRecord mr = mock(MetricsRecord.class);