|
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
@@ -41,12 +42,14 @@ import org.junit.Test;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.spy;
|
|
@@ -137,6 +140,72 @@ public class TestBlockReportLease {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testExceptionThrownWhenFBRLeaseExpired() throws Exception {
|
|
|
+ HdfsConfiguration conf = new HdfsConfiguration();
|
|
|
+ Random rand = new Random();
|
|
|
+
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(1).build()) {
|
|
|
+ cluster.waitActive();
|
|
|
+
|
|
|
+ FSNamesystem fsn = cluster.getNamesystem();
|
|
|
+ BlockManager blockManager = fsn.getBlockManager();
|
|
|
+ BlockManager spyBlockManager = spy(blockManager);
|
|
|
+ fsn.setBlockManagerForTesting(spyBlockManager);
|
|
|
+ String poolId = cluster.getNamesystem().getBlockPoolId();
|
|
|
+
|
|
|
+ NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
|
|
|
+
|
|
|
+ // Test based on one DataNode report to Namenode
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
+ DatanodeDescriptor datanodeDescriptor = spyBlockManager
|
|
|
+ .getDatanodeManager().getDatanode(dn.getDatanodeId());
|
|
|
+
|
|
|
+ DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
|
|
|
+ StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);
|
|
|
+
|
|
|
+ // Send heartbeat and request full block report lease
|
|
|
+ HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
|
|
|
+ dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
|
|
|
+ SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
|
|
|
+
|
|
|
+ // Remove full block report lease about dn
|
|
|
+ spyBlockManager.getBlockReportLeaseManager()
|
|
|
+ .removeLease(datanodeDescriptor);
|
|
|
+
|
|
|
+ ExecutorService pool = Executors.newFixedThreadPool(1);
|
|
|
+
|
|
|
+ // Trigger sendBlockReport
|
|
|
+ BlockReportContext brContext = new BlockReportContext(1, 0,
|
|
|
+ rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
|
|
|
+ Future<DatanodeCommand> sendBRfuturea = pool.submit(() -> {
|
|
|
+ // Build every storage with 100 blocks for sending report
|
|
|
+ DatanodeStorage[] datanodeStorages
|
|
|
+ = new DatanodeStorage[storages.length];
|
|
|
+ for (int i = 0; i < storages.length; i++) {
|
|
|
+ datanodeStorages[i] = storages[i].getStorage();
|
|
|
+ }
|
|
|
+ StorageBlockReport[] reports = createReports(datanodeStorages, 100);
|
|
|
+
|
|
|
+ // Send blockReport
|
|
|
+ return rpcServer.blockReport(dnRegistration, poolId, reports,
|
|
|
+ brContext);
|
|
|
+ });
|
|
|
+
|
|
|
+ // Get result, it will not null if process successfully
|
|
|
+ ExecutionException exception = null;
|
|
|
+ try {
|
|
|
+ sendBRfuturea.get();
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ exception = e;
|
|
|
+ }
|
|
|
+ assertNotNull(exception);
|
|
|
+ assertEquals(InvalidBlockReportLeaseException.class,
|
|
|
+ exception.getCause().getClass());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testCheckBlockReportLeaseWhenDnUnregister() throws Exception {
|
|
|
HdfsConfiguration conf = new HdfsConfiguration();
|