瀏覽代碼

HADOOP-16912. Emit per priority RPC queue time and processing time from DecayRpcScheduler. Contributed by Fengnan Li.

Chao Sun 5 年之前
父節點
當前提交
e3fbdcbc14

+ 21 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -42,6 +42,7 @@ import com.google.common.util.concurrent.AtomicDoubleArray;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.metrics.DecayRpcSchedulerDetailedMetrics;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -154,6 +155,10 @@ public class DecayRpcScheduler implements RpcScheduler,
   private final AtomicDoubleArray responseTimeAvgInLastWindow;
   private final AtomicLongArray responseTimeCountInLastWindow;
 
+  // RPC queue time rates per queue
+  private final DecayRpcSchedulerDetailedMetrics
+      decayRpcSchedulerDetailedMetrics;
+
   // Pre-computed scheduling decisions during the decay sweep are
   // atomically swapped in as a read-only map
   private final AtomicReference<Map<Object, Integer>> scheduleCacheRef =
@@ -236,6 +241,10 @@ public class DecayRpcScheduler implements RpcScheduler,
     Preconditions.checkArgument(topUsersCount > 0,
         "the number of top users for scheduler metrics must be at least 1");
 
+    decayRpcSchedulerDetailedMetrics =
+        DecayRpcSchedulerDetailedMetrics.create(ns);
+    decayRpcSchedulerDetailedMetrics.init(numLevels);
+
     // Setup delay timer
     Timer timer = new Timer(true);
     DecayTask task = new DecayTask(this, timer);
@@ -626,6 +635,11 @@ public class DecayRpcScheduler implements RpcScheduler,
     long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
     long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
 
+    this.decayRpcSchedulerDetailedMetrics.addQueueTime(
+        priorityLevel, queueTime);
+    this.decayRpcSchedulerDetailedMetrics.addProcessingTime(
+        priorityLevel, processingTime);
+
     responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
     responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
         queueTime+processingTime);
@@ -987,9 +1001,16 @@ public class DecayRpcScheduler implements RpcScheduler,
     return decayedCallCosts;
   }
 
+  @VisibleForTesting
+  public DecayRpcSchedulerDetailedMetrics
+      getDecayRpcSchedulerDetailedMetrics() {
+    return decayRpcSchedulerDetailedMetrics;
+  }
+
   @Override
   public void stop() {
     metricsProxy.unregisterSource(namespace);
     MetricsProxy.removeInstance(namespace);
+    decayRpcSchedulerDetailedMetrics.shutdown();
   }
 }

+ 135 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/DecayRpcSchedulerDetailedMetrics.java

