Browse Source

HDFS-11551. Support Timeout when checking single disk. Contributed by Hanisha Koneru.

Hanisha Koneru 8 years ago
parent
commit
f16bb79957

+ 24 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java

@@ -48,7 +48,7 @@ public final class SlowDiskReports {
   private final Map<String, Map<DiskOp, Double>> slowDisks;
 
   /**
-   * An object representing a SlowPeerReports with no entries. Should
+   * An object representing a SlowDiskReports with no entries. Should
    * be used instead of null or creating new objects when there are
    * no slow peers to report.
    */
@@ -119,8 +119,28 @@ public final class SlowDiskReports {
    * Lists the types of operations on which disk latencies are measured.
    */
   public enum DiskOp {
-    METADATA,
-    READ,
-    WRITE
+    METADATA("MetadataOp"),
+    READ("ReadIO"),
+    WRITE("WriteIO");
+
+    private final String value;
+
+    DiskOp(final String v) {
+      this.value = v;
+    }
+
+    @Override
+    public String toString() {
+      return value;
+    }
+
+    public static DiskOp fromValue(final String value) {
+      for (DiskOp as : DiskOp.values()) {
+        if (as.value.equals(value)) {
+          return as;
+        }
+      }
+      return null;
+    }
   }
 }

+ 33 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -174,9 +175,15 @@ public class DatanodeManager {
    * True if we should process latency metrics from downstream peers.
    */
   private final boolean dataNodePeerStatsEnabled;
+  /**
+   *  True if we should process latency metrics from individual DN disks.
+   */
+  private final boolean dataNodeDiskStatsEnabled;
 
   @Nullable
   private final SlowPeerTracker slowPeerTracker;
+  @Nullable
+  private final SlowDiskTracker slowDiskTracker;
   
   /**
    * The minimum time between resending caching directives to Datanodes,
@@ -200,9 +207,16 @@ public class DatanodeManager {
     this.dataNodePeerStatsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+    this.dataNodeDiskStatsEnabled = Util.isDiskStatsEnabled(conf.getDouble(
+        DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY,
+        DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_DEFAULT));
 
+    final Timer timer = new Timer();
     this.slowPeerTracker = dataNodePeerStatsEnabled ?
-        new SlowPeerTracker(conf, new Timer()) : null;
+        new SlowPeerTracker(conf, timer) : null;
+
+    this.slowDiskTracker = dataNodeDiskStatsEnabled ?
+        new SlowDiskTracker(conf, timer) : null;
 
     networktopology = NetworkTopology.getInstance(conf);
 
@@ -1581,6 +1595,16 @@ public class DatanodeManager {
       }
     }
 
+    if (slowDiskTracker != null) {
+      if (!slowDisks.getSlowDisks().isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
+              slowDisks.getSlowDisks());
+        }
+        slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
+      }
+    }
+
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }
@@ -1792,5 +1816,13 @@ public class DatanodeManager {
   public String getSlowPeersReport() {
     return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
   }
+
+  /**
+   * Use only for testing.
+   */
+  @VisibleForTesting
+  public SlowDiskTracker getSlowDiskTracker() {
+    return slowDiskTracker;
+  }
 }
 

+ 291 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java

@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Doubles;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class aggregates information from {@link SlowDiskReports} received via
+ * heartbeats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowDiskTracker {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SlowPeerTracker.class);
+
+  /**
+   * Time duration after which a report is considered stale. This is
+   * set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e.
+   * maintained for at least two successive reports.
+   */
+  private long reportValidityMs;
+
+  /**
+   * Timer object for querying the current time. Separated out for
+   * unit testing.
+   */
+  private final Timer timer;
+
+  /**
+   * Number of disks to include in JSON report per operation. We will return
+   * disks with the highest latency.
+   */
+  private static final int MAX_DISKS_TO_REPORT = 5;
+  private static final String DATANODE_DISK_SEPARATOR = ":";
+  private final long reportGenerationIntervalMs;
+
+  private volatile long lastUpdateTime;
+  private AtomicBoolean isUpdateInProgress = new AtomicBoolean(false);
+
+  /**
+   * Information about disks that have been reported as being slow.
+   * It is map of (Slow Disk ID) -> (DiskLatency). The DiskLatency contains
+   * the disk ID, the latencies reported and the timestamp when the report
+   * was received.
+   */
+  private final ConcurrentHashMap<String, DiskLatency> diskIDLatencyMap;
+
+  /**
+   * Map of slow disk -> diskOperations it has been reported slow in.
+   */
+  private volatile ArrayList<DiskLatency> slowDisksReport =
+      Lists.newArrayList();
+  private volatile ArrayList<DiskLatency> oldSlowDisksCheck;
+
+  public SlowDiskTracker(Configuration conf, Timer timer) {
+    this.timer = timer;
+    this.lastUpdateTime = timer.monotonicNow();
+    this.diskIDLatencyMap = new ConcurrentHashMap<String, DiskLatency>();
+    this.reportGenerationIntervalMs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.reportValidityMs = reportGenerationIntervalMs * 3;
+  }
+
+  @VisibleForTesting
+  public static String getSlowDiskIDForReport(String datanodeID,
+      String slowDisk) {
+    return datanodeID + DATANODE_DISK_SEPARATOR + slowDisk;
+  }
+
+  public void addSlowDiskReport(String dataNodeID,
+      SlowDiskReports dnSlowDiskReport) {
+    Map<String, Map<DiskOp, Double>> slowDisks =
+        dnSlowDiskReport.getSlowDisks();
+
+    long now = timer.monotonicNow();
+
+    for (Map.Entry<String, Map<DiskOp, Double>> slowDiskEntry :
+        slowDisks.entrySet()) {
+
+      String diskID = getSlowDiskIDForReport(dataNodeID,
+          slowDiskEntry.getKey());
+
+      Map<DiskOp, Double> latencies = slowDiskEntry.getValue();
+
+      DiskLatency diskLatency = new DiskLatency(diskID, latencies, now);
+      diskIDLatencyMap.put(diskID, diskLatency);
+    }
+
+    checkAndUpdateReportIfNecessary();
+  }
+
+  private void checkAndUpdateReportIfNecessary() {
+    // Check if it is time for update
+    long now = timer.monotonicNow();
+    if (now - lastUpdateTime > reportGenerationIntervalMs) {
+      updateSlowDiskReportAsync(now);
+    }
+  }
+
+  @VisibleForTesting
+  public void updateSlowDiskReportAsync(final long now) {
+    if (isUpdateInProgress.compareAndSet(false, true)) {
+      lastUpdateTime = now;
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          slowDisksReport = getSlowDisks(diskIDLatencyMap,
+              MAX_DISKS_TO_REPORT, now);
+
+          cleanUpOldReports(now);
+
+          isUpdateInProgress.set(false);
+        }
+      }).start();
+    }
+  }
+
+  /**
+   * This structure is a thin wrapper over disk latencies.
+   */
+  public static class DiskLatency {
+    @JsonProperty("SlowDiskID")
+    final private String slowDiskID;
+    @JsonProperty("Latencies")
+    final private Map<DiskOp, Double> latencyMap;
+    @JsonIgnore
+    private long timestamp;
+
+    /**
+     * Constructor needed by Jackson for Object mapping.
+     */
+    public DiskLatency(
+        @JsonProperty("SlowDiskID") String slowDiskID,
+        @JsonProperty("Latencies") Map<DiskOp, Double> latencyMap) {
+      this.slowDiskID = slowDiskID;
+      this.latencyMap = latencyMap;
+    }
+
+    public DiskLatency(String slowDiskID, Map<DiskOp, Double> latencyMap,
+        long timestamp) {
+      this.slowDiskID = slowDiskID;
+      this.latencyMap = latencyMap;
+      this.timestamp = timestamp;
+    }
+
+    String getSlowDiskID() {
+      return this.slowDiskID;
+    }
+
+    double getMaxLatency() {
+      double maxLatency = 0;
+      for (double latency : latencyMap.values()) {
+        if (latency > maxLatency) {
+          maxLatency = latency;
+        }
+      }
+      return maxLatency;
+    }
+
+    Double getLatency(DiskOp op) {
+      return this.latencyMap.get(op);
+    }
+  }
+
+  /**
+   * Retrieve a list of stop low disks i.e disks with the highest max latencies.
+   * @param numDisks number of disks to return. This is to limit the size of
+   *                 the generated JSON.
+   */
+  private ArrayList<DiskLatency> getSlowDisks(
+      Map<String, DiskLatency> reports, int numDisks, long now) {
+    if (reports.isEmpty()) {
+      return new ArrayList(ImmutableList.of());
+    }
+
+    final PriorityQueue<DiskLatency> topNReports = new PriorityQueue<>(
+        reports.size(),
+        new Comparator<DiskLatency>() {
+          @Override
+          public int compare(DiskLatency o1, DiskLatency o2) {
+            return Doubles.compare(
+                o1.getMaxLatency(), o2.getMaxLatency());
+          }
+        });
+
+    ArrayList<DiskLatency> oldSlowDiskIDs = Lists.newArrayList();
+
+    for (Map.Entry<String, DiskLatency> entry : reports.entrySet()) {
+      DiskLatency diskLatency = entry.getValue();
+      if (now - diskLatency.timestamp < reportValidityMs) {
+        if (topNReports.size() < numDisks) {
+          topNReports.add(diskLatency);
+        } else if (topNReports.peek().getMaxLatency() <
+            diskLatency.getMaxLatency()) {
+          topNReports.poll();
+          topNReports.add(diskLatency);
+        }
+      } else {
+        oldSlowDiskIDs.add(diskLatency);
+      }
+    }
+
+    oldSlowDisksCheck = oldSlowDiskIDs;
+
+    return Lists.newArrayList(topNReports);
+  }
+
+  /**
+   * Retrieve all valid reports as a JSON string.
+   * @return serialized representation of valid reports. null if
+   *         serialization failed.
+   */
+  public String getSlowDiskReportAsJsonString() {
+    ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      return objectMapper.writeValueAsString(slowDisksReport);
+    } catch (JsonProcessingException e) {
+      // Failed to serialize. Don't log the exception call stack.
+      LOG.debug("Failed to serialize statistics" + e);
+      return null;
+    }
+  }
+
+  private void cleanUpOldReports(long now) {
+    if (oldSlowDisksCheck != null) {
+      for (DiskLatency oldDiskLatency : oldSlowDisksCheck) {
+        diskIDLatencyMap.remove(oldDiskLatency.getSlowDiskID(), oldDiskLatency);
+      }
+    }
+    // Replace oldSlowDiskIDsCheck with an empty ArrayList
+    oldSlowDisksCheck = null;
+  }
+
+  @VisibleForTesting
+  ArrayList<DiskLatency> getSlowDisksReport() {
+    return this.slowDisksReport;
+  }
+
+  @VisibleForTesting
+  long getReportValidityMs() {
+    return reportValidityMs;
+  }
+
+  @VisibleForTesting
+  void setReportValidityMs(long reportValidityMs) {
+    this.reportValidityMs = reportValidityMs;
+  }
+}

