|
@@ -22,7 +22,10 @@ import static org.apache.hadoop.util.Time.now;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
-import java.util.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import com.google.common.base.Joiner;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -458,7 +461,7 @@ class BPServiceActor implements Runnable {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
|
|
|
+ final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
|
|
|
|
|
|
// Flush any block information that precedes the block report. Otherwise
|
|
|
// we have a chance that we will miss the delHint information
|
|
@@ -485,40 +488,54 @@ class BPServiceActor implements Runnable {
|
|
|
}
|
|
|
|
|
|
// Send the reports to the NN.
|
|
|
- int numReportsSent;
|
|
|
+ int numReportsSent = 0;
|
|
|
+ int numRPCs = 0;
|
|
|
+ boolean success = false;
|
|
|
long brSendStartTime = now();
|
|
|
- if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
|
|
- // Below split threshold, send all reports in a single message.
|
|
|
- numReportsSent = 1;
|
|
|
- DatanodeCommand cmd =
|
|
|
- bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
|
|
|
- if (cmd != null) {
|
|
|
- cmds.add(cmd);
|
|
|
- }
|
|
|
- } else {
|
|
|
- // Send one block report per message.
|
|
|
- numReportsSent = i;
|
|
|
- for (StorageBlockReport report : reports) {
|
|
|
- StorageBlockReport singleReport[] = { report };
|
|
|
+ try {
|
|
|
+ if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
|
|
+ // Below split threshold, send all reports in a single message.
|
|
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
|
|
- bpRegistration, bpos.getBlockPoolId(), singleReport);
|
|
|
+ bpRegistration, bpos.getBlockPoolId(), reports);
|
|
|
+ numRPCs = 1;
|
|
|
+ numReportsSent = reports.length;
|
|
|
if (cmd != null) {
|
|
|
cmds.add(cmd);
|
|
|
}
|
|
|
+ } else {
|
|
|
+ // Send one block report per message.
|
|
|
+ for (StorageBlockReport report : reports) {
|
|
|
+ StorageBlockReport singleReport[] = { report };
|
|
|
+ DatanodeCommand cmd = bpNamenode.blockReport(
|
|
|
+ bpRegistration, bpos.getBlockPoolId(), singleReport);
|
|
|
+ numReportsSent++;
|
|
|
+ numRPCs++;
|
|
|
+ if (cmd != null) {
|
|
|
+ cmds.add(cmd);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ success = true;
|
|
|
+ } finally {
|
|
|
+ // Log the block report processing stats from Datanode perspective
|
|
|
+ long brSendCost = now() - brSendStartTime;
|
|
|
+ long brCreateCost = brSendStartTime - brCreateStartTime;
|
|
|
+ dn.getMetrics().addBlockReport(brSendCost);
|
|
|
+ final int nCmds = cmds.size();
|
|
|
+ LOG.info((success ? "S" : "Uns") +
|
|
|
+ "uccessfully sent " + numReportsSent +
|
|
|
+ " of " + reports.length +
|
|
|
+ " blockreports for " + totalBlockCount +
|
|
|
+ " total blocks using " + numRPCs +
|
|
|
+ " RPCs. This took " + brCreateCost +
|
|
|
+ " msec to generate and " + brSendCost +
|
|
|
+ " msecs for RPC and NN processing." +
|
|
|
+ " Got back " +
|
|
|
+ ((nCmds == 0) ? "no commands" :
|
|
|
+ ((nCmds == 1) ? "one command: " + cmds.get(0) :
|
|
|
+ (nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
|
|
|
+ ".");
|
|
|
}
|
|
|
-
|
|
|
- // Log the block report processing stats from Datanode perspective
|
|
|
- long brSendCost = now() - brSendStartTime;
|
|
|
- long brCreateCost = brSendStartTime - brCreateStartTime;
|
|
|
- dn.getMetrics().addBlockReport(brSendCost);
|
|
|
- LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
|
|
|
- " blocks total. Took " + brCreateCost +
|
|
|
- " msec to generate and " + brSendCost +
|
|
|
- " msecs for RPC and NN processing. " +
|
|
|
- " Got back commands " +
|
|
|
- (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));
|
|
|
-
|
|
|
scheduleNextBlockReport(startTime);
|
|
|
return cmds.size() == 0 ? null : cmds;
|
|
|
}
|
|
@@ -968,7 +985,6 @@ class BPServiceActor implements Runnable {
|
|
|
|
|
|
/**
|
|
|
* Add pending incremental block report for a single block.
|
|
|
- * @param blockID
|
|
|
* @param blockInfo
|
|
|
*/
|
|
|
void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
|