@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is for maintaining queue (priority) level related
+ * statistics when FairCallQueue is used and publishing them
+ * through the metrics interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@Metrics(about="Per queue(priority) metrics",
+    context="decayrpcschedulerdetailed")
+public class DecayRpcSchedulerDetailedMetrics {
+
+  @Metric private MutableRatesWithAggregation rpcQueueRates;
+  @Metric private MutableRatesWithAggregation rpcProcessingRates;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DecayRpcSchedulerDetailedMetrics.class);
+  private final MetricsRegistry registry;
+  private final String name;
+  private String[] queueNamesForLevels;
+  private String[] processingNamesForLevels;
+
+  DecayRpcSchedulerDetailedMetrics(String ns) {
+    name = "DecayRpcSchedulerDetailedMetrics."+ ns;
+    registry = new MetricsRegistry("decayrpcschedulerdetailed")
+        .tag("port", "RPC port", String.valueOf(ns));
+    LOG.debug(registry.info().toString());
+  }
+
+  public static DecayRpcSchedulerDetailedMetrics create(String ns) {
+    DecayRpcSchedulerDetailedMetrics m =
+        new DecayRpcSchedulerDetailedMetrics(ns);
+    return DefaultMetricsSystem.instance().register(m.name, null, m);
+  }
+
+  /**
+   * Initialize the metrics for JMX with priority levels.
+   */
+  public void init(int numLevels) {
+    LOG.info("Initializing RPC stats for {} priority levels", numLevels);
+    queueNamesForLevels = new String[numLevels];
+    processingNamesForLevels = new String[numLevels];
+    for (int i = 0; i < numLevels; i++) {
+      queueNamesForLevels[i] = getQueueName(i+1);
+      processingNamesForLevels[i] = getProcessingName(i+1);
+    }
+    rpcQueueRates.init(queueNamesForLevels);
+    rpcProcessingRates.init(processingNamesForLevels);
+  }
+
+  /**
+   * Instrument a Call queue time based on its priority.
+   *
+   * @param priority of the RPC call
+   * @param queueTime of the RPC call in the queue of the priority
+   */
+  public void addQueueTime(int priority, long queueTime) {
+    rpcQueueRates.add(queueNamesForLevels[priority], queueTime);
+  }
+
+  /**
+   * Instrument a Call processing time based on its priority.
+   *
+   * @param priority of the RPC call
+   * @param processingTime of the RPC call in the queue of the priority
+   */
+  public void addProcessingTime(int priority, long processingTime) {
+    rpcProcessingRates.add(processingNamesForLevels[priority], processingTime);
+  }
+
+  /**
+   * Shutdown the instrumentation process.
+   */
+  public void shutdown() {
+    DefaultMetricsSystem.instance().unregisterSource(name);
+  }
+
+  /**
+   * Returns the rate name inside the metric.
+   */
+  public String getQueueName(int priority) {
+    return "DecayRPCSchedulerPriority."+priority+".RpcQueueTime";
+  }
+
+  /**
+   * Returns the rate name inside the metric.
+   */
+  public String getProcessingName(int priority) {
+    return "DecayRPCSchedulerPriority."+priority+".RpcProcessingTime";
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  @VisibleForTesting
+  MutableRatesWithAggregation getRpcQueueRates() {
+    return rpcQueueRates;
+  }
+
+  @VisibleForTesting
+  MutableRatesWithAggregation getRpcProcessingRates() {
+    return rpcProcessingRates;
+  }
+}

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java

@@ -79,6 +79,18 @@ public class MutableRatesWithAggregation extends MutableMetric {
     }
   }
 