+ 16 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.metrics;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -33,9 +32,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * This class detects and maintains DataNode disk outliers and their
@@ -122,43 +121,41 @@ public class DataNodeDiskMetrics {
 
   private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
       Map<String, Double> readIoStats, Map<String, Double> writeIoStats) {
-    Set<String> diskOutliersSet = Sets.newHashSet();
+    Map<String, Map<DiskOp, Double>> diskStats = Maps.newHashMap();
 
     // Get MetadataOp Outliers
     Map<String, Double> metadataOpOutliers = slowDiskDetector
         .getOutliers(metadataOpStats);
-    if (!metadataOpOutliers.isEmpty()) {
-      diskOutliersSet.addAll(metadataOpOutliers.keySet());
+    for (Map.Entry<String, Double> entry : metadataOpOutliers.entrySet()) {
+      addDiskStat(diskStats, entry.getKey(), DiskOp.METADATA, entry.getValue());
     }
 
     // Get ReadIo Outliers
     Map<String, Double> readIoOutliers = slowDiskDetector
         .getOutliers(readIoStats);
-    if (!readIoOutliers.isEmpty()) {
-      diskOutliersSet.addAll(readIoOutliers.keySet());
+    for (Map.Entry<String, Double> entry : readIoOutliers.entrySet()) {
+      addDiskStat(diskStats, entry.getKey(), DiskOp.READ, entry.getValue());
     }
 
     // Get WriteIo Outliers
     Map<String, Double> writeIoOutliers = slowDiskDetector
         .getOutliers(writeIoStats);
-    if (!readIoOutliers.isEmpty()) {
-      diskOutliersSet.addAll(writeIoOutliers.keySet());
-    }
-
-    Map<String, Map<DiskOp, Double>> diskStats =
-        Maps.newHashMap();
-    for (String disk : diskOutliersSet) {
-      Map<DiskOp, Double> diskStat = Maps.newHashMap();
-      diskStat.put(DiskOp.METADATA, metadataOpStats.get(disk));
-      diskStat.put(DiskOp.READ, readIoStats.get(disk));
-      diskStat.put(DiskOp.WRITE, writeIoStats.get(disk));
-      diskStats.put(disk, diskStat);
+    for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) {
+      addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue());
     }
 
     diskOutliersStats = diskStats;
     LOG.debug("Updated disk outliers.");
   }
 
+  private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
+      String disk, DiskOp diskOp, double latency) {
+    if (!diskStats.containsKey(disk)) {
+      diskStats.put(disk, new HashMap<DiskOp, Double>());
+    }
+    diskStats.get(disk).put(diskOp, latency);
+  }
+
   public Map<String, Map<DiskOp, Double>> getDiskOutliersStats() {
     return diskOutliersStats;
   }

