Browse Source

HADOOP-10181. GangliaContext does not work with multicast ganglia setup. Contributed by Andrew Johnson.

cnauroth 10 years ago
parent
commit
8004a00230

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -811,6 +811,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11432. Fix SymlinkBaseTest#testCreateLinkUsingPartQualPath2.
     (Liang Xie via gera)
 
+    HADOOP-10181. GangliaContext does not work with multicast ganglia setup.
+    (Andrew Johnson via cnauroth)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 25 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java

@@ -21,10 +21,7 @@
 package org.apache.hadoop.metrics.ganglia;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.SocketAddress;
-import java.net.SocketException;
+import java.net.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,13 +51,16 @@ public class GangliaContext extends AbstractMetricsContext {
   private static final String SLOPE_PROPERTY = "slope";
   private static final String TMAX_PROPERTY = "tmax";
   private static final String DMAX_PROPERTY = "dmax";
-    
+  private static final String MULTICAST_PROPERTY = "multicast";
+  private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
+
   private static final String DEFAULT_UNITS = "";
   private static final String DEFAULT_SLOPE = "both";
   private static final int DEFAULT_TMAX = 60;
   private static final int DEFAULT_DMAX = 0;
   private static final int DEFAULT_PORT = 8649;
   private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
+  private static final int DEFAULT_MULTICAST_TTL = 1;
 
   private final Log LOG = LogFactory.getLog(this.getClass());    
 
@@ -83,6 +83,8 @@ public class GangliaContext extends AbstractMetricsContext {
   private Map<String,String> slopeTable;
   private Map<String,String> tmaxTable;
   private Map<String,String> dmaxTable;
+  private boolean multicastEnabled;
+  private int multicastTtl;
     
   protected DatagramSocket datagramSocket;
     
@@ -104,11 +106,26 @@ public class GangliaContext extends AbstractMetricsContext {
     slopeTable = getAttributeTable(SLOPE_PROPERTY);
     tmaxTable  = getAttributeTable(TMAX_PROPERTY);
     dmaxTable  = getAttributeTable(DMAX_PROPERTY);
+    multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY));
+    String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY);
+    if (multicastEnabled) {
+      if (multicastTtlValue == null) {
+        multicastTtl = DEFAULT_MULTICAST_TTL;
+      } else {
+        multicastTtl = Integer.parseInt(multicastTtlValue);
+      }
+    }
         
     try {
-      datagramSocket = new DatagramSocket();
-    } catch (SocketException se) {
-      se.printStackTrace();
+      if (multicastEnabled) {
+        LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
+        datagramSocket = new MulticastSocket();
+        ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
+      } else {
+        datagramSocket = new DatagramSocket();
+      }
+    } catch (IOException e) {
+      LOG.error(e);
     }
   }
 

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html

@@ -54,6 +54,12 @@ These are the implementation specific factory attributes
     <dt><i>contextName</i>.period</dt>
     <dd>The period in seconds on which the metric data is sent to the
     server(s).</dd>
+
+    <dt><i>contextName</i>.multicast</dt>
+    <dd>Enable multicast for Ganglia</dd>
+
+    <dt><i>contextName</i>.multicast.ttl</dt>
+    <dd>TTL for multicast packets</dd>
     
     <dt><i>contextName</i>.units.<i>recordName</i>.<i>metricName</i></dt>
     <dd>The units for the specified metric in the specified record.</dd>

+ 27 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java

@@ -19,12 +19,7 @@
 package org.apache.hadoop.metrics2.sink.ganglia;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
+import java.net.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +59,11 @@ public abstract class AbstractGangliaSink implements MetricsSink {
   public static final int DEFAULT_DMAX = 0;
   public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
   public static final int DEFAULT_PORT = 8649;
+  public static final boolean DEFAULT_MULTICAST_ENABLED = false;
+  public static final int DEFAULT_MULTICAST_TTL = 1;
   public static final String SERVERS_PROPERTY = "servers";
+  public static final String MULTICAST_ENABLED_PROPERTY = "multicast";
+  public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
   public static final int BUFFER_SIZE = 1500; // as per libgmond.c
   public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
   public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
@@ -73,6 +72,8 @@ public abstract class AbstractGangliaSink implements MetricsSink {
   private String hostName = "UNKNOWN.example.com";
   private DatagramSocket datagramSocket;
   private List<? extends SocketAddress> metricsServers;
+  private boolean multicastEnabled;
+  private int multicastTtl;
   private byte[] buffer = new byte[BUFFER_SIZE];
   private int offset;
   private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
@@ -134,6 +135,9 @@ public abstract class AbstractGangliaSink implements MetricsSink {
     // load the gannglia servers from properties
     metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
         DEFAULT_PORT);
+    multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
+            DEFAULT_MULTICAST_ENABLED);
+    multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);
 
     // extract the Ganglia conf per metrics
     gangliaConfMap = new HashMap<String, GangliaConf>();
