|
@@ -286,23 +286,30 @@ public class TestBPOfferService {
|
|
|
public void testMissBlocksWhenReregister() throws Exception {
|
|
|
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
|
|
bpos.start();
|
|
|
+ int totalTestBlocks = 4000;
|
|
|
+ Thread addNewBlockThread = null;
|
|
|
+ final AtomicInteger count = new AtomicInteger(0);
|
|
|
+
|
|
|
try {
|
|
|
waitForBothActors(bpos);
|
|
|
waitForInitialization(bpos);
|
|
|
-
|
|
|
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
|
|
public void blockUtilSendFullBlockReport() {
|
|
|
try {
|
|
|
- Thread.sleep(200);
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ if(count.get() > 2000) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }, 100, 1000);
|
|
|
+ } catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
|
|
|
countBlockReportItems(FAKE_BLOCK, mockNN1);
|
|
|
- int totalTestBlocks = 4000;
|
|
|
- Thread addNewBlockThread = new Thread(() -> {
|
|
|
+ addNewBlockThread = new Thread(() -> {
|
|
|
for (int i = 0; i < totalTestBlocks; i++) {
|
|
|
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
|
|
|
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
|
|
@@ -312,6 +319,7 @@ public class TestBPOfferService {
|
|
|
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
|
|
|
bpos.notifyNamenodeReceivingBlock(b, storageId);
|
|
|
fsDataset.finalizeBlock(b, false);
|
|
|
+ count.addAndGet(1);
|
|
|
Thread.sleep(1);
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
@@ -321,7 +329,13 @@ public class TestBPOfferService {
|
|
|
addNewBlockThread.start();
|
|
|
|
|
|
// Make sure that generate blocks for DataNode and IBR not empty now.
|
|
|
- Thread.sleep(200);
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ if(count.get() > 0) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }, 100, 1000);
|
|
|
+
|
|
|
// Trigger re-register using DataNode Command.
|
|
|
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
|
|
|
bpos.triggerHeartbeatForTests();
|
|
@@ -340,6 +354,7 @@ public class TestBPOfferService {
|
|
|
assertTrue(fullBlockReportCount == totalTestBlocks ||
|
|
|
incrBlockReportCount == totalTestBlocks);
|
|
|
} finally {
|
|
|
+ addNewBlockThread.join();
|
|
|
bpos.stop();
|
|
|
bpos.join();
|
|
|
|
|
@@ -698,12 +713,17 @@ public class TestBPOfferService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Record blocks counts of block report and total adding blocks count of IBR
|
|
|
+ * which assume no deleting blocks here.
|
|
|
+ */
|
|
|
private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
|
|
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|
|
|
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
|
|
|
final ArgumentCaptor<StorageBlockReport[]> captor =
|
|
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
|
|
|
|
|
+ // Record blocks count about the last time block report.
|
|
|
Mockito.doAnswer((Answer<Object>) invocation -> {
|
|
|
Object[] arguments = invocation.getArguments();
|
|
|
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
|
|
@@ -716,6 +736,7 @@ public class TestBPOfferService {
|
|
|
Mockito.any()
|
|
|
);
|
|
|
|
|
|
+ // Record total adding blocks count and assume no deleting blocks here.
|
|
|
Mockito.doAnswer((Answer<Object>) invocation -> {
|
|
|
Object[] arguments = invocation.getArguments();
|
|
|
StorageReceivedDeletedBlocks[] list =
|