瀏覽代碼

HDFS-8827. Erasure Coding: Fix NPE when NameNode processes over-replicated striped blocks. Contributed by Walter Su and Takuya Fukudome.

Jing Zhao 9 年之前
父節點
當前提交
fbf7e81ca0

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

@@ -391,3 +391,6 @@
 
     HDFS-8857. Erasure Coding: Fix ArrayIndexOutOfBoundsException in
     TestWriteStripedFileWithFailure. (Li Bo)
+
+    HDFS-8827. Erasure Coding: Fix NPE when NameNode processes over-replicated
+    striped blocks. (Walter Su and Takuya Fukudome via jing9)

+ 14 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -3135,14 +3135,13 @@ public class BlockManager {
     assert namesystem.hasWriteLock();
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(storedBlock);
-    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
-        bc.getStoragePolicyID());
-    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
-        replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
     if (storedBlock.isStriped()) {
-      chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint,
-          excessTypes);
+      chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint);
     } else {
+      final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+          bc.getStoragePolicyID());
+      final List<StorageType> excessTypes = storagePolicy.chooseExcess(
+          replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
       chooseExcessReplicasContiguous(bc, nonExcess, storedBlock,
           replication, addedNode, delNodeHint, excessTypes);
     }
@@ -3216,8 +3215,7 @@ public class BlockManager {
   private void chooseExcessReplicasStriped(BlockCollection bc,
       final Collection<DatanodeStorageInfo> nonExcess,
       BlockInfo storedBlock,
-      DatanodeDescriptor delNodeHint,
-      List<StorageType> excessTypes) {
+      DatanodeDescriptor delNodeHint) {
     assert storedBlock instanceof BlockInfoStriped;
     BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
     short groupSize = sblk.getTotalBlockNum();
@@ -3237,6 +3235,14 @@ public class BlockManager {
       found.set(index);
       storage2index.put(storage, index);
     }
+    // the number of target left replicas equals to the of number of the found
+    // indices.
+    int numOfTarget = found.cardinality();
+
+    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+        bc.getStoragePolicyID());
+    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
+        (short)numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess));
 
     // use delHint only if delHint is duplicated
     final DatanodeStorageInfo delStorageHint =

+ 151 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java

@@ -24,9 +24,14 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.junit.After;
 import org.junit.Before;
@@ -35,6 +40,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -49,7 +55,7 @@ public class TestAddOverReplicatedStripedBlocks {
   private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
   private final short GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
   private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-  private final int NUM_STRIPE_PER_BLOCK = 1;
+  private final int NUM_STRIPE_PER_BLOCK = 4;
   private final int BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
   private final int numDNs = GROUP_SIZE + 3;
 
@@ -57,6 +63,8 @@ public class TestAddOverReplicatedStripedBlocks {
   public void setup() throws IOException {
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    // disable block recovery
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     SimulatedFSDataset.setFactory(conf);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.waitActive();
@@ -113,4 +121,146 @@ public class TestAddOverReplicatedStripedBlocks {
         filePath.toString(), 0, fileLen);
     DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
   }
+
+  @Test
+  public void testProcessOverReplicatedSBSmallerThanFullBlocks()
+      throws Exception {
+    // Create a EC file which doesn't fill full internal blocks.
+    int fileLen = CELLSIZE * (DATA_BLK_NUM - 1);
+    byte[] content = new byte[fileLen];
+    DFSTestUtil.writeFile(fs, filePath, new String(content));
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    cluster.triggerBlockReports();
+    List<DatanodeInfo> infos = Arrays.asList(bg.getLocations());
+
+    // let a internal block be over replicated with 2 redundant blocks.
+    // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks +
+    // 3 parity blocks  + 2 redundant blocks > GROUP_SIZE)
+    blk.setBlockId(groupId + 2);
+    List<DataNode> dataNodeList = cluster.getDataNodes();
+    for (int i = 0; i < numDNs; i++) {
+      if (!infos.contains(dataNodeList.get(i).getDatanodeId())) {
+        cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+        System.out.println("XXX: inject block into datanode " + i);
+      }
+    }
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // verify that all internal blocks exists
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+  }
+
+  @Test
+  public void testProcessOverReplicatedAndCorruptStripedBlock()
+      throws Exception {
+    long fileLen = DATA_BLK_NUM * BLOCK_SIZE;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, 1,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    BlockInfoStriped blockInfo = new BlockInfoStriped(blk,
+        ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE);
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      blk.setBlockId(groupId + i);
+      cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+    }
+    cluster.triggerBlockReports();
+
+    // let a internal block be corrupt
+    BlockManager bm = cluster.getNamesystem().getBlockManager();
+    List<DatanodeInfo> infos = Arrays.asList(bg.getLocations());
+    List<String> storages = Arrays.asList(bg.getStorageIDs());
+    cluster.getNamesystem().writeLock();
+    try {
+      bm.findAndMarkBlockAsCorrupt(lbs.getLastLocatedBlock().getBlock(),
+          infos.get(0), storages.get(0), "TEST");
+    } finally {
+      cluster.getNamesystem().writeUnlock();
+    }
+    assertEquals(1, bm.countNodes(blockInfo).corruptReplicas());
+
+    // let a internal block be over replicated with 2 redundant block.
+    blk.setBlockId(groupId + 2);
+    cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
+    cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // verify that all internal blocks exists
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+  }
+
+  @Test
+  public void testProcessOverReplicatedAndMissingStripedBlock()
+      throws Exception {
+    long fileLen = CELLSIZE * DATA_BLK_NUM;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, 1,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    // only inject GROUP_SIZE - 1 blocks, so there is one block missing
+    for (int i = 0; i < GROUP_SIZE - 1; i++) {
+      blk.setBlockId(groupId + i);
+      cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+    }
+    cluster.triggerBlockReports();
+
+    // let a internal block be over replicated with 2 redundant blocks.
+    // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks +
+    // 3 parity blocks  + 2 redundant blocks > GROUP_SIZE)
+    blk.setBlockId(groupId + 2);
+    cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
+    cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // Since one block is missing, when over-replicated blocks got deleted,
+    // we are left GROUP_SIZE - 1 blocks.
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+  }
+
 }