|
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
@@ -59,7 +58,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
|
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
@@ -143,7 +141,7 @@ public class TestBPOfferService {
|
|
.when(mock).versionRequest();
|
|
.when(mock).versionRequest();
|
|
|
|
|
|
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
|
|
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
|
|
- .when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
|
|
|
|
|
|
+ .when(mock).registerDatanode(Mockito.any());
|
|
|
|
|
|
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
|
|
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
|
|
.when(mock).sendHeartbeat(
|
|
.when(mock).sendHeartbeat(
|
|
@@ -200,11 +198,9 @@ public class TestBPOfferService {
|
|
waitForBothActors(bpos);
|
|
waitForBothActors(bpos);
|
|
|
|
|
|
// The DN should have register to both NNs.
|
|
// The DN should have register to both NNs.
|
|
- Mockito.verify(mockNN1).registerDatanode(
|
|
|
|
- Mockito.any(DatanodeRegistration.class));
|
|
|
|
- Mockito.verify(mockNN2).registerDatanode(
|
|
|
|
- Mockito.any(DatanodeRegistration.class));
|
|
|
|
-
|
|
|
|
|
|
+ Mockito.verify(mockNN1).registerDatanode(Mockito.any());
|
|
|
|
+ Mockito.verify(mockNN2).registerDatanode(Mockito.any());
|
|
|
|
+
|
|
// Should get block reports from both NNs
|
|
// Should get block reports from both NNs
|
|
waitForBlockReport(mockNN1);
|
|
waitForBlockReport(mockNN1);
|
|
waitForBlockReport(mockNN2);
|
|
waitForBlockReport(mockNN2);
|
|
@@ -267,10 +263,10 @@ public class TestBPOfferService {
|
|
Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
|
|
Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
|
|
FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() }))
|
|
FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() }))
|
|
.when(mockNN2).blockReport(
|
|
.when(mockNN2).blockReport(
|
|
- Mockito.<DatanodeRegistration>anyObject(),
|
|
|
|
|
|
+ Mockito.any(),
|
|
Mockito.eq(FAKE_BPID),
|
|
Mockito.eq(FAKE_BPID),
|
|
- Mockito.<StorageBlockReport[]>anyObject(),
|
|
|
|
- Mockito.<BlockReportContext>anyObject());
|
|
|
|
|
|
+ Mockito.any(),
|
|
|
|
+ Mockito.any());
|
|
|
|
|
|
bpos.start();
|
|
bpos.start();
|
|
try {
|
|
try {
|
|
@@ -287,8 +283,7 @@ public class TestBPOfferService {
|
|
|
|
|
|
// Should ignore the delete command from the standby
|
|
// Should ignore the delete command from the standby
|
|
Mockito.verify(mockFSDataset, Mockito.never())
|
|
Mockito.verify(mockFSDataset, Mockito.never())
|
|
- .invalidate(Mockito.eq(FAKE_BPID),
|
|
|
|
- (Block[]) Mockito.anyObject());
|
|
|
|
|
|
+ .invalidate(Mockito.eq(FAKE_BPID), Mockito.any());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -487,10 +482,10 @@ public class TestBPOfferService {
|
|
public Boolean get() {
|
|
public Boolean get() {
|
|
try {
|
|
try {
|
|
Mockito.verify(mockNN).blockReport(
|
|
Mockito.verify(mockNN).blockReport(
|
|
- Mockito.<DatanodeRegistration>anyObject(),
|
|
|
|
|
|
+ Mockito.any(),
|
|
Mockito.eq(FAKE_BPID),
|
|
Mockito.eq(FAKE_BPID),
|
|
- Mockito.<StorageBlockReport[]>anyObject(),
|
|
|
|
- Mockito.<BlockReportContext>anyObject());
|
|
|
|
|
|
+ Mockito.any(),
|
|
|
|
+ Mockito.any());
|
|
return true;
|
|
return true;
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.info("waiting on block report: " + t.getMessage());
|
|
LOG.info("waiting on block report: " + t.getMessage());
|
|
@@ -513,10 +508,10 @@ public class TestBPOfferService {
|
|
private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) {
|
|
private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) {
|
|
try {
|
|
try {
|
|
Mockito.verify(mockNN).blockReport(
|
|
Mockito.verify(mockNN).blockReport(
|
|
- Mockito.<DatanodeRegistration>anyObject(),
|
|
|
|
|
|
+ Mockito.any(),
|
|
Mockito.eq(FAKE_BPID),
|
|
Mockito.eq(FAKE_BPID),
|
|
- Mockito.<StorageBlockReport[]>anyObject(),
|
|
|
|
- Mockito.<BlockReportContext>anyObject());
|
|
|
|
|
|
+ Mockito.any(),
|
|
|
|
+ Mockito.any());
|
|
return true;
|
|
return true;
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.info("waiting on block report: " + t.getMessage());
|
|
LOG.info("waiting on block report: " + t.getMessage());
|
|
@@ -538,9 +533,9 @@ public class TestBPOfferService {
|
|
public Boolean get() {
|
|
public Boolean get() {
|
|
try {
|
|
try {
|
|
Mockito.verify(mockNN).blockReceivedAndDeleted(
|
|
Mockito.verify(mockNN).blockReceivedAndDeleted(
|
|
- Mockito.<DatanodeRegistration>anyObject(),
|
|
|
|
- Mockito.eq(fakeBlockPoolId),
|
|
|
|
- captor.capture());
|
|
|
|
|
|
+ Mockito.any(),
|
|
|
|
+ Mockito.eq(fakeBlockPoolId),
|
|
|
|
+ captor.capture());
|
|
return true;
|
|
return true;
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
return false;
|
|
return false;
|
|
@@ -600,9 +595,9 @@ public class TestBPOfferService {
|
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
assertSame(mockNN1, bpos.getActiveNN());
|
|
assertSame(mockNN1, bpos.getActiveNN());
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
|
|
- .when(mockNN1).reportBadBlocks(Mockito.any(LocatedBlock[].class));
|
|
|
|
|
|
+ .when(mockNN1).reportBadBlocks(Mockito.any());
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
|
|
- .when(mockNN2).reportBadBlocks(Mockito.any(LocatedBlock[].class));
|
|
|
|
|
|
+ .when(mockNN2).reportBadBlocks(Mockito.any());
|
|
bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
|
|
bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
|
|
.getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
|
|
.getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
|
|
.getStorageType());
|
|
.getStorageType());
|
|
@@ -641,10 +636,10 @@ public class TestBPOfferService {
|
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
assertSame(mockNN1, bpos.getActiveNN());
|
|
assertSame(mockNN1, bpos.getActiveNN());
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
|
|
- .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
|
|
|
|
|
|
+ .when(mockNN1).errorReport(Mockito.any(),
|
|
Mockito.anyInt(), Mockito.anyString());
|
|
Mockito.anyInt(), Mockito.anyString());
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
|
|
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
|
|
- .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
|
|
|
|
|
|
+ .when(mockNN2).errorReport(Mockito.any(),
|
|
Mockito.anyInt(), Mockito.anyString());
|
|
Mockito.anyInt(), Mockito.anyString());
|
|
String errorString = "Can't send invalid block " + FAKE_BLOCK;
|
|
String errorString = "Can't send invalid block " + FAKE_BLOCK;
|
|
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
|
|
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
|
|
@@ -679,21 +674,14 @@ public class TestBPOfferService {
|
|
bpos.triggerHeartbeatForTests();
|
|
bpos.triggerHeartbeatForTests();
|
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
assertSame(mockNN1, bpos.getActiveNN());
|
|
assertSame(mockNN1, bpos.getActiveNN());
|
|
- Mockito.doAnswer(new Answer<Void>() {
|
|
|
|
- // Throw an IOException when this function is first called which will
|
|
|
|
- // in turn add that errorReport back to the bpThreadQueue and let it
|
|
|
|
- // process the next time.
|
|
|
|
- @Override
|
|
|
|
- public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
- if (firstCallTime == 0) {
|
|
|
|
- firstCallTime = Time.now();
|
|
|
|
- throw new IOException();
|
|
|
|
- } else {
|
|
|
|
|
|
+ // Throw an IOException when this function is first called which will
|
|
|
|
+ // in turn add that errorReport back to the bpThreadQueue and let it
|
|
|
|
+ // process the next time.
|
|
|
|
+ Mockito.doThrow(new IOException("Throw IOException in the first call."))
|
|
|
|
+ .doAnswer((Answer<Void>) invocation -> {
|
|
secondCallTime = Time.now();
|
|
secondCallTime = Time.now();
|
|
return null;
|
|
return null;
|
|
- }
|
|
|
|
- }
|
|
|
|
- }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
|
|
|
|
|
|
+ }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
|
|
Mockito.anyInt(), Mockito.anyString());
|
|
Mockito.anyInt(), Mockito.anyString());
|
|
String errorString = "Can't send invalid block " + FAKE_BLOCK;
|
|
String errorString = "Can't send invalid block " + FAKE_BLOCK;
|
|
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
|
|
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
|
|
@@ -880,9 +868,9 @@ public class TestBPOfferService {
|
|
|
|
|
|
// The DN should have register to both NNs.
|
|
// The DN should have register to both NNs.
|
|
Mockito.verify(mockNN1)
|
|
Mockito.verify(mockNN1)
|
|
- .registerDatanode(Mockito.any(DatanodeRegistration.class));
|
|
|
|
|
|
+ .registerDatanode(Mockito.any());
|
|
Mockito.verify(mockNN2)
|
|
Mockito.verify(mockNN2)
|
|
- .registerDatanode(Mockito.any(DatanodeRegistration.class));
|
|
|
|
|
|
+ .registerDatanode(Mockito.any());
|
|
|
|
|
|
// Should get block reports from both NNs
|
|
// Should get block reports from both NNs
|
|
waitForBlockReport(mockNN1);
|
|
waitForBlockReport(mockNN1);
|
|
@@ -927,8 +915,7 @@ public class TestBPOfferService {
|
|
Thread.sleep(1000);
|
|
Thread.sleep(1000);
|
|
|
|
|
|
// verify new NN registered
|
|
// verify new NN registered
|
|
- Mockito.verify(mockNN3)
|
|
|
|
- .registerDatanode(Mockito.any(DatanodeRegistration.class));
|
|
|
|
|
|
+ Mockito.verify(mockNN3).registerDatanode(Mockito.any());
|
|
|
|
|
|
// When we receive a block, it should report it to both NNs
|
|
// When we receive a block, it should report it to both NNs
|
|
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|
|
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|