|
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
/**
|
|
/**
|
|
* This class tests the internals of PendingReplicationBlocks.java,
|
|
* This class tests the internals of PendingReplicationBlocks.java,
|
|
@@ -52,13 +53,11 @@ public class TestPendingReplication {
|
|
private static final int DFS_REPLICATION_INTERVAL = 1;
|
|
private static final int DFS_REPLICATION_INTERVAL = 1;
|
|
// Number of datanodes in the cluster
|
|
// Number of datanodes in the cluster
|
|
private static final int DATANODE_COUNT = 5;
|
|
private static final int DATANODE_COUNT = 5;
|
|
-
|
|
|
|
@Test
|
|
@Test
|
|
public void testPendingReplication() {
|
|
public void testPendingReplication() {
|
|
PendingReplicationBlocks pendingReplications;
|
|
PendingReplicationBlocks pendingReplications;
|
|
pendingReplications = new PendingReplicationBlocks(TIMEOUT * 1000);
|
|
pendingReplications = new PendingReplicationBlocks(TIMEOUT * 1000);
|
|
pendingReplications.start();
|
|
pendingReplications.start();
|
|
-
|
|
|
|
//
|
|
//
|
|
// Add 10 blocks to pendingReplications.
|
|
// Add 10 blocks to pendingReplications.
|
|
//
|
|
//
|
|
@@ -140,8 +139,7 @@ public class TestPendingReplication {
|
|
//
|
|
//
|
|
// Verify that everything has timed out.
|
|
// Verify that everything has timed out.
|
|
//
|
|
//
|
|
- assertEquals("Size of pendingReplications ",
|
|
|
|
- 0, pendingReplications.size());
|
|
|
|
|
|
+ assertEquals("Size of pendingReplications ", 0, pendingReplications.size());
|
|
Block[] timedOut = pendingReplications.getTimedOutBlocks();
|
|
Block[] timedOut = pendingReplications.getTimedOutBlocks();
|
|
assertTrue(timedOut != null && timedOut.length == 15);
|
|
assertTrue(timedOut != null && timedOut.length == 15);
|
|
for (int i = 0; i < timedOut.length; i++) {
|
|
for (int i = 0; i < timedOut.length; i++) {
|
|
@@ -149,6 +147,98 @@ public class TestPendingReplication {
|
|
}
|
|
}
|
|
pendingReplications.stop();
|
|
pendingReplications.stop();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+/* Test that processPendingReplications will use the most recent
|
|
|
|
+ * blockinfo from the blocksmap by placing a larger genstamp into
|
|
|
|
+ * the blocksmap.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testProcessPendingReplications() throws Exception {
|
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
|
+ conf.setLong(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
|
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
|
+ Block block;
|
|
|
|
+ BlockInfoContiguous blockInfo;
|
|
|
|
+ try {
|
|
|
|
+ cluster =
|
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+
|
|
|
|
+ FSNamesystem fsn = cluster.getNamesystem();
|
|
|
|
+ BlockManager blkManager = fsn.getBlockManager();
|
|
|
|
+
|
|
|
|
+ PendingReplicationBlocks pendingReplications =
|
|
|
|
+ blkManager.pendingReplications;
|
|
|
|
+ UnderReplicatedBlocks neededReplications = blkManager.neededReplications;
|
|
|
|
+ BlocksMap blocksMap = blkManager.blocksMap;
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // Add 1 block to pendingReplications with GenerationStamp = 0.
|
|
|
|
+ //
|
|
|
|
+
|
|
|
|
+ block = new Block(1, 1, 0);
|
|
|
|
+ blockInfo = new BlockInfoContiguous(block, (short) 3);
|
|
|
|
+
|
|
|
|
+ pendingReplications.increment(block,
|
|
|
|
+ DatanodeStorageInfo.toDatanodeDescriptors(
|
|
|
|
+ DFSTestUtil.createDatanodeStorageInfos(1)));
|
|
|
|
+ BlockCollection bc = Mockito.mock(BlockCollection.class);
|
|
|
|
+ Mockito.doReturn((short) 3).when(bc).getPreferredBlockReplication();
|
|
|
|
+ // Place into blocksmap with GenerationStamp = 1
|
|
|
|
+ blockInfo.setGenerationStamp(1);
|
|
|
|
+ blocksMap.addBlockCollection(blockInfo, bc);
|
|
|
|
+
|
|
|
|
+ assertEquals("Size of pendingReplications ", 1,
|
|
|
|
+ pendingReplications.size());
|
|
|
|
+
|
|
|
|
+ // Add a second block to pendingReplications that has no
|
|
|
|
+ // corresponding entry in blocksmap
|
|
|
|
+ block = new Block(2, 2, 0);
|
|
|
|
+ pendingReplications.increment(block,
|
|
|
|
+ DatanodeStorageInfo.toDatanodeDescriptors(
|
|
|
|
+ DFSTestUtil.createDatanodeStorageInfos(1)));
|
|
|
|
+
|
|
|
|
+ // verify 2 blocks in pendingReplications
|
|
|
|
+ assertEquals("Size of pendingReplications ", 2,
|
|
|
|
+ pendingReplications.size());
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // Wait for everything to timeout.
|
|
|
|
+ //
|
|
|
|
+ while (pendingReplications.size() > 0) {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // Verify that block moves to neededReplications
|
|
|
|
+ //
|
|
|
|
+ while (neededReplications.size() == 0) {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Verify that the generation stamp we will try to replicate
|
|
|
|
+ // is now 1
|
|
|
|
+ for (Block b: neededReplications) {
|
|
|
|
+ assertEquals("Generation stamp is 1 ", 1,
|
|
|
|
+ b.getGenerationStamp());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Verify size of neededReplications is exactly 1.
|
|
|
|
+ assertEquals("size of neededReplications is 1 ", 1,
|
|
|
|
+ neededReplications.size());
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the
|
|
* Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the
|