|
@@ -27,9 +27,11 @@ import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
+import java.util.BitSet;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -43,14 +45,19 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
|
+import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.test.PathUtils;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -585,4 +592,121 @@ public class TestDecommissionWithStriped {
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Simulate that There are 2 nodes(dn0,dn1) in decommission. Firstly dn0
|
|
|
+ * replicates in success, dn1 replicates in failure. Decommissions go on.
|
|
|
+ */
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testDecommissionWithFailedReplicating() throws Exception {
|
|
|
+
|
|
|
+ // Write ec file.
|
|
|
+ Path ecFile = new Path(ecDir, "firstReplicationFailedFile");
|
|
|
+ int writeBytes = cellSize * 6;
|
|
|
+ writeStripedFile(dfs, ecFile, writeBytes);
|
|
|
+
|
|
|
+ // Get 2 nodes of ec block and set them in decommission.
|
|
|
+ // The 2 nodes are not in pendingNodes of DatanodeAdminManager.
|
|
|
+ List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
|
|
|
+ .getAllBlocks();
|
|
|
+ LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
|
|
|
+ DatanodeInfo[] dnList = blk.getLocations();
|
|
|
+ DatanodeDescriptor dn0 = bm.getDatanodeManager()
|
|
|
+ .getDatanode(dnList[0].getDatanodeUuid());
|
|
|
+ dn0.startDecommission();
|
|
|
+ DatanodeDescriptor dn1 = bm.getDatanodeManager()
|
|
|
+ .getDatanode(dnList[1].getDatanodeUuid());
|
|
|
+ dn1.startDecommission();
|
|
|
+
|
|
|
+ assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
|
|
|
+ .getNumPendingNodes());
|
|
|
+
|
|
|
+ // Replicate dn0 block to another dn
|
|
|
+ // Simulate that dn0 replicates in success, dn1 replicates in failure.
|
|
|
+ final byte blockIndex = blk.getBlockIndices()[0];
|
|
|
+ final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex,
|
|
|
+ cellSize, blk.getBlock().getGenerationStamp());
|
|
|
+ DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
|
|
|
+ DatanodeDescriptor target = bm.getDatanodeManager()
|
|
|
+ .getDatanode(extraDn.getDatanodeUuid());
|
|
|
+ dn0.addBlockToBeReplicated(targetBlk,
|
|
|
+ new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
|
|
|
+
|
|
|
+ // dn0 replicates in success
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return dn0.getNumberOfReplicateBlocks() == 0;
|
|
|
+ }
|
|
|
+ }, 100, 60000);
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ Iterator<DatanodeStorageInfo> it =
|
|
|
+ bm.getStoredBlock(targetBlk).getStorageInfos();
|
|
|
+ while(it.hasNext()) {
|
|
|
+ if (it.next().getDatanodeDescriptor().equals(target)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 100, 60000);
|
|
|
+
|
|
|
+ // There are 8 live replicas
|
|
|
+ BlockInfoStriped blockInfo =
|
|
|
+ (BlockInfoStriped)bm.getStoredBlock(
|
|
|
+ new Block(blk.getBlock().getBlockId()));
|
|
|
+ assertEquals(8, bm.countNodes(blockInfo).liveReplicas());
|
|
|
+
|
|
|
+ // Add the 2 nodes to pendingNodes of DatanodeAdminManager
|
|
|
+ bm.getDatanodeManager().getDatanodeAdminManager()
|
|
|
+ .getPendingNodes().add(dn0);
|
|
|
+ bm.getDatanodeManager().getDatanodeAdminManager()
|
|
|
+ .getPendingNodes().add(dn1);
|
|
|
+
|
|
|
+ waitNodeState(dn0, AdminStates.DECOMMISSIONED);
|
|
|
+ waitNodeState(dn1, AdminStates.DECOMMISSIONED);
|
|
|
+
|
|
|
+ // There are 9 live replicas
|
|
|
+ assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
|
|
|
+
|
|
|
+ // After dn0 & dn1 decommissioned, all internal Blocks(0~8) are there
|
|
|
+ Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
|
|
|
+ BitSet indexBitSet = new BitSet(9);
|
|
|
+ while(it.hasNext()) {
|
|
|
+ DatanodeStorageInfo storageInfo = it.next();
|
|
|
+ if(storageInfo.getDatanodeDescriptor().equals(dn0)
|
|
|
+ || storageInfo.getDatanodeDescriptor().equals(dn1)) {
|
|
|
+ // Skip decommissioned nodes
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ byte index = blockInfo.getStorageBlockIndex(storageInfo);
|
|
|
+ indexBitSet.set(index);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < 9; ++i) {
|
|
|
+ assertEquals(true, indexBitSet.get(i));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a Datanode which does not contain the block.
|
|
|
+ */
|
|
|
+ private DatanodeInfo getDatanodeOutOfTheBlock(LocatedStripedBlock blk)
|
|
|
+ throws Exception {
|
|
|
+ DatanodeInfo[] allDnInfos = client.datanodeReport(DatanodeReportType.LIVE);
|
|
|
+ DatanodeInfo[] blkDnInos= blk.getLocations();
|
|
|
+ for (DatanodeInfo dnInfo : allDnInfos) {
|
|
|
+ boolean in = false;
|
|
|
+ for (DatanodeInfo blkDnInfo : blkDnInos) {
|
|
|
+ if (blkDnInfo.equals(dnInfo)) {
|
|
|
+ in = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(!in) {
|
|
|
+ return dnInfo;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|