|
@@ -325,15 +325,14 @@ public class TestBPOfferService {
|
|
|
}
|
|
|
}).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
|
|
|
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
|
|
|
+ List<BPServiceActor> actors = bpos.getBPServiceActors();
|
|
|
+ assertEquals(2, actors.size());
|
|
|
bpos.start();
|
|
|
try {
|
|
|
waitForInitialization(bpos);
|
|
|
- List<BPServiceActor> actors = bpos.getBPServiceActors();
|
|
|
- // even if one of the actor initialization fails also other will be
|
|
|
- // running until both failed.
|
|
|
- assertEquals(2, actors.size());
|
|
|
- BPServiceActor actor = actors.get(0);
|
|
|
- waitForBlockReport(actor.getNameNodeProxy());
|
|
|
+ // even if one of the actor initialization fails, the other one will be
|
|
|
+ // finish block report.
|
|
|
+ waitForBlockReport(mockNN1, mockNN2);
|
|
|
} finally {
|
|
|
bpos.stop();
|
|
|
}
|
|
@@ -409,7 +408,32 @@ public class TestBPOfferService {
|
|
|
}
|
|
|
}, 500, 10000);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private void waitForBlockReport(
|
|
|
+ final DatanodeProtocolClientSideTranslatorPB mockNN1,
|
|
|
+ final DatanodeProtocolClientSideTranslatorPB mockNN2)
|
|
|
+ throws Exception {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return get(mockNN1) || get(mockNN2);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) {
|
|
|
+ try {
|
|
|
+ Mockito.verify(mockNN).blockReport(
|
|
|
+ Mockito.<DatanodeRegistration>anyObject(),
|
|
|
+ Mockito.eq(FAKE_BPID),
|
|
|
+ Mockito.<StorageBlockReport[]>anyObject());
|
|
|
+ return true;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.info("waiting on block report: " + t.getMessage());
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 500, 10000);
|
|
|
+ }
|
|
|
+
|
|
|
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
|
|
|
ExtendedBlock fakeBlock,
|
|
|
DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|