@@ -143,9 +147,15 @@ public abstract class AbstractGangliaSink implements MetricsSink {
     loadGangliaConf(GangliaConfType.slope);
 
     try {
-      datagramSocket = new DatagramSocket();
-    } catch (SocketException se) {
-      LOG.error(se);
+      if (multicastEnabled) {
+        LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
+        datagramSocket = new MulticastSocket();
+        ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
+      } else {
+        datagramSocket = new DatagramSocket();
+      }
+    } catch (IOException e) {
+      LOG.error(e);
     }
 
     // see if sparseMetrics is supported. Default is false
@@ -295,4 +305,12 @@ public abstract class AbstractGangliaSink implements MetricsSink {
   void setDatagramSocket(DatagramSocket datagramSocket) {
     this.datagramSocket = datagramSocket;
   }
+
+  /**
+   * Used only by unit tests
+   * @return the datagramSocket for this sink
+   */
+  DatagramSocket getDatagramSocket() {
+    return datagramSocket;
+  }
 }

+ 41 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java

@@ -22,13 +22,54 @@
 package org.apache.hadoop.metrics.ganglia;
 
 import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.metrics.ContextFactory;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
 
+import java.net.MulticastSocket;
+
 public class TestGangliaContext {
+  @Test
+  public void testShouldCreateDatagramSocketByDefault() throws Exception {
+    GangliaContext context = new GangliaContext();
+    context.init("gangliaContext", ContextFactory.getFactory());
+    assertFalse("Created MulticastSocket", context.datagramSocket instanceof MulticastSocket);
+  }
+
+  @Test
+  public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
+    GangliaContext context = new GangliaContext();
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("gangliaContext.multicast", "false");
+    context.init("gangliaContext", factory);
+    assertFalse("Created MulticastSocket", context.datagramSocket instanceof MulticastSocket);
+  }
+
+  @Test
+  public void testShouldCreateMulticastSocket() throws Exception {
+    GangliaContext context = new GangliaContext();
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("gangliaContext.multicast", "true");
+    context.init("gangliaContext", factory);
+    assertTrue("Did not create MulticastSocket", context.datagramSocket instanceof MulticastSocket);
+    MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket;
+    assertEquals("Did not set default TTL", multicastSocket.getTimeToLive(), 1);
+  }
+
+  @Test
+  public void testShouldSetMulticastSocketTtl() throws Exception {
+    GangliaContext context = new GangliaContext();
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("gangliaContext.multicast", "true");
+    factory.setAttribute("gangliaContext.multicast.ttl", "10");
+    context.init("gangliaContext", factory);
+    MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket;
+    assertEquals("Did not set TTL", multicastSocket.getTimeToLive(), 10);
+  }
   
   @Test
   public void testCloseShouldCloseTheSocketWhichIsCreatedByInit() throws Exception {

+ 81 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.impl.ConfigBuilder;
+import org.junit.Test;
+
+import java.net.DatagramSocket;
+import java.net.MulticastSocket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestGangliaSink {
+    @Test
+    public void testShouldCreateDatagramSocketByDefault() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .subset("test.sink.ganglia");
+
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket);
+    }
+
+    @Test
+    public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .add("test.sink.ganglia.multicast", false)
+                .subset("test.sink.ganglia");
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket);
+    }
+
+    @Test
+    public void testShouldCreateMulticastSocket() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .add("test.sink.ganglia.multicast", true)
+                .subset("test.sink.ganglia");
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket);
+        int ttl = ((MulticastSocket) socket).getTimeToLive();
+        assertEquals("Did not set default TTL", 1, ttl);
+    }
+
+    @Test
+    public void testShouldSetMulticastSocketTtl() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .add("test.sink.ganglia.multicast", true)
+                .add("test.sink.ganglia.multicast.ttl", 3)
+                .subset("test.sink.ganglia");
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket);
+        int ttl = ((MulticastSocket) socket).getTimeToLive();
+        assertEquals("Did not set TTL", 3, ttl);
+    }
+}