|
@@ -436,14 +436,15 @@ public class TestDecommissionWithStriped {
|
|
|
return new DFSClient(nn.getNameNodeAddress(), conf);
|
|
|
}
|
|
|
|
|
|
- private void writeStripedFile(DistributedFileSystem dfs, Path ecFile,
|
|
|
- int writeBytes) throws IOException, Exception {
|
|
|
+ private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile,
|
|
|
+ int writeBytes) throws Exception {
|
|
|
byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
|
|
|
- DFSTestUtil.writeFile(dfs, ecFile, new String(bytes));
|
|
|
- StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
|
|
|
+ DFSTestUtil.writeFile(fs, ecFile, new String(bytes));
|
|
|
+ StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
|
|
|
|
|
|
- StripedFileTestUtil.checkData(dfs, ecFile, writeBytes,
|
|
|
+ StripedFileTestUtil.checkData(fs, ecFile, writeBytes,
|
|
|
new ArrayList<DatanodeInfo>(), null, blockGroupSize);
|
|
|
+ return bytes;
|
|
|
}
|
|
|
|
|
|
private void writeConfigFile(Path name, List<String> nodes)
|
|
@@ -894,4 +895,81 @@ public class TestDecommissionWithStriped {
|
|
|
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
|
|
|
cleanupFile(dfs, ecFile);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test recovery for an ec block, its storage array contains these internal
|
|
|
+ * blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2,
|
|
|
+ * b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are
|
|
|
+ * in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are
|
|
|
+ * in live.
|
|
|
+ */
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testRecoveryWithDecommission() throws Exception {
|
|
|
+ final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission");
|
|
|
+ int writeBytes = cellSize * dataBlocks;
|
|
|
+ byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes);
|
|
|
+ List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
|
|
|
+ .getAllBlocks();
|
|
|
+ LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
|
|
|
+ DatanodeInfo[] dnList = blk.getLocations();
|
|
|
+ BlockInfoStriped blockInfo =
|
|
|
+ (BlockInfoStriped)bm.getStoredBlock(
|
|
|
+ new Block(blk.getBlock().getBlockId()));
|
|
|
+
|
|
|
+ // Decommission datanode dn0 contains block b0
|
|
|
+ // Aim to add storageinfo of replicated block b0 to storages[9] of ec block
|
|
|
+ List<DatanodeInfo> decommissionedNodes = new ArrayList<>();
|
|
|
+ decommissionedNodes.add(dnList[0]);
|
|
|
+ decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
|
|
|
+
|
|
|
+ // Now storages of ec block are (b0{decommissioned}, b[1-8]{live},
|
|
|
+ // b0{live})
|
|
|
+ assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
|
|
|
+ assertEquals(1, bm.countNodes(blockInfo).decommissioned());
|
|
|
+
|
|
|
+ int decommissionNodesNum = 4;
|
|
|
+
|
|
|
+ // Decommission nodes contain blocks of b[0-3]
|
|
|
+ // dn0 has been decommissioned
|
|
|
+ for (int i = 1; i < decommissionNodesNum; i++) {
|
|
|
+ decommissionedNodes.add(dnList[i]);
|
|
|
+ }
|
|
|
+ decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
|
|
|
+
|
|
|
+ // Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live},
|
|
|
+ // b0{live}, b[1-3]{live})
|
|
|
+ // There are 9 live and 4 decommissioned internal blocks
|
|
|
+ assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
|
|
|
+ assertEquals(4, bm.countNodes(blockInfo).decommissioned());
|
|
|
+
|
|
|
+ // There are no reconstruction tasks
|
|
|
+ assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
|
|
|
+ .getNumPendingNodes());
|
|
|
+ assertEquals(0, bm.getUnderReplicatedNotMissingBlocks());
|
|
|
+
|
|
|
+ // Set dn0 in decommissioning
|
|
|
+ // So that the block on dn0 can be used for reconstruction task
|
|
|
+ DatanodeDescriptor dn0 = bm.getDatanodeManager()
|
|
|
+ .getDatanode(dnList[0].getDatanodeUuid());
|
|
|
+ dn0.startDecommission();
|
|
|
+
|
|
|
+ // Stop the datanode contains b4
|
|
|
+ DataNode dn = cluster.getDataNode(
|
|
|
+ dnList[decommissionNodesNum].getIpcPort());
|
|
|
+ cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr());
|
|
|
+ cluster.setDataNodeDead(dn.getDatanodeId());
|
|
|
+
|
|
|
+ // Now storages of ec block are (b[0]{decommissioning},
|
|
|
+ // b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live})
|
|
|
+ // There are 8 live and 1 decommissioning internal blocks
|
|
|
+ // Wait for reconstruction EC block.
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ () -> bm.countNodes(blockInfo).liveReplicas() == 9,
|
|
|
+ 100, 10000);
|
|
|
+
|
|
|
+ byte[] readBytesArray = new byte[writeBytes];
|
|
|
+ StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes,
|
|
|
+ originBytesArray, readBytesArray, ecPolicy);
|
|
|
+ cleanupFile(dfs, ecFile);
|
|
|
+ }
|
|
|
}
|