Browse Source

HDFS-17094. EC: Fix bug in block recovery when there are stale datanodes. (#5854)

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Signed-off-by: Tao Li <tomscut@apache.org>
zhangshuyan 1 year ago
parent
commit
708ee3b4b8

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java

@@ -147,6 +147,14 @@ public class BlockUnderConstructionFeature {
     return indices;
   }
 
+  public byte[] getBlockIndicesForSpecifiedStorages(List<Integer> storageIdx) {
+    byte[] indices = new byte[storageIdx.size()];
+    for (int i = 0; i < indices.length; i++) {
+      indices[i] = BlockIdManager.getBlockIndex(replicas[storageIdx.get(i)]);
+    }
+    return indices;
+  }
+
   public int getNumExpectedLocations() {
     return replicas.length;
   }

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1701,9 +1701,11 @@ public class DatanodeManager {
       // Skip stale nodes during recovery
       final List<DatanodeStorageInfo> recoveryLocations =
           new ArrayList<>(storages.length);
-      for (DatanodeStorageInfo storage : storages) {
-        if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
-          recoveryLocations.add(storage);
+      final List<Integer> storageIdx = new ArrayList<>(storages.length);
+      for (int i = 0; i < storages.length; ++i) {
+        if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
+          recoveryLocations.add(storages[i]);
+          storageIdx.add(i);
         }
       }
       // If we are performing a truncate recovery than set recovery fields
@@ -1736,7 +1738,8 @@ public class DatanodeManager {
         rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
             uc.getBlockRecoveryId());
         if (b.isStriped()) {
-          rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
+          rBlock = new RecoveringStripedBlock(rBlock,
+              uc.getBlockIndicesForSpecifiedStorages(storageIdx),
               ((BlockInfoStriped) b).getErasureCodingPolicy());
         }
       }

+ 60 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java

@@ -30,6 +30,10 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -188,6 +192,62 @@ public class TestLeaseRecoveryStriped {
     }
   }
 
+  /**
+   * Test lease recovery for EC policy when one internal block located on
+   * stale datanode.
+   */
+  @Test
+  public void testLeaseRecoveryWithStaleDataNode() {
+    LOG.info("blockLengthsSuite: " +
+        Arrays.toString(blockLengthsSuite));
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+
+    for (int i = 0; i < blockLengthsSuite.length; i++) {
+      BlockLengths blockLengths = blockLengthsSuite[i];
+      try {
+        writePartialBlocks(blockLengths.getBlockLengths());
+
+        // Get block info for the last block and mark corresponding datanode
+        // as stale.
+        LocatedBlock locatedblock =
+            TestInterDatanodeProtocol.getLastLocatedBlock(
+                dfs.dfs.getNamenode(), p.toString());
+        DatanodeInfo firstDataNode = locatedblock.getLocations()[0];
+        DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
+            .getBlockManager().getDatanodeManager()
+            .getDatanode(firstDataNode);
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(
+            cluster.getDataNode(dnDes.getIpcPort()), true);
+        DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
+
+        long[] longArray = new long[blockLengths.getBlockLengths().length - 1];
+        for (int j = 0; j < longArray.length; ++j) {
+          longArray[j] = blockLengths.getBlockLengths()[j + 1];
+        }
+        int safeLength = (int) StripedBlockUtil.getSafeLength(ecPolicy,
+            longArray);
+        int checkDataLength = Math.min(testFileLength, safeLength);
+        recoverLease();
+        List<Long> oldGS = new ArrayList<>();
+        oldGS.add(1001L);
+        StripedFileTestUtil.checkData(dfs, p, checkDataLength,
+            new ArrayList<>(), oldGS, blockGroupSize);
+
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(
+            cluster.getDataNode(dnDes.getIpcPort()), false);
+        DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);
+
+      } catch (Throwable e) {
+        String msg = "failed testCase at i=" + i + ", blockLengths="
+            + blockLengths + "\n"
+            + StringUtils.stringifyException(e);
+        Assert.fail(msg);
+      }
+    }
+  }
+
   @Test
   public void testSafeLength() {
     checkSafeLength(0, 0); // Length of: 0