|
@@ -169,6 +169,7 @@ public class TestDataNodeMetrics {
|
|
|
conf.setInt(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, interval);
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.numDataNodes(3).build();
|
|
|
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
|
|
try {
|
|
|
cluster.waitActive();
|
|
|
DistributedFileSystem fs = cluster.getFileSystem();
|
|
@@ -190,22 +191,30 @@ public class TestDataNodeMetrics {
|
|
|
DataNodeFaultInjector.set(injector);
|
|
|
Path testFile = new Path("/testFlushNanosMetric.txt");
|
|
|
FSDataOutputStream fout = fs.create(testFile);
|
|
|
+ DFSOutputStream dout = (DFSOutputStream) fout.getWrappedStream();
|
|
|
fout.write(new byte[1]);
|
|
|
fout.hsync();
|
|
|
+ DatanodeInfo[] pipeline = dout.getPipeline();
|
|
|
fout.close();
|
|
|
+ dout.close();
|
|
|
+ DatanodeInfo headDatanodeInfo = pipeline[0];
|
|
|
List<DataNode> datanodes = cluster.getDataNodes();
|
|
|
- DataNode datanode = datanodes.get(0);
|
|
|
- MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
|
|
|
+ DataNode headNode = datanodes.stream().filter(d -> d.getDatanodeId().equals(headDatanodeInfo))
|
|
|
+ .findFirst().orElseGet(null);
|
|
|
+ assertNotNull("Could not find the head of the datanode write pipeline",
|
|
|
+ headNode);
|
|
|
+ MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics().name());
|
|
|
assertTrue("More than 1 packet received",
|
|
|
- getLongCounter("TotalPacketsReceived", dnMetrics) > 1L);
|
|
|
+ getLongCounter("PacketsReceived", dnMetrics) > 1L);
|
|
|
assertTrue("More than 1 slow packet to mirror",
|
|
|
- getLongCounter("TotalPacketsSlowWriteToMirror", dnMetrics) > 1L);
|
|
|
- assertCounter("TotalPacketsSlowWriteToDisk", 1L, dnMetrics);
|
|
|
- assertCounter("TotalPacketsSlowWriteToOsCache", 0L, dnMetrics);
|
|
|
+ getLongCounter("PacketsSlowWriteToMirror", dnMetrics) > 1L);
|
|
|
+ assertCounter("PacketsSlowWriteToDisk", 1L, dnMetrics);
|
|
|
+ assertCounter("PacketsSlowWriteToOsCache", 0L, dnMetrics);
|
|
|
} finally {
|
|
|
if (cluster != null) {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
+ DataNodeFaultInjector.set(oldInjector);
|
|
|
}
|
|
|
}
|
|
|
|