|
@@ -57,6 +57,10 @@ public class DataNodeDiskMetrics {
|
|
|
private volatile Map<String, Map<DiskOp, Double>>
|
|
|
diskOutliersStats = Maps.newHashMap();
|
|
|
|
|
|
+ // Adding for test purpose. When addSlowDiskForTesting() called from test
|
|
|
+ // code, status should not be overridden by daemon thread.
|
|
|
+ private boolean overrideStatus = true;
|
|
|
+
|
|
|
public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
|
|
|
this.dn = dn;
|
|
|
this.detectionInterval = diskOutlierDetectionIntervalMs;
|
|
@@ -71,41 +75,43 @@ public class DataNodeDiskMetrics {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
while (shouldRun) {
|
|
|
- Map<String, Double> metadataOpStats = Maps.newHashMap();
|
|
|
- Map<String, Double> readIoStats = Maps.newHashMap();
|
|
|
- Map<String, Double> writeIoStats = Maps.newHashMap();
|
|
|
- FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null;
|
|
|
- try {
|
|
|
- fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences();
|
|
|
- Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences
|
|
|
- .iterator();
|
|
|
- while (volumeIterator.hasNext()) {
|
|
|
- FsVolumeSpi volume = volumeIterator.next();
|
|
|
- DataNodeVolumeMetrics metrics = volumeIterator.next().getMetrics();
|
|
|
- String volumeName = volume.getBaseURI().getPath();
|
|
|
-
|
|
|
- metadataOpStats.put(volumeName,
|
|
|
- metrics.getMetadataOperationMean());
|
|
|
- readIoStats.put(volumeName, metrics.getReadIoMean());
|
|
|
- writeIoStats.put(volumeName, metrics.getWriteIoMean());
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (fsVolumeReferences != null) {
|
|
|
- try {
|
|
|
- fsVolumeReferences.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Error in releasing FS Volume references", e);
|
|
|
+ if (dn.getFSDataset() != null) {
|
|
|
+ Map<String, Double> metadataOpStats = Maps.newHashMap();
|
|
|
+ Map<String, Double> readIoStats = Maps.newHashMap();
|
|
|
+ Map<String, Double> writeIoStats = Maps.newHashMap();
|
|
|
+ FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null;
|
|
|
+ try {
|
|
|
+ fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences();
|
|
|
+ Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences
|
|
|
+ .iterator();
|
|
|
+ while (volumeIterator.hasNext()) {
|
|
|
+ FsVolumeSpi volume = volumeIterator.next();
|
|
|
+ DataNodeVolumeMetrics metrics = volume.getMetrics();
|
|
|
+ String volumeName = volume.getBaseURI().getPath();
|
|
|
+
|
|
|
+ metadataOpStats.put(volumeName,
|
|
|
+ metrics.getMetadataOperationMean());
|
|
|
+ readIoStats.put(volumeName, metrics.getReadIoMean());
|
|
|
+ writeIoStats.put(volumeName, metrics.getWriteIoMean());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (fsVolumeReferences != null) {
|
|
|
+ try {
|
|
|
+ fsVolumeReferences.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error in releasing FS Volume references", e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- if (metadataOpStats.isEmpty() && readIoStats.isEmpty() &&
|
|
|
- writeIoStats.isEmpty()) {
|
|
|
- LOG.debug("No disk stats available for detecting outliers.");
|
|
|
- return;
|
|
|
- }
|
|
|
+ if (metadataOpStats.isEmpty() && readIoStats.isEmpty()
|
|
|
+ && writeIoStats.isEmpty()) {
|
|
|
+ LOG.debug("No disk stats available for detecting outliers.");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
|
|
|
- writeIoStats);
|
|
|
+ detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
|
|
|
+ writeIoStats);
|
|
|
+ }
|
|
|
|
|
|
try {
|
|
|
Thread.sleep(detectionInterval);
|
|
@@ -143,9 +149,10 @@ public class DataNodeDiskMetrics {
|
|
|
for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) {
|
|
|
addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue());
|
|
|
}
|
|
|
-
|
|
|
- diskOutliersStats = diskStats;
|
|
|
- LOG.debug("Updated disk outliers.");
|
|
|
+ if (overrideStatus) {
|
|
|
+ diskOutliersStats = diskStats;
|
|
|
+ LOG.debug("Updated disk outliers.");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
|
|
@@ -176,6 +183,7 @@ public class DataNodeDiskMetrics {
|
|
|
@VisibleForTesting
|
|
|
public void addSlowDiskForTesting(String slowDiskPath,
|
|
|
Map<DiskOp, Double> latencies) {
|
|
|
+ overrideStatus = false;
|
|
|
if (latencies == null) {
|
|
|
diskOutliersStats.put(slowDiskPath, ImmutableMap.of());
|
|
|
} else {
|