Browse Source

HDFS-15159. Prevent adding same DN multiple times in PendingReconstructionBlocks. Contributed by hemanthboyina.

Ayush Saxena 5 years ago
parent
commit
8a7c54995a

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2004,6 +2004,15 @@ public class BlockManager implements BlockStatsMXBean {
       // This list includes decommissioning or corrupt nodes.
       final Set<Node> excludedNodes = new HashSet<>(rw.getContainingNodes());
 
+      // Exclude all nodes which already exists as targets for the block
+      List<DatanodeStorageInfo> targets =
+          pendingReconstruction.getTargets(rw.getBlock());
+      if (targets != null) {
+        for (DatanodeStorageInfo dn : targets) {
+          excludedNodes.add(dn.getDatanodeDescriptor());
+        }
+      }
+
       // choose replication targets: NOT HOLDING THE GLOBAL LOCK
       final BlockPlacementPolicy placementPolicy =
           placementPolicies.getPolicy(rw.getBlock().getBlockType());

+ 55 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java

@@ -32,6 +32,8 @@ import java.util.ArrayList;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Supplier;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -56,8 +58,11 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Level;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class tests the internals of PendingReconstructionBlocks.java, as well
@@ -561,4 +566,54 @@ public class TestPendingReconstruction {
       fsn.writeUnlock();
     }
   }
+
+  @Test
+  public void testPendingReConstructionBlocksForSameDN() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    DFSTestUtil.setNameNodeLogLevel(Level.DEBUG);
+    LogCapturer logs = GenericTestUtils.LogCapturer
+        .captureLogs(LoggerFactory.getLogger("BlockStateChange"));
+    BlockManager bm = cluster.getNamesystem().getBlockManager();
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      // 1. create a file
+      Path filePath = new Path("/tmp.txt");
+      DFSTestUtil.createFile(dfs, filePath, 1024, (short) 1, 0L);
+
+      // 2. disable the IBR
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.pauseIBR(dn);
+      }
+      DatanodeManager datanodeManager =
+          cluster.getNamesystem().getBlockManager().getDatanodeManager();
+      ArrayList<DatanodeDescriptor> dnList =
+          new ArrayList<DatanodeDescriptor>();
+      datanodeManager.fetchDatanodes(dnList, dnList, false);
+
+      LocatedBlock block = NameNodeAdapter
+          .getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1)
+          .get(0);
+
+      // 3. set replication as 3
+      dfs.setReplication(filePath, (short) 3);
+
+      // 4 compute replication work twice to make sure the same DN is not adding
+      // twice
+      BlockManagerTestUtil.computeAllPendingWork(bm);
+      BlockManagerTestUtil.computeAllPendingWork(bm);
+      BlockManagerTestUtil.updateState(bm);
+
+      // 5 capture the logs and verify the reconstruction work for block for
+      // same DN
+      String blockName =
+          "to replicate " + block.getBlock().getLocalBlock().toString();
+      assertEquals(1, StringUtils.countMatches(logs.getOutput(), blockName));
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }