|
@@ -0,0 +1,181 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.datanode.metrics;
|
|
|
+
|
|
|
+import com.google.common.collect.Maps;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+/**
|
|
|
+ * This class detects and maintains DataNode disk outliers and their
|
|
|
+ * latencies for different ops (metadata, read, write).
|
|
|
+ */
|
|
|
+@InterfaceAudience.Private
|
|
|
+@InterfaceStability.Unstable
|
|
|
+public class DataNodeDiskMetrics {
|
|
|
+
|
|
|
+ public static final Logger LOG = LoggerFactory.getLogger(
|
|
|
+ DataNodeDiskMetrics.class);
|
|
|
+
|
|
|
+ private DataNode dn;
|
|
|
+ private final long MIN_OUTLIER_DETECTION_DISKS = 5;
|
|
|
+ private final long SLOW_DISK_LOW_THRESHOLD_MS = 20;
|
|
|
+ private final long detectionInterval;
|
|
|
+ private volatile boolean shouldRun;
|
|
|
+ private OutlierDetector slowDiskDetector;
|
|
|
+ private Daemon slowDiskDetectionDaemon;
|
|
|
+ private volatile Map<String, Map<DiskOutlierDetectionOp, Double>> diskOutliersStats;
|
|
|
+
|
|
|
+ public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
|
|
|
+ this.dn = dn;
|
|
|
+ this.detectionInterval = diskOutlierDetectionIntervalMs;
|
|
|
+ slowDiskDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_DISKS,
|
|
|
+ SLOW_DISK_LOW_THRESHOLD_MS);
|
|
|
+ shouldRun = true;
|
|
|
+ startDiskOutlierDetectionThread();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startDiskOutlierDetectionThread() {
|
|
|
+ slowDiskDetectionDaemon = new Daemon(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (shouldRun) {
|
|
|
+ Map<String, Double> metadataOpStats = Maps.newHashMap();
|
|
|
+ Map<String, Double> readIoStats = Maps.newHashMap();
|
|
|
+ Map<String, Double> writeIoStats = Maps.newHashMap();
|
|
|
+ FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null;
|
|
|
+ try {
|
|
|
+ fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences();
|
|
|
+ Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences
|
|
|
+ .iterator();
|
|
|
+ while (volumeIterator.hasNext()) {
|
|
|
+ FsVolumeSpi volume = volumeIterator.next();
|
|
|
+ DataNodeVolumeMetrics metrics = volumeIterator.next().getMetrics();
|
|
|
+ String volumeName = volume.getBasePath();
|
|
|
+
|
|
|
+ metadataOpStats.put(volumeName,
|
|
|
+ metrics.getMetadataOperationMean());
|
|
|
+ readIoStats.put(volumeName, metrics.getReadIoMean());
|
|
|
+ writeIoStats.put(volumeName, metrics.getWriteIoMean());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (fsVolumeReferences != null) {
|
|
|
+ try {
|
|
|
+ fsVolumeReferences.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error in releasing FS Volume references", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (metadataOpStats.isEmpty() && readIoStats.isEmpty() &&
|
|
|
+ writeIoStats.isEmpty()) {
|
|
|
+ LOG.debug("No disk stats available for detecting outliers.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
|
|
|
+ writeIoStats);
|
|
|
+
|
|
|
+ try {
|
|
|
+ Thread.sleep(detectionInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.error("Disk Outlier Detection thread interrupted", e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ slowDiskDetectionDaemon.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
|
|
|
+ Map<String, Double> readIoStats, Map<String, Double> writeIoStats) {
|
|
|
+ Set<String> diskOutliersSet = Sets.newHashSet();
|
|
|
+
|
|
|
+ // Get MetadataOp Outliers
|
|
|
+ Map<String, Double> metadataOpOutliers = slowDiskDetector
|
|
|
+ .getOutliers(metadataOpStats);
|
|
|
+ if (!metadataOpOutliers.isEmpty()) {
|
|
|
+ diskOutliersSet.addAll(metadataOpOutliers.keySet());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get ReadIo Outliers
|
|
|
+ Map<String, Double> readIoOutliers = slowDiskDetector
|
|
|
+ .getOutliers(readIoStats);
|
|
|
+ if (!readIoOutliers.isEmpty()) {
|
|
|
+ diskOutliersSet.addAll(readIoOutliers.keySet());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get WriteIo Outliers
|
|
|
+ Map<String, Double> writeIoOutliers = slowDiskDetector
|
|
|
+ .getOutliers(writeIoStats);
|
|
|
+ if (!readIoOutliers.isEmpty()) {
|
|
|
+ diskOutliersSet.addAll(writeIoOutliers.keySet());
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Map<DiskOutlierDetectionOp, Double>> diskStats =
|
|
|
+ Maps.newHashMap();
|
|
|
+ for (String disk : diskOutliersSet) {
|
|
|
+ Map<DiskOutlierDetectionOp, Double> diskStat = Maps.newHashMap();
|
|
|
+ diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk));
|
|
|
+ diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk));
|
|
|
+ diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk));
|
|
|
+ diskStats.put(disk, diskStat);
|
|
|
+ }
|
|
|
+
|
|
|
+ diskOutliersStats = diskStats;
|
|
|
+ LOG.debug("Updated disk outliers.");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Lists the types of operations on which disk latencies are measured.
|
|
|
+ */
|
|
|
+ public enum DiskOutlierDetectionOp {
|
|
|
+ METADATA,
|
|
|
+ READ,
|
|
|
+ WRITE
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String,
|
|
|
+ Map<DiskOutlierDetectionOp, Double>> getDiskOutliersStats() {
|
|
|
+ return diskOutliersStats;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void shutdownAndWait() {
|
|
|
+ shouldRun = false;
|
|
|
+ slowDiskDetectionDaemon.interrupt();
|
|
|
+ try {
|
|
|
+ slowDiskDetectionDaemon.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.error("Disk Outlier Detection daemon did not shutdown", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|