|
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
@@ -28,13 +30,21 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+
|
|
|
/**
|
|
|
* This class tests the internals of PendingReplicationBlocks.java,
|
|
|
* as well as how PendingReplicationBlocks acts in BlockManager
|
|
@@ -44,7 +54,22 @@ public class TestPendingReplication {
|
|
|
private static final int DFS_REPLICATION_INTERVAL = 1;
|
|
|
// Number of datanodes in the cluster
|
|
|
private static final int DATANODE_COUNT = 5;
|
|
|
+
|
|
|
+ private DatanodeDescriptor genDatanodeId(int seed) {
|
|
|
+ seed = seed % 256;
|
|
|
+ String ip = seed + "." + seed + "." + seed + "." + seed;
|
|
|
+ return DFSTestUtil.getDatanodeDescriptor(ip, null);
|
|
|
+ }
|
|
|
|
|
|
+ private DatanodeDescriptor[] genDatanodes(int number) {
|
|
|
+ Preconditions.checkArgument(number >= 0);
|
|
|
+ DatanodeDescriptor[] nodes = new DatanodeDescriptor[number];
|
|
|
+ for (int i = 0; i < number; i++) {
|
|
|
+ nodes[i] = genDatanodeId(i);
|
|
|
+ }
|
|
|
+ return nodes;
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testPendingReplication() {
|
|
|
PendingReplicationBlocks pendingReplications;
|
|
@@ -56,7 +81,7 @@ public class TestPendingReplication {
|
|
|
//
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
Block block = new Block(i, i, 0);
|
|
|
- pendingReplications.increment(block, i);
|
|
|
+ pendingReplications.increment(block, genDatanodes(i));
|
|
|
}
|
|
|
assertEquals("Size of pendingReplications ",
|
|
|
10, pendingReplications.size());
|
|
@@ -66,15 +91,16 @@ public class TestPendingReplication {
|
|
|
// remove one item and reinsert it
|
|
|
//
|
|
|
Block blk = new Block(8, 8, 0);
|
|
|
- pendingReplications.decrement(blk); // removes one replica
|
|
|
+ pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica
|
|
|
assertEquals("pendingReplications.getNumReplicas ",
|
|
|
7, pendingReplications.getNumReplicas(blk));
|
|
|
|
|
|
for (int i = 0; i < 7; i++) {
|
|
|
- pendingReplications.decrement(blk); // removes all replicas
|
|
|
+ // removes all replicas
|
|
|
+ pendingReplications.decrement(blk, genDatanodeId(i));
|
|
|
}
|
|
|
assertTrue(pendingReplications.size() == 9);
|
|
|
- pendingReplications.increment(blk, 8);
|
|
|
+ pendingReplications.increment(blk, genDatanodes(8));
|
|
|
assertTrue(pendingReplications.size() == 10);
|
|
|
|
|
|
//
|
|
@@ -102,7 +128,7 @@ public class TestPendingReplication {
|
|
|
|
|
|
for (int i = 10; i < 15; i++) {
|
|
|
Block block = new Block(i, i, 0);
|
|
|
- pendingReplications.increment(block, i);
|
|
|
+ pendingReplications.increment(block, genDatanodes(i));
|
|
|
}
|
|
|
assertTrue(pendingReplications.size() == 15);
|
|
|
|
|
@@ -133,6 +159,101 @@ public class TestPendingReplication {
|
|
|
pendingReplications.stop();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the
|
|
|
+ * pending replications. Also make sure the blockReceivedAndDeleted call is
|
|
|
+ * idempotent to the pending replications.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testBlockReceived() throws Exception {
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
|
|
+ DATANODE_COUNT).build();
|
|
|
+ cluster.waitActive();
|
|
|
+
|
|
|
+ DistributedFileSystem hdfs = cluster.getFileSystem();
|
|
|
+ FSNamesystem fsn = cluster.getNamesystem();
|
|
|
+ BlockManager blkManager = fsn.getBlockManager();
|
|
|
+
|
|
|
+ final String file = "/tmp.txt";
|
|
|
+ final Path filePath = new Path(file);
|
|
|
+ short replFactor = 1;
|
|
|
+ DFSTestUtil.createFile(hdfs, filePath, 1024L, replFactor, 0);
|
|
|
+
|
|
|
+ // temporarily stop the heartbeat
|
|
|
+ ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
|
|
+ for (int i = 0; i < DATANODE_COUNT; i++) {
|
|
|
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ hdfs.setReplication(filePath, (short) DATANODE_COUNT);
|
|
|
+ BlockManagerTestUtil.computeAllPendingWork(blkManager);
|
|
|
+
|
|
|
+ assertEquals(1, blkManager.pendingReplications.size());
|
|
|
+ INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile();
|
|
|
+ Block[] blocks = fileNode.getBlocks();
|
|
|
+ assertEquals(DATANODE_COUNT - 1,
|
|
|
+ blkManager.pendingReplications.getNumReplicas(blocks[0]));
|
|
|
+
|
|
|
+ LocatedBlock locatedBlock = hdfs.getClient().getLocatedBlocks(file, 0)
|
|
|
+ .get(0);
|
|
|
+ DatanodeInfo existingDn = (locatedBlock.getLocations())[0];
|
|
|
+ int reportDnNum = 0;
|
|
|
+ String poolId = cluster.getNamesystem().getBlockPoolId();
|
|
|
+ // let two datanodes (other than the one that already has the data) to
|
|
|
+ // report to NN
|
|
|
+ for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) {
|
|
|
+ if (!datanodes.get(i).getDatanodeId().equals(existingDn)) {
|
|
|
+ DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
|
|
|
+ poolId);
|
|
|
+ StorageReceivedDeletedBlocks[] report = {
|
|
|
+ new StorageReceivedDeletedBlocks(dnR.getStorageID(),
|
|
|
+ new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
|
|
+ blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
|
|
|
+ cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
|
|
|
+ reportDnNum++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(DATANODE_COUNT - 3,
|
|
|
+ blkManager.pendingReplications.getNumReplicas(blocks[0]));
|
|
|
+
|
|
|
+ // let the same datanodes report again
|
|
|
+ for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) {
|
|
|
+ if (!datanodes.get(i).getDatanodeId().equals(existingDn)) {
|
|
|
+ DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
|
|
|
+ poolId);
|
|
|
+ StorageReceivedDeletedBlocks[] report =
|
|
|
+ { new StorageReceivedDeletedBlocks(dnR.getStorageID(),
|
|
|
+ new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
|
|
+ blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
|
|
|
+ cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
|
|
|
+ reportDnNum++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(DATANODE_COUNT - 3,
|
|
|
+ blkManager.pendingReplications.getNumReplicas(blocks[0]));
|
|
|
+
|
|
|
+ // re-enable heartbeat for the datanode that has data
|
|
|
+ for (int i = 0; i < DATANODE_COUNT; i++) {
|
|
|
+ DataNodeTestUtils
|
|
|
+ .setHeartbeatsDisabledForTests(datanodes.get(i), false);
|
|
|
+ DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
|
|
|
+ }
|
|
|
+
|
|
|
+ Thread.sleep(5000);
|
|
|
+ assertEquals(0, blkManager.pendingReplications.size());
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test if BlockManager can correctly remove corresponding pending records
|
|
|
* when a file is deleted
|