|
@@ -485,72 +485,54 @@ public class BlockManager implements BlockStatsMXBean {
|
|
public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
|
public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
|
final Configuration conf) throws IOException {
|
|
final Configuration conf) throws IOException {
|
|
this.namesystem = namesystem;
|
|
this.namesystem = namesystem;
|
|
- datanodeManager = new DatanodeManager(this, namesystem, conf);
|
|
|
|
- heartbeatManager = datanodeManager.getHeartbeatManager();
|
|
|
|
|
|
+ this.datanodeManager = new DatanodeManager(this, namesystem, conf);
|
|
|
|
+ this.heartbeatManager = datanodeManager.getHeartbeatManager();
|
|
this.blockIdManager = new BlockIdManager(this);
|
|
this.blockIdManager = new BlockIdManager(this);
|
|
- blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
|
|
|
|
|
|
+ this.blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
|
|
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
|
|
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
|
|
- rescannedMisreplicatedBlocks =
|
|
|
|
|
|
+ this.rescannedMisreplicatedBlocks =
|
|
new ArrayList<Block>(blocksPerPostpondedRescan);
|
|
new ArrayList<Block>(blocksPerPostpondedRescan);
|
|
- startupDelayBlockDeletionInMs = conf.getLong(
|
|
|
|
|
|
+ this.startupDelayBlockDeletionInMs = conf.getLong(
|
|
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
|
- DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
|
|
|
|
- deleteBlockLockTimeMs = conf.getLong(
|
|
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT)
|
|
|
|
+ * 1000L;
|
|
|
|
+ this.deleteBlockLockTimeMs = conf.getLong(
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_LOCK_THRESHOLD_MS,
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_LOCK_THRESHOLD_MS,
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_LOCK_THRESHOLD_MS_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_LOCK_THRESHOLD_MS_DEFAULT);
|
|
- deleteBlockUnlockIntervalTimeMs = conf.getLong(
|
|
|
|
|
|
+ this.deleteBlockUnlockIntervalTimeMs = conf.getLong(
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_UNLOCK_INTERVAL_MS,
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_UNLOCK_INTERVAL_MS,
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_UNLOCK_INTERVAL_MS_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_UNLOCK_INTERVAL_MS_DEFAULT);
|
|
- invalidateBlocks = new InvalidateBlocks(
|
|
|
|
|
|
+ this.invalidateBlocks = new InvalidateBlocks(
|
|
datanodeManager.getBlockInvalidateLimit(),
|
|
datanodeManager.getBlockInvalidateLimit(),
|
|
startupDelayBlockDeletionInMs,
|
|
startupDelayBlockDeletionInMs,
|
|
blockIdManager);
|
|
blockIdManager);
|
|
- markedDeleteQueue = new ConcurrentLinkedQueue<>();
|
|
|
|
|
|
+ this.markedDeleteQueue = new ConcurrentLinkedQueue<>();
|
|
// Compute the map capacity by allocating 2% of total memory
|
|
// Compute the map capacity by allocating 2% of total memory
|
|
- blocksMap = new BlocksMap(
|
|
|
|
|
|
+ this.blocksMap = new BlocksMap(
|
|
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
|
|
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
|
|
- placementPolicies = new BlockPlacementPolicies(
|
|
|
|
- conf, datanodeManager.getFSClusterStats(),
|
|
|
|
- datanodeManager.getNetworkTopology(),
|
|
|
|
- datanodeManager.getHost2DatanodeMap());
|
|
|
|
- storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(conf);
|
|
|
|
- pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
|
|
|
|
|
|
+ this.placementPolicies = new BlockPlacementPolicies(
|
|
|
|
+ conf, datanodeManager.getFSClusterStats(),
|
|
|
|
+ datanodeManager.getNetworkTopology(),
|
|
|
|
+ datanodeManager.getHost2DatanodeMap());
|
|
|
|
+ this.storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(conf);
|
|
|
|
+ this.pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
|
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
|
* 1000L);
|
|
* 1000L);
|
|
|
|
|
|
createSPSManager(conf);
|
|
createSPSManager(conf);
|
|
|
|
|
|
- blockTokenSecretManager = createBlockTokenSecretManager(conf);
|
|
|
|
-
|
|
|
|
- providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
|
|
|
|
|
|
+ this.blockTokenSecretManager = createBlockTokenSecretManager(conf);
|
|
|
|
+ this.providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
|
|
|
|
|
|
this.maxCorruptFilesReturned = conf.getInt(
|
|
this.maxCorruptFilesReturned = conf.getInt(
|
|
- DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
|
|
|
|
- DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
|
|
|
|
|
|
+ DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
|
|
this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
|
|
|
- final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
|
|
|
|
- DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
|
|
|
|
- final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
|
|
|
- if (minR <= 0)
|
|
|
|
- throw new IOException("Unexpected configuration parameters: "
|
|
|
|
- + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
|
|
|
|
- + " = " + minR + " <= 0");
|
|
|
|
- if (maxR > Short.MAX_VALUE)
|
|
|
|
- throw new IOException("Unexpected configuration parameters: "
|
|
|
|
- + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
|
|
|
|
- + " = " + maxR + " > " + Short.MAX_VALUE);
|
|
|
|
- if (minR > maxR)
|
|
|
|
- throw new IOException("Unexpected configuration parameters: "
|
|
|
|
- + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
|
|
|
|
- + " = " + minR + " > "
|
|
|
|
- + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
|
|
|
|
- + " = " + maxR);
|
|
|
|
- this.minReplication = (short)minR;
|
|
|
|
- this.maxReplication = (short)maxR;
|
|
|
|
|
|
+ this.minReplication = (short) initMinReplication(conf);
|
|
|
|
+ this.maxReplication = (short) initMaxReplication(conf);
|
|
|
|
|
|
this.maxReplicationStreams =
|
|
this.maxReplicationStreams =
|
|
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
|
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
|
@@ -582,6 +564,66 @@ public class BlockManager implements BlockStatsMXBean {
|
|
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
|
|
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
|
|
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
|
|
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
|
|
|
|
|
|
|
|
+ this.minReplicationToBeInMaintenance =
|
|
|
|
+ (short) initMinReplicationToBeInMaintenance(conf);
|
|
|
|
+ this.replQueueResetToHeadThreshold =
|
|
|
|
+ initReplQueueResetToHeadThreshold(conf);
|
|
|
|
+
|
|
|
|
+ long heartbeatIntervalSecs = conf.getTimeDuration(
|
|
|
|
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
|
|
|
|
+ long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
|
|
|
|
+ this.pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
|
|
|
|
+
|
|
|
|
+ this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
|
|
|
|
+
|
|
|
|
+ this.bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled,
|
|
|
|
+ conf);
|
|
|
|
+
|
|
|
|
+ int queueSize = conf.getInt(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
|
|
|
|
+ this.blockReportThread = new BlockReportProcessingThread(queueSize);
|
|
|
|
+
|
|
|
|
+ this.deleteCorruptReplicaImmediately =
|
|
|
|
+ conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
|
|
|
+ DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);
|
|
|
|
+
|
|
|
|
+ printInitialConfigs();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private int initMinReplication(Configuration conf) throws IOException {
|
|
|
|
+ final int minR = conf.getInt(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
|
|
|
+ if (minR <= 0) {
|
|
|
|
+ throw new IOException("Unexpected configuration parameters: "
|
|
|
|
+ + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
|
|
|
|
+ + " = " + minR + " <= 0");
|
|
|
|
+ }
|
|
|
|
+ return minR;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private int initMaxReplication(Configuration conf) throws IOException {
|
|
|
|
+ final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
|
|
|
|
+ if (maxR > Short.MAX_VALUE) {
|
|
|
|
+ throw new IOException("Unexpected configuration parameters: "
|
|
|
|
+ + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
|
|
|
|
+ + " = " + maxR + " > " + Short.MAX_VALUE);
|
|
|
|
+ }
|
|
|
|
+ if (minReplication > maxR) {
|
|
|
|
+ throw new IOException("Unexpected configuration parameters: "
|
|
|
|
+ + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
|
|
|
|
+ + " = " + minReplication + " > "
|
|
|
|
+ + DFSConfigKeys.DFS_REPLICATION_MAX_KEY
|
|
|
|
+ + " = " + maxR);
|
|
|
|
+ }
|
|
|
|
+ return maxR;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private int initMinReplicationToBeInMaintenance(Configuration conf)
|
|
|
|
+ throws IOException {
|
|
final int minMaintenanceR = conf.getInt(
|
|
final int minMaintenanceR = conf.getInt(
|
|
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
|
|
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
|
|
@@ -598,39 +640,25 @@ public class BlockManager implements BlockStatsMXBean {
|
|
+ DFSConfigKeys.DFS_REPLICATION_KEY
|
|
+ DFSConfigKeys.DFS_REPLICATION_KEY
|
|
+ " = " + defaultReplication);
|
|
+ " = " + defaultReplication);
|
|
}
|
|
}
|
|
- this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
|
|
|
|
|
|
+ return minMaintenanceR;
|
|
|
|
+ }
|
|
|
|
|
|
- replQueueResetToHeadThreshold = conf.getInt(
|
|
|
|
|
|
+ private int initReplQueueResetToHeadThreshold(Configuration conf) {
|
|
|
|
+ int threshold = conf.getInt(
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
|
|
- if (replQueueResetToHeadThreshold < 0) {
|
|
|
|
|
|
+ if (threshold < 0) {
|
|
LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}",
|
|
LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}",
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
|
|
- replQueueResetToHeadThreshold, DFSConfigKeys.
|
|
|
|
|
|
+ threshold, DFSConfigKeys.
|
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
|
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
|
|
- replQueueResetToHeadThreshold = DFSConfigKeys.
|
|
|
|
|
|
+ threshold = DFSConfigKeys.
|
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT;
|
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT;
|
|
}
|
|
}
|
|
|
|
+ return threshold;
|
|
|
|
+ }
|
|
|
|
|
|
- long heartbeatIntervalSecs = conf.getTimeDuration(
|
|
|
|
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
|
|
|
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
|
|
|
|
- long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
|
|
|
|
- pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
|
|
|
|
-
|
|
|
|
- this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
|
|
|
|
-
|
|
|
|
- bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
|
|
|
-
|
|
|
|
- int queueSize = conf.getInt(
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
|
|
|
|
- blockReportThread = new BlockReportProcessingThread(queueSize);
|
|
|
|
-
|
|
|
|
- this.deleteCorruptReplicaImmediately =
|
|
|
|
- conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
|
|
|
- DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);
|
|
|
|
-
|
|
|
|
|
|
+ private void printInitialConfigs() {
|
|
LOG.info("defaultReplication = {}", defaultReplication);
|
|
LOG.info("defaultReplication = {}", defaultReplication);
|
|
LOG.info("maxReplication = {}", maxReplication);
|
|
LOG.info("maxReplication = {}", maxReplication);
|
|
LOG.info("minReplication = {}", minReplication);
|
|
LOG.info("minReplication = {}", minReplication);
|