|
@@ -318,8 +318,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
new Daemon(new StorageInfoDefragmenter());
|
|
new Daemon(new StorageInfoDefragmenter());
|
|
|
|
|
|
/** Block report thread for handling async reports. */
|
|
/** Block report thread for handling async reports. */
|
|
- private final BlockReportProcessingThread blockReportThread =
|
|
|
|
- new BlockReportProcessingThread();
|
|
|
|
|
|
+ private final BlockReportProcessingThread blockReportThread;
|
|
|
|
|
|
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
|
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
|
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
|
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
|
@@ -573,6 +572,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
|
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
|
|
|
|
|
|
|
+ int queueSize = conf.getInt(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
|
|
|
|
+ blockReportThread = new BlockReportProcessingThread(queueSize);
|
|
|
|
+
|
|
LOG.info("defaultReplication = {}", defaultReplication);
|
|
LOG.info("defaultReplication = {}", defaultReplication);
|
|
LOG.info("maxReplication = {}", maxReplication);
|
|
LOG.info("maxReplication = {}", maxReplication);
|
|
LOG.info("minReplication = {}", minReplication);
|
|
LOG.info("minReplication = {}", minReplication);
|
|
@@ -4931,11 +4935,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private static final long MAX_LOCK_HOLD_MS = 4;
|
|
private static final long MAX_LOCK_HOLD_MS = 4;
|
|
private long lastFull = 0;
|
|
private long lastFull = 0;
|
|
|
|
|
|
- private final BlockingQueue<Runnable> queue =
|
|
|
|
- new ArrayBlockingQueue<Runnable>(1024);
|
|
|
|
|
|
+ private final BlockingQueue<Runnable> queue;
|
|
|
|
|
|
- BlockReportProcessingThread() {
|
|
|
|
|
|
+ BlockReportProcessingThread(int size) {
|
|
super("Block report processor");
|
|
super("Block report processor");
|
|
|
|
+ queue = new ArrayBlockingQueue<>(size);
|
|
setDaemon(true);
|
|
setDaemon(true);
|
|
}
|
|
}
|
|
|
|
|