|
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
@@ -42,11 +43,13 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+
|
|
|
/**
|
|
|
* Test manually requesting that the DataNode send a block report.
|
|
|
*/
|
|
|
public final class TestTriggerBlockReport {
|
|
|
- private void testTriggerBlockReport(boolean incremental) throws Exception {
|
|
|
+ private void testTriggerBlockReport(boolean incremental, boolean withSpecificNN) throws Exception {
|
|
|
Configuration conf = new HdfsConfiguration();
|
|
|
|
|
|
// Set a really long value for dfs.blockreport.intervalMsec and
|
|
@@ -57,16 +60,24 @@ public final class TestTriggerBlockReport {
|
|
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
|
|
|
|
|
|
final MiniDFSCluster cluster =
|
|
|
- new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
|
|
|
cluster.waitActive();
|
|
|
- FileSystem fs = cluster.getFileSystem();
|
|
|
- DatanodeProtocolClientSideTranslatorPB spy =
|
|
|
+ cluster.transitionToActive(0);
|
|
|
+ FileSystem fs = cluster.getFileSystem(0);
|
|
|
+ DatanodeProtocolClientSideTranslatorPB spyOnNn0 =
|
|
|
+ InternalDataNodeTestUtils.spyOnBposToNN(
|
|
|
+ cluster.getDataNodes().get(0), cluster.getNameNode(0));
|
|
|
+ DatanodeProtocolClientSideTranslatorPB spyOnNn1 =
|
|
|
InternalDataNodeTestUtils.spyOnBposToNN(
|
|
|
- cluster.getDataNodes().get(0), cluster.getNameNode());
|
|
|
+ cluster.getDataNodes().get(0), cluster.getNameNode(1));
|
|
|
DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L);
|
|
|
|
|
|
- // We should get 1 incremental block report.
|
|
|
- Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted(
|
|
|
+ // We should get 1 incremental block report on both NNs.
|
|
|
+ Mockito.verify(spyOnNn0, timeout(60000).times(1)).blockReceivedAndDeleted(
|
|
|
+ any(DatanodeRegistration.class),
|
|
|
+ anyString(),
|
|
|
+ any(StorageReceivedDeletedBlocks[].class));
|
|
|
+ Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReceivedAndDeleted(
|
|
|
any(DatanodeRegistration.class),
|
|
|
anyString(),
|
|
|
any(StorageReceivedDeletedBlocks[].class));
|
|
@@ -75,12 +86,21 @@ public final class TestTriggerBlockReport {
|
|
|
// since the interval we configured is so long.
|
|
|
for (int i = 0; i < 3; i++) {
|
|
|
Thread.sleep(10);
|
|
|
- Mockito.verify(spy, times(0)).blockReport(
|
|
|
+ Mockito.verify(spyOnNn0, times(0)).blockReport(
|
|
|
any(DatanodeRegistration.class),
|
|
|
anyString(),
|
|
|
any(StorageBlockReport[].class),
|
|
|
any());
|
|
|
- Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
|
|
|
+ Mockito.verify(spyOnNn0, times(1)).blockReceivedAndDeleted(
|
|
|
+ any(DatanodeRegistration.class),
|
|
|
+ anyString(),
|
|
|
+ any(StorageReceivedDeletedBlocks[].class));
|
|
|
+ Mockito.verify(spyOnNn1, times(0)).blockReport(
|
|
|
+ any(DatanodeRegistration.class),
|
|
|
+ anyString(),
|
|
|
+ any(StorageBlockReport[].class),
|
|
|
+ any());
|
|
|
+ Mockito.verify(spyOnNn1, times(1)).blockReceivedAndDeleted(
|
|
|
any(DatanodeRegistration.class),
|
|
|
anyString(),
|
|
|
any(StorageReceivedDeletedBlocks[].class));
|
|
@@ -91,20 +111,21 @@ public final class TestTriggerBlockReport {
|
|
|
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
|
|
|
new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null);
|
|
|
DataNode datanode = cluster.getDataNodes().get(0);
|
|
|
- BPServiceActor actor =
|
|
|
- datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
|
|
|
- final FsDatasetSpi<?> dataset = datanode.getFSDataset();
|
|
|
- final DatanodeStorage storage;
|
|
|
- try (FsDatasetSpi.FsVolumeReferences volumes =
|
|
|
- dataset.getFsVolumeReferences()) {
|
|
|
- storage = dataset.getStorage(volumes.get(0).getStorageID());
|
|
|
+ for (BPServiceActor actor : datanode.getAllBpOs().get(0).getBPServiceActors()) {
|
|
|
+ final FsDatasetSpi<?> dataset = datanode.getFSDataset();
|
|
|
+ final DatanodeStorage storage;
|
|
|
+ try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
|
|
|
+ storage = dataset.getStorage(volumes.get(0).getStorageID());
|
|
|
+ }
|
|
|
+ actor.getIbrManager().addRDBI(rdbi, storage);
|
|
|
}
|
|
|
|
|
|
- actor.getIbrManager().addRDBI(rdbi, storage);
|
|
|
-
|
|
|
// Manually trigger a block report.
|
|
|
+ // Only trigger block report to NN1 when testing triggering block report on specific namenode.
|
|
|
+ InetSocketAddress nnAddr = withSpecificNN ? cluster.getNameNode(1).getServiceRpcAddress() : null;
|
|
|
datanode.triggerBlockReport(
|
|
|
new BlockReportOptions.Factory().
|
|
|
+ setNamenodeAddr(nnAddr).
|
|
|
setIncremental(incremental).
|
|
|
build()
|
|
|
);
|
|
@@ -112,13 +133,25 @@ public final class TestTriggerBlockReport {
|
|
|
// triggerBlockReport returns before the block report is
|
|
|
// actually sent. Wait for it to be sent here.
|
|
|
if (incremental) {
|
|
|
- Mockito.verify(spy, timeout(60000).times(2)).
|
|
|
+ Mockito.verify(spyOnNn1, timeout(60000).times(2)).
|
|
|
+ blockReceivedAndDeleted(
|
|
|
+ any(DatanodeRegistration.class),
|
|
|
+ anyString(),
|
|
|
+ any(StorageReceivedDeletedBlocks[].class));
|
|
|
+ int nn0IncrBlockReport = withSpecificNN ? 1 : 2;
|
|
|
+ Mockito.verify(spyOnNn0, timeout(60000).times(nn0IncrBlockReport)).
|
|
|
blockReceivedAndDeleted(
|
|
|
any(DatanodeRegistration.class),
|
|
|
anyString(),
|
|
|
any(StorageReceivedDeletedBlocks[].class));
|
|
|
} else {
|
|
|
- Mockito.verify(spy, timeout(60000)).blockReport(
|
|
|
+ Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReport(
|
|
|
+ any(DatanodeRegistration.class),
|
|
|
+ anyString(),
|
|
|
+ any(StorageBlockReport[].class),
|
|
|
+ any());
|
|
|
+ int nn0BlockReport = withSpecificNN ? 0 : 1;
|
|
|
+ Mockito.verify(spyOnNn0, timeout(60000).times(nn0BlockReport)).blockReport(
|
|
|
any(DatanodeRegistration.class),
|
|
|
anyString(),
|
|
|
any(StorageBlockReport[].class),
|
|
@@ -130,11 +163,13 @@ public final class TestTriggerBlockReport {
|
|
|
|
|
|
@Test
|
|
|
public void testTriggerFullBlockReport() throws Exception {
|
|
|
- testTriggerBlockReport(false);
|
|
|
+ testTriggerBlockReport(false, false);
|
|
|
+ testTriggerBlockReport(false, true);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testTriggerIncrementalBlockReport() throws Exception {
|
|
|
- testTriggerBlockReport(true);
|
|
|
+ testTriggerBlockReport(true, false);
|
|
|
+ testTriggerBlockReport(true, true);
|
|
|
}
|
|
|
}
|