1
0
Kaynağa Gözat

HDFS-11194. Maintain aggregated peer performance metrics on NameNode.

Arpit Agarwal 8 yıl önce
ebeveyn
işleme
b57368b6f8
42 değiştirilmiş dosya ile 1721 ekleme ve 107 silme
  1. 46 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
  2. 7 6
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
  3. 107 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java
  4. 18 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  5. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
  6. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
  7. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  8. 41 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  9. 273 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
  10. 33 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  11. 11 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  12. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  13. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  14. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  15. 54 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
  16. 194 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
  17. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  18. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  19. 7 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  20. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
  21. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  22. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
  23. 7 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  24. 22 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
  25. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
  26. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
  27. 226 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
  28. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
  29. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
  30. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
  31. 23 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
  32. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
  33. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
  34. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
  35. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
  36. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
  37. 142 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
  38. 335 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
  39. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  40. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
  41. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
  42. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java

+ 46 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.metrics2.lib;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +40,9 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import javax.annotation.Nullable;
+
 import static org.apache.hadoop.metrics2.lib.Interns.*;
 
 /**
@@ -63,7 +67,10 @@ public class RollingAverages extends MutableMetric implements Closeable {
           .setNameFormat("RollingAverages-%d").build());
 
   private ScheduledFuture<?> scheduledTask = null;
+
+  @Nullable
   private Map<String, MutableRate> currentSnapshot;
+
   private final int numWindows;
   private final String avgInfoNameTemplate;
   private final String avgInfoDescTemplate;
@@ -100,31 +107,31 @@ public class RollingAverages extends MutableMetric implements Closeable {
 
   /**
    * Constructor of {@link RollingAverages}.
-   * @param windowSize
-   *          The number of seconds of each window for which sub set of samples
-   *          are gathered to compute the rolling average, A.K.A. roll over
-   *          interval.
+   * @param windowSizeMs
+   *          The number of milliseconds of each window for which subset
+   *          of samples are gathered to compute the rolling average, A.K.A.
+   *          roll over interval.
    * @param numWindows
    *          The number of windows maintained to compute the rolling average.
    * @param valueName
    *          of the metric (e.g. "Time", "Latency")
    */
   public RollingAverages(
-      final int windowSize,
+      final long windowSizeMs,
       final int numWindows,
       final String valueName) {
     String uvName = StringUtils.capitalize(valueName);
     String lvName = StringUtils.uncapitalize(valueName);
-    avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
+    avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName;
     avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
     this.numWindows = numWindows;
     scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
-        windowSize, windowSize, TimeUnit.SECONDS);
+        windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS);
   }
 
   /**
    * Constructor of {@link RollingAverages}.
-   * @param windowSize
+   * @param windowSizeMs
    *          The number of seconds of each window for which sub set of samples
    *          are gathered to compute rolling average, also A.K.A roll over
    *          interval.
@@ -133,9 +140,9 @@ public class RollingAverages extends MutableMetric implements Closeable {
    *          average of the rolling averages.
    */
   public RollingAverages(
-      final int windowSize,
+      final long windowSizeMs,
       final int numWindows) {
-    this(windowSize, numWindows, "Time");
+    this(windowSizeMs, numWindows, "Time");
   }
 
   @Override
