Bläddra i källkod

HDFS-16423. Balancer should not get blocks on stale storages (#3883)

Reviewed-by: litao <tomleescut@gmail.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
qinyuren 3 år sedan
förälder
incheckning
db2c3200e6

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1664,9 +1664,16 @@ public class BlockManager implements BlockStatsMXBean {
     if(numBlocks == 0) {
       return new BlocksWithLocations(new BlockWithLocations[0]);
     }
+
+    // skip stale storage
+    DatanodeStorageInfo[] storageInfos = Arrays
+        .stream(node.getStorageInfos())
+        .filter(s -> !s.areBlockContentsStale())
+        .toArray(DatanodeStorageInfo[]::new);
+
     // starting from a random block
     int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
-    Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
+    Iterator<BlockInfo> iter = node.getBlockIterator(startBlock, storageInfos);
     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
     List<BlockInfo> pending = new ArrayList<BlockInfo>();
     long totalSize = 0;
@@ -1685,8 +1692,8 @@ public class BlockManager implements BlockStatsMXBean {
       }
     }
     if(totalSize<size) {
-      iter = node.getBlockIterator(); // start from the beginning
-      for(int i=0; i<startBlock&&totalSize<size; i++) {
+      iter = node.getBlockIterator(0, storageInfos); // start from the beginning
+      for(int i = 0; i < startBlock && totalSize < size && iter.hasNext(); i++) {
         curBlock = iter.next();
         if(!curBlock.isComplete())  continue;
         if (curBlock.getNumBytes() < minBlockSize) {

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -647,6 +647,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return new BlockIterator(startBlock, getStorageInfos());
   }
 
+  /**
+   * Get iterator, which starts iterating from the specified block and storages.
+   *
+   * @param startBlock on which blocks are start iterating
+   * @param storageInfos specified storages
+   */
+  Iterator<BlockInfo> getBlockIterator(
+      final int startBlock, final DatanodeStorageInfo[] storageInfos) {
+    return new BlockIterator(startBlock, storageInfos);
+  }
+
   @VisibleForTesting
   public void incrementPendingReplicationWithoutTargets() {
     pendingReplicationWithoutTargets++;

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java

@@ -168,6 +168,11 @@ public class DatanodeStorageInfo {
     return blockContentsStale;
   }
 
+  @VisibleForTesting
+  public void setBlockContentsStale(boolean value) {
+    blockContentsStale = value;
+  }
+
   void markStaleAfterFailover() {
     heartbeatedSinceFailover = false;
     blockContentsStale = true;

+ 65 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@@ -468,4 +469,68 @@ public class TestGetBlocks {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testReadSkipStaleStorage() throws Exception {
+    final short repFactor = (short) 1;
+    final int blockNum = 64;
+    final int storageNum = 2;
+    final int fileLen = BLOCK_SIZE * blockNum;
+    final Path path = new Path("testReadSkipStaleStorage");
+    final Configuration conf = new HdfsConfiguration();
+
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .storagesPerDatanode(storageNum)
+        .build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, path, false, 1024, fileLen,
+        BLOCK_SIZE, repFactor, 0, true);
+
+    // get datanode info
+    ClientProtocol client = NameNodeProxies.createProxy(conf,
+        cluster.getFileSystem(0).getUri(),
+        ClientProtocol.class).getProxy();
+    DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL);
+
+    // get storage info
+    BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
+    DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager()
+        .getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos();
+
+    InetSocketAddress addr = new InetSocketAddress("localhost",
+        cluster.getNameNodePort());
+    NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
+        DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
+
+    // check blocks count equals to blockNum
+    BlockWithLocations[] blocks = namenode.getBlocks(
+        dataNodes[0], fileLen*2, 0, 0).getBlocks();
+    assertEquals(blockNum, blocks.length);
+
+    // calculate the block count on storage[0]
+    int count = 0;
+    for (BlockWithLocations b : blocks) {
+      for (String s : b.getStorageIDs()) {
+        if (s.equals(storageInfos[0].getStorageID())) {
+          count++;
+        }
+      }
+    }
+
+    // set storage[0] stale
+    storageInfos[0].setBlockContentsStale(true);
+    blocks = namenode.getBlocks(
+        dataNodes[0], fileLen*2, 0, 0).getBlocks();
+    assertEquals(blockNum - count, blocks.length);
+
+    // set all storage stale
+    bm0.getDatanodeManager().markAllDatanodesStale();
+    blocks = namenode.getBlocks(
+        dataNodes[0], fileLen*2, 0, 0).getBlocks();
+    assertEquals(0, blocks.length);
+  }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java

@@ -126,6 +126,7 @@ public class TestBalancerService {
     TestBalancer.initConf(conf);
     try {
       setupCluster(conf);
+      TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
       long totalCapacity = addOneDataNode(conf); // make cluster imbalanced
 
       Thread balancerThread =
@@ -193,6 +194,7 @@ public class TestBalancerService {
       cluster.transitionToActive(0);
       cluster.waitActive();
 
+      TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
       long totalCapacity = addOneDataNode(conf);
       TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
           cluster, BalancerParameters.DEFAULT);

+ 36 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java

@@ -47,12 +47,17 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
@@ -75,6 +80,26 @@ public class TestBalancerWithHANameNodes {
     TestBalancer.initTestSetup();
   }
 
+  public static void waitStoragesNoStale(MiniDFSCluster cluster,
+      ClientProtocol client, int nnIndex) throws Exception {
+    // trigger a full block report and wait all storages out of stale
+    cluster.triggerBlockReports();
+    DatanodeInfo[] dataNodes = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
+    GenericTestUtils.waitFor(() -> {
+      BlockManager bm = cluster.getNamesystem(nnIndex).getBlockManager();
+      for (DatanodeInfo dn : dataNodes) {
+        DatanodeStorageInfo[] storageInfos = bm.getDatanodeManager()
+            .getDatanode(dn.getDatanodeUuid()).getStorageInfos();
+        for (DatanodeStorageInfo s : storageInfos) {
+          if (s.areBlockContentsStale()) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, 300, 60000);
+  }
+
   /**
    * Test a cluster with even distribution, then a new empty node is added to
    * the cluster. Test start a cluster with specified number of nodes, and fills
@@ -103,13 +128,17 @@ public class TestBalancerWithHANameNodes {
       client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
           ClientProtocol.class).getProxy();
 
-      doTest(conf);
+      doTest(conf, true);
     } finally {
       cluster.shutdown();
     }
   }
 
   void doTest(Configuration conf) throws Exception {
+    doTest(conf, false);
+  }
+
+  void doTest(Configuration conf, boolean withHA) throws Exception {
     int numOfDatanodes = TEST_CAPACITIES.length;
     long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
     // fill up the cluster to be 30% full
@@ -123,6 +152,12 @@ public class TestBalancerWithHANameNodes {
       HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
           cluster.getNameNode(1));
     }
+
+    // all storages are stale after HA
+    if (withHA) {
+      waitStoragesNoStale(cluster, client, 0);
+    }
+
     // start up an empty node with the same capacity and on the same rack
     long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
     String newNodeRack = TestBalancer.RACK2; // new node's rack