Bläddra i källkod

HDFS-14768. EC : Busy DN replica should be consider in live replica check. Contributed by guojh.

Surendra Singh Lilhore 5 år sedan
förälder
incheckning
02009c3bb7

+ 22 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -854,7 +854,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
     }
     // source node returned is not used
     // source node returned is not used
     chooseSourceDatanodes(blockInfo, containingNodes,
     chooseSourceDatanodes(blockInfo, containingNodes,
-        containingLiveReplicasNodes, numReplicas,
+        containingLiveReplicasNodes, numReplicas, new ArrayList<Byte>(),
         new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
         new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
     
     
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
@@ -2024,9 +2024,10 @@ public class BlockManager implements BlockStatsMXBean {
     List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
     List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
     NumberReplicas numReplicas = new NumberReplicas();
     NumberReplicas numReplicas = new NumberReplicas();
     List<Byte> liveBlockIndices = new ArrayList<>();
     List<Byte> liveBlockIndices = new ArrayList<>();
+    List<Byte> liveBusyBlockIndices = new ArrayList<>();
     final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
     final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
         containingNodes, liveReplicaNodes, numReplicas,
         containingNodes, liveReplicaNodes, numReplicas,
-        liveBlockIndices, priority);
+        liveBlockIndices, liveBusyBlockIndices, priority);
     short requiredRedundancy = getExpectedLiveRedundancyNum(block,
     short requiredRedundancy = getExpectedLiveRedundancyNum(block,
         numReplicas);
         numReplicas);
     if(srcNodes == null || srcNodes.length == 0) {
     if(srcNodes == null || srcNodes.length == 0) {
@@ -2079,9 +2080,13 @@ public class BlockManager implements BlockStatsMXBean {
       for (int i = 0 ; i < liveBlockIndices.size(); i++) {
       for (int i = 0 ; i < liveBlockIndices.size(); i++) {
         indices[i] = liveBlockIndices.get(i);
         indices[i] = liveBlockIndices.get(i);
       }
       }
+      byte[] busyIndices = new byte[liveBusyBlockIndices.size()];
+      for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
+        busyIndices[i] = liveBusyBlockIndices.get(i);
+      }
       return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
       return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
           containingNodes, liveReplicaNodes, additionalReplRequired,
           containingNodes, liveReplicaNodes, additionalReplRequired,
-          priority, indices);
+          priority, indices, busyIndices);
     } else {
     } else {
       return new ReplicationWork(block, bc, srcNodes,
       return new ReplicationWork(block, bc, srcNodes,
           containingNodes, liveReplicaNodes, additionalReplRequired,
           containingNodes, liveReplicaNodes, additionalReplRequired,
@@ -2293,8 +2298,8 @@ public class BlockManager implements BlockStatsMXBean {
   DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
   DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
       List<DatanodeDescriptor> containingNodes,
       List<DatanodeDescriptor> containingNodes,
       List<DatanodeStorageInfo> nodesContainingLiveReplicas,
       List<DatanodeStorageInfo> nodesContainingLiveReplicas,
-      NumberReplicas numReplicas,
-      List<Byte> liveBlockIndices, int priority) {
+      NumberReplicas numReplicas, List<Byte> liveBlockIndices,
+      List<Byte> liveBusyBlockIndices, int priority) {
     containingNodes.clear();
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     nodesContainingLiveReplicas.clear();
     List<DatanodeDescriptor> srcNodes = new ArrayList<>();
     List<DatanodeDescriptor> srcNodes = new ArrayList<>();
@@ -2347,12 +2352,6 @@ public class BlockManager implements BlockStatsMXBean {
         continue;
         continue;
       }
       }
 
 
-      if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
-          && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
-          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
-        continue; // already reached replication limit
-      }
-
       // for EC here need to make sure the numReplicas replicates state correct
       // for EC here need to make sure the numReplicas replicates state correct
       // because in the scheduleReconstruction it need the numReplicas to check
       // because in the scheduleReconstruction it need the numReplicas to check
       // whether need to reconstruct the ec internal block
       // whether need to reconstruct the ec internal block
@@ -2364,7 +2363,19 @@ public class BlockManager implements BlockStatsMXBean {
             liveBitSet, decommissioningBitSet, blockIndex);
             liveBitSet, decommissioningBitSet, blockIndex);
       }
       }
 
 