@@ -213,7 +220,7 @@ public class RollingAverages extends MutableMetric implements Closeable {
    * Iterates over snapshot to capture all Avg metrics into rolling structure
    * {@link RollingAverages#averages}.
    */
-  private void rollOverAvgs() {
+  private synchronized void rollOverAvgs() {
     if (currentSnapshot == null) {
       return;
     }
@@ -248,4 +255,32 @@ public class RollingAverages extends MutableMetric implements Closeable {
     }
     scheduledTask = null;
   }
+
+  /**
+   * Retrieve a map of metric name -> (aggregate).
+   * Filter out entries that don't have at least minSamples.
+   *
+   * @return a map of peer DataNode Id to the average latency to that
+   *         node seen over the measurement period.
+   */
+  public synchronized Map<String, Double> getStats(long minSamples) {
+    final Map<String, Double> stats = new HashMap<>();
+
+    for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+        : averages.entrySet()) {
+      final String name = entry.getKey();
+      double totalSum = 0;
+      long totalCount = 0;
+
+      for (final SumAndCount sumAndCount : entry.getValue()) {
+        totalCount += sumAndCount.getCount();
+        totalSum += sumAndCount.getSum();
+      }
+
+      if (totalCount > minSamples) {
+        stats.put(name, totalSum / totalCount);
+      }
+    }
+    return stats;
+  }
 }

+ 7 - 6
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java

@@ -42,7 +42,8 @@ public class TestRollingAverages {
   public void testRollingAveragesEmptyRollover() throws Exception {
     final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
     /* 5s interval and 2 windows */
-    try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+    try (final RollingAverages rollingAverages =
+             new RollingAverages(5000, 2)) {
       /* Check it initially */
       rollingAverages.snapshot(rb, true);
       verify(rb, never()).addGauge(
@@ -74,10 +75,10 @@ public class TestRollingAverages {
   public void testRollingAveragesRollover() throws Exception {
     final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
     final String name = "foo2";
-    final int windowSize = 5; // 5s roll over interval
+    final int windowSizeMs = 5000; // 5s roll over interval
     final int numWindows = 2;
     final int numOpsPerIteration = 1000;
-    try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+    try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
         numWindows)) {
 
       /* Push values for three intervals */
@@ -92,7 +93,7 @@ public class TestRollingAverages {
          * Sleep until 1s after the next windowSize seconds interval, to let the
          * metrics roll over
          */
-        final long sleep = (start + (windowSize * 1000 * i) + 1000)
+        final long sleep = (start + (windowSizeMs * i) + 1000)
             - Time.monotonicNow();
         Thread.sleep(sleep);
 
@@ -110,12 +111,12 @@ public class TestRollingAverages {
         final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
             : numOpsPerIteration;
         verify(rb).addGauge(
-            info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+            info("[Foo2]RollingAvgTime", "Rolling average time for foo2"),
             rollingSum / rollingTotal);
 
         /* Verify the metrics were added the right number of times */
         verify(rb, times(i)).addGauge(
-            eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+            eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")),
             anyDouble());
       }
     }

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * A class that allows a DataNode to communicate information about all
+ * its peer DataNodes that appear to be slow.
+ *
+ * The wire representation of this structure is a list of
+ * SlowPeerReportProto messages.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class SlowPeerReports {
+  /**
+   * A map from the DataNode's DataNodeUUID to its aggregate latency
+   * as seen by the reporting node.
+   *
+   * The exact choice of the aggregate is opaque to the NameNode but it
+   * should be chosen consistently by all DataNodes in the cluster.
+   * Examples of aggregates are 90th percentile (good) and mean (not so
+   * good).
+   *
+   * The NameNode must not attempt to interpret the aggregate latencies
+   * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing
+   * latencies across reports from different DataNodes may not be not
+   * meaningful and must be avoided.
+   */
+  @Nonnull
+  private final Map<String, Double> slowPeers;
+
+  /**
+   * An object representing a SlowPeerReports with no entries. Should
+   * be used instead of null or creating new objects when there are
+   * no slow peers to report.
+   */
+  public static final SlowPeerReports EMPTY_REPORT =
+      new SlowPeerReports(ImmutableMap.of());
+
+  private SlowPeerReports(Map<String, Double> slowPeers) {
+    this.slowPeers = slowPeers;
+  }
+
+  public static SlowPeerReports create(
+      @Nullable Map<String, Double> slowPeers) {
+    if (slowPeers == null || slowPeers.isEmpty()) {
+      return EMPTY_REPORT;
+    }
+    return new SlowPeerReports(slowPeers);
+  }
+
+  public Map<String, Double> getSlowPeers() {
+    return slowPeers;
+  }
+
+  public boolean haveSlowPeers() {
+    return slowPeers.size() > 0;
+  }
+
+  /**
+   * Return true if the two objects represent the same set slow peer
+   * entries. Primarily for unit testing convenience.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof SlowPeerReports)) {
+      return false;
+    }
+
+    SlowPeerReports that = (SlowPeerReports) o;
+
+    return slowPeers.equals(that.slowPeers);
+  }
+
+  @Override
+  public int hashCode() {
+    return slowPeers.hashCode();
+  }
+}

+ 18 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -457,14 +457,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_METRICS_SESSION_ID_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
   public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
-  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
-      "dfs.metrics.rolling.average.window.size";
-  public static final int     DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
-      3600;
-  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
-      "dfs.metrics.rolling.average.window.numbers";
-  public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
-      48;
+
+  // The following setting is not meant to be changed by administrators.
+  public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
+      "dfs.metrics.rolling.averages.window.length";
+  public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
+      "5m";
+
+  // The following setting is not meant to be changed by administrators.
+  public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
+      "dfs.metrics.rolling.average.num.windows";
+  public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
+      36;
+
   public static final String  DFS_DATANODE_PEER_STATS_ENABLED_KEY =
       "dfs.datanode.peer.stats.enabled";
   public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
@@ -669,6 +674,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
   public static final int     DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
 
+  public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY =
+      "dfs.datanode.slow.peers.report.interval";
+  public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT =
+      "30m";
+
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
   public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -71,6 +72,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
+import javax.annotation.Nonnull;
+
 /**
  * This class is the client side translator to translate the requests made on
  * {@link DatanodeProtocol} interfaces to the RPC server implementing
@@ -132,7 +135,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xmitsInProgress, int xceiverCount, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
-      boolean requestFullBlockReportLease) throws IOException {
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -149,6 +153,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
           volumeFailureSummary));
     }
+    if (slowPeers.haveSlowPeers()) {
+      builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
+    }
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java

@@ -120,7 +120,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           report, request.getCacheCapacity(), request.getCacheUsed(),
           request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes(),
-          volumeFailureSummary, request.getRequestFullBlockReportLease());
+          volumeFailureSummary, request.getRequestFullBlockReportLease(),
+          PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -20,7 +20,10 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import com.google.protobuf.ByteString;
 
@@ -44,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
@@ -107,6 +111,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 
 /**
@@ -829,6 +834,45 @@ public class PBHelper {
     return builder.build();
   }
 
+  public static List<SlowPeerReportProto> convertSlowPeerInfo(
+      SlowPeerReports slowPeers) {
+    if (slowPeers.getSlowPeers().size() == 0) {
+      return Collections.emptyList();
+    }
+
+    List<SlowPeerReportProto> slowPeerInfoProtos =
+        new ArrayList<>(slowPeers.getSlowPeers().size());
+    for (Map.Entry<String, Double> entry :
+        slowPeers.getSlowPeers().entrySet()) {
+      slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder()
+              .setDataNodeId(entry.getKey())
+              .setAggregateLatency(entry.getValue())
+              .build());
+    }
+    return slowPeerInfoProtos;
+  }
+
+  public static SlowPeerReports convertSlowPeerInfo(
+      List<SlowPeerReportProto> slowPeerProtos) {
+
+    // No slow peers, or possibly an older DataNode.
+    if (slowPeerProtos == null || slowPeerProtos.size() == 0) {
+      return SlowPeerReports.EMPTY_REPORT;
+    }
+
+    Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size());
+    for (SlowPeerReportProto proto : slowPeerProtos) {
+      if (!proto.hasDataNodeId()) {
+        // The DataNodeId should be reported.
+        continue;
+      }
+      slowPeersMap.put(
+          proto.getDataNodeId(),
+          proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0);
+    }
+    return SlowPeerReports.create(slowPeersMap);
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

+ 41 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -50,7 +50,10 @@ import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Timer;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
@@ -172,6 +175,14 @@ public class DatanodeManager {
    */
   private final HashMap<String, Integer> datanodesSoftwareVersions =
     new HashMap<>(4, 0.75f);
+
+  /**
+   * True if we should process latency metrics from downstream peers.
+   */
+  private final boolean dataNodePeerStatsEnabled;
+
+  @Nullable
+  private final SlowPeerTracker slowPeerTracker;
   
   /**
    * The minimum time between resending caching directives to Datanodes,
@@ -194,6 +205,12 @@ public class DatanodeManager {
     this.decomManager = new DecommissionManager(namesystem, blockManager,
         heartbeatManager);
     this.fsClusterStats = newFSClusterStats();
+    this.dataNodePeerStatsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+    this.slowPeerTracker = dataNodePeerStatsEnabled ?
+        new SlowPeerTracker(conf, new Timer()) : null;
 
     this.defaultXferPort = NetUtils.createSocketAddr(
           conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
@@ -1566,7 +1583,8 @@ public class DatanodeManager {
       StorageReport[] reports, final String blockPoolId,
       long cacheCapacity, long cacheUsed, int xceiverCount, 
       int maxTransfers, int failedVolumes,
-      VolumeFailureSummary volumeFailureSummary) throws IOException {
+      VolumeFailureSummary volumeFailureSummary,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     final DatanodeDescriptor nodeinfo;
     try {
       nodeinfo = getDatanode(nodeReg);
@@ -1632,6 +1650,19 @@ public class DatanodeManager {
       nodeinfo.setBalancerBandwidth(0);
     }
 
+    if (slowPeerTracker != null) {
+      final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
+      if (!slowPeersMap.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
+              slowPeersMap);
+        }
+        for (String slowNodeId : slowPeersMap.keySet()) {
+          slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
+        }
+      }
+    }
+
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }
@@ -1834,5 +1865,14 @@ public class DatanodeManager {
     this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
   }
+
+  /**
+   * Retrieve information about slow peers as a JSON.
+   * Returns null if we are not tracking slow peers.
+   * @return
+   */
+  public String getSlowPeersReport() {
+    return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
+  }
 }
 

+ 273 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java

@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * This class aggregates information from {@link SlowPeerReports} received via
+ * heartbeats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowPeerTracker {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SlowPeerTracker.class);
+
+  /**
+   * Time duration after which a report is considered stale. This is
+   * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e.
+   * maintained for at least two successive reports.
+   */
+  private final long reportValidityMs;
+
+  /**
+   * Timer object for querying the current time. Separated out for
+   * unit testing.
+   */
+  private final Timer timer;
+
+  /**
+   * Number of nodes to include in JSON report. We will return nodes with
+   * the highest number of votes from peers.
+   */
+  private static final int MAX_NODES_TO_REPORT = 5;
+
+  /**
+   * Information about peers that have reported a node as being slow.
+   * Each outer map entry is a map of (DatanodeId) -> (timestamp),
+   * mapping reporting nodes to the timestamp of the last report from
+   * that node.
+   *
+   * DatanodeId could be the DataNodeId or its address. We
+   * don't care as long as the caller uses it consistently.
+   *
+   * Stale reports are not evicted proactively and can potentially
+   * hang around forever.
+   */
+  private final ConcurrentMap<String, ConcurrentMap<String, Long>>
+      allReports;
+
+  public SlowPeerTracker(Configuration conf, Timer timer) {
+    this.timer = timer;
+    this.allReports = new ConcurrentHashMap<>();
+    this.reportValidityMs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS) * 3;
+  }
+
+  /**
+   * Add a new report. DatanodeIds can be the DataNodeIds or addresses
+   * We don't care as long as the caller is consistent.
+   *
+   * @param reportingNode DataNodeId of the node reporting on its peer.
+   * @param slowNode DataNodeId of the peer suspected to be slow.
+   */
+  public void addReport(String slowNode,
+                        String reportingNode) {
+    ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
+
+    if (nodeEntries == null) {
+      // putIfAbsent guards against multiple writers.
+      allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
+      nodeEntries = allReports.get(slowNode);
+    }
+
+    // Replace the existing entry from this node, if any.
+    nodeEntries.put(reportingNode, timer.monotonicNow());
+  }
+
+  /**
+   * Retrieve the non-expired reports that mark a given DataNode
+   * as slow. Stale reports are excluded.
+   *
+   * @param slowNode target node Id.
+   * @return set of reports which implicate the target node as being slow.
+   */
+  public Set<String> getReportsForNode(String slowNode) {
+    final ConcurrentMap<String, Long> nodeEntries =
+        allReports.get(slowNode);
+
+    if (nodeEntries == null || nodeEntries.isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    return filterNodeReports(nodeEntries, timer.monotonicNow());
+  }
+
+  /**
+   * Retrieve all reports for all nodes. Stale reports are excluded.
+   *
+   * @return map from SlowNodeId -> (set of nodes reporting peers).
+   */
+  public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
+    if (allReports.isEmpty()) {
+      return ImmutableMap.of();
+    }
+
+    final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
+    final long now = timer.monotonicNow();
+
+    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
+        allReports.entrySet()) {
+      SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
+      if (!validReports.isEmpty()) {
+        allNodesValidReports.put(entry.getKey(), validReports);
+      }
+    }
+    return allNodesValidReports;
+  }
+
+  /**
+   * Filter the given reports to return just the valid ones.
+   *
+   * @param reports
+   * @param now
+   * @return
+   */
+  private SortedSet<String> filterNodeReports(
+      ConcurrentMap<String, Long> reports, long now) {
+    final SortedSet<String> validReports = new TreeSet<>();
+
+    for (Map.Entry<String, Long> entry : reports.entrySet()) {
+      if (now - entry.getValue() < reportValidityMs) {
+        validReports.add(entry.getKey());
+      }
+    }
+    return validReports;
+  }
+
+  /**
+   * Retrieve all valid reports as a JSON string.
+   * @return serialized representation of valid reports. null if
+   *         serialization failed.
+   */
+  public String getJson() {
+    Collection<ReportForJson> validReports = getJsonReports(
+        MAX_NODES_TO_REPORT);
+    ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      return objectMapper.writeValueAsString(validReports);
+    } catch (JsonProcessingException e) {
+      // Failed to serialize. Don't log the exception call stack.
+      LOG.debug("Failed to serialize statistics" + e);
+      return null;
+    }
+  }
+
+  /**
+   * This structure is a thin wrapper over reports to make Json
+   * [de]serialization easy.
+   */
+  public static class ReportForJson {
+    @JsonProperty("SlowNode")
+    final private String slowNode;
+
+    @JsonProperty("ReportingNodes")
+    final private SortedSet<String> reportingNodes;
+
+    public ReportForJson(
+        @JsonProperty("SlowNode") String slowNode,
+        @JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
+      this.slowNode = slowNode;
+      this.reportingNodes = reportingNodes;
+    }
+
+    public String getSlowNode() {
+      return slowNode;
+    }
+
+    public SortedSet<String> getReportingNodes() {
+      return reportingNodes;
+    }
+  }
+
+  /**
+   * Retrieve reports in a structure for generating JSON, limiting the
+   * output to the top numNodes nodes i.e nodes with the most reports.
+   * @param numNodes number of nodes to return. This is to limit the
+   *                 size of the generated JSON.
+   */
+  private Collection<ReportForJson> getJsonReports(int numNodes) {
+    if (allReports.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final PriorityQueue<ReportForJson> topNReports =
+        new PriorityQueue<>(allReports.size(),
+            new Comparator<ReportForJson>() {
+          @Override
+          public int compare(ReportForJson o1, ReportForJson o2) {
+            return Ints.compare(o1.reportingNodes.size(),
+                o2.reportingNodes.size());
+          }
+        });
+
+    final long now = timer.monotonicNow();
+
+    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
+        allReports.entrySet()) {
+      SortedSet<String> validReports = filterNodeReports(
+          entry.getValue(), now);
+      if (!validReports.isEmpty()) {
+        if (topNReports.size() < numNodes) {
+          topNReports.add(new ReportForJson(entry.getKey(), validReports));
+        } else if (topNReports.peek().getReportingNodes().size() <
+            validReports.size()){
+          // Remove the lowest element
+          topNReports.poll();
+          topNReports.add(new ReportForJson(entry.getKey(), validReports));
+        }
+      }
+    }
+    return topNReports;
+  }
+
+  @VisibleForTesting
+  long getReportValidityMs() {
+    return reportValidityMs;
+  }
+}

+ 33 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -127,7 +128,8 @@ class BPServiceActor implements Runnable {
     this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
     scheduler = new Scheduler(dnConf.heartBeatInterval,
-        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
+        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
+        dnConf.slowPeersReportIntervalMs);
     // get the value of maxDataLength.
     this.maxDataLength = dnConf.getMaxDataLength();
   }
@@ -489,12 +491,18 @@ class BPServiceActor implements Runnable {
                 " storage reports from service actor: " + this);
     }
     
-    scheduler.updateLastHeartbeatTime(monotonicNow());
+    final long now = monotonicNow();
+    scheduler.updateLastHeartbeatTime(now);
     VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
         .getVolumeFailureSummary();
     int numFailedVolumes = volumeFailureSummary != null ?
         volumeFailureSummary.getFailedStorageLocations().length : 0;
-    return bpNamenode.sendHeartbeat(bpRegistration,
+    final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
+    final SlowPeerReports slowPeers =
+        slowPeersReportDue && dn.getPeerMetrics() != null ?
+            SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
+            SlowPeerReports.EMPTY_REPORT;
+    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
         dn.getFSDataset().getCacheUsed(),
@@ -502,7 +510,14 @@ class BPServiceActor implements Runnable {
         dn.getXceiverCount(),
         numFailedVolumes,
         volumeFailureSummary,
-        requestBlockReportLease);
+        requestBlockReportLease,
+        slowPeers);
+
+    if (slowPeersReportDue) {
+      // If the report was due and successfully sent, schedule the next one.
+      scheduler.scheduleNextSlowPeerReport();
+    }
+    return response;
   }
 
   @VisibleForTesting
