|
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
@@ -136,6 +137,50 @@ public class TestBlockReportLease {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testCheckBlockReportLeaseWhenDnUnregister() throws Exception {
|
|
|
|
+ HdfsConfiguration conf = new HdfsConfiguration();
|
|
|
|
+ Random rand = new Random();
|
|
|
|
+
|
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
|
|
|
|
+ FSNamesystem fsn = cluster.getNamesystem();
|
|
|
|
+ BlockManager blockManager = fsn.getBlockManager();
|
|
|
|
+ String poolId = cluster.getNamesystem().getBlockPoolId();
|
|
|
|
+ NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
|
|
|
|
+
|
|
|
|
+ // Remove the unique DataNode to simulate the unregistered situation.
|
|
|
|
+ // This is similar to starting NameNode, and DataNodes are not registered yet.
|
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
|
+ blockManager.getDatanodeManager().getDatanodeMap().remove(dn.getDatanodeUuid());
|
|
|
|
+
|
|
|
|
+ // Trigger BlockReport.
|
|
|
|
+ DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
|
|
|
|
+ StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);
|
|
|
|
+ ExecutorService pool = Executors.newFixedThreadPool(1);
|
|
|
|
+ BlockReportContext brContext = new BlockReportContext(1, 0,
|
|
|
|
+ rand.nextLong(), 1);
|
|
|
|
+ Future<DatanodeCommand> sendBRFuture = pool.submit(() -> {
|
|
|
|
+ // Build every storage with 100 blocks for sending report.
|
|
|
|
+ DatanodeStorage[] datanodeStorages
|
|
|
|
+ = new DatanodeStorage[storages.length];
|
|
|
|
+ for (int i = 0; i < storages.length; i++) {
|
|
|
|
+ datanodeStorages[i] = storages[i].getStorage();
|
|
|
|
+ }
|
|
|
|
+ StorageBlockReport[] reports = createReports(datanodeStorages, 100);
|
|
|
|
+
|
|
|
|
+ // Send blockReport.
|
|
|
|
+ return rpcServer.blockReport(dnRegistration, poolId, reports,
|
|
|
|
+ brContext);
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ // When unregistered DataNode triggering the block report, will throw an
|
|
|
|
+ // UnregisteredNodeException. After NameNode processing, RegisterCommand
|
|
|
|
+ // is returned to the DataNode.
|
|
|
|
+ DatanodeCommand datanodeCommand = sendBRFuture.get();
|
|
|
|
+ assertTrue(datanodeCommand instanceof RegisterCommand);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
|
|
private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
|
|
int numBlocks) {
|
|
int numBlocks) {
|
|
int longsPerBlock = 3;
|
|
int longsPerBlock = 3;
|