|
@@ -44,6 +44,8 @@ import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Set;
|
|
|
|
+import java.util.TreeSet;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
@@ -55,6 +57,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
@@ -109,8 +112,6 @@ public class TestBPOfferService {
|
|
private long firstLeaseId = 0;
|
|
private long firstLeaseId = 0;
|
|
private long secondLeaseId = 0;
|
|
private long secondLeaseId = 0;
|
|
private long nextFullBlockReportLeaseId = 1L;
|
|
private long nextFullBlockReportLeaseId = 1L;
|
|
- private int fullBlockReportCount = 0;
|
|
|
|
- private int incrBlockReportCount = 0;
|
|
|
|
|
|
|
|
static {
|
|
static {
|
|
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
|
|
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
|
|
@@ -233,14 +234,6 @@ public class TestBPOfferService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void setBlockReportCount(int count) {
|
|
|
|
- fullBlockReportCount = count;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void setIncreaseBlockReportCount(int count) {
|
|
|
|
- incrBlockReportCount += count;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Test that the BPOS can register to talk to two different NNs,
|
|
* Test that the BPOS can register to talk to two different NNs,
|
|
* sends block reports to both, etc.
|
|
* sends block reports to both, etc.
|
|
@@ -288,6 +281,7 @@ public class TestBPOfferService {
|
|
Thread addNewBlockThread = null;
|
|
Thread addNewBlockThread = null;
|
|
final AtomicInteger count = new AtomicInteger(0);
|
|
final AtomicInteger count = new AtomicInteger(0);
|
|
DataNodeFaultInjector prevDNFaultInjector = null;
|
|
DataNodeFaultInjector prevDNFaultInjector = null;
|
|
|
|
+ Set<Long> blocks = new TreeSet<>();
|
|
try {
|
|
try {
|
|
waitForBothActors(bpos);
|
|
waitForBothActors(bpos);
|
|
waitForInitialization(bpos);
|
|
waitForInitialization(bpos);
|
|
@@ -303,7 +297,7 @@ public class TestBPOfferService {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
- countBlockReportItems(FAKE_BLOCK, mockNN1);
|
|
|
|
|
|
+ countBlockReportItems(FAKE_BLOCK, mockNN1, blocks);
|
|
addNewBlockThread = new Thread(() -> {
|
|
addNewBlockThread = new Thread(() -> {
|
|
for (int i = 0; i < totalTestBlocks; i++) {
|
|
for (int i = 0; i < totalTestBlocks; i++) {
|
|
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
|
|
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
|
|
@@ -334,14 +328,12 @@ public class TestBPOfferService {
|
|
addNewBlockThread = null;
|
|
addNewBlockThread = null;
|
|
// Verify FBR/IBR count is equal to generate number.
|
|
// Verify FBR/IBR count is equal to generate number.
|
|
try {
|
|
try {
|
|
- GenericTestUtils.waitFor(() ->
|
|
|
|
- (fullBlockReportCount == totalTestBlocks ||
|
|
|
|
- incrBlockReportCount == totalTestBlocks), 1000, 15000);
|
|
|
|
|
|
+ GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks,
|
|
|
|
+ 1000, 15000);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- fail(String.format("Timed out wait for IBR counts FBRCount = %d,"
|
|
|
|
- + " IBRCount = %d; expected = %d. Exception: %s",
|
|
|
|
- fullBlockReportCount, incrBlockReportCount, totalTestBlocks,
|
|
|
|
- e.getMessage()));
|
|
|
|
|
|
+ fail(String.format("Timed out waiting for blocks count. "
|
|
|
|
+ + "reported = %d, expected = %d. Exception: %s",
|
|
|
|
+ blocks.size(), totalTestBlocks, e.getMessage()));
|
|
}
|
|
}
|
|
|
|
|
|
} finally {
|
|
} finally {
|
|
@@ -711,7 +703,8 @@ public class TestBPOfferService {
|
|
* which assume no deleting blocks here.
|
|
* which assume no deleting blocks here.
|
|
*/
|
|
*/
|
|
private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
|
private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
|
- final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|
|
|
|
|
|
+ final DatanodeProtocolClientSideTranslatorPB mockNN,
|
|
|
|
+ final Set<Long> blocks) throws Exception {
|
|
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
|
|
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
|
|
final ArgumentCaptor<StorageBlockReport[]> captor =
|
|
final ArgumentCaptor<StorageBlockReport[]> captor =
|
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
|
@@ -720,7 +713,9 @@ public class TestBPOfferService {
|
|
Mockito.doAnswer((Answer<Object>) invocation -> {
|
|
Mockito.doAnswer((Answer<Object>) invocation -> {
|
|
Object[] arguments = invocation.getArguments();
|
|
Object[] arguments = invocation.getArguments();
|
|
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
|
|
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
|
|
- setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
|
|
|
|
|
|
+ for (BlockReportReplica brr : list[0].getBlocks()) {
|
|
|
|
+ blocks.add(brr.getBlockId());
|
|
|
|
+ }
|
|
return null;
|
|
return null;
|
|
}).when(mockNN).blockReport(
|
|
}).when(mockNN).blockReport(
|
|
Mockito.any(),
|
|
Mockito.any(),
|
|
@@ -734,7 +729,9 @@ public class TestBPOfferService {
|
|
Object[] arguments = invocation.getArguments();
|
|
Object[] arguments = invocation.getArguments();
|
|
StorageReceivedDeletedBlocks[] list =
|
|
StorageReceivedDeletedBlocks[] list =
|
|
(StorageReceivedDeletedBlocks[])arguments[2];
|
|
(StorageReceivedDeletedBlocks[])arguments[2];
|
|
- setIncreaseBlockReportCount(list[0].getBlocks().length);
|
|
|
|
|
|
+ for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) {
|
|
|
|
+ blocks.add(rdbi.getBlock().getBlockId());
|
|
|
|
+ }
|
|
return null;
|
|
return null;
|
|
}).when(mockNN).blockReceivedAndDeleted(
|
|
}).when(mockNN).blockReceivedAndDeleted(
|
|
Mockito.any(),
|
|
Mockito.any(),
|
|
@@ -1233,4 +1230,4 @@ public class TestBPOfferService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-}
|
|
|
|
|
|
+}
|