@@ -1079,18 +1094,23 @@ class BPServiceActor implements Runnable {
     @VisibleForTesting
     boolean resetBlockReportTime = true;
 
+    @VisibleForTesting
+    volatile long nextSlowPeersReportTime = monotonicNow();
+
     private final AtomicBoolean forceFullBlockReport =
         new AtomicBoolean(false);
 
     private final long heartbeatIntervalMs;
     private final long lifelineIntervalMs;
     private final long blockReportIntervalMs;
+    private final long slowPeersReportIntervalMs;
 
     Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
-        long blockReportIntervalMs) {
+              long blockReportIntervalMs, long slowPeersReportIntervalMs) {
       this.heartbeatIntervalMs = heartbeatIntervalMs;
       this.lifelineIntervalMs = lifelineIntervalMs;
       this.blockReportIntervalMs = blockReportIntervalMs;
+      this.slowPeersReportIntervalMs = slowPeersReportIntervalMs;
       scheduleNextLifeline(nextHeartbeatTime);
     }
 
@@ -1123,6 +1143,10 @@ class BPServiceActor implements Runnable {
       lastBlockReportTime = blockReportTime;
     }
 
+    void scheduleNextSlowPeerReport() {
+      nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs;
+    }
+
     long getLastHearbeatTime() {
       return (monotonicNow() - lastHeartbeatTime)/1000;
     }
@@ -1149,6 +1173,10 @@ class BPServiceActor implements Runnable {
       return nextBlockReportTime - curTime <= 0;
     }
 
+    boolean isSlowPeersReportDue(long curTime) {
+      return nextSlowPeersReportTime - curTime <= 0;
+    }
+
     void forceFullBlockReportNow() {
       forceFullBlockReport.set(true);
       resetBlockReportTime = true;

+ 11 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -93,7 +94,7 @@ class BlockReceiver implements Closeable {
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
-  private String bracketedMirrorAddr;
+  private String mirrorNameForMetrics;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
   private DataTransferThrottler throttler;
@@ -843,10 +844,9 @@ class BlockReceiver implements Closeable {
    * </p>
    */
   private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
-    if (isPenultimateNode && mirrorAddr != null) {
-      datanode.getPeerMetrics().addSendPacketDownstream(
-          bracketedMirrorAddr,
-          elapsedMs);
+    final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
+    if (peerMetrics != null && isPenultimateNode) {
+      peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
     }
   }
 
@@ -927,8 +927,13 @@ class BlockReceiver implements Closeable {
     boolean responderClosed = false;
     mirrorOut = mirrOut;
     mirrorAddr = mirrAddr;
-    bracketedMirrorAddr = "[" + mirrAddr + "]";
     isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+    if (isPenultimateNode) {
+      mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
+          downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
+      LOG.debug("Will collect peer metrics for downstream node {}",
+          mirrorNameForMetrics);
+    }
     throttler = throttlerArg;
 
     this.replyOut = replyOut;

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@@ -94,6 +96,8 @@ public class DNConf {
   private final long lifelineIntervalMs;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
+  final boolean peerStatsEnabled;
+  final long slowPeersReportIntervalMs;
   final long ibrInterval;
   final long initialBlockReportDelayMs;
   final long cacheReportInterval;
@@ -173,6 +177,13 @@ public class DNConf {
     this.blockReportInterval = getConf().getLong(
         DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.peerStatsEnabled = getConf().getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+    this.slowPeersReportIntervalMs = getConf().getTimeDuration(
+        DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
+        DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
     this.ibrInterval = getConf().getLong(
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -334,6 +334,7 @@ public class DataNode extends ReconfigurableBase
   private int infoSecurePort;
 
   DataNodeMetrics metrics;
+  @Nullable
   private DataNodePeerMetrics peerMetrics;
   private InetSocketAddress streamingAddr;
   
@@ -422,6 +423,7 @@ public class DataNode extends ReconfigurableBase
     this.blockScanner = new BlockScanner(this, this.getConf());
     this.pipelineSupportECN = false;
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.dnConf = new DNConf(this);
     initOOBTimeout();
     storageLocationChecker = null;
     volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@@ -1363,7 +1365,8 @@ public class DataNode extends ReconfigurableBase
     initIpcServer();
 
     metrics = DataNodeMetrics.create(getConf(), getDisplayName());
-    peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
+    peerMetrics = dnConf.peerStatsEnabled ?
+        DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
     ecWorker = new ErasureCodingWorker(getConf(), this);
@@ -3456,6 +3459,7 @@ public class DataNode extends ReconfigurableBase
 
   @Override // DataNodeMXBean
   public String getSendPacketDownstreamAvgInfo() {
-    return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+    return peerMetrics != null ?
+        peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
   }
 }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -341,7 +341,9 @@ class DataXceiver extends Receiver implements Runnable {
    * the thread dies away.
    */
   private void collectThreadLocalStates() {
-    datanode.getPeerMetrics().collectThreadLocalStates();
+    if (datanode.getPeerMetrics() != null) {
+      datanode.getPeerMetrics().collectThreadLocalStates();
+    }
   }
 
   @Override

+ 54 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java

@@ -18,40 +18,59 @@
 
 package org.apache.hadoop.hdfs.server.datanode.metrics;
 
-import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics2.MetricsJsonBuilder;
 import org.apache.hadoop.metrics2.lib.RollingAverages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
  * various peer operations.
  */
 @InterfaceAudience.Private
+@InterfaceStability.Unstable
 public class DataNodePeerMetrics {
 
-  static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DataNodePeerMetrics.class);
 
   private final RollingAverages sendPacketDownstreamRollingAvgerages;
 
   private final String name;
-  private final boolean peerStatsEnabled;
+
+  /**
+   * Threshold in milliseconds below which a DataNode is definitely not slow.
+   */
+  private static final long LOW_THRESHOLD_MS = 5;
+
+  private final SlowNodeDetector slowNodeDetector;
+
+  /**
+   * Minimum number of packet send samples which are required to qualify
+   * for outlier detection. If the number of samples is below this then
+   * outlier detection is skipped.
+   */
+  @VisibleForTesting
+  static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
 
   public DataNodePeerMetrics(
       final String name,
-      final int windowSize,
-      final int numWindows,
-      final boolean peerStatsEnabled) {
+      final long windowSizeMs,
+      final int numWindows) {
     this.name = name;
-    this.peerStatsEnabled = peerStatsEnabled;
+    this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS);
     sendPacketDownstreamRollingAvgerages = new RollingAverages(
-        windowSize,
-        numWindows);
+        windowSizeMs, numWindows);
   }
 
   public String name() {
@@ -66,21 +85,18 @@ public class DataNodePeerMetrics {
         ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
         : dnName.replace(':', '-'));
 
-    final int windowSize = conf.getInt(
-            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
-            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+    final long windowSizeMs = conf.getTimeDuration(
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
+            TimeUnit.MILLISECONDS);
     final int numWindows = conf.getInt(
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
-    final boolean peerStatsEnabled = conf.getBoolean(
-        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
-        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
 
     return new DataNodePeerMetrics(
         name,
-        windowSize,
-        numWindows,
-        peerStatsEnabled);
+        windowSizeMs,
+        numWindows);
   }
 
   /**
@@ -94,9 +110,7 @@ public class DataNodePeerMetrics {
   public void addSendPacketDownstream(
       final String peerAddr,
       final long elapsedMs) {
-    if (peerStatsEnabled) {
-      sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
-    }
+    sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
   }
 
   /**
@@ -114,4 +128,19 @@ public class DataNodePeerMetrics {
   public void collectThreadLocalStates() {
     sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
   }
+
+  /**
+   * Retrieve the set of dataNodes that look significantly slower
+   * than their peers.
+   */
+  public Map<String, Double> getOutliers() {
+    // This maps the metric name to the aggregate latency.
+    // The metric name is the datanode ID.
+    final Map<String, Double> stats =
+        sendPacketDownstreamRollingAvgerages.getStats(
+            MIN_OUTLIER_DETECTION_SAMPLES);
+    LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
+
+    return slowNodeDetector.getOutliers(stats);
+  }
 }

+ 194 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java

@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A utility class to help detect nodes whose aggregate latency
+ * is an outlier within a given set.
+ *
+ * We use the median absolute deviation for outlier detection as
+ * described in the following publication:
+ *
+ * Leys, C., et al., Detecting outliers: Do not use standard deviation
+ * around the mean, use absolute deviation around the median.
+ * http://dx.doi.org/10.1016/j.jesp.2013.03.013
+ *
+ * We augment the above scheme with the following heuristics to be even
+ * more conservative:
+ *
+ *  1. Skip outlier detection if the sample size is too small.
+ *  2. Never flag nodes whose aggregate latency is below a low threshold.
+ *  3. Never flag nodes whose aggregate latency is less than a small
+ *     multiple of the median.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowNodeDetector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SlowNodeDetector.class);
+
+  /**
+   * Minimum number of peers to run outlier detection.
+   */
+  private static long minOutlierDetectionPeers = 10;
+
+  /**
+   * The multiplier is from Leys, C. et al.
+   */
+  private static final double MAD_MULTIPLIER = (double) 1.4826;
+
+  /**
+   * Threshold in milliseconds below which a DataNode is definitely not slow.
+   */
+  private final long lowThresholdMs;
+
+  /**
+   * Deviation multiplier. A sample is considered to be an outlier if it
+   * exceeds the median by (multiplier * median abs. deviation). 3 is a
+   * conservative choice.
+   */
+  private static final int DEVIATION_MULTIPLIER = 3;
+
+  /**
+   * If most of the samples are clustered together, the MAD can be
+   * low. The median multiplier introduces another safeguard to avoid
+   * overaggressive outlier detection.
+   */
+  @VisibleForTesting
+  static final int MEDIAN_MULTIPLIER = 3;
+
+  public SlowNodeDetector(long lowThresholdMs) {
+    this.lowThresholdMs = lowThresholdMs;
+  }
+
+  /**
+   * Return a set of DataNodes whose latency is much higher than
+   * their peers. The input is a map of (node -> aggregate latency)
+   * entries.
+   *
+   * The aggregate may be an arithmetic mean or a percentile e.g.
+   * 90th percentile. Percentiles are a better choice than median
+   * since latency is usually not a normal distribution.
+   *
+   * This method allocates temporary memory O(n) and
+   * has run time O(n.log(n)), where n = stats.size().
+   *
+   * @return
+   */
+  public Map<String, Double> getOutliers(Map<String, Double> stats) {
+    if (stats.size() < minOutlierDetectionPeers) {
+      LOG.debug("Skipping statistical outlier detection as we don't have " +
+              "latency data for enough peers. Have {}, need at least {}",
+          stats.size(), minOutlierDetectionPeers);
+      return ImmutableMap.of();
+    }
+    // Compute the median absolute deviation of the aggregates.
+    final List<Double> sorted = new ArrayList<>(stats.values());
+    Collections.sort(sorted);
+    final Double median = computeMedian(sorted);
+    final Double mad = computeMad(sorted);
+    Double upperLimitLatency = Math.max(
+        lowThresholdMs, median * MEDIAN_MULTIPLIER);
+    upperLimitLatency = Math.max(
+        upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
+
+    final Map<String, Double> slowNodes = new HashMap<>();
+
+    LOG.trace("getOutliers: List={}, MedianLatency={}, " +
+        "MedianAbsoluteDeviation={}, upperLimitLatency={}",
+        sorted, median, mad, upperLimitLatency);
+
+    // Find nodes whose latency exceeds the threshold.
+    for (Map.Entry<String, Double> entry : stats.entrySet()) {
+      if (entry.getValue() > upperLimitLatency) {
+        slowNodes.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return slowNodes;
+  }
+
+  /**
+   * Compute the Median Absolute Deviation of a sorted list.
+   */
+  public static Double computeMad(List<Double> sortedValues) {
+    if (sortedValues.size() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot compute the Median Absolute Deviation " +
+              "of an empty list.");
+    }
+
+    // First get the median of the values.
+    Double median = computeMedian(sortedValues);
+    List<Double> deviations = new ArrayList<>(sortedValues);
+
+    // Then update the list to store deviation from the median.
+    for (int i = 0; i < sortedValues.size(); ++i) {
+      deviations.set(i, Math.abs(sortedValues.get(i) - median));
+    }
+
+    // Finally get the median absolute deviation.
+    Collections.sort(deviations);
+    return computeMedian(deviations) * MAD_MULTIPLIER;
+  }
+
+  /**
+   * Compute the median of a sorted list.
+   */
+  public static Double computeMedian(List<Double> sortedValues) {
+    if (sortedValues.size() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot compute the median of an empty list.");
+    }
+
+    Double median = sortedValues.get(sortedValues.size() / 2);
+    if (sortedValues.size() % 2 == 0) {
+      median += sortedValues.get((sortedValues.size() / 2) - 1);
+      median /= 2;
+    }
+    return median;
+  }
+
+  /**
+   * This method *must not* be used outside of unit tests.
+   */
+  @VisibleForTesting
+  static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) {
+    SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers;
+  }
+
+  @VisibleForTesting
+  static long getMinOutlierDetectionPeers() {
+    return minOutlierDetectionPeers;
+  }
+}

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -129,6 +129,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.annotation.Nonnull;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
@@ -255,6 +256,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -3639,7 +3641,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
-      boolean requestFullBlockReportLease) throws IOException {
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3647,7 +3650,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
-          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
+          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
+          slowPeers);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -1822,6 +1822,12 @@ public class NameNode extends ReconfigurableBase implements
     return getNamesystem().getBytesInFuture();
   }
 
+  @Override
+  public String getSlowPeersReport() {
+    return namesystem.getBlockManager().getDatanodeManager()
+        .getSlowPeersReport();
+  }
+
   /**
    * Shutdown the NN immediately in an ungraceful way. Used when it would be
    * unsafe for the NN to continue operating, e.g. during a failed HA state

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -205,6 +206,8 @@ import org.slf4j.Logger;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
+import javax.annotation.Nonnull;
+
 /**
  * This class is responsible for handling all of the RPC calls to the NameNode.
  * It is created, started, and stopped by {@link NameNode}.
@@ -1418,12 +1421,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
       int xmitsInProgress, int xceiverCount,
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
-      boolean requestFullBlockReportLease) throws IOException {
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
-        failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
+        failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
+        slowPeers);
   }
 
   @Override // DatanodeProtocol

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java

@@ -69,4 +69,10 @@ public interface NameNodeStatusMXBean {
    * @return number of bytes that can be deleted if exited from safe mode.
    */
   long getBytesWithFutureGenerationStamps();
+
+  /**
+   * Retrieves information about slow DataNodes, if the feature is
+   * enabled. The report is in a JSON format.
+   */
+  String getSlowPeersReport();
 }

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.KerberosInfo;
 
+import javax.annotation.Nonnull;
+
 /**********************************************************************
  * Protocol that a DFS datanode uses to communicate with the NameNode.
  * It's used to upload current load information and block reports.
@@ -105,6 +107,9 @@ public interface DatanodeProtocol {
    * @param volumeFailureSummary info about volume failures
    * @param requestFullBlockReportLease whether to request a full block
    *                                    report lease.
+   * @param slowPeers Details of peer DataNodes that were detected as being
+   *                  slow to respond to packet writes. Empty report if no
+   *                  slow peers were detected by the DataNode.
    * @throws IOException on error
    */
   @Idempotent
@@ -116,7 +121,8 @@ public interface DatanodeProtocol {
                                        int xceiverCount,
                                        int failedVolumes,
                                        VolumeFailureSummary volumeFailureSummary,
-                                       boolean requestFullBlockReportLease)
+                                       boolean requestFullBlockReportLease,
+                                       @Nonnull SlowPeerReports slowPeers)
       throws IOException;
 
   /**

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -195,6 +195,7 @@ message VolumeFailureSummaryProto {
  * cacheCapacity - total cache capacity available at the datanode
  * cacheUsed - amount of cache used
  * volumeFailureSummary - info about volume failures
+ * slowPeers - info about peer DataNodes that are suspected to be slow.
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -206,6 +207,7 @@ message HeartbeatRequestProto {
   optional uint64 cacheUsed = 7 [default = 0 ];
   optional VolumeFailureSummaryProto volumeFailureSummary = 8;
   optional bool requestFullBlockReportLease = 9 [ default = false ];
+  repeated SlowPeerReportProto slowPeers = 10;
 }
 
 /**
@@ -385,6 +387,24 @@ message CommitBlockSynchronizationRequestProto {
 message CommitBlockSynchronizationResponseProto {
 }
 
+/**
+ * Information about a single slow peer that may be reported by
+ * the DataNode to the NameNode as part of the heartbeat request.
+ * The message includes the peer's DataNodeId and its
+ * aggregate packet latency as observed by the reporting DataNode.
+ * (DataNodeId must be transmitted as a string for protocol compability
+ *  with earlier versions of Hadoop).
+ *
+ * The exact choice of the aggregate is opaque to the NameNode but it
+ * _should_ be chosen consistenly by all DataNodes in the cluster.
+ * Examples of aggregates are 90th percentile (good) and mean (not so
+ * good).
+ */
+message SlowPeerReportProto {
+  optional string dataNodeId = 1;
+  optional double aggregateLatency = 2;
+}
+
 /**
  * Protocol used from datanode to the namenode
  * See the request and response for details of rpc call.

+ 7 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1980,19 +1980,15 @@
 </property>
 
 <property>
-  <name>dfs.metrics.rolling.average.window.size</name>
-  <value>3600</value>
+  <name>dfs.datanode.slow.peers.report.interval</name>
+  <value>30m</value>
   <description>
-    The number of seconds of each window for which sub set of samples are gathered
-    to compute the rolling average, A.K.A. roll over interval.
-  </description>
-</property>
+    This setting controls how frequently DataNodes will report their peer
+    latencies to the NameNode via heartbeats.  This setting supports
+    multiple time unit suffixes as described in dfs.heartbeat.interval.
+    If no suffix is specified then milliseconds is assumed.
 
-<property>
-  <name>dfs.metrics.rolling.average.window.numbers</name>
-  <value>48</value>
-  <description>
-    The number of windows maintained to compute the rolling average.
+    It is ignored if dfs.datanode.peer.stats.enabled is false.
   </description>
 </property>
 

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
@@ -770,6 +772,26 @@ public class TestPBHelper {
     assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
   }
 
+  @Test
+  public void testSlowPeerInfoPBHelper() {
+    // Test with a map that has a few slow peer entries.
+    final SlowPeerReports slowPeers = SlowPeerReports.create(
+        ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0));
+    SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo(
+        PBHelper.convertSlowPeerInfo(slowPeers));
+    assertTrue(
+        "Expected map:" + slowPeers + ", got map:" +
+            slowPeersConverted1.getSlowPeers(),
+        slowPeersConverted1.equals(slowPeers));
+
+    // Test with an empty map.
+    SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo(
+        PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT));
+    assertTrue(
+        "Expected empty map:" + ", got map:" + slowPeersConverted2,
+        slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
+  }
+
   private void assertBlockECRecoveryInfoEquals(
       BlockECReconstructionInfo blkECRecoveryInfo1,
       BlockECReconstructionInfo blkECRecoveryInfo2) {

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java

@@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 /**
  * Test if FSNamesystem handles heartbeat right
  */
 public class TestHeartbeatHandling {
+
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
   /**
    * Test if
    * {@link FSNamesystem#handleHeartbeat}

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import com.google.common.base.Supplier;
 import java.util.ArrayList;
 import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -112,7 +114,7 @@ public class TestNameNodePrunesMissingStorages {
       // Stop the DataNode and send fake heartbeat with missing storage.
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
-          0, null, true);
+          0, null, true, SlowPeerReports.EMPTY_REPORT);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

+ 226 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java

@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link SlowPeerTracker}.
+ */
+public class TestSlowPeerTracker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestSlowPeerTracker.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private Configuration conf;
+  private SlowPeerTracker tracker;
+  private FakeTimer timer;
+  private long reportValidityMs;
+
+  @Before
+  public void setup() {
+    conf = new HdfsConfiguration();
+    timer = new FakeTimer();
+    tracker = new SlowPeerTracker(conf, timer);
+    reportValidityMs = tracker.getReportValidityMs();
+  }
+
+  /**
+   * Edge case, there are no reports to retrieve.
+   */
+  @Test
+  public void testEmptyReports() {
+    assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+    assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
+  }
+
+  @Test
+  public void testReportsAreRetrieved() {
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node3", "node1");
+    tracker.addReport("node3", "node2");
+
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
+    assertThat(tracker.getReportsForNode("node2").size(), is(1));
+    assertThat(tracker.getReportsForNode("node3").size(), is(2));
+    assertThat(tracker.getReportsForNode("node1").size(), is(0));
+  }
+
+  /**
+   * Test that when all reports are expired, we get back nothing.
+   */
+  @Test
+  public void testAllReportsAreExpired() {
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node3", "node2");
+    tracker.addReport("node1", "node3");
+
+    // No reports should expire after 1ms.
+    timer.advance(1);
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(3));
+
+    // All reports should expire after REPORT_VALIDITY_MS.
+    timer.advance(reportValidityMs);
+    assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+    assertTrue(tracker.getReportsForNode("node1").isEmpty());
+    assertTrue(tracker.getReportsForNode("node2").isEmpty());
+    assertTrue(tracker.getReportsForNode("node3").isEmpty());
+  }
+
+  /**
+   * Test the case when a subset of reports has expired.
+   * Ensure that we only get back non-expired reports.
+   */
+  @Test
+  public void testSomeReportsAreExpired() {
+    tracker.addReport("node3", "node1");
+    tracker.addReport("node3", "node2");
+    timer.advance(reportValidityMs);
+    tracker.addReport("node3", "node4");
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+    assertThat(tracker.getReportsForNode("node3").size(), is(1));
+    assertTrue(tracker.getReportsForNode("node3").contains("node4"));
+  }
+
+  /**
+   * Test the case when an expired report is replaced by a valid one.
+   */
+  @Test
+  public void testReplacement() {
+    tracker.addReport("node2", "node1");
+    timer.advance(reportValidityMs); // Expire the report.
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
+
+    // This should replace the expired report with a newer valid one.
+    tracker.addReport("node2", "node1");
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+    assertThat(tracker.getReportsForNode("node2").size(), is(1));
+  }
+
+  @Test
+  public void testGetJson() throws IOException {
+    tracker.addReport("node1", "node2");
+    tracker.addReport("node2", "node3");
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node4", "node1");
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // And ensure its contents are what we expect.
+    assertThat(reports.size(), is(3));
+    assertTrue(isNodeInReports(reports, "node1"));
+    assertTrue(isNodeInReports(reports, "node2"));
+    assertTrue(isNodeInReports(reports, "node4"));
+
+    assertFalse(isNodeInReports(reports, "node3"));
+  }
+
+  @Test
+  public void testGetJsonSizeIsLimited() throws IOException {
+    tracker.addReport("node1", "node2");
+    tracker.addReport("node1", "node3");
+    tracker.addReport("node2", "node3");
+    tracker.addReport("node2", "node4");
+    tracker.addReport("node3", "node4");
+    tracker.addReport("node3", "node5");
+    tracker.addReport("node4", "node6");
+    tracker.addReport("node5", "node6");
+    tracker.addReport("node5", "node7");
+    tracker.addReport("node6", "node7");
+    tracker.addReport("node6", "node8");
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // Ensure that node4 is not in the list since it was
+    // tagged by just one peer and we already have 5 other nodes.
+    assertFalse(isNodeInReports(reports, "node4"));
+
+    // Remaining nodes should be in the list.
+    assertTrue(isNodeInReports(reports, "node1"));
+    assertTrue(isNodeInReports(reports, "node2"));
+    assertTrue(isNodeInReports(reports, "node3"));
+    assertTrue(isNodeInReports(reports, "node5"));
+    assertTrue(isNodeInReports(reports, "node6"));
+  }
+
+  @Test
+  public void testLowRankedElementsIgnored() throws IOException {
+    // Insert 5 nodes with 2 peer reports each.
+    for (int i = 0; i < 5; ++i) {
+      tracker.addReport("node" + i, "reporter1");
+      tracker.addReport("node" + i, "reporter2");
+    }
+
+    // Insert 10 nodes with 1 peer report each.
+    for (int i = 10; i < 20; ++i) {
+      tracker.addReport("node" + i, "reporter1");
+    }
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // Ensure that only the first 5 nodes with two reports each were
+    // included in the JSON.
+    for (int i = 0; i < 5; ++i) {
+      assertTrue(isNodeInReports(reports, "node" + i));
+    }
+  }
+
+  private boolean isNodeInReports(
+      Set<ReportForJson> reports, String node) {
+    for (ReportForJson report : reports) {
+      if (report.getSlowNode().equalsIgnoreCase(node)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private Set<ReportForJson> getAndDeserializeJson()
+      throws IOException {
+    final String json = tracker.getJson();
+    LOG.info("Got JSON: {}", json);
+    return (new ObjectMapper()).readValue(
+        json, new TypeReference<Set<ReportForJson>>() {});
+  }
+}

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.Assert;
@@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils {
             Mockito.any(StorageReport[].class), Mockito.anyLong(),
             Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
-            Mockito.anyBoolean())).thenReturn(
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -119,7 +120,7 @@ public class TestBPOfferService {
     Mockito.doReturn(conf).when(mockDn).getConf();
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
-    .when(mockDn).getMetrics();
+        .when(mockDn).getMetrics();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -152,7 +153,8 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.any(VolumeFailureSummary.class),
-          Mockito.anyBoolean());
+          Mockito.anyBoolean(),
+          Mockito.any(SlowPeerReports.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -217,7 +218,8 @@ public class TestBlockRecovery {
             Mockito.anyInt(),
             Mockito.anyInt(),
             Mockito.any(VolumeFailureSummary.class),
-            Mockito.anyBoolean()))
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

+ 23 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java

@@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
 import static java.lang.Math.abs;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
@@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
 
 
 /**
@@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler {
   private static final long HEARTBEAT_INTERVAL_MS = 5000;      // 5 seconds
   private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
   private static final long BLOCK_REPORT_INTERVAL_MS = 10000;  // 10 seconds
+  private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000;  // 10 seconds
   private final Random random = new Random(System.nanoTime());
 
   @Test
@@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler {
     }
   }
 
+  @Test
+  public void testSlowPeerReportScheduling() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      assertTrue(scheduler.isSlowPeersReportDue(now));
+      scheduler.scheduleNextSlowPeerReport();
+      assertFalse(scheduler.isSlowPeersReportDue(now));
+      assertFalse(scheduler.isSlowPeersReportDue(now + 1));
+      assertTrue(scheduler.isSlowPeersReportDue(
+          now + SLOW_PEER_REPORT_INTERVAL_MS));
+    }
+  }
+
   private Scheduler makeMockScheduler(long now) {
     LOG.info("Using now = " + now);
-    Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
-        LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+    Scheduler mockScheduler = spy(new Scheduler(
+        HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
+        BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
     doReturn(now).when(mockScheduler).monotonicNow();
     mockScheduler.nextBlockReportTime = now;
     mockScheduler.nextHeartbeatTime = now;
+    mockScheduler.nextSlowPeersReportTime = now;
     return mockScheduler;
   }
 

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -167,7 +168,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             anyInt(),
             any(VolumeFailureSummary.class),
-            anyBoolean());
+            anyBoolean(),
+            any(SlowPeerReports.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -230,7 +232,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             anyInt(),
             any(VolumeFailureSummary.class),
-            anyBoolean());
+            anyBoolean(),
+            any(SlowPeerReports.class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics {
     final int numOpsPerIteration = 1000;
 
     final Configuration conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
-        windowSize);
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+        windowSize, TimeUnit.SECONDS);
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
         numWindows);
     conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyInt(),
            Mockito.anyInt(),
            Mockito.any(VolumeFailureSummary.class),
-           Mockito.anyBoolean());
+           Mockito.anyBoolean(),
+           Mockito.any(SlowPeerReports.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
@@ -172,7 +173,7 @@ public class TestFsDatasetCache {
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
         anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
-        anyBoolean());
+        anyBoolean(), any(SlowPeerReports.class));
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.After;
@@ -106,7 +107,8 @@ public class TestStorageReport {
         any(DatanodeRegistration.class),
         captor.capture(),
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
-        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
+        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
+        Mockito.any(SlowPeerReports.class));
 
     StorageReport[] reports = captor.getValue();
 

+ 142 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test that the {@link DataNodePeerMetrics} class is able to detect
+ * outliers i.e. slow nodes via the metrics it maintains.
+ */
+public class TestDataNodeOutlierDetectionViaMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  // A few constants to keep the test run time short.
+  private static final int WINDOW_INTERVAL_SECONDS = 3;
+  private static final int ROLLING_AVERAGE_WINDOWS = 10;
+  private static final int SLOW_NODE_LATENCY_MS = 20_000;
+  private static final int FAST_NODE_MAX_LATENCY_MS = 5;
+
+  private Random random = new Random(System.currentTimeMillis());
+
+  @Before
+  public void setup() {
+    GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+  }
+
+  /**
+   * Test that a very slow peer is detected as an outlier.
+   */
+  @Test
+  public void testOutlierIsDetected() throws Exception {
+    final String slowNodeName = "SlowNode";
+
+    DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+        ROLLING_AVERAGE_WINDOWS);
+
+    injectFastNodesSamples(peerMetrics);
+    injectSlowNodeSamples(peerMetrics, slowNodeName);
+
+    // Trigger a snapshot.
+    peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+    final Map<String, Double> outliers = peerMetrics.getOutliers();
+    LOG.info("Got back outlier nodes: {}", outliers);
+    assertThat(outliers.size(), is(1));
+    assertTrue(outliers.containsKey(slowNodeName));
+  }
+
+  /**
+   * Test that when there are no outliers, we get back nothing.
+   */
+  @Test
+  public void testWithNoOutliers() throws Exception {
+    DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+        ROLLING_AVERAGE_WINDOWS);
+
+    injectFastNodesSamples(peerMetrics);
+
+    // Trigger a snapshot.
+    peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+    // Ensure that we get back the outlier.
+    assertTrue(peerMetrics.getOutliers().isEmpty());
+  }
+
+  /**
+   * Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes.
+   *
+   * @param peerMetrics
+   */
+  public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
+    for (int nodeIndex = 0;
+         nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
+         ++nodeIndex) {
+      final String nodeName = "FastNode-" + nodeIndex;
+      LOG.info("Generating stats for node {}", nodeName);
+      for (int i = 0;
+           i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+           ++i) {
+        peerMetrics.addSendPacketDownstream(
+            nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS));
+      }
+    }
+  }
+
+  /**
+   * Inject fake stats for one extremely slow node.
+   */
+  public void injectSlowNodeSamples(
+      DataNodePeerMetrics peerMetrics, String slowNodeName)
+      throws InterruptedException {
+
+    // And the one slow node.
+    for (int i = 0;
+         i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+         ++i) {
+      peerMetrics.addSendPacketDownstream(
+          slowNodeName, SLOW_NODE_LATENCY_MS);
+    }
+  }
+}

