|
@@ -65,7 +65,8 @@ public class DirectoryScanner implements Runnable {
|
|
|
LoggerFactory.getLogger(DirectoryScanner.class);
|
|
|
|
|
|
private static final int DEFAULT_MAP_SIZE = 32768;
|
|
|
- private static final int RECONCILE_BLOCKS_BATCH_SIZE = 1000;
|
|
|
+ private final int reconcileBlocksBatchSize;
|
|
|
+ private final long reconcileBlocksBatchInterval;
|
|
|
private final FsDatasetSpi<?> dataset;
|
|
|
private final ExecutorService reportCompileThreadPool;
|
|
|
private final ScheduledExecutorService masterThread;
|
|
@@ -315,6 +316,41 @@ public class DirectoryScanner implements Runnable {
|
|
|
|
|
|
masterThread =
|
|
|
new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory());
|
|
|
+
|
|
|
+ int reconcileBatchSize =
|
|
|
+ conf.getInt(DFSConfigKeys.
|
|
|
+ DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE,
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE_DEFAULT);
|
|
|
+
|
|
|
+ if (reconcileBatchSize <= 0) {
|
|
|
+ LOG.warn("Invalid value configured for " +
|
|
|
+ "dfs.datanode.reconcile.blocks.batch.size, " +
|
|
|
+ "should be greater than 0, Using default.");
|
|
|
+ reconcileBatchSize =
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE_DEFAULT;
|
|
|
+ }
|
|
|
+
|
|
|
+ reconcileBlocksBatchSize = reconcileBatchSize;
|
|
|
+
|
|
|
+ long reconcileBatchInterval =
|
|
|
+ conf.getTimeDuration(DFSConfigKeys.
|
|
|
+ DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL,
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL_DEFAULT,
|
|
|
+ TimeUnit.MILLISECONDS);
|
|
|
+
|
|
|
+ if (reconcileBatchInterval <= 0) {
|
|
|
+ LOG.warn("Invalid value configured for " +
|
|
|
+ "dfs.datanode.reconcile.blocks.batch.interval, " +
|
|
|
+ "should be greater than 0, Using default.");
|
|
|
+ reconcileBatchInterval =
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL_DEFAULT;
|
|
|
+ }
|
|
|
+
|
|
|
+ reconcileBlocksBatchInterval = reconcileBatchInterval;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -428,16 +464,16 @@ public class DirectoryScanner implements Runnable {
|
|
|
LOG.debug("reconcile start DirectoryScanning");
|
|
|
scan();
|
|
|
|
|
|
- // HDFS-14476: run checkAndUpadte with batch to avoid holding the lock too
|
|
|
+ // HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too
|
|
|
// long
|
|
|
int loopCount = 0;
|
|
|
synchronized (diffs) {
|
|
|
for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) {
|
|
|
dataset.checkAndUpdate(entry.getKey(), entry.getValue());
|
|
|
|
|
|
- if (loopCount % RECONCILE_BLOCKS_BATCH_SIZE == 0) {
|
|
|
+ if (loopCount % reconcileBlocksBatchSize == 0) {
|
|
|
try {
|
|
|
- Thread.sleep(2000);
|
|
|
+ Thread.sleep(reconcileBlocksBatchInterval);
|
|
|
} catch (InterruptedException e) {
|
|
|
// do nothing
|
|
|
}
|