瀏覽代碼

HDFS-16371. Exclude slow disks when choosing volume (#3753)

litao 3 年之前
父節點
當前提交
46b02788bd

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -701,6 +701,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.datanode.max.disks.to.report";
   public static final int DFS_DATANODE_MAX_DISKS_TO_REPORT_DEFAULT =
       5;
+  public static final String DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY =
+      "dfs.datanode.max.slowdisks.to.exclude";
+  public static final int DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT =
+      0;
   public static final String  DFS_DATANODE_HOST_NAME_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
   public static final String  DFS_NAMENODE_CHECKPOINT_DIR_KEY =

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -374,7 +374,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
-        blockChooserImpl, conf);
+        blockChooserImpl, conf, datanode.getDiskMetrics());
     asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
     deletingBlock = new HashMap<String, Set<Long>>();

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
@@ -67,15 +69,17 @@ class FsVolumeList {
   private final boolean enableSameDiskTiering;
   private final MountVolumeMap mountVolumeMap;
   private Map<URI, Double> capacityRatioMap;
+  private final DataNodeDiskMetrics diskMetrics;
 
   FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
       BlockScanner blockScanner,
       VolumeChoosingPolicy<FsVolumeImpl> blockChooser,
-      Configuration config) {
+      Configuration config, DataNodeDiskMetrics dataNodeDiskMetrics) {
     this.blockChooser = blockChooser;
     this.blockScanner = blockScanner;
     this.checkDirsLock = new AutoCloseableLock();
     this.checkDirsLockCondition = checkDirsLock.newCondition();
+    this.diskMetrics = dataNodeDiskMetrics;
     for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
       volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
           volumeFailureInfo);
@@ -100,6 +104,15 @@ class FsVolumeList {
 
   private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
       long blockSize, String storageId) throws IOException {
+
+    // Exclude slow disks when choosing volume.
+    if (diskMetrics != null) {
+      List<String> slowDisksToExclude = diskMetrics.getSlowDisksToExclude();
+      list = list.stream()
+          .filter(volume -> !slowDisksToExclude.contains(volume.getBaseURI().getPath()))
+          .collect(Collectors.toList());
+    }
+
     while (true) {
       FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
           storageId);

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java

@@ -34,9 +34,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * This class detects and maintains DataNode disk outliers and their
@@ -69,6 +73,14 @@ public class DataNodeDiskMetrics {
    * Threshold in milliseconds below which a disk is definitely not slow.
    */
   private final long lowThresholdMs;
+  /**
+   * The number of slow disks that needs to be excluded.
+   */
+  private int maxSlowDisksToExclude;
+  /**
+   * List of slow disks that need to be excluded.
+   */
+  private List<String> slowDisksToExclude = new ArrayList<>();
 
   public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs,
       Configuration conf) {
@@ -80,6 +92,9 @@ public class DataNodeDiskMetrics {
     lowThresholdMs =
         conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
             DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT);
+    maxSlowDisksToExclude =
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY,
+            DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT);
     slowDiskDetector =
         new OutlierDetector(minOutlierDetectionDisks, lowThresholdMs);
     shouldRun = true;
@@ -127,6 +142,21 @@ public class DataNodeDiskMetrics {
 
             detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
                 writeIoStats);
+
+            // Sort the slow disks by latency and extract the top n by maxSlowDisksToExclude.
+            if (maxSlowDisksToExclude > 0) {
+              ArrayList<DiskLatency> diskLatencies = new ArrayList<>();
+              for (Map.Entry<String, Map<DiskOp, Double>> diskStats :
+                  diskOutliersStats.entrySet()) {
+                diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue()));
+              }
+
+              Collections.sort(diskLatencies, (o1, o2)
+                  -> Double.compare(o2.getMaxLatency(), o1.getMaxLatency()));
+
+              slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude)
+                  .map(DiskLatency::getSlowDisk).collect(Collectors.toList());
+            }
           }
 
           try {
@@ -171,6 +201,35 @@ public class DataNodeDiskMetrics {
     }
   }
 
+  /**
+   * This structure is a wrapper over disk latencies.
+   */
+  public static class DiskLatency {
+    final private String slowDisk;
+    final private Map<DiskOp, Double> latencyMap;
+
+    public DiskLatency(
+        String slowDiskID,
+        Map<DiskOp, Double> latencyMap) {
+      this.slowDisk = slowDiskID;
+      this.latencyMap = latencyMap;
+    }
+
+    double getMaxLatency() {
+      double maxLatency = 0;
+      for (double latency : latencyMap.values()) {
+        if (latency > maxLatency) {
+          maxLatency = latency;
+        }
+      }
+      return maxLatency;
+    }
+
+    public String getSlowDisk() {
+      return slowDisk;
+    }
+  }
+
   private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
       String disk, DiskOp diskOp, double latency) {
     if (!diskStats.containsKey(disk)) {
@@ -206,4 +265,8 @@ public class DataNodeDiskMetrics {
       diskOutliersStats.put(slowDiskPath, latencies);
     }
   }
+
+  public List<String> getSlowDisksToExclude() {
+    return slowDisksToExclude;
+  }
 }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2483,6 +2483,15 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.max.slowdisks.to.exclude</name>
+  <value>0</value>
+  <description>
+    The number of slow disks that needs to be excluded. By default, this parameter is set to 0,
+    which disables excluding slow disk when choosing volume.
+  </description>
+</property>
+
 <property>
   <name>hadoop.user.group.metrics.percentiles.intervals</name>
   <value></value>