+  /**
+   * Initialize the registry with all rate names passed in.
+   * This is an alternative to the above init function since this metric
+   * can be used more than just for rpc name.
+   * @param names the array of all rate names
+   */
+  public void init(String[] names) {
+    for (String name : names) {
+      addMetricIfNotExists(name);
+    }
+  }
+
   /**
    * Add a rate sample for a rate metric.
    * @param name of the rate metric

+ 11 - 0
hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

@@ -123,6 +123,17 @@ FairCallQueue metrics will only exist if FairCallQueue is enabled. Each metric e
 | `FairCallQueueSize_p`*Priority* | Current number of calls in priority queue |
 | `FairCallQueueOverflowedCalls_p`*Priority* | Total number of overflowed calls in priority queue |
 
+DecayRpcSchedulerDetailed
+-------------------------
+
+DecayRpcSchedulerDetailed metrics only exist when DecayRpcScheduler is used (FairCallQueue enabled). It is an addition
+to FairCallQueue metrics. For each level of priority, rpcqueue and rpcprocessing detailed metrics are exposed.
+
+| Name | Description |
+|:---- | :---- |
+|  `DecayRPCSchedulerPriority.`*Priority*`.RpcQueueTime` | RpcQueueTime metrics for each priority |
+|  `DecayRPCSchedulerPriority.`*Priority*`.RpcProcessingTime` | RpcProcessingTime metrics for each priority |
+
 rpcdetailed context
 ===================
 

+ 32 - 29
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

@@ -66,15 +66,15 @@ public class TestDecayRpcScheduler {
   @SuppressWarnings("deprecation")
   public void testParsePeriod() {
     // By default
-    scheduler = new DecayRpcScheduler(1, "", new Configuration());
+    scheduler = new DecayRpcScheduler(1, "ipc.1", new Configuration());
     assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
       scheduler.getDecayPeriodMillis());
 
     // Custom
     Configuration conf = new Configuration();
-    conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
+    conf.setLong("ipc.2." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
       1058);
-    scheduler = new DecayRpcScheduler(1, "ns", conf);
+    scheduler = new DecayRpcScheduler(1, "ipc.2", conf);
     assertEquals(1058L, scheduler.getDecayPeriodMillis());
   }
 
@@ -82,15 +82,15 @@ public class TestDecayRpcScheduler {
   @SuppressWarnings("deprecation")
   public void testParseFactor() {
     // Default
-    scheduler = new DecayRpcScheduler(1, "", new Configuration());
+    scheduler = new DecayRpcScheduler(1, "ipc.3", new Configuration());
     assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT,
       scheduler.getDecayFactor(), 0.00001);
 
     // Custom
     Configuration conf = new Configuration();
-    conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
+    conf.set("ipc.4." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
       "0.125");
-    scheduler = new DecayRpcScheduler(1, "prefix", conf);
+    scheduler = new DecayRpcScheduler(1, "ipc.4", conf);
     assertEquals(0.125, scheduler.getDecayFactor(), 0.00001);
   }
 
@@ -106,23 +106,23 @@ public class TestDecayRpcScheduler {
   public void testParseThresholds() {
     // Defaults vary by number of queues
     Configuration conf = new Configuration();
-    scheduler = new DecayRpcScheduler(1, "", conf);
+    scheduler = new DecayRpcScheduler(1, "ipc.5", conf);
     assertEqualDecimalArrays(new double[]{}, scheduler.getThresholds());
 
-    scheduler = new DecayRpcScheduler(2, "", conf);
+    scheduler = new DecayRpcScheduler(2, "ipc.6", conf);
     assertEqualDecimalArrays(new double[]{0.5}, scheduler.getThresholds());
 
-    scheduler = new DecayRpcScheduler(3, "", conf);
+    scheduler = new DecayRpcScheduler(3, "ipc.7", conf);
     assertEqualDecimalArrays(new double[]{0.25, 0.5}, scheduler.getThresholds());
 
-    scheduler = new DecayRpcScheduler(4, "", conf);
+    scheduler = new DecayRpcScheduler(4, "ipc.8", conf);
     assertEqualDecimalArrays(new double[]{0.125, 0.25, 0.5}, scheduler.getThresholds());
 
     // Custom
     conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
+    conf.set("ipc.9." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
       "1, 10, 20, 50, 85");
-    scheduler = new DecayRpcScheduler(6, "ns", conf);
+    scheduler = new DecayRpcScheduler(6, "ipc.9", conf);
     assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds());
   }
 
@@ -130,8 +130,9 @@ public class TestDecayRpcScheduler {
   @SuppressWarnings("deprecation")
   public void testAccumulate() {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
-    scheduler = new DecayRpcScheduler(1, "ns", conf);
+    conf.set("ipc.10." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
+        "99999999"); // Never flush
+    scheduler = new DecayRpcScheduler(1, "ipc.10", conf);
 
     assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first
 
@@ -151,11 +152,11 @@ public class TestDecayRpcScheduler {
   @SuppressWarnings("deprecation")
   public void testDecay() throws Exception {
     Configuration conf = new Configuration();
-    conf.setLong("ns." // Never decay
+    conf.setLong("ipc.11." // Never decay
         + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999);
-    conf.setDouble("ns."
+    conf.setDouble("ipc.11."
         + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5);
-    scheduler = new DecayRpcScheduler(1, "ns", conf);
+    scheduler = new DecayRpcScheduler(1, "ipc.11", conf);
 
     assertEquals(0, scheduler.getTotalCallSnapshot());
 
@@ -202,7 +203,7 @@ public class TestDecayRpcScheduler {
   @SuppressWarnings("deprecation")
   public void testPriority() throws Exception {
     Configuration conf = new Configuration();
-    final String namespace = "ns";
+    final String namespace = "ipc.12";
     conf.set(namespace + "." + DecayRpcScheduler
         .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
     conf.set(namespace + "." + DecayRpcScheduler
@@ -239,9 +240,11 @@ public class TestDecayRpcScheduler {
   @SuppressWarnings("deprecation")
   public void testPeriodic() throws InterruptedException {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
-    scheduler = new DecayRpcScheduler(1, "ns", conf);
+    conf.set(
+        "ipc.13." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
+    conf.set(
+        "ipc.13." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+    scheduler = new DecayRpcScheduler(1, "ipc.13", conf);
 
     assertEquals(10, scheduler.getDecayPeriodMillis());
     assertEquals(0, scheduler.getTotalCallSnapshot());
@@ -269,7 +272,7 @@ public class TestDecayRpcScheduler {
       // MetricsSystemImpl to true
       DefaultMetricsSystem.initialize("NameNode");
       Configuration conf = new Configuration();
-      scheduler = new DecayRpcScheduler(1, "ns", conf);
+      scheduler = new DecayRpcScheduler(1, "ipc.14", conf);
       // check if there is npe in log
       assertFalse(bytes.toString().contains("NullPointerException"));
     } finally {
@@ -280,7 +283,7 @@ public class TestDecayRpcScheduler {
 
   @Test
   public void testUsingWeightedTimeCostProvider() {
-    scheduler = getSchedulerWithWeightedTimeCostProvider(3);
+    scheduler = getSchedulerWithWeightedTimeCostProvider(3, "ipc.15");
 
     // 3 details in increasing order of cost. Although medium has a longer
     // duration, the shared lock is weighted less than the exclusive lock
@@ -330,7 +333,7 @@ public class TestDecayRpcScheduler {
 
   @Test
   public void testUsingWeightedTimeCostProviderWithZeroCostCalls() {
-    scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+    scheduler = getSchedulerWithWeightedTimeCostProvider(2, "ipc.16");
 
     ProcessingDetails emptyDetails =
         new ProcessingDetails(TimeUnit.MILLISECONDS);
@@ -347,7 +350,7 @@ public class TestDecayRpcScheduler {
 
   @Test
   public void testUsingWeightedTimeCostProviderNoRequests() {
-    scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+    scheduler = getSchedulerWithWeightedTimeCostProvider(2, "ipc.18");
 
     assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
   }
@@ -357,13 +360,13 @@ public class TestDecayRpcScheduler {
    * normal decaying disabled.
    */
   private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider(
-      int priorityLevels) {
+      int priorityLevels, String ns) {
     Configuration conf = new Configuration();
-    conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+    conf.setClass(ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
         WeightedTimeCostProvider.class, CostProvider.class);
-    conf.setLong("ns."
+    conf.setLong(ns + "."
         + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
-    return new DecayRpcScheduler(priorityLevels, "ns", conf);
+    return new DecayRpcScheduler(priorityLevels, ns, conf);
   }
 
   /**

+ 45 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/metrics/TestDecayRpcSchedulerDetailedMetrics.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.DecayRpcScheduler;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.junit.Test;
+
+public class TestDecayRpcSchedulerDetailedMetrics {
+
+  @Test
+  public void metricsRegistered() {
+    Configuration conf = new Configuration();
+    DecayRpcScheduler scheduler = new DecayRpcScheduler(4, "ipc.8020", conf);
+    MetricsSystem metricsSystem = DefaultMetricsSystem.instance();
+    DecayRpcSchedulerDetailedMetrics metrics =
+        scheduler.getDecayRpcSchedulerDetailedMetrics();
+
+    assertNotNull(metricsSystem.getSource(metrics.getName()));
+
+    scheduler.stop();
+
+    assertNull(metricsSystem.getSource(metrics.getName()));
+  }
+}

+ 13 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java

@@ -149,6 +149,19 @@ public class TestMutableMetrics {
     assertGauge("BarAvgTime", 0.0, rb);
   }
 
+  @Test public void testMutableRatesWithAggregationInitWithArray() {
+    MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
+
+    rates.init(new String[]{"Foo", "Bar"});
+    rates.snapshot(rb, false);
+
+    assertCounter("FooNumOps", 0L, rb);
+    assertGauge("FooAvgTime", 0.0, rb);
+    assertCounter("BarNumOps", 0L, rb);
+    assertGauge("BarAvgTime", 0.0, rb);
+  }
+
   @Test public void testMutableRatesWithAggregationSingleThread() {
     MutableRatesWithAggregation rates = new MutableRatesWithAggregation();