Browse Source

HADOOP-12985. Support MetricsSource interface for DecayRpcScheduler Metrics. Contributed by Xiaoyu Yao.

Xiaoyu Yao 9 years ago
parent
commit
5bd7b592e5

+ 124 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -32,11 +32,19 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDoubleArray;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
+import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
 
 import org.codehaus.jackson.map.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -49,7 +57,8 @@ import org.slf4j.LoggerFactory;
  * for large periods (on the order of seconds), as it offloads work to the
  * decay sweep.
  */
-public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean {
+public class DecayRpcScheduler implements RpcScheduler,
+    DecayRpcSchedulerMXBean, MetricsSource {
   /**
    * Period controls how many milliseconds between each decay sweep.
    */
@@ -107,6 +116,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
       IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY =
       "decay-scheduler.backoff.responsetime.thresholds";
 
+  // Specifies the top N user's call count and scheduler decision
+  // Metrics2 Source
+  public static final String DECAYSCHEDULER_METRICS_TOP_USER_COUNT =
+      "decay-scheduler.metrics.top.user.count";
+  public static final int DECAYSCHEDULER_METRICS_TOP_USER_COUNT_DEFAULT = 10;
+
   public static final Logger LOG =
       LoggerFactory.getLogger(DecayRpcScheduler.class);
 
@@ -138,6 +153,8 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
   private final IdentityProvider identityProvider;
   private final boolean backOffByResponseTimeEnabled;
   private final long[] backOffResponseTimeThresholds;
+  private final String namespace;
+  private final int topUsersCount; // e.g., report top 10 users' metrics
 
   /**
    * This TimerTask will call decayCurrentCounts until
@@ -179,6 +196,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
           "at least 1");
     }
     this.numLevels = numLevels;
+    this.namespace = ns;
     this.decayFactor = parseDecayFactor(ns, conf);
     this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
     this.identityProvider = this.parseIdentityProvider(ns, conf);
@@ -199,8 +217,15 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
     responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels);
     responseTimeCountInLastWindow = new AtomicLongArray(numLevels);
 
+    topUsersCount =
+        conf.getInt(DECAYSCHEDULER_METRICS_TOP_USER_COUNT,
+            DECAYSCHEDULER_METRICS_TOP_USER_COUNT_DEFAULT);
+    Preconditions.checkArgument(topUsersCount > 0,
+        "the number of top users for scheduler metrics must be at least 1");
+
     MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
     prox.setDelegate(this);
+    prox.registerMetrics2Source(ns);
   }
 
   // Load configs
@@ -615,7 +640,8 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
    * MetricsProxy is a singleton because we may init multiple schedulers and we
    * want to clean up resources when a new scheduler replaces the old one.
    */
-  private static final class MetricsProxy implements DecayRpcSchedulerMXBean {
+  public static final class MetricsProxy implements DecayRpcSchedulerMXBean,
+      MetricsSource {
     // One singleton per namespace
     private static final HashMap<String, MetricsProxy> INSTANCES =
       new HashMap<String, MetricsProxy>();
@@ -646,6 +672,11 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
       this.delegate = new WeakReference<DecayRpcScheduler>(obj);
     }
 
+    void registerMetrics2Source(String namespace) {
+      final String name = "DecayRpcSchedulerMetrics2." + namespace;
+      DefaultMetricsSystem.instance().register(name, name, this);
+    }
+
     @Override
     public String getSchedulingDecisionSummary() {
       DecayRpcScheduler scheduler = delegate.get();
@@ -704,6 +735,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
         return scheduler.getResponseTimeCountInLastWindow();
       }
     }
+
+    @Override
+    public void getMetrics(MetricsCollector collector, boolean all) {
+      DecayRpcScheduler scheduler = delegate.get();
+      if (scheduler != null) {
+        scheduler.getMetrics(collector, all);
+      }
+    }
   }
 
   public int getUniqueIdentityCount() {
@@ -731,6 +770,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
     return ret;
   }
 
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    // Metrics2 interface to act as a Metric source
+    try {
+      MetricsRecordBuilder rb = collector.addRecord(getClass().getName())
+          .setContext(namespace);
+      addTotalCallVolume(rb);
+      addUniqueIdentityCount(rb);
+      addTopNCallerSummary(rb);
+      addAvgResponseTimePerPriority(rb);
+      addCallVolumePerPriority(rb);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while metric collection. Exception : "
+          + e.getMessage());
+    }
+  }
+
+  // Key: UniqueCallers
+  private void addUniqueIdentityCount(MetricsRecordBuilder rb) {
+    rb.addCounter(Interns.info("UniqueCallers", "Total unique callers"),
+        getUniqueIdentityCount());
+  }
+
+  // Key: CallVolume
+  private void addTotalCallVolume(MetricsRecordBuilder rb) {
+    rb.addCounter(Interns.info("CallVolume", "Total Call Volume"),
+        getTotalCallVolume());
+  }
+
+  // Key: Priority.0.CallVolume
+  private void addCallVolumePerPriority(MetricsRecordBuilder rb) {
+    for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
+      rb.addGauge(Interns.info("Priority." + i + ".CallVolume", "Call volume " +
+          "of priority "+ i), responseTimeCountInLastWindow.get(i));
+    }
+  }
+
+  // Key: Priority.0.AvgResponseTime
+  private void addAvgResponseTimePerPriority(MetricsRecordBuilder rb) {
+    for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) {
+      rb.addGauge(Interns.info("Priority." + i + ".AvgResponseTime", "Average" +
+          " response time of priority " + i),
+          responseTimeAvgInLastWindow.get(i));
+    }
+  }
+
+  // Key: Top.0.Caller(xyz).Volume and Top.0.Caller(xyz).Priority
+  private void addTopNCallerSummary(MetricsRecordBuilder rb) {
+    final int topCallerCount = 10;
+    TopN topNCallers = getTopCallers(topCallerCount);
+    Map<Object, Integer> decisions = scheduleCacheRef.get();
+    for (int i=0; i < topNCallers.size(); i++) {
+      NameValuePair entry =  topNCallers.poll();
+      String topCaller = "Top." + (topCallerCount - i) + "." +
+          "Caller(" + entry.getName() + ")";
+      String topCallerVolume = topCaller + ".Volume";
+      String topCallerPriority = topCaller + ".Priority";
+      rb.addCounter(Interns.info(topCallerVolume, topCallerVolume),
+          entry.getValue());
+      Integer priority = decisions.get(entry.getName());
+      if (priority != null) {
+        rb.addCounter(Interns.info(topCallerPriority, topCallerPriority),
+            priority);
+      }
+    }
+  }
+
+  // Get the top N callers' call count and scheduler decision
+  private TopN getTopCallers(int n) {
+    TopN topNCallers = new TopN(n);
+    Iterator<Map.Entry<Object, AtomicLong>> it =
+        callCounts.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<Object, AtomicLong> entry = it.next();
+      String caller = entry.getKey().toString();
+      Long count = entry.getValue().get();
+      if (count > 0) {
+        topNCallers.offer(new NameValuePair(caller, count));
+      }
+    }
+    return topNCallers;
+  }
+
   public String getSchedulingDecisionSummary() {
     Map<Object, Integer> decisions = scheduleCacheRef.get();
     if (decisions == null) {

+ 0 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsUtil.java

@@ -101,5 +101,4 @@ public class MetricsUtil {
     }
     return hostName;
   }
-
 }