+      if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
+          && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
+          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
+        if (isStriped && state == StoredReplicaState.LIVE) {
+          liveBusyBlockIndices.add(blockIndex);
+        }
+        continue; // already reached replication limit
+      }
+
       if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
       if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
+        if (isStriped && state == StoredReplicaState.LIVE) {
+          liveBusyBlockIndices.add(blockIndex);
+        }
         continue;
         continue;
       }
       }
 
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -634,7 +634,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return new BlockIterator(startBlock, getStorageInfos());
     return new BlockIterator(startBlock, getStorageInfos());
   }
   }
 
 
-  void incrementPendingReplicationWithoutTargets() {
+  @VisibleForTesting
+  public void incrementPendingReplicationWithoutTargets() {
     pendingReplicationWithoutTargets++;
     pendingReplicationWithoutTargets++;
   }
   }
 
 

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java

@@ -31,6 +31,7 @@ import java.util.Set;
 
 
 class ErasureCodingWork extends BlockReconstructionWork {
 class ErasureCodingWork extends BlockReconstructionWork {
   private final byte[] liveBlockIndicies;
   private final byte[] liveBlockIndicies;
+  private final byte[] liveBusyBlockIndicies;
   private final String blockPoolId;
   private final String blockPoolId;
 
 
   public ErasureCodingWork(String blockPoolId, BlockInfo block,
   public ErasureCodingWork(String blockPoolId, BlockInfo block,
@@ -38,12 +39,13 @@ class ErasureCodingWork extends BlockReconstructionWork {
       DatanodeDescriptor[] srcNodes,
       DatanodeDescriptor[] srcNodes,
       List<DatanodeDescriptor> containingNodes,
       List<DatanodeDescriptor> containingNodes,
       List<DatanodeStorageInfo> liveReplicaStorages,
       List<DatanodeStorageInfo> liveReplicaStorages,
-      int additionalReplRequired,
-      int priority, byte[] liveBlockIndicies) {
+      int additionalReplRequired, int priority,
+      byte[] liveBlockIndicies, byte[] liveBusyBlockIndicies) {
     super(block, bc, srcNodes, containingNodes,
     super(block, bc, srcNodes, containingNodes,
         liveReplicaStorages, additionalReplRequired, priority);
         liveReplicaStorages, additionalReplRequired, priority);
     this.blockPoolId = blockPoolId;
     this.blockPoolId = blockPoolId;
     this.liveBlockIndicies = liveBlockIndicies;
     this.liveBlockIndicies = liveBlockIndicies;
+    this.liveBusyBlockIndicies = liveBusyBlockIndicies;
     LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
     LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
         block);
         block);
   }
   }
@@ -70,13 +72,17 @@ class ErasureCodingWork extends BlockReconstructionWork {
    */
    */
   private boolean hasAllInternalBlocks() {
   private boolean hasAllInternalBlocks() {
     final BlockInfoStriped block = (BlockInfoStriped) getBlock();
     final BlockInfoStriped block = (BlockInfoStriped) getBlock();
-    if (getSrcNodes().length < block.getRealTotalBlockNum()) {
+    if (liveBlockIndicies.length
+        + liveBusyBlockIndicies.length < block.getRealTotalBlockNum()) {
       return false;
       return false;
     }
     }
     BitSet bitSet = new BitSet(block.getTotalBlockNum());
     BitSet bitSet = new BitSet(block.getTotalBlockNum());
     for (byte index : liveBlockIndicies) {
     for (byte index : liveBlockIndicies) {
       bitSet.set(index);
       bitSet.set(index);
     }
     }
+    for (byte busyIndex: liveBusyBlockIndicies) {
+      bitSet.set(busyIndex);
+    }
     for (int i = 0; i < block.getRealDataBlockNum(); i++) {
     for (int i = 0; i < block.getRealDataBlockNum(); i++) {
       if (!bitSet.get(i)) {
       if (!bitSet.get(i)) {
         return false;
         return false;

+ 66 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java

@@ -40,14 +40,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -81,6 +84,9 @@ public class TestDecommissionWithStriped {
   // replication interval
   // replication interval
   private static final int NAMENODE_REPLICATION_INTERVAL = 1;
   private static final int NAMENODE_REPLICATION_INTERVAL = 1;
 
 
+  private int replicationStreamsHardLimit =
+      DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;
+
   private Path decommissionDir;
   private Path decommissionDir;
   private Path hostsFile;
   private Path hostsFile;
   private Path excludeFile;
   private Path excludeFile;
@@ -273,7 +279,6 @@ public class TestDecommissionWithStriped {
         fsn.getNumDecomLiveDataNodes());
         fsn.getNumDecomLiveDataNodes());
 
 
     // Ensure decommissioned datanode is not automatically shutdown
     // Ensure decommissioned datanode is not automatically shutdown
-    DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
     assertEquals("All datanodes must be alive", numDNs,
     assertEquals("All datanodes must be alive", numDNs,
         client.datanodeReport(DatanodeReportType.LIVE).length);
         client.datanodeReport(DatanodeReportType.LIVE).length);
 
 
@@ -283,6 +288,65 @@ public class TestDecommissionWithStriped {
     cleanupFile(dfs, ecFile);
     cleanupFile(dfs, ecFile);
   }
   }
 
 
+  /**
+   * DN decommission shouldn't reconstruction busy DN block.
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testDecommissionWithBusyNode() throws Exception {
+    byte busyDNIndex = 1;
+    byte decommisionDNIndex = 0;
+    //1. create EC file
+    final Path ecFile = new Path(ecDir, "testDecommissionWithBusyNode");
+    int writeBytes = cellSize * dataBlocks;
+    writeStripedFile(dfs, ecFile, writeBytes);
+    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);
+
+    //2. make once DN busy
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(ecFile.toString()).asFile();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
+    DatanodeDescriptor busyNode =
+        dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
+    for (int j = 0; j < replicationStreamsHardLimit; j++) {
+      busyNode.incrementPendingReplicationWithoutTargets();
+    }
+
+    //3. decomission one node
+    List<DatanodeInfo> decommisionNodes = new ArrayList<>();
+    decommisionNodes.add(
+        dnStorageInfos[decommisionDNIndex].getDatanodeDescriptor());
+    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+    assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes());
+
+    //4. wait for decommission block to replicate
+    Thread.sleep(3000);
+    DatanodeStorageInfo[] newDnStorageInfos = bm.getStorages(firstBlock);
+    Assert.assertEquals("Busy DN shouldn't be reconstructed",
+        dnStorageInfos[busyDNIndex].getStorageID(),
+        newDnStorageInfos[busyDNIndex].getStorageID());
+
+    //5. check decommission DN block index, it should be reconstructed again
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        ecFile.toString(), 0, writeBytes);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    int decommissionBlockIndexCount = 0;
+    for (byte index : bg.getBlockIndices()) {
+      if (index == decommisionDNIndex) {
+        decommissionBlockIndexCount++;
+      }
+    }
+
+    Assert.assertEquals("Decommission DN block should be reconstructed", 2,
+        decommissionBlockIndexCount);
+
+    FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
+    Assert.assertTrue("Checksum mismatches!",
+        fileChecksum1.equals(fileChecksum2));
+  }
+
   /**
   /**
    * Tests to verify that the file checksum should be able to compute after the
    * Tests to verify that the file checksum should be able to compute after the
    * decommission operation.
    * decommission operation.

+ 14 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -660,6 +660,7 @@ public class TestBlockManager {
             liveNodes,
             liveNodes,
             new NumberReplicas(),
             new NumberReplicas(),
             new ArrayList<Byte>(),
             new ArrayList<Byte>(),
+            new ArrayList<Byte>(),
             LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
             LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
 
 
     assertEquals("Does not choose a source node for a less-than-highest-priority"
     assertEquals("Does not choose a source node for a less-than-highest-priority"
@@ -671,6 +672,7 @@ public class TestBlockManager {
             liveNodes,
             liveNodes,
             new NumberReplicas(),
             new NumberReplicas(),
             new ArrayList<Byte>(),
             new ArrayList<Byte>(),
+            new ArrayList<Byte>(),
             LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length);
             LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length);
 
 
     // Increase the replication count to test replication count > hard limit
     // Increase the replication count to test replication count > hard limit
@@ -685,6 +687,7 @@ public class TestBlockManager {
             liveNodes,
             liveNodes,
             new NumberReplicas(),
             new NumberReplicas(),
             new ArrayList<Byte>(),
             new ArrayList<Byte>(),
+            new ArrayList<Byte>(),
             LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
             LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
   }
   }
 
 
@@ -730,13 +733,15 @@ public class TestBlockManager {
     List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
     List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
     NumberReplicas numReplicas = new NumberReplicas();
     NumberReplicas numReplicas = new NumberReplicas();
     List<Byte> liveBlockIndices = new ArrayList<>();
     List<Byte> liveBlockIndices = new ArrayList<>();
+    List<Byte> liveBusyBlockIndices = new ArrayList<>();
 
 
     bm.chooseSourceDatanodes(
     bm.chooseSourceDatanodes(
             aBlockInfoStriped,
             aBlockInfoStriped,
             cntNodes,
             cntNodes,
             liveNodes,
             liveNodes,
             numReplicas, liveBlockIndices,
             numReplicas, liveBlockIndices,
-            LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
+            liveBusyBlockIndices,
+            LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
 
 
     assertEquals("Choose the source node for reconstruction with one node reach"
     assertEquals("Choose the source node for reconstruction with one node reach"
             + " the MAX maxReplicationStreams, the numReplicas still return the"
             + " the MAX maxReplicationStreams, the numReplicas still return the"
@@ -791,12 +796,14 @@ public class TestBlockManager {
         new LinkedList<DatanodeStorageInfo>();
         new LinkedList<DatanodeStorageInfo>();
     NumberReplicas numReplicas = new NumberReplicas();
     NumberReplicas numReplicas = new NumberReplicas();
     List<Byte> liveBlockIndices = new ArrayList<>();
     List<Byte> liveBlockIndices = new ArrayList<>();
+    List<Byte> liveBusyBlockIndices = new ArrayList<>();
 
 
     bm.chooseSourceDatanodes(
     bm.chooseSourceDatanodes(
         aBlockInfoStriped,
         aBlockInfoStriped,
         containingNodes,
         containingNodes,
         nodesContainingLiveReplicas,
         nodesContainingLiveReplicas,
         numReplicas, liveBlockIndices,
         numReplicas, liveBlockIndices,
+        liveBusyBlockIndices,
         LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
         LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
     assertEquals("There are 5 live replicas in " +
     assertEquals("There are 5 live replicas in " +
             "[ds2, ds3, ds4, ds5, ds6] datanodes ",
             "[ds2, ds3, ds4, ds5, ds6] datanodes ",
@@ -828,7 +835,9 @@ public class TestBlockManager {
             bm.getStoredBlock(aBlock),
             bm.getStoredBlock(aBlock),
             cntNodes,
             cntNodes,
             liveNodes,
             liveNodes,
-            new NumberReplicas(), new LinkedList<Byte>(),
+            new NumberReplicas(),
+            new LinkedList<Byte>(),
+            new ArrayList<Byte>(),
             LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]);
             LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]);
 
 
 
 
@@ -842,7 +851,9 @@ public class TestBlockManager {
             bm.getStoredBlock(aBlock),
             bm.getStoredBlock(aBlock),
             cntNodes,
             cntNodes,
             liveNodes,
             liveNodes,
-            new NumberReplicas(), new LinkedList<Byte>(),
+            new NumberReplicas(),
+            new LinkedList<Byte>(),
+            new ArrayList<Byte>(),
             LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length);
             LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length);
   }
   }