+ 448 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java

@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.hdfs.DFSConfigKeys
+    .DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys
+    .DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
+import org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker
+    .DiskLatency;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.FakeTimer;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Maps;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link SlowDiskTracker}.
+ */
+public class TestSlowDiskTracker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestSlowDiskTracker.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private static Configuration conf;
+  private SlowDiskTracker tracker;
+  private FakeTimer timer;
+  private long reportValidityMs;
+  private static final long OUTLIERS_REPORT_INTERVAL = 1000;
+
+  static {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setDouble(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY, 1.0);
+    conf.setTimeDuration(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
+        OUTLIERS_REPORT_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+  @Before
+  public void setup() {
+    timer = new FakeTimer();
+    tracker = new SlowDiskTracker(conf, timer);
+    reportValidityMs = tracker.getReportValidityMs();
+  }
+
+  @Test
+  public void testDataNodeHeartbeatSlowDiskReport() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .build();
+    try {
+      DataNode dn1 = cluster.getDataNodes().get(0);
+      DataNode dn2 = cluster.getDataNodes().get(1);
+      NameNode nn = cluster.getNameNode(0);
+
+      DatanodeManager datanodeManager = nn.getNamesystem().getBlockManager()
+          .getDatanodeManager();
+      final SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker();
+      slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 100);
+
+      dn1.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of(
+          DiskOp.WRITE, 1.3));
+      dn1.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of(
+          DiskOp.READ, 1.6, DiskOp.WRITE, 1.1));
+      dn2.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of(
+          DiskOp.METADATA, 0.8));
+      dn2.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of(
+          DiskOp.WRITE, 1.3));
+
+      String dn1ID = dn1.getDatanodeId().getIpcAddr(false);
+      String dn2ID = dn2.getDatanodeId().getIpcAddr(false);
+
+      // Advance the timer and wait for NN to receive reports from DataNodes.
+      Thread.sleep(OUTLIERS_REPORT_INTERVAL);
+
+      // Wait for NN to receive reports from all DNs
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return (slowDiskTracker.getSlowDisksReport().size() == 4);
+        }
+      }, 1000, 100000);
+
+      Map<String, DiskLatency> slowDisksReport = getSlowDisksReportForTesting(
+          slowDiskTracker);
+
+      assertThat(slowDisksReport.size(), is(4));
+      assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk1")
+          .getLatency(DiskOp.WRITE) - 1.3) < 0.0000001);
+      assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2")
+          .getLatency(DiskOp.READ) - 1.6) < 0.0000001);
+      assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2")
+          .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
+      assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk1")
+          .getLatency(DiskOp.METADATA) - 0.8) < 0.0000001);
+      assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk2")
+          .getLatency(DiskOp.WRITE) - 1.3) < 0.0000001);
+
+      // Test the slow disk report JSON string
+      ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
+          slowDiskTracker.getSlowDiskReportAsJsonString());
+
+      assertThat(jsonReport.size(), is(4));
+      assertTrue(isDiskInReports(jsonReport, dn1ID, "disk1", DiskOp.WRITE, 1.3));
+      assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.READ, 1.6));
+      assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.WRITE, 1.1));
+      assertTrue(isDiskInReports(jsonReport, dn2ID, "disk1", DiskOp.METADATA,
+          0.8));
+      assertTrue(isDiskInReports(jsonReport, dn2ID, "disk2", DiskOp.WRITE, 1.3));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Edge case, there are no reports to retrieve.
+   */
+  @Test
+  public void testEmptyReports() {
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+    assertTrue(getSlowDisksReportForTesting(tracker).isEmpty());
+  }
+
+  @Test
+  public void testReportsAreRetrieved() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
+    addSlowDiskForTesting("dn1", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.3));
+    addSlowDiskForTesting("dn2", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.1));
+
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !tracker.getSlowDisksReport().isEmpty();
+      }
+    }, 500, 5000);
+
+    Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
+
+    assertThat(reports.size(), is(3));
+    assertTrue(Math.abs(reports.get("dn1:disk1")
+        .getLatency(DiskOp.METADATA) - 1.1) < 0.0000001);
+    assertTrue(Math.abs(reports.get("dn1:disk1")
+        .getLatency(DiskOp.READ) - 1.8) < 0.0000001);
+    assertTrue(Math.abs(reports.get("dn1:disk2")
+        .getLatency(DiskOp.READ) - 1.3) < 0.0000001);
+    assertTrue(Math.abs(reports.get("dn2:disk2")
+        .getLatency(DiskOp.READ) - 1.1) < 0.0000001);
+  }
+
+  /**
+   * Test that when all reports are expired, we get back nothing.
+   */
+  @Test
+  public void testAllReportsAreExpired() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
+    addSlowDiskForTesting("dn1", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.3));
+    addSlowDiskForTesting("dn2", "disk2",
+        ImmutableMap.of(DiskOp.WRITE, 1.1));
+
+    // No reports should expire after 1ms.
+    timer.advance(1);
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !tracker.getSlowDisksReport().isEmpty();
+      }
+    }, 500, 5000);
+
+    Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
+
+    assertThat(reports.size(), is(3));
+    assertTrue(Math.abs(reports.get("dn1:disk1")
+        .getLatency(DiskOp.METADATA) - 1.1) < 0.0000001);
+    assertTrue(Math.abs(reports.get("dn1:disk1")
+        .getLatency(DiskOp.READ) - 1.8) < 0.0000001);
+    assertTrue(Math.abs(reports.get("dn1:disk2")
+        .getLatency(DiskOp.READ) - 1.3) < 0.0000001);
+    assertTrue(Math.abs(reports.get("dn2:disk2")
+        .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
+
+    // All reports should expire after REPORT_VALIDITY_MS.
+    timer.advance(reportValidityMs);
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return tracker.getSlowDisksReport().isEmpty();
+      }
+    }, 500, 3000);
+
+    reports = getSlowDisksReportForTesting(tracker);
+
+    assertThat(reports.size(), is(0));
+  }
+
+  /**
+   * Test the case when a subset of reports has expired.
+   * Ensure that we only get back non-expired reports.
+   */
+  @Test
+  public void testSomeReportsAreExpired() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
+    addSlowDiskForTesting("dn1", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.3));
+    timer.advance(reportValidityMs);
+    addSlowDiskForTesting("dn2", "disk2",
+        ImmutableMap.of(DiskOp.WRITE, 1.1));
+
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !tracker.getSlowDisksReport().isEmpty();
+      }
+    }, 500, 5000);
+
+    Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
+
+    assertThat(reports.size(), is(1));
+    assertTrue(Math.abs(reports.get("dn2:disk2")
+        .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
+  }
+
+  /**
+   * Test the case when an expired report is replaced by a valid one.
+   */
+  @Test
+  public void testReplacement() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
+    timer.advance(reportValidityMs);
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.READ, 1.4));
+
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !tracker.getSlowDisksReport().isEmpty();
+      }
+    }, 500, 5000);
+
+    Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
+
+    assertThat(reports.size(), is(1));
+    assertTrue(reports.get("dn1:disk1").getLatency(DiskOp.METADATA) == null);
+    assertTrue(Math.abs(reports.get("dn1:disk1")
+        .getLatency(DiskOp.READ) - 1.4) < 0.0000001);
+  }
+
+  @Test
+  public void testGetJson() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
+    addSlowDiskForTesting("dn1", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.3));
+    addSlowDiskForTesting("dn2", "disk2",
+        ImmutableMap.of(DiskOp.WRITE, 1.1));
+    addSlowDiskForTesting("dn3", "disk1",
+        ImmutableMap.of(DiskOp.WRITE, 1.1));
+
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return tracker.getSlowDiskReportAsJsonString() != null;
+      }
+    }, 500, 5000);
+
+    ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
+        tracker.getSlowDiskReportAsJsonString());
+
+    // And ensure its contents are what we expect.
+    assertThat(jsonReport.size(), is(4));
+    assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.METADATA,
+        1.1));
+    assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.8));
+    assertTrue(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.3));
+    assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.WRITE, 1.1));
+    assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.1));
+  }
+
+  @Test
+  public void testGetJsonSizeIsLimited() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.READ, 1.1));
+    addSlowDiskForTesting("dn1", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.2));
+    addSlowDiskForTesting("dn1", "disk3",
+        ImmutableMap.of(DiskOp.READ, 1.3));
+    addSlowDiskForTesting("dn2", "disk1",
+        ImmutableMap.of(DiskOp.READ, 1.4));
+    addSlowDiskForTesting("dn2", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.5));
+    addSlowDiskForTesting("dn3", "disk1",
+        ImmutableMap.of(DiskOp.WRITE, 1.6));
+    addSlowDiskForTesting("dn3", "disk2",
+        ImmutableMap.of(DiskOp.READ, 1.7));
+    addSlowDiskForTesting("dn3", "disk3",
+        ImmutableMap.of(DiskOp.READ, 1.2));
+
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return tracker.getSlowDiskReportAsJsonString() != null;
+      }
+    }, 500, 5000);
+
+    ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
+        tracker.getSlowDiskReportAsJsonString());
+
+    // Ensure that only the top 5 highest latencies are in the report.
+    assertThat(jsonReport.size(), is(5));
+    assertTrue(isDiskInReports(jsonReport, "dn3", "disk2", DiskOp.READ, 1.7));
+    assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.6));
+    assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.READ, 1.5));
+    assertTrue(isDiskInReports(jsonReport, "dn2", "disk1", DiskOp.READ, 1.4));
+    assertTrue(isDiskInReports(jsonReport, "dn1", "disk3", DiskOp.READ, 1.3));
+
+    // Remaining nodes should be in the list.
+    assertFalse(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.1));
+    assertFalse(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.2));
+    assertFalse(isDiskInReports(jsonReport, "dn3", "disk3", DiskOp.READ, 1.2));
+  }
+
+  @Test
+  public void testEmptyReport() throws Exception {
+    addSlowDiskForTesting("dn1", "disk1",
+        ImmutableMap.of(DiskOp.READ, 1.1));
+    timer.advance(reportValidityMs);
+
+    tracker.updateSlowDiskReportAsync(timer.monotonicNow());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return tracker.getSlowDiskReportAsJsonString() != null;
+      }
+    }, 500, 5000);
+
+    ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
+        tracker.getSlowDiskReportAsJsonString());
+
+    assertTrue(jsonReport.isEmpty());
+  }
+
+  private boolean isDiskInReports(ArrayList<DiskLatency> reports,
+      String dataNodeID, String disk, DiskOp diskOp, double latency) {
+    String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk);
+    for (DiskLatency diskLatency : reports) {
+      if (diskLatency.getSlowDiskID().equals(diskID)) {
+        if (diskLatency.getLatency(diskOp) == null) {
+          return false;
+        }
+        if (Math.abs(diskLatency.getLatency(diskOp) - latency) < 0.0000001) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private ArrayList<DiskLatency> getAndDeserializeJson(
+      final String json) throws IOException {
+    return (new ObjectMapper()).readValue(json,
+        new TypeReference<ArrayList<DiskLatency>>() {});
+  }
+
+  private void addSlowDiskForTesting(String dnID, String disk,
+      Map<DiskOp, Double> latencies) {
+    Map<String, Map<DiskOp, Double>> slowDisk = Maps.newHashMap();
+    slowDisk.put(disk, latencies);
+    SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk);
+    tracker.addSlowDiskReport(dnID, slowDiskReport);
+  }
+
+  Map<String, DiskLatency> getSlowDisksReportForTesting(
+      SlowDiskTracker slowDiskTracker) {
+    Map<String, DiskLatency> slowDisksMap = Maps.newHashMap();
+    for (DiskLatency diskLatency : slowDiskTracker.getSlowDisksReport()) {
+      slowDisksMap.put(diskLatency.getSlowDiskID(), diskLatency);
+    }
+    return slowDisksMap;
+  }
+}