+ 105 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Metrics2Util.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.util;
+
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Utility class to simplify creation of hadoop metrics2 source/sink.
+ */
+@InterfaceAudience.Private
+public class Metrics2Util {
+  /**
+   * A pair of a name and its corresponding value. Defines a custom
+   * comparator so the TopN PriorityQueue sorts based on the count.
+   */
+  @InterfaceAudience.Private
+  public static class NameValuePair implements Comparable<NameValuePair> {
+    private String name;
+    private long value;
+
+    public NameValuePair(String metricName, long value) {
+      this.name = metricName;
+      this.value = value;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public long getValue() {
+      return value;
+    }
+
+    @Override
+    public int compareTo(NameValuePair other) {
+      return (int) (value - other.value);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof NameValuePair) {
+        return compareTo((NameValuePair)other) == 0;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.valueOf(value).hashCode();
+    }
+  }
+
+  /**
+   * A fixed-size priority queue, used to retrieve top-n of offered entries.
+   */
+  @InterfaceAudience.Private
+  public static class TopN extends PriorityQueue<NameValuePair> {
+    private static final long serialVersionUID = 5134028249611535803L;
+    private int n; // > 0
+    private long total = 0;
+
+    public TopN(int n) {
+      super(n);
+      this.n = n;
+    }
+
+    @Override
+    public boolean offer(NameValuePair entry) {
+      updateTotal(entry.value);
+      if (size() == n) {
+        NameValuePair smallest = peek();
+        if (smallest.value >= entry.value) {
+          return false;
+        }
+        poll(); // remove smallest
+      }
+      return super.offer(entry);
+    }
+
+    private void updateTotal(long value) {
+      total += value;
+    }
+
+    public long getTotal() {
+      return total;
+    }
+  }
+}

+ 68 - 22
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import com.google.common.base.Supplier;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -1025,7 +1026,6 @@ public class TestRPC extends TestRpcBase {
    */
   @Test (timeout=30000)
   public void testClientBackOffByResponseTime() throws Exception {
-    Server server;
     final TestRpcService proxy;
     boolean succeeded = false;
     final int numClients = 1;
@@ -1038,28 +1038,9 @@ public class TestRPC extends TestRpcBase {
     final ExecutorService executorService =
         Executors.newFixedThreadPool(numClients);
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
-    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0.";
-    conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
-    conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
-        "org.apache.hadoop.ipc.FairCallQueue");
-    conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
-        "org.apache.hadoop.ipc.DecayRpcScheduler");
-    conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
-        2);
-    conf.setBoolean(ns +
-        DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
-        true);
-    // set a small thresholds 2s and 4s for level 0 and level 1 for testing
-    conf.set(ns +
-        DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY
-        , "2s, 4s");
 
-    // Set max queue size to 3 so that 2 calls from the test won't trigger
-    // back off because the queue is full.
-    RPC.Builder builder = newServerBuilder(conf)
-        .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1)
-        .setVerbose(true);
-    server = setupTestServer(builder);
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    Server server = setupDecayRpcSchedulerandTestServer(ns + ".");
 
     @SuppressWarnings("unchecked")
     CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