+ 335 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java

@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SlowNodeDetector}.
+ */
+public class TestSlowNodeDetector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestSlowNodeDetector.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private final static double LOW_THRESHOLD = 1000;
+  private final static long MIN_OUTLIER_DETECTION_PEERS = 3;
+
+  // Randomly generated test cases for median and MAD. The first entry
+  // in each pair is the expected median and the second entry is the
+  // expected Median Absolute Deviation. The small sets of size 1 and 2
+  // exist to test the edge cases however in practice the MAD of a very
+  // small set is not useful.
+  private Map<List<Double>, Pair<Double, Double>> medianTestMatrix =
+      new ImmutableMap.Builder<List<Double>, Pair<Double, Double>>()
+          // Single element.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(9.6502431302).build(),
+              Pair.of(9.6502431302, 0.0))
+
+          // Two elements.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(1.72168104625)
+                  .add(11.7872544459).build(),
+              Pair.of(6.75446774606, 7.4616095611))
+
+          // The Remaining lists were randomly generated with sizes 3-10.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(76.2635686249)
+                  .add(27.0652018553)
+                  .add(1.3868476443)
+                  .add(49.7194624164)
+                  .add(47.385680883)
+                  .add(57.8721199173).build(),
+              Pair.of(48.5525716497, 22.837202532))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(86.0573389581)
+                  .add(93.2399572424)
+                  .add(64.9545429122)
+                  .add(35.8509730085)
+                  .add(1.6534313654).build(),
+              Pair.of(64.9545429122, 41.9360180373))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(5.00127007366)
+                  .add(37.9790589127)
+                  .add(67.5784746266).build(),
+              Pair.of(37.9790589127, 43.8841594039))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(1.43442932944)
+                  .add(70.6769829947)
+                  .add(37.47579656)
+                  .add(51.1126141394)
+                  .add(72.2465914419)
+                  .add(32.2930549225)
+                  .add(39.677459781).build(),
+              Pair.of(39.677459781, 16.9537852208))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(26.7913745214)
+                  .add(68.9833706658)
+                  .add(29.3882180746)
+                  .add(68.3455244453)
+                  .add(74.9277265022)
+                  .add(12.1469972942)
+                  .add(72.5395402683)
+                  .add(7.87917492506)
+                  .add(33.3253447774)
+                  .add(72.2753759125).build(),
+              Pair.of(50.8354346113, 31.9881230079))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(38.6482290705)
+                  .add(88.0690746319)
+                  .add(50.6673611649)
+                  .add(64.5329814115)
+                  .add(25.2580979294)
+                  .add(59.6709630711)
+                  .add(71.5406993741)
+                  .add(81.3073035091)
+                  .add(20.5549547284).build(),
+              Pair.of(59.6709630711, 31.1683520683))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(87.352734249)
+                  .add(65.4760359094)
+                  .add(28.9206803169)
+                  .add(36.5908574008)
+                  .add(87.7407653175)
+                  .add(99.3704511335)
+                  .add(41.3227434076)
+                  .add(46.2713494909)
+                  .add(3.49940920921).build(),
+              Pair.of(46.2713494909, 28.4729106898))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(95.3251533286)
+                  .add(27.2777870437)
+                  .add(43.73477168).build(),
+              Pair.of(43.73477168, 24.3991619317))
+
+          .build();
+
+  // A test matrix that maps inputs to the expected output list of
+  // slow nodes i.e. outliers.
+  private Map<Map<String, Double>, Set<String>> outlierTestMatrix =
+      new ImmutableMap.Builder<Map<String, Double>, Set<String>>()
+          // The number of samples is too low and all samples are below
+          // the low threshold. Nothing should be returned.
+          .put(ImmutableMap.of(
+              "n1", 0.0,
+              "n2", LOW_THRESHOLD + 1),
+              ImmutableSet.of())
+
+          // A statistical outlier below the low threshold must not be
+          // returned.
+          .put(ImmutableMap.of(
+              "n1", 1.0,
+              "n2", 1.0,
+              "n3", LOW_THRESHOLD - 1),
+              ImmutableSet.of())
+
+          // A statistical outlier above the low threshold must be returned.
+          .put(ImmutableMap.of(
+              "n1", 1.0,
+              "n2", 1.0,
+              "n3", LOW_THRESHOLD + 1),
+              ImmutableSet.of("n3"))
+
+          // A statistical outlier must not be returned if it is within a
+          // MEDIAN_MULTIPLIER multiple of the median.
+          .put(ImmutableMap.of(
+              "n1", LOW_THRESHOLD + 0.1,
+              "n2", LOW_THRESHOLD + 0.1,
+              "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1),
+              ImmutableSet.of())
+
+          // A statistical outlier must be returned if it is outside a
+          // MEDIAN_MULTIPLIER multiple of the median.
+          .put(ImmutableMap.of(
+              "n1", LOW_THRESHOLD + 0.1,
+              "n2", LOW_THRESHOLD + 0.1,
+              "n3", (LOW_THRESHOLD + 0.1) *
+                  SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1),
+              ImmutableSet.of("n3"))
+
+          // Only the statistical outliers n3 and n11 should be returned.
+          .put(new ImmutableMap.Builder<String, Double>()
+                  .put("n1", 1029.4322)
+                  .put("n2", 2647.876)
+                  .put("n3", 9194.312)
+                  .put("n4", 2.2)
+                  .put("n5", 2012.92)
+                  .put("n6", 1843.81)
+                  .put("n7", 1201.43)
+                  .put("n8", 6712.01)
+                  .put("n9", 3278.554)
+                  .put("n10", 2091.765)
+                  .put("n11", 9194.77).build(),
+              ImmutableSet.of("n3", "n11"))
+
+          // The following input set has multiple outliers.
+          //   - The low outliers (n4, n6) should not be returned.
+          //   - High outlier n2 is within 3 multiples of the median
+          //     and so it should not be returned.
+          //   - Only the high outlier n8 should be returned.
+          .put(new ImmutableMap.Builder<String, Double>()
+                  .put("n1", 5002.0)
+                  .put("n2", 9001.0)
+                  .put("n3", 5004.0)
+                  .put("n4", 1001.0)
+                  .put("n5", 5003.0)
+                  .put("n6", 2001.0)
+                  .put("n7", 5000.0)
+                  .put("n8", 101002.0)
+                  .put("n9", 5001.0)
+                  .put("n10", 5002.0)
+                  .put("n11", 5105.0)
+                  .put("n12", 5006.0).build(),
+              ImmutableSet.of("n8"))
+
+          .build();
+
+
+  private SlowNodeDetector slowNodeDetector;
+
+  @Before
+  public void setup() {
+    slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD);
+    SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS);
+    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+  }
+
+  @Test
+  public void testOutliersFromTestMatrix() {
+    for (Map.Entry<Map<String, Double>, Set<String>> entry :
+        outlierTestMatrix.entrySet()) {
+
+      LOG.info("Verifying set {}", entry.getKey());
+      final Set<String> outliers =
+          slowNodeDetector.getOutliers(entry.getKey()).keySet();
+      assertTrue(
+          "Running outlier detection on " + entry.getKey() +
+              " was expected to yield set " + entry.getValue() + ", but " +
+              " we got set " + outliers,
+          outliers.equals(entry.getValue()));
+    }
+  }
+
+  /**
+   * Unit test for {@link SlowNodeDetector#computeMedian(List)}.
+   */
+  @Test
+  public void testMediansFromTestMatrix() {
+    for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
+        medianTestMatrix.entrySet()) {
+      final List<Double> inputList = new ArrayList<>(entry.getKey());
+      Collections.sort(inputList);
+      final Double median = SlowNodeDetector.computeMedian(inputList);
+      final Double expectedMedian = entry.getValue().getLeft();
+
+      // Ensure that the median is within 0.001% of expected.
+      // We need some fudge factor for floating point comparison.
+      final Double errorPercent =
+          Math.abs(median - expectedMedian) * 100.0 / expectedMedian;
+
+      assertTrue(
+          "Set " + inputList + "; Expected median: " +
+              expectedMedian + ", got: " + median,
+          errorPercent < 0.001);
+    }
+  }
+
+  /**
+   * Unit test for {@link SlowNodeDetector#computeMad(List)}.
+   */
+  @Test
+  public void testMadsFromTestMatrix() {
+    for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
+        medianTestMatrix.entrySet()) {
+      final List<Double> inputList = new ArrayList<>(entry.getKey());
+      Collections.sort(inputList);
+      final Double mad = SlowNodeDetector.computeMad(inputList);
+      final Double expectedMad = entry.getValue().getRight();
+
+      // Ensure that the MAD is within 0.001% of expected.
+      // We need some fudge factor for floating point comparison.
+      if (entry.getKey().size() > 1) {
+        final Double errorPercent =
+            Math.abs(mad - expectedMad) * 100.0 / expectedMad;
+
+        assertTrue(
+            "Set " + entry.getKey() + "; Expected M.A.D.: " +
+                expectedMad + ", got: " + mad,
+            errorPercent < 0.001);
+      } else {
+        // For an input list of size 1, the MAD should be 0.0.
+        final Double epsilon = 0.000001; // Allow for some FP math error.
+        assertTrue(
+            "Set " + entry.getKey() + "; Expected M.A.D.: " +
+                expectedMad + ", got: " + mad,
+            mad < epsilon);
+      }
+    }
+  }
+
+  /**
+   * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when
+   * passed an empty list.
+   */
+  @Test(expected=IllegalArgumentException.class)
+  public void testMedianOfEmptyList() {
+    SlowNodeDetector.computeMedian(Collections.emptyList());
+  }
+
+  /**
+   * Verify that {@link SlowNodeDetector#computeMad(List)} throws when
+   * passed an empty list.
+   */
+  @Test(expected=IllegalArgumentException.class)
+  public void testMadOfEmptyList() {
+    SlowNodeDetector.computeMedian(Collections.emptyList());
+  }
+}

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -951,7 +952,8 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
-          0L, 0L, 0, 0, 0, null, true).getCommands();
+          0L, 0L, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1000,7 +1002,8 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
+          rep, 0L, 0L, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
@@ -122,7 +123,8 @@ public class NameNodeAdapter {
       DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
-        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
+        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
+        SlowPeerReports.EMPTY_REPORT);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -132,7 +133,8 @@ public class TestDeadDatanode {
         new DatanodeStorage(reg.getDatanodeUuid()),
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
-        dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
+        dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
+            SlowPeerReports.EMPTY_REPORT).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java

@@ -66,6 +66,10 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
     // Purposely hidden, based on comments in DFSConfigKeys
     configurationPropsToSkipCompare
         .add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
 
     // Fully deprecated properties?
     configurationPropsToSkipCompare