Browse Source

HDFS-17003. Erasure Coding: invalidate wrong block after reporting bad blocks from datanode (#5643). Contributed by hfutatzhanghb.

Reviewed-by: Stephen O'Donnell <sodonnel@apache.org>
Reviewed-by: zhangshuyan <zqingchai@gmail.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
(cherry picked from commit 0e6bd09ae3c79d5869ffd9ab6ad579786b1f3cd7)
hfutatzhanghb 1 year ago
parent
commit
a804f37ed5

+ 16 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -3634,9 +3634,24 @@ public class BlockManager implements BlockStatsMXBean {
     // ConcurrentModificationException, when the block is removed from the node
     DatanodeDescriptor[] nodesCopy =
         nodes.toArray(new DatanodeDescriptor[nodes.size()]);
+
+    DatanodeStorageInfo[] storages = null;
+    if (blk.isStriped()) {
+      storages = getStorages(blk);
+    }
+
     for (DatanodeDescriptor node : nodesCopy) {
+      Block blockToInvalidate = reported;
+      if (storages != null && blk.isStriped()) {
+        for (DatanodeStorageInfo s : storages) {
+          if (s.getDatanodeDescriptor().equals(node)) {
+            blockToInvalidate = getBlockOnStorage(blk, s);
+            break;
+          }
+        }
+      }
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
+        if (!invalidateBlock(new BlockToMarkCorrupt(blockToInvalidate, blk, null,
             Reason.ANY), node, numberReplicas)) {
           removedFromBlocksMap = false;
         }

+ 19 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java

@@ -73,11 +73,11 @@ abstract public class ReadStripedFileWithDecodingHelper {
   public static MiniDFSCluster initializeCluster() throws IOException {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
-        0);
+        2);
     MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(NUM_DATANODES)
+        .numDataNodes(NUM_DATANODES + 3)
         .build();
     myCluster.getFileSystem().enableErasureCodingPolicy(
         StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -108,6 +108,22 @@ abstract public class ReadStripedFileWithDecodingHelper {
     return -1;
   }
 
+  // The index begins from 1.
+  public static int findDataNodeAtIndex(MiniDFSCluster cluster,
+      DistributedFileSystem dfs, Path file, long length, int index) throws IOException {
+    BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length);
+    String name = (locs[0].getNames())[index - 1];
+    int dnIndex = 0;
+    for (DataNode dn : cluster.getDataNodes()) {
+      int port = dn.getXferPort();
+      if (name.contains(Integer.toString(port))) {
+        return dnIndex;
+      }
+      dnIndex++;
+    }
+    return -1;
+  }
+
   /**
    * Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS.
    * Input for parameterized tests classes.

+ 126 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +51,7 @@ import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.BLOCK_SIZ
 import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
 import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
 import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findDataNodeAtIndex;
 import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
 import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
 import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
@@ -96,7 +98,7 @@ public class TestReadStripedFileWithDecoding {
         .get(0);
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
         CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
-    // find the first block file
+    // Find the first block file.
     File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
     File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
     Assert.assertTrue("Block file does not exist", blkFile.exists());
@@ -169,6 +171,129 @@ public class TestReadStripedFileWithDecoding {
     }
   }
 
+  /**
+   * This unit test try to cover the below situation:
+   * Suppose we have an EC file with RS(d,p) policy and block group id
+   * is blk_-9223372036845119810_1920002.
+   * If the first and second data block in this ec block group are corrupted,
+   * meanwhile we read this EC file.
+   * It will trigger reportBadBlock RPC and
+   * add the blk_-9223372036845119810_1920002
+   * and blk_-9223372036845119809_1920002 blocks to corruptReplicas.
+   * It will also reconstruct the two blocks and send IBR to namenode,
+   * then execute BlockManager#addStoredBlock and
+   * invalidateCorruptReplicas method. Suppose we first receive the IBR of
+   * blk_-9223372036845119810_1920002, then in invalidateCorruptReplicas method,
+   * it will only invalidate 9223372036845119809_1920002 on the two datanodes contains
+   * the two corrupt blocks.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCorruptionECBlockInvalidate() throws Exception {
+
+    final Path file = new Path("/invalidate_corrupted");
+    final int length = BLOCK_SIZE * NUM_DATA_UNITS;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(dfs, file, bytes);
+
+    int dnIndex = findFirstDataNode(cluster, dfs, file,
+        CELL_SIZE * NUM_DATA_UNITS);
+    int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file,
+        CELL_SIZE * NUM_DATA_UNITS, 2);
+    Assert.assertNotEquals(-1, dnIndex);
+    Assert.assertNotEquals(-1, dnIndex2);
+
+    LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
+        .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
+        .get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
+
+    final Block b = blks[0].getBlock().getLocalBlock();
+    final Block b2 = blks[1].getBlock().getLocalBlock();
+
+    // Find the first block file.
+    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
+    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile.exists());
+    // Corrupt the block file.
+    LOG.info("Deliberately corrupting file " + blkFile.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile)) {
+      out.write("corruption".getBytes());
+      out.flush();
+    }
+
+    // Find the second block file.
+    File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0);
+    File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile2.exists());
+    // Corrupt the second block file.
+    LOG.info("Deliberately corrupting file " + blkFile2.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile2)) {
+      out.write("corruption".getBytes());
+      out.flush();
+    }
+
+    // Disable the heartbeat from DN so that the corrupted block record is kept
+    // in NameNode.
+    for (DataNode dataNode : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
+    }
+    try {
+      // Do stateful read.
+      StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
+          ByteBuffer.allocate(1024));
+
+      // Check whether the corruption has been reported to the NameNode.
+      final FSNamesystem ns = cluster.getNamesystem();
+      final BlockManager bm = ns.getBlockManager();
+      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
+          .asFile().getBlocks())[0];
+      GenericTestUtils.waitFor(() -> {
+        if (bm.getCorruptReplicas(blockInfo) == null) {
+          return false;
+        }
+        return bm.getCorruptReplicas(blockInfo).size() == 2;
+      }, 250, 60000);
+      // Double check.
+      Assert.assertEquals(2, bm.getCorruptReplicas(blockInfo).size());
+
+      DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId());
+
+      DatanodeDescriptor dnd2 =
+          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId());
+
+      for (DataNode datanode : cluster.getDataNodes()) {
+        if (!datanode.getDatanodeUuid().equals(dnd.getDatanodeUuid()) &&
+            !datanode.getDatanodeUuid().equals(dnd2.getDatanodeUuid())) {
+          DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
+        }
+      }
+
+      GenericTestUtils.waitFor(() -> {
+        return bm.containsInvalidateBlock(
+            blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b);
+      }, 250, 60000);
+      Assert.assertTrue(bm.containsInvalidateBlock(
+          blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
+
+      GenericTestUtils.waitFor(() -> {
+        return bm.containsInvalidateBlock(
+            blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2);
+      }, 250, 60000);
+
+      Assert.assertTrue(bm.containsInvalidateBlock(
+          blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2));
+
+    } finally {
+      for (DataNode datanode : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
+      }
+    }
+  }
+
   @Test
   public void testMoreThanOneCorruptedBlock() throws IOException {
     final Path file = new Path("/corrupted");