@@ -1068,6 +1049,13 @@ public class TestRPC extends TestRpcBase {
 
     Exception lastException = null;
     proxy = getClient(addr, conf);
+
+    MetricsRecordBuilder rb1 =
+        getMetrics("DecayRpcSchedulerMetrics2." + ns);
+    final long beginCallVolume = MetricsAsserts.getLongCounter("CallVolume", rb1);
+    final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
+        rb1);
+
     try {
       // start a sleep RPC call that sleeps 3s.
       for (int i = 0; i < numClients; i++) {
@@ -1095,6 +1083,36 @@ public class TestRPC extends TestRpcBase {
         } else {
           lastException = unwrapExeption;
         }
+
+        // Lets Metric system update latest metrics
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            MetricsRecordBuilder rb2 =
+              getMetrics("DecayRpcSchedulerMetrics2." + ns);
+            long callVolume1 = MetricsAsserts.getLongCounter("CallVolume", rb2);
+            int uniqueCaller1 = MetricsAsserts.getIntCounter("UniqueCallers",
+              rb2);
+            long callVolumePriority0 = MetricsAsserts.getLongGauge(
+                "Priority.0.CallVolume", rb2);
+            long callVolumePriority1 = MetricsAsserts.getLongGauge(
+                "Priority.1.CallVolume", rb2);
+            double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
+                "Priority.0.AvgResponseTime", rb2);
+            double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
+                "Priority.1.AvgResponseTime", rb2);
+
+            LOG.info("CallVolume1: " + callVolume1);
+            LOG.info("UniqueCaller: " + uniqueCaller1);
+            LOG.info("Priority.0.CallVolume: " + callVolumePriority0);
+            LOG.info("Priority.1.CallVolume: " + callVolumePriority1);
+            LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
+            LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
+
+            return callVolume1 > beginCallVolume
+                && uniqueCaller1 > beginUniqueCaller;
+          }
+        }, 30, 60000);
       }
     } finally {
       executorService.shutdown();
@@ -1106,6 +1124,34 @@ public class TestRPC extends TestRpcBase {
     assertTrue("RetriableException not received", succeeded);
   }
 
+  private Server setupDecayRpcSchedulerandTestServer(String ns)
+      throws Exception {
+    final int queueSizePerHandler = 3;
+
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+    conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
+        "org.apache.hadoop.ipc.FairCallQueue");
+    conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+        "org.apache.hadoop.ipc.DecayRpcScheduler");
+    conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
+        2);
+    conf.setBoolean(ns +
+            DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
+        true);
+    // set a small thresholds 2s and 4s for level 0 and level 1 for testing
+    conf.set(ns +
+            DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY
+        , "2s, 4s");
+
+    // Set max queue size to 3 so that 2 calls from the test won't trigger
+    // back off because the queue is full.
+    RPC.Builder builder = newServerBuilder(conf)
+        .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1)
+        .setVerbose(true);
+    return setupTestServer(builder);
+  }
+
   /**
    *  Test RPC timeout.
    */

