|
@@ -30,6 +30,7 @@ import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
@@ -48,10 +49,12 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
@@ -92,7 +95,10 @@ public class TestBPOfferService {
|
|
|
|
|
|
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
|
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
|
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
|
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
|
- private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
|
|
|
|
|
|
+ private final NNHAStatusHeartbeat[] mockHaStatuses =
|
|
|
|
+ new NNHAStatusHeartbeat[2];
|
|
|
|
+ private final DatanodeCommand[][] datanodeCommands =
|
|
|
|
+ new DatanodeCommand[2][0];
|
|
private final int[] heartbeatCounts = new int[2];
|
|
private final int[] heartbeatCounts = new int[2];
|
|
private DataNode mockDn;
|
|
private DataNode mockDn;
|
|
private FsDatasetSpi<?> mockFSDataset;
|
|
private FsDatasetSpi<?> mockFSDataset;
|
|
@@ -147,6 +153,7 @@ public class TestBPOfferService {
|
|
Mockito.any(VolumeFailureSummary.class),
|
|
Mockito.any(VolumeFailureSummary.class),
|
|
Mockito.anyBoolean());
|
|
Mockito.anyBoolean());
|
|
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
|
|
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
|
|
|
|
+ datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
|
return mock;
|
|
return mock;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -165,9 +172,12 @@ public class TestBPOfferService {
|
|
@Override
|
|
@Override
|
|
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
|
|
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
|
|
heartbeatCounts[nnIdx]++;
|
|
heartbeatCounts[nnIdx]++;
|
|
- return new HeartbeatResponse(new DatanodeCommand[0],
|
|
|
|
- mockHaStatuses[nnIdx], null,
|
|
|
|
|
|
+ HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
|
|
|
|
+ datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
|
|
ThreadLocalRandom.current().nextLong() | 1L);
|
|
ThreadLocalRandom.current().nextLong() | 1L);
|
|
|
|
+ //reset the command
|
|
|
|
+ datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
|
|
|
+ return heartbeatResponse;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -709,4 +719,84 @@ public class TestBPOfferService {
|
|
bpos.join();
|
|
bpos.join();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * HDFS-9917 : Standby IBR accumulation when Standby was down.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testIBRClearanceForStandbyOnReRegister() throws Exception {
|
|
|
|
+ final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
|
|
|
+ bpos.start();
|
|
|
|
+ try {
|
|
|
|
+ waitForInitialization(bpos);
|
|
|
|
+ // Should start with neither NN as active.
|
|
|
|
+ assertNull(bpos.getActiveNN());
|
|
|
|
+ // Have NN1 claim active at txid 1
|
|
|
|
+ mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
|
|
|
|
+ bpos.triggerHeartbeatForTests();
|
|
|
|
+ // Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
|
|
|
+ assertSame(mockNN1, bpos.getActiveNN());
|
|
|
|
+ // Return nothing when active Active Namenode gets IBRs
|
|
|
|
+ Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted(
|
|
|
|
+ Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
|
|
|
|
+ .any(StorageReceivedDeletedBlocks[].class));
|
|
|
|
+
|
|
|
|
+ final IOException re = new IOException(
|
|
|
|
+ "Standby NN is currently not able to process IBR");
|
|
|
|
+
|
|
|
|
+ final AtomicBoolean ibrReported = new AtomicBoolean(false);
|
|
|
|
+ // throw exception for standby when first IBR is receieved
|
|
|
|
+ Mockito.doAnswer(new Answer<Void>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
+ ibrReported.set(true);
|
|
|
|
+ throw re;
|
|
|
|
+ }
|
|
|
|
+ }).when(mockNN2).blockReceivedAndDeleted(
|
|
|
|
+ Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
|
|
|
|
+ .any(StorageReceivedDeletedBlocks[].class));
|
|
|
|
+
|
|
|
|
+ DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
|
|
|
|
+ Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
|
|
|
|
+ // Add IBRs
|
|
|
|
+ bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
|
|
|
|
+ // Send heartbeat so that the BpServiceActor can send IBR to
|
|
|
|
+ // namenode
|
|
|
|
+ bpos.triggerHeartbeatForTests();
|
|
|
|
+ // Wait till first IBR is received at standbyNN. Just for confirmation.
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return ibrReported.get();
|
|
|
|
+ }
|
|
|
|
+ }, 100, 1000);
|
|
|
|
+
|
|
|
|
+ // Send register command back to Datanode to reRegister().
|
|
|
|
+ // After reRegister IBRs should be cleared.
|
|
|
|
+ datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() };
|
|
|
|
+ assertEquals(
|
|
|
|
+ "IBR size before reRegister should be non-0", 1, getStandbyIBRSize(
|
|
|
|
+ bpos));
|
|
|
|
+ bpos.triggerHeartbeatForTests();
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return getStandbyIBRSize(bpos) == 0;
|
|
|
|
+ }
|
|
|
|
+ }, 100, 1000);
|
|
|
|
+ } finally {
|
|
|
|
+ bpos.stop();
|
|
|
|
+ bpos.join();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private int getStandbyIBRSize(BPOfferService bpos) {
|
|
|
|
+ List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
|
|
|
|
+ for (BPServiceActor bpServiceActor : bpServiceActors) {
|
|
|
|
+ if (bpServiceActor.state == HAServiceState.STANDBY) {
|
|
|
|
+ return bpServiceActor.getIbrManager().getPendingIBRSize();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return -1;
|
|
|
|
+ }
|
|
}
|
|
}
|