|
@@ -26,10 +26,13 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.SortedSet;
|
|
|
+import java.util.TreeSet;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -101,6 +104,9 @@ class BPServiceActor implements Runnable {
|
|
|
private final DataNode dn;
|
|
|
private final DNConf dnConf;
|
|
|
private long prevBlockReportId;
|
|
|
+ private final SortedSet<Integer> blockReportSizes =
|
|
|
+ Collections.synchronizedSortedSet(new TreeSet<Integer>());
|
|
|
+ private final int maxDataLength;
|
|
|
|
|
|
private final IncrementalBlockReportManager ibrManager;
|
|
|
|
|
@@ -122,6 +128,8 @@ class BPServiceActor implements Runnable {
|
|
|
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
|
|
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
|
|
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
|
|
|
+ // get the value of maxDataLength.
|
|
|
+ this.maxDataLength = dnConf.getMaxDataLength();
|
|
|
}
|
|
|
|
|
|
public DatanodeRegistration getBpRegistration() {
|
|
@@ -166,6 +174,8 @@ class BPServiceActor implements Runnable {
|
|
|
String.valueOf(getScheduler().getLastHearbeatTime()));
|
|
|
info.put("LastBlockReport",
|
|
|
String.valueOf(getScheduler().getLastBlockReportTime()));
|
|
|
+ info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize()));
|
|
|
+ info.put("maxDataLength", String.valueOf(maxDataLength));
|
|
|
return info;
|
|
|
}
|
|
|
|
|
@@ -305,6 +315,14 @@ class BPServiceActor implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private int getMaxBlockReportSize() {
|
|
|
+ int maxBlockReportSize = 0;
|
|
|
+ if (!blockReportSizes.isEmpty()) {
|
|
|
+ maxBlockReportSize = blockReportSizes.last();
|
|
|
+ }
|
|
|
+ return maxBlockReportSize;
|
|
|
+ }
|
|
|
+
|
|
|
private long generateUniqueBlockReportId() {
|
|
|
// Initialize the block report ID the first time through.
|
|
|
// Note that 0 is used on the NN to indicate "uninitialized", so we should
|
|
@@ -353,12 +371,18 @@ class BPServiceActor implements Runnable {
|
|
|
boolean success = false;
|
|
|
long brSendStartTime = monotonicNow();
|
|
|
long reportId = generateUniqueBlockReportId();
|
|
|
+ boolean useBlocksBuffer =
|
|
|
+ bpRegistration.getNamespaceInfo().isCapabilitySupported(
|
|
|
+ NamespaceInfo.Capability.STORAGE_BLOCK_REPORT_BUFFERS);
|
|
|
+ blockReportSizes.clear();
|
|
|
try {
|
|
|
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
|
|
// Below split threshold, send all reports in a single message.
|
|
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
|
|
bpRegistration, bpos.getBlockPoolId(), reports,
|
|
|
new BlockReportContext(1, 0, reportId, fullBrLeaseId));
|
|
|
+ blockReportSizes.add(
|
|
|
+ calculateBlockReportPBSize(useBlocksBuffer, reports));
|
|
|
numRPCs = 1;
|
|
|
numReportsSent = reports.length;
|
|
|
if (cmd != null) {
|
|
@@ -372,6 +396,8 @@ class BPServiceActor implements Runnable {
|
|
|
bpRegistration, bpos.getBlockPoolId(), singleReport,
|
|
|
new BlockReportContext(reports.length, r, reportId,
|
|
|
fullBrLeaseId));
|
|
|
+ blockReportSizes.add(
|
|
|
+ calculateBlockReportPBSize(useBlocksBuffer, singleReport));
|
|
|
numReportsSent++;
|
|
|
numRPCs++;
|
|
|
if (cmd != null) {
|
|
@@ -437,7 +463,22 @@ class BPServiceActor implements Runnable {
|
|
|
}
|
|
|
return cmd;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private int calculateBlockReportPBSize(
|
|
|
+ boolean useBlocksBuffer, StorageBlockReport[] reports) {
|
|
|
+ int reportSize = 0;
|
|
|
+
|
|
|
+ for (StorageBlockReport r : reports) {
|
|
|
+ if (useBlocksBuffer) {
|
|
|
+ reportSize += r.getBlocks().getBlocksBuffer().size();
|
|
|
+ } else {
|
|
|
+ // each block costs 10 bytes in PB because of uint64
|
|
|
+ reportSize += 10 * r.getBlocks().getBlockListAsLongs().length;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return reportSize;
|
|
|
+ }
|
|
|
+
|
|
|
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
|
|
|
throws IOException {
|
|
|
scheduler.scheduleNextHeartbeat();
|