Prechádzať zdrojové kódy

HDFS-17477. IncrementalBlockReport race condition additional edge cases (#6748)

dannytbecker 1 rok pred
rodič
commit
0c35cf0982

+ 13 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -3263,6 +3263,7 @@ public class BlockManager implements BlockStatsMXBean {
     for (BlockReportReplica iblk : report) {
       ReplicaState reportedState = iblk.getState();
 
+      removeQueuedBlock(storageInfo, iblk);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initial report of block {} on {} size {} replicaState = {}",
             iblk.getBlockName(), storageInfo.getDatanodeDescriptor(),
@@ -3348,6 +3349,7 @@ public class BlockManager implements BlockStatsMXBean {
     // scan the report and process newly reported blocks
     for (BlockReportReplica iblk : newReport) {
       ReplicaState iState = iblk.getState();
+      removeQueuedBlock(storageInfo, iblk);
       LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
           iblk.getNumBytes(), iState);
       BlockInfo storedBlock = processReportedBlock(storageInfo,
@@ -3416,7 +3418,6 @@ public class BlockManager implements BlockStatsMXBean {
 
     LOG.debug("Reported block {} on {} size {} replicaState = {}", block, dn,
         block.getNumBytes(), reportedState);
-
     if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
@@ -3496,6 +3497,16 @@ public class BlockManager implements BlockStatsMXBean {
     pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
   }
 
+  /**
+   * Queue the given reported block for later processing in the
+   * standby node. @see PendingDataNodeMessages.
+   */
+  private void removeQueuedBlock(DatanodeStorageInfo storageInfo, Block block) {
+    LOG.debug("Removing queued block {} from datanode {} from pending queue.",
+        block, storageInfo.getDatanodeDescriptor());
+    pendingDNMessages.removeQueuedBlock(storageInfo, block);
+  }
+
   /**
    * Try to process any messages that were previously queued for the given
    * block. This is called from FSEditLogLoader whenever a block's state
@@ -4558,6 +4569,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
+    removeQueuedBlock(storageInfo, block);
     LOG.debug("Reported block {} on {} size {} replicaState = {}",
         block, node, block.getNumBytes(), reportedState);
 

+ 15 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java

@@ -95,11 +95,24 @@ class PendingDataNodeMessages {
   
   void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
+    if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
+      Block blkId = new Block(BlockIdManager.convertToStripedID(block
+          .getBlockId()));
+      getBlockQueue(blkId).add(
+          new ReportedBlockInfo(storageInfo, new Block(block), reportedState));
+    } else {
+      block = new Block(block);
+      getBlockQueue(block).add(
+          new ReportedBlockInfo(storageInfo, block, reportedState));
+    }
+    count++;
+  }
+
+  void removeQueuedBlock(DatanodeStorageInfo storageInfo, Block block) {
     if (storageInfo == null || block == null) {
       return;
     }
     block = new Block(block);
-    long genStamp = block.getGenerationStamp();
     Queue<ReportedBlockInfo> queue = null;
     if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
       Block blkId = new Block(BlockIdManager.convertToStripedID(block
@@ -114,12 +127,9 @@ class PendingDataNodeMessages {
     // the old reported block will be processed and marked as corrupt by the ANN.
     // See HDFS-17453
     int size = queue.size();
-    if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) &&
-        rbi.block.getGenerationStamp() <= genStamp)) {
+    if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo))) {
       count -= (size - queue.size());
     }
-    queue.add(new ReportedBlockInfo(storageInfo, block, reportedState));
-    count++;
   }
   
   /**

+ 40 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java

@@ -67,14 +67,16 @@ public class TestPendingDataNodeMessages {
     msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED);
     msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED);
     List<ReportedBlockInfo> rbis = Arrays.asList(
+        new ReportedBlockInfo(storageInfo1, block1Gs1, ReplicaState.FINALIZED),
+        new ReportedBlockInfo(storageInfo2, block1Gs1, ReplicaState.FINALIZED),
         new ReportedBlockInfo(storageInfo1, block1Gs2, ReplicaState.FINALIZED),
         new ReportedBlockInfo(storageInfo2, block1Gs2, ReplicaState.FINALIZED));
 
-    assertEquals(2, msgs.count());
+    assertEquals(4, msgs.count());
     
     // Nothing queued yet for block 2
     assertNull(msgs.takeBlockQueue(block2Gs1));
-    assertEquals(2, msgs.count());
+    assertEquals(4, msgs.count());
     
     Queue<ReportedBlockInfo> q =
       msgs.takeBlockQueue(block1Gs2DifferentInstance);
@@ -123,4 +125,40 @@ public class TestPendingDataNodeMessages {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testRemoveQueuedBlock() {
+    DatanodeDescriptor fakeDN1 = DFSTestUtil.getDatanodeDescriptor(
+        "localhost", 8898, "/default-rack");
+    DatanodeDescriptor fakeDN2 = DFSTestUtil.getDatanodeDescriptor(
+        "localhost", 8899, "/default-rack");
+    DatanodeStorage storage1 = new DatanodeStorage("STORAGE_ID_1");
+    DatanodeStorage storage2 = new DatanodeStorage("STORAGE_ID_2");
+    DatanodeStorageInfo storageInfo1 = new DatanodeStorageInfo(fakeDN1, storage1);
+    DatanodeStorageInfo storageInfo2 = new DatanodeStorageInfo(fakeDN2, storage2);
+    msgs.enqueueReportedBlock(storageInfo1, block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(storageInfo2, block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED);
+    List<ReportedBlockInfo> rbis = Arrays.asList(
+        new ReportedBlockInfo(storageInfo2, block1Gs1, ReplicaState.FINALIZED),
+        new ReportedBlockInfo(storageInfo2, block1Gs2, ReplicaState.FINALIZED));
+
+    assertEquals(4, msgs.count());
+
+    // Nothing queued yet for block 2
+    assertNull(msgs.takeBlockQueue(block2Gs1));
+    assertEquals(4, msgs.count());
+
+    msgs.removeQueuedBlock(storageInfo1, block1Gs1);
+    Queue<ReportedBlockInfo> q =
+        msgs.takeBlockQueue(block1Gs2DifferentInstance);
+    assertEquals(Joiner.on(",").join(rbis),
+        Joiner.on(",").join(q));
+    assertEquals(0, msgs.count());
+
+    // Should be null if we pull again;
+    assertNull(msgs.takeBlockQueue(block1Gs2));
+    assertEquals(0, msgs.count());
+  }
 }

+ 199 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java

@@ -27,14 +27,18 @@ import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.mockito.invocation.InvocationOnMock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -246,6 +250,7 @@ public class TestIncrementalBlockReports {
 
       NameNode nn1 = cluster.getNameNode(0);
       NameNode nn2 = cluster.getNameNode(1);
+      BlockManager bm2 = nn2.getNamesystem().getBlockManager();
       FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
       List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
       List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
@@ -262,7 +267,6 @@ public class TestIncrementalBlockReports {
               }
             }
           }
-          ibrsToStandby.add(inv);
           return null;
         }).when(nnSpy).blockReceivedAndDeleted(
             any(DatanodeRegistration.class),
@@ -289,8 +293,9 @@ public class TestIncrementalBlockReports {
         }
       }
 
-      assertEquals("There should be 3 pending messages from DNs", 3,
-          nn2.getNamesystem().getBlockManager().getPendingDataNodeMessageCount());
+      GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0,
+          1000, 30000,
+          "There should be 0 pending DN messages");
       ibrsToStandby.clear();
       // We need to trigger another edit log roll so that the pendingDNMessages
       // are processed.
@@ -308,6 +313,9 @@ public class TestIncrementalBlockReports {
       }
       ibrsToStandby.clear();
       ibrPhaser.arriveAndDeregister();
+      GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0,
+          1000, 30000,
+          "There should be 0 pending DN messages");
       ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
       HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
       LOG.info("==================================");
@@ -325,4 +333,192 @@ public class TestIncrementalBlockReports {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testIBRRaceCondition2() throws Exception {
+    cluster.shutdown();
+    Configuration conf = new Configuration();
+    HAUtil.setAllowStandbyReads(conf, true);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      BlockManager bm2 = nn2.getNamesystem().getBlockManager();
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
+      List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
+      Phaser ibrPhaser = new Phaser(1);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
+        doAnswer((inv) -> {
+          for (StorageReceivedDeletedBlocks srdb :
+              inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
+            for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
+              if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
+                ibrsToStandby.add(inv);
+                ibrPhaser.arriveAndDeregister();
+              }
+            }
+          }
+          return null;
+        }).when(nnSpy).blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+        spies.add(nnSpy);
+      }
+
+      LOG.info("==================================");
+      // Force the DNs to delay report to the SNN
+      ibrPhaser.bulkRegister(9);
+      DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
+      DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
+      DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
+      HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
+      // SNN has caught up to the latest edit log so we send the IBRs to SNN
+      int phase = ibrPhaser.arrive();
+      ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS);
+      for (InvocationOnMock sendIBRs : ibrsToStandby) {
+        try {
+          sendIBRs.callRealMethod();
+        } catch (Throwable t) {
+          LOG.error("Exception thrown while calling sendIBRs: ", t);
+        }
+      }
+
+      GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0,
+          1000, 30000,
+          "There should be 0 pending DN messages");
+      ibrsToStandby.clear();
+      ibrPhaser.arriveAndDeregister();
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
+      HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
+      LOG.info("==================================");
+
+      // Trigger an active switch to force SNN to mark blocks as corrupt if they
+      // have a bad genstamp in the pendingDNMessages queue.
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      cluster.waitActive(1);
+
+      assertEquals("There should not be any corrupt replicas", 0,
+          nn2.getNamesystem().getBlockManager()
+              .numCorruptReplicas(block.getLocalBlock()));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testIBRRaceCondition3() throws Exception {
+    cluster.shutdown();
+    Configuration conf = new Configuration();
+    HAUtil.setAllowStandbyReads(conf, true);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      BlockManager bm2 = nn2.getNamesystem().getBlockManager();
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      LinkedHashMap<Long, List<InvocationOnMock>> ibrsToStandby =
+          new LinkedHashMap<>();
+      AtomicLong lowestGenStamp = new AtomicLong(Long.MAX_VALUE);
+      List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
+      Phaser ibrPhaser = new Phaser(1);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
+        doAnswer((inv) -> {
+          for (StorageReceivedDeletedBlocks srdb :
+              inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
+            for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
+              if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
+                long genStamp = block.getBlock().getGenerationStamp();
+                ibrsToStandby.putIfAbsent(genStamp, new ArrayList<>());
+                ibrsToStandby.get(genStamp).add(inv);
+                lowestGenStamp.getAndUpdate((prev) -> Math.min(prev, genStamp));
+                ibrPhaser.arriveAndDeregister();
+              }
+            }
+          }
+          return null;
+        }).when(nnSpy).blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+        spies.add(nnSpy);
+      }
+
+      LOG.info("==================================");
+      // Force the DNs to delay report to the SNN
+      ibrPhaser.bulkRegister(9);
+      DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
+      DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
+      DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
+      HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
+      // SNN has caught up to the latest edit log so we send the IBRs to SNN
+      int phase = ibrPhaser.arrive();
+      ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS);
+      ibrsToStandby.forEach((genStamp, ibrs) -> {
+        if (lowestGenStamp.get() != genStamp) {
+          ibrs.removeIf(inv -> {
+            try {
+              inv.callRealMethod();
+            } catch (Throwable t) {
+              LOG.error("Exception thrown while calling sendIBRs: ", t);
+            }
+            return true;
+          });
+        }
+      });
+
+      GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0,
+          1000, 30000,
+          "There should be 0 pending DN messages");
+      ibrPhaser.arriveAndDeregister();
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
+      HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
+
+      // Send old ibrs to simulate actual stale or corrupt DNs
+      for (InvocationOnMock sendIBR : ibrsToStandby.get(lowestGenStamp.get())) {
+        try {
+          sendIBR.callRealMethod();
+        } catch (Throwable t) {
+          LOG.error("Exception thrown while calling sendIBRs: ", t);
+        }
+      }
+
+      GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 3,
+          1000, 30000,
+          "There should be 0 pending DN messages");
+      LOG.info("==================================");
+
+      // Trigger an active switch to force SNN to mark blocks as corrupt if they
+      // have a bad genstamp in the pendingDNMessages queue.
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      cluster.waitActive(1);
+
+      assertEquals("There should be 1 corrupt replica", 1,
+          nn2.getNamesystem().getBlockManager()
+              .numCorruptReplicas(block.getLocalBlock()));
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }