|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
|
|
|
import static org.hamcrest.core.Is.is;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertThat;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
@@ -401,6 +403,46 @@ public class TestSlowDiskTracker {
|
|
|
assertTrue(tracker.getSlowDiskReportAsJsonString() == null);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testRemoveInvalidReport() throws Exception {
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
|
|
+ try {
|
|
|
+ NameNode nn = cluster.getNameNode(0);
|
|
|
+
|
|
|
+ DatanodeManager datanodeManager =
|
|
|
+ nn.getNamesystem().getBlockManager().getDatanodeManager();
|
|
|
+ SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker();
|
|
|
+ slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 3);
|
|
|
+ assertTrue(slowDiskTracker.getSlowDisksReport().isEmpty());
|
|
|
+ slowDiskTracker.addSlowDiskReport(
|
|
|
+ "dn1",
|
|
|
+ generateSlowDiskReport("disk1",
|
|
|
+ Collections.singletonMap(DiskOp.WRITE, 1.3)));
|
|
|
+ slowDiskTracker.addSlowDiskReport(
|
|
|
+ "dn2",
|
|
|
+ generateSlowDiskReport("disk2",
|
|
|
+ Collections.singletonMap(DiskOp.WRITE, 1.1)));
|
|
|
+
|
|
|
+ // wait for slow disk report
|
|
|
+ GenericTestUtils.waitFor(() -> !slowDiskTracker.getSlowDisksReport()
|
|
|
+ .isEmpty(), 500, 5000);
|
|
|
+ Map<String, DiskLatency> slowDisksReport =
|
|
|
+ getSlowDisksReportForTesting(slowDiskTracker);
|
|
|
+ assertEquals(2, slowDisksReport.size());
|
|
|
+
|
|
|
+ // wait for invalid report to be removed
|
|
|
+ Thread.sleep(OUTLIERS_REPORT_INTERVAL * 3);
|
|
|
+ GenericTestUtils.waitFor(() -> slowDiskTracker.getSlowDisksReport()
|
|
|
+ .isEmpty(), 500, 5000);
|
|
|
+ slowDisksReport = getSlowDisksReportForTesting(slowDiskTracker);
|
|
|
+ assertEquals(0, slowDisksReport.size());
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private boolean isDiskInReports(ArrayList<DiskLatency> reports,
|
|
|
String dataNodeID, String disk, DiskOp diskOp, double latency) {
|
|
|
String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk);
|
|
@@ -430,6 +472,14 @@ public class TestSlowDiskTracker {
|
|
|
tracker.addSlowDiskReport(dnID, slowDiskReport);
|
|
|
}
|
|
|
|
|
|
+ private SlowDiskReports generateSlowDiskReport(String disk,
|
|
|
+ Map<DiskOp, Double> latencies) {
|
|
|
+ Map<String, Map<DiskOp, Double>> slowDisk = Maps.newHashMap();
|
|
|
+ slowDisk.put(disk, latencies);
|
|
|
+ SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk);
|
|
|
+ return slowDiskReport;
|
|
|
+ }
|
|
|
+
|
|
|
Map<String, DiskLatency> getSlowDisksReportForTesting(
|
|
|
SlowDiskTracker slowDiskTracker) {
|
|
|
Map<String, DiskLatency> slowDisksMap = Maps.newHashMap();
|