+ 108 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java

@@ -21,6 +21,7 @@ import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -32,12 +33,16 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -53,6 +58,7 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
 import static org.junit.Assert.assertEquals;
@@ -73,6 +79,10 @@ public class TestFsVolumeList {
   private FsDatasetImpl dataset = null;
   private String baseDir;
   private BlockScanner blockScanner;
+  private final static int NUM_DATANODES = 3;
+  private final static int STORAGES_PER_DATANODE = 3;
+  private final static int DEFAULT_BLOCK_SIZE = 102400;
+  private final static int BUFFER_LENGTH = 1024;
 
   @Before
   public void setUp() {
@@ -89,7 +99,7 @@ public class TestFsVolumeList {
   public void testGetNextVolumeWithClosedVolume() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(),
-        blockScanner, blockChooser, conf);
+        blockScanner, blockChooser, conf, null);
     final List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "nextvolume-" + i);
@@ -132,7 +142,7 @@ public class TestFsVolumeList {
   @Test(timeout=30000)
   public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(
-        Collections.<VolumeFailureInfo>emptyList(), null, blockChooser, conf);
+        Collections.<VolumeFailureInfo>emptyList(), null, blockChooser, conf, null);
     File volDir = new File(baseDir, "volume-0");
     volDir.mkdirs();
     FsVolumeImpl volume = new FsVolumeImplBuilder()
@@ -511,7 +521,7 @@ public class TestFsVolumeList {
         .build();
     FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(),
-        blockScanner, blockChooser, conf);
+        blockScanner, blockChooser, conf, null);
     volumeList.addVolume(archivalVolume.obtainReference());
     volumeList.addVolume(diskVolume.obtainReference());
 
@@ -620,4 +630,99 @@ public class TestFsVolumeList {
     mountVolumeMap.removeVolume(spyArchivalVolume);
     assertEquals(dfCapacity - duReserved, spyDiskVolume.getCapacity());
   }
+
+  @Test
+  public void testExcludeSlowDiskWhenChoosingVolume() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    // Set datanode outliers report interval to 1s.
+    conf.setStrings(DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1s");
+    // Enable datanode disk metrics collector.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 30);
+    // Enable excluding slow disks when choosing volume.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, 1);
+    // Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE.
+    long capacity = 10 * DEFAULT_BLOCK_SIZE;
+    long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE];
+    String[] hostnames = new String[NUM_DATANODES];
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      hostnames[i] = i + "." + i + "." + i + "." + i;
+      for(int j = 0; j < STORAGES_PER_DATANODE; j++){
+        capacities[i][j]=capacity;
+      }
+    }
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .hosts(hostnames)
+        .numDataNodes(NUM_DATANODES)
+        .storagesPerDatanode(STORAGES_PER_DATANODE)
+        .storageCapacities(capacities).build();
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+
+    // Create file for each datanode.
+    ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+    DataNode dn0 = dataNodes.get(0);
+    DataNode dn1 = dataNodes.get(1);
+    DataNode dn2 = dataNodes.get(2);
+
+    // Mock the first disk of each datanode is a slowest disk.
+    String slowDisk0OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(0)
+        .getVolume().getBaseURI().getPath();
+    String slowDisk0OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(0)
+        .getVolume().getBaseURI().getPath();
+    String slowDisk0OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(0)
+        .getVolume().getBaseURI().getPath();
+
+    String slowDisk1OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(1)
+        .getVolume().getBaseURI().getPath();
+    String slowDisk1OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(1)
+        .getVolume().getBaseURI().getPath();
+    String slowDisk1OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(1)
+        .getVolume().getBaseURI().getPath();
+
+    dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn0, ImmutableMap.of(
+        SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5,
+        SlowDiskReports.DiskOp.METADATA, 2.0));
+    dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn1, ImmutableMap.of(
+        SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5,
+        SlowDiskReports.DiskOp.METADATA, 2.0));
+    dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn2, ImmutableMap.of(
+        SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5,
+        SlowDiskReports.DiskOp.METADATA, 2.0));
+
+    dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn0, ImmutableMap.of(
+        SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0,
+        SlowDiskReports.DiskOp.METADATA, 1.0));
+    dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn1, ImmutableMap.of(
+        SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0,
+        SlowDiskReports.DiskOp.METADATA, 1.0));
+    dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn2, ImmutableMap.of(
+        SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0,
+        SlowDiskReports.DiskOp.METADATA, 1.0));
+
+    // Wait until the data on the slow disk is collected successfully.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override public Boolean get() {
+        return dn0.getDiskMetrics().getSlowDisksToExclude().size() == 1 &&
+            dn1.getDiskMetrics().getSlowDisksToExclude().size() == 1 &&
+            dn2.getDiskMetrics().getSlowDisksToExclude().size() == 1;
+      }
+    }, 1000, 5000);
+
+    // Create a file with 3 replica.
+    DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000,
+        DEFAULT_BLOCK_SIZE, (short) 3, 0, false, null);
+
+    // Asserts that the number of blocks created on a slow disk is 0.
+    Assert.assertEquals(0, dn0.getVolumeReport().stream()
+        .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn0)).collect(Collectors.toList()).get(0)
+        .getNumBlocks());
+    Assert.assertEquals(0, dn1.getVolumeReport().stream()
+        .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn1)).collect(Collectors.toList()).get(0)
+        .getNumBlocks());
+    Assert.assertEquals(0, dn2.getVolumeReport().stream()
+        .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn2)).collect(Collectors.toList()).get(0)
+        .getNumBlocks());
+  }
 }