+ 3 - 70
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java

@@ -20,17 +20,17 @@ package org.apache.hadoop.hdfs.server.namenode.top.window;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.primitives.Ints;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
+import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,7 +209,7 @@ public class RollingWindowManager {
       }
       for (int i = 0; i < size; i++) {
         NameValuePair userEntry = reverse.pop();
-        User user = new User(userEntry.name, userEntry.value);
+        User user = new User(userEntry.getName(), userEntry.getValue());
         op.addUser(user);
       }
     }
@@ -276,71 +276,4 @@ public class RollingWindowManager {
     }
     return window;
   }
-
-  /**
-   * A pair of a name and its corresponding value. Defines a custom 
-   * comparator so the TopN PriorityQueue sorts based on the count.
-   */
-  static private class NameValuePair implements Comparable<NameValuePair> {
-    String name;
-    long value;
-
-    public NameValuePair(String metricName, long value) {
-      this.name = metricName;
-      this.value = value;
-    }
-
-    @Override
-    public int compareTo(NameValuePair other) {
-      return (int) (value - other.value);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other instanceof NameValuePair) {
-        return compareTo((NameValuePair)other) == 0;
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return Long.valueOf(value).hashCode();
-    }
-  }
-
-  /**
-   * A fixed-size priority queue, used to retrieve top-n of offered entries.
-   */
-  static private class TopN extends PriorityQueue<NameValuePair> {
-    private static final long serialVersionUID = 5134028249611535803L;
-    int n; // > 0
-    private long total = 0;
-
-    TopN(int n) {
-      super(n);
-      this.n = n;
-    }
-
-    @Override
-    public boolean offer(NameValuePair entry) {
-      updateTotal(entry.value);
-      if (size() == n) {
-        NameValuePair smallest = peek();
-        if (smallest.value >= entry.value) {
-          return false;
-        }
-        poll(); // remove smallest
-      }
-      return super.offer(entry);
-    }
-
-    private void updateTotal(long value) {
-      total += value;
-    }
-
-    public long getTotal() {
-      return total;
-    }
-  }
 }