Browse Source

HDFS-14920. Erasure Coding: Decommission may hang If one or more datanodes are out of service during decommission. Contributed by Fei Hui.

Ayush Saxena 5 years ago
parent
commit
9d25ae7669

+ 43 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2302,8 +2302,14 @@ public class BlockManager implements BlockStatsMXBean {
     final boolean isStriped = block.isStriped();
     DatanodeDescriptor decommissionedSrc = null;
 
-    BitSet bitSet = isStriped ?
-        new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
+    BitSet liveBitSet = null;
+    BitSet decommissioningBitSet = null;
+    if (isStriped) {
+      int blockNum = ((BlockInfoStriped) block).getTotalBlockNum();
+      liveBitSet = new BitSet(blockNum);
+      decommissioningBitSet = new BitSet(blockNum);
+    }
+
     for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
       final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
@@ -2354,14 +2360,8 @@ public class BlockManager implements BlockStatsMXBean {
       if (isStriped) {
         blockIndex = ((BlockInfoStriped) block)
             .getStorageBlockIndex(storage);
-        if (state == StoredReplicaState.LIVE) {
-          if (!bitSet.get(blockIndex)) {
-            bitSet.set(blockIndex);
-          } else {
-            numReplicas.subtract(StoredReplicaState.LIVE, 1);
-            numReplicas.add(StoredReplicaState.REDUNDANT, 1);
-          }
-        }
+        countLiveAndDecommissioningReplicas(numReplicas, state,
+            liveBitSet, decommissioningBitSet, blockIndex);
       }
 
       if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
@@ -4207,7 +4207,9 @@ public class BlockManager implements BlockStatsMXBean {
    * by the state of those replicas.
    * For a striped block, this includes nodes storing blocks belonging to the
    * striped block group. But note we exclude duplicated internal block replicas
-   * for calculating {@link NumberReplicas#liveReplicas}.
+   * for calculating {@link NumberReplicas#liveReplicas}. If the replica on a
+   * decommissioning node is the same as the replica on a live node, the
+   * internal block for this replica is live, not decommissioning.
    */
   public NumberReplicas countNodes(BlockInfo b) {
     return countNodes(b, false);
@@ -4281,17 +4283,40 @@ public class BlockManager implements BlockStatsMXBean {
   private void countReplicasForStripedBlock(NumberReplicas counters,
       BlockInfoStriped block, Collection<DatanodeDescriptor> nodesCorrupt,
       boolean inStartupSafeMode) {
-    BitSet bitSet = new BitSet(block.getTotalBlockNum());
+    BitSet liveBitSet = new BitSet(block.getTotalBlockNum());
+    BitSet decommissioningBitSet = new BitSet(block.getTotalBlockNum());
     for (StorageAndBlockIndex si : block.getStorageAndIndexInfos()) {
       StoredReplicaState state = checkReplicaOnStorage(counters, block,
           si.getStorage(), nodesCorrupt, inStartupSafeMode);
-      if (state == StoredReplicaState.LIVE) {
-        if (!bitSet.get(si.getBlockIndex())) {
-          bitSet.set(si.getBlockIndex());
-        } else {
-          counters.subtract(StoredReplicaState.LIVE, 1);
-          counters.add(StoredReplicaState.REDUNDANT, 1);
+      countLiveAndDecommissioningReplicas(counters, state, liveBitSet,
+          decommissioningBitSet, si.getBlockIndex());
+    }
+  }
+
+  /**
+   * Count distinct live and decommission internal blocks with blockIndex.
+   * If A replica with INDEX is decommissioning, and B replica with INDEX
+   * is live, the internal INDEX block is live.
+   */
+  private void countLiveAndDecommissioningReplicas(NumberReplicas counters,
+      StoredReplicaState state, BitSet liveBitSet,
+      BitSet decommissioningBitSet, byte blockIndex) {
+    if (state == StoredReplicaState.LIVE) {
+      if (!liveBitSet.get(blockIndex)) {
+        liveBitSet.set(blockIndex);
+        // Sub decommissioning because the index replica is live.
+        if (decommissioningBitSet.get(blockIndex)) {
+          counters.subtract(StoredReplicaState.DECOMMISSIONING, 1);
         }
+      } else {
+        counters.subtract(StoredReplicaState.LIVE, 1);
+        counters.add(StoredReplicaState.REDUNDANT, 1);
+      }
+    } else if (state == StoredReplicaState.DECOMMISSIONING) {
+      if (liveBitSet.get(blockIndex) || decommissioningBitSet.get(blockIndex)) {
+        counters.subtract(StoredReplicaState.DECOMMISSIONING, 1);
+      } else {
+        decommissioningBitSet.set(blockIndex);
       }
     }
   }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java

@@ -41,6 +41,8 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta
     // replicas for the same internal block
     LIVE,
     READONLY,
+    // decommissioning replicas. for a striped block ,this value excludes
+    // redundant and live replicas for the same internal block.
     DECOMMISSIONING,
     DECOMMISSIONED,
     // We need live ENTERING_MAINTENANCE nodes to continue

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java

@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.Daemon;
@@ -278,6 +279,13 @@ class PendingReconstructionBlocks {
     }
   }
 
+  /**
+   * @return timer thread.
+   */
+  @VisibleForTesting
+  public Daemon getTimerThread() {
+    return timerThread;
+  }
   /*
    * Shuts down the pending reconstruction monitor thread.
    * Waits for the thread to exit.

+ 205 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -31,7 +32,6 @@ import java.util.BitSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -139,7 +140,7 @@ public class TestDecommissionWithStriped {
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
         false);
 
-    numDNs = dataBlocks + parityBlocks + 2;
+    numDNs = dataBlocks + parityBlocks + 5;
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem(0);
@@ -633,25 +634,21 @@ public class TestDecommissionWithStriped {
         new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
 
     // dn0 replicates in success
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        return dn0.getNumberOfReplicateBlocks() == 0;
-      }
-    }, 100, 60000);
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        Iterator<DatanodeStorageInfo> it =
-            bm.getStoredBlock(targetBlk).getStorageInfos();
-        while(it.hasNext()) {
-          if (it.next().getDatanodeDescriptor().equals(target)) {
-            return true;
+    GenericTestUtils.waitFor(
+        () -> dn0.getNumberOfReplicateBlocks() == 0,
+        100, 60000);
+    GenericTestUtils.waitFor(
+        () -> {
+          Iterator<DatanodeStorageInfo> it =
+              bm.getStoredBlock(targetBlk).getStorageInfos();
+          while(it.hasNext()) {
+            if (it.next().getDatanodeDescriptor().equals(target)) {
+              return true;
+            }
           }
-        }
-        return false;
-      }
-    }, 100, 60000);
+          return false;
+        },
+        100, 60000);
 
     // There are 8 live replicas
     BlockInfoStriped blockInfo =
@@ -709,4 +706,192 @@ public class TestDecommissionWithStriped {
     }
     return null;
   }
+
+  @Test (timeout = 120000)
+  public void testDecommissionWithMissingBlock() throws Exception {
+    // Write ec file.
+    Path ecFile = new Path(ecDir, "missingOneInternalBLockFile");
+    int writeBytes = cellSize * 6;
+    writeStripedFile(dfs, ecFile, writeBytes);
+
+    final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
+    LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
+        .get(0);
+    LocatedStripedBlock lsb = (LocatedStripedBlock)lb;
+    DatanodeInfo[] dnLocs = lsb.getLocations();
+    BlockInfoStriped blockInfo =
+        (BlockInfoStriped)bm.getStoredBlock(
+            new Block(lsb.getBlock().getBlockId()));
+
+    assertEquals(dataBlocks + parityBlocks, dnLocs.length);
+    int decommNodeIndex = 1;
+    int numDecommission= 4;
+    int stopNodeIndex = 0;
+
+    // Add the 4 nodes, and set the 4 nodes decommissioning.
+    // So that they are decommissioning at the same time
+    for (int i = decommNodeIndex; i < numDecommission + decommNodeIndex; ++i) {
+      decommisionNodes.add(dnLocs[i]);
+      DatanodeDescriptor dn = bm.getDatanodeManager()
+          .getDatanode(dnLocs[i].getDatanodeUuid());
+      dn.startDecommission();
+    }
+    GenericTestUtils.waitFor(
+        () -> bm.countNodes(blockInfo).decommissioning() == numDecommission,
+        100, 10000);
+
+    // Namenode does not handle decommissioning nodes now
+    assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
+        .getNumPendingNodes());
+
+    // Replicate dn1 block to another dn
+    // So that one of the 4 replicas has been replicated.
+    final byte blockIndex = lsb.getBlockIndices()[decommNodeIndex];
+    final Block targetBlk = new Block(lsb.getBlock().getBlockId() + blockIndex,
+        cellSize, lsb.getBlock().getGenerationStamp());
+    DatanodeInfo extraDn = getDatanodeOutOfTheBlock(lsb);
+    DatanodeDescriptor target = bm.getDatanodeManager()
+        .getDatanode(extraDn.getDatanodeUuid());
+    DatanodeDescriptor dnStartIndexDecommission = bm.getDatanodeManager()
+        .getDatanode(dnLocs[decommNodeIndex].getDatanodeUuid());
+    dnStartIndexDecommission.addBlockToBeReplicated(targetBlk,
+        new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
+
+    // Wait for replication success.
+    GenericTestUtils.waitFor(
+        () -> {
+          Iterator<DatanodeStorageInfo> it =
+              bm.getStoredBlock(targetBlk).getStorageInfos();
+          while(it.hasNext()) {
+            if (it.next().getDatanodeDescriptor().equals(target)) {
+              return true;
+            }
+          }
+          return false;
+        },
+        100, 60000);
+
+    // Reopen ecFile, get the new locations.
+    lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
+        .get(0);
+    lsb = (LocatedStripedBlock)lb;
+    DatanodeInfo[] newDnLocs = lsb.getLocations();
+
+    // Now the block has 10 internal blocks.
+    assertEquals(10, newDnLocs.length);
+
+    // Stop the dn0(stopNodeIndex) datanode
+    // So that the internal block from this dn misses
+    DataNode dn = cluster.getDataNode(dnLocs[stopNodeIndex].getIpcPort());
+    cluster.stopDataNode(dnLocs[stopNodeIndex].getXferAddr());
+    cluster.setDataNodeDead(dn.getDatanodeId());
+
+    // So far, there are 4 decommissioning nodes, 1 replica has been
+    // replicated, and 1 replica misses. There are 8 total internal
+    // blocks, 5 live and 3 decommissioning internal blocks.
+    assertEquals(5, bm.countNodes(blockInfo).liveReplicas());
+    assertEquals(3, bm.countNodes(blockInfo).decommissioning());
+
+    // Handle decommission nodes in a new thread.
+    // Verify that nodes are decommissioned.
+    final CountDownLatch decomStarted = new CountDownLatch(0);
+    new Thread(
+        () -> {
+          try {
+            decomStarted.countDown();
+            decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+          } catch (Exception e) {
+            LOG.error("Exception while decommissioning", e);
+            Assert.fail("Shouldn't throw exception!");
+          }
+        }).start();
+    decomStarted.await(5, TimeUnit.SECONDS);
+
+    // Wake up to reconstruct the block.
+    BlockManagerTestUtil.wakeupPendingReconstructionTimerThread(bm);
+
+    // Wait for decommissioning
+    GenericTestUtils.waitFor(
+        // Whether there are 8 live replicas after decommission.
+        () -> bm.countNodes(blockInfo).liveReplicas() == 9,
+        100, 60000);
+
+    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
+        null, blockGroupSize);
+    cleanupFile(dfs, ecFile);
+  }
+
+  @Test (timeout = 120000)
+  public void testCountNodes() throws Exception{
+    // Write ec file.
+    Path ecFile = new Path(ecDir, "testCountNodes");
+    int writeBytes = cellSize * 6;
+    writeStripedFile(dfs, ecFile, writeBytes);
+
+    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
+        .getAllBlocks();
+    LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
+    DatanodeInfo[] dnList = blk.getLocations();
+    DatanodeDescriptor dn0 = bm.getDatanodeManager()
+        .getDatanode(dnList[0].getDatanodeUuid());
+    dn0.startDecommission();
+
+    // Replicate dn0 block to another dn
+    final byte blockIndex = blk.getBlockIndices()[0];
+    final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex,
+        cellSize, blk.getBlock().getGenerationStamp());
+    DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
+    DatanodeDescriptor target = bm.getDatanodeManager()
+        .getDatanode(extraDn.getDatanodeUuid());
+    dn0.addBlockToBeReplicated(targetBlk,
+        new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
+
+    // dn0 replicates in success
+    GenericTestUtils.waitFor(
+        () -> dn0.getNumberOfReplicateBlocks() == 0,
+        100, 60000);
+    GenericTestUtils.waitFor(
+        () -> {
+          Iterator<DatanodeStorageInfo> it =
+              bm.getStoredBlock(targetBlk).getStorageInfos();
+          while(it.hasNext()) {
+            if (it.next().getDatanodeDescriptor().equals(target)) {
+              return true;
+            }
+          }
+          return false;
+        },
+        100, 60000);
+
+    // There are 9 live replicas, 0 decommissioning replicas.
+    BlockInfoStriped blockInfo =
+        (BlockInfoStriped)bm.getStoredBlock(
+            new Block(blk.getBlock().getBlockId()));
+    Iterator<BlockInfoStriped.StorageAndBlockIndex> it =
+        blockInfo.getStorageAndIndexInfos().iterator();
+    DatanodeStorageInfo decommissioningStorage = null;
+    DatanodeStorageInfo liveStorage = null;
+    while(it.hasNext()) {
+      BlockInfoStriped.StorageAndBlockIndex si = it.next();
+      if(si.getStorage().getDatanodeDescriptor().equals(dn0)) {
+        decommissioningStorage = si.getStorage();
+      }
+      if(si.getStorage().getDatanodeDescriptor().equals(target)) {
+        liveStorage = si.getStorage();
+      }
+    }
+    assertNotNull(decommissioningStorage);
+    assertNotNull(liveStorage);
+
+    // Adjust internal block locations
+    // [b0(decommissioning), b1, b2, b3, b4, b5, b6, b7, b8, b0(live)] changed
+    // to [b0(live), b1, b2, b3, b4, b5, b6, b7, b8, b0(decommissioning)]
+    BlockManagerTestUtil.removeStorage(blockInfo, decommissioningStorage);
+    BlockManagerTestUtil.addStorage(blockInfo, liveStorage, targetBlk);
+    BlockManagerTestUtil.addStorage(blockInfo, decommissioningStorage,
+        targetBlk);
+    assertEquals(0, bm.countNodes(blockInfo).decommissioning());
+    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
+    cleanupFile(dfs, ecFile);
+  }
 }

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java

@@ -160,6 +160,14 @@ public class BlockManagerTestUtil {
     }
   }
 
+  /**
+   * Wakeup the timer thread of PendingReconstructionBlocks.
+   */
+  public static void wakeupPendingReconstructionTimerThread(
+      final BlockManager blockManager) {
+    blockManager.pendingReconstruction.getTimerThread().interrupt();
+  }
+
   public static HeartbeatManager getHeartbeatManager(
       final BlockManager blockManager) {
     return blockManager.getDatanodeManager().getHeartbeatManager();
@@ -386,4 +394,20 @@ public class BlockManagerTestUtil {
           nn.getNamesystem().getBlockManager().getDatanodeManager();
       return !dnm.getNetworkTopology().contains(dnm.getDatanode(dnUuid));
   }
+
+  /**
+   * Remove storage from block.
+   */
+  public static void removeStorage(BlockInfo block,
+      DatanodeStorageInfo storage) {
+    block.removeStorage(storage);
+  }
+
+  /**
+   * Add storage to block.
+   */
+  public static void addStorage(BlockInfo block, DatanodeStorageInfo storage,
+      Block reportedBlock) {
+    block.addStorage(storage, reportedBlock);
+  }
 }