|
@@ -230,8 +230,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return pendingReconstruction.getNumTimedOuts();
|
|
|
}
|
|
|
|
|
|
- /**replicationRecheckInterval is how often namenode checks for new replication work*/
|
|
|
- private final long replicationRecheckInterval;
|
|
|
+ /**
|
|
|
+ * redundancyRecheckInterval is how often namenode checks for new
|
|
|
+ * reconstruction work.
|
|
|
+ */
|
|
|
+ private final long redundancyRecheckIntervalMs;
|
|
|
|
|
|
/** How often to check and the limit for the storageinfo efficiency. */
|
|
|
private final long storageInfoDefragmentInterval;
|
|
@@ -244,8 +247,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
final BlocksMap blocksMap;
|
|
|
|
|
|
- /** Replication thread. */
|
|
|
- final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
|
|
+ /** Redundancy thread. */
|
|
|
+ private final Daemon redundancyThread = new Daemon(new RedundancyMonitor());
|
|
|
|
|
|
/** StorageInfoDefragmenter thread. */
|
|
|
private final Daemon storageInfoDefragmenterThread =
|
|
@@ -435,10 +438,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
|
|
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
|
|
|
|
|
- this.replicationRecheckInterval =
|
|
|
- conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
|
|
|
- TimeUnit.SECONDS) * 1000L;
|
|
|
+ this.redundancyRecheckIntervalMs = conf.getTimeDuration(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
|
|
|
+ TimeUnit.SECONDS) * 1000;
|
|
|
|
|
|
this.storageInfoDefragmentInterval =
|
|
|
conf.getLong(
|
|
@@ -493,7 +496,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
LOG.info("maxReplication = " + maxReplication);
|
|
|
LOG.info("minReplication = " + minReplication);
|
|
|
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
|
|
|
- LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
|
|
|
+ LOG.info("redundancyRecheckInterval = " + redundancyRecheckIntervalMs +
|
|
|
+ "ms");
|
|
|
LOG.info("encryptDataTransfer = " + encryptDataTransfer);
|
|
|
LOG.info("maxNumBlocksToLog = " + maxNumBlocksToLog);
|
|
|
}
|
|
@@ -586,7 +590,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return blockTokenSecretManager;
|
|
|
}
|
|
|
|
|
|
- /** Allow silent termination of replication monitor for testing */
|
|
|
+ /** Allow silent termination of redundancy monitor for testing. */
|
|
|
@VisibleForTesting
|
|
|
void enableRMTerminationForTesting() {
|
|
|
checkNSRunning = false;
|
|
@@ -604,8 +608,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public void activate(Configuration conf, long blockTotal) {
|
|
|
pendingReconstruction.start();
|
|
|
datanodeManager.activate(conf);
|
|
|
- this.replicationThread.setName("ReplicationMonitor");
|
|
|
- this.replicationThread.start();
|
|
|
+ this.redundancyThread.setName("RedundancyMonitor");
|
|
|
+ this.redundancyThread.start();
|
|
|
storageInfoDefragmenterThread.setName("StorageInfoMonitor");
|
|
|
storageInfoDefragmenterThread.start();
|
|
|
this.blockReportThread.start();
|
|
@@ -616,10 +620,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public void close() {
|
|
|
bmSafeMode.close();
|
|
|
try {
|
|
|
- replicationThread.interrupt();
|
|
|
+ redundancyThread.interrupt();
|
|
|
storageInfoDefragmenterThread.interrupt();
|
|
|
blockReportThread.interrupt();
|
|
|
- replicationThread.join(3000);
|
|
|
+ redundancyThread.join(3000);
|
|
|
storageInfoDefragmenterThread.join(3000);
|
|
|
blockReportThread.join(3000);
|
|
|
} catch (InterruptedException ie) {
|
|
@@ -880,7 +884,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
|
* If IBR is not sent from expected locations yet, add the datanodes to
|
|
|
- * pendingReconstruction in order to keep ReplicationMonitor from scheduling
|
|
|
+ * pendingReconstruction in order to keep RedundancyMonitor from scheduling
|
|
|
* the block.
|
|
|
*/
|
|
|
public void addExpectedReplicasToPending(BlockInfo blk) {
|
|
@@ -1884,7 +1888,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
|
|
neededReconstruction.remove(block, priority);
|
|
|
rw.resetTargets();
|
|
|
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
|
+ blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
|
|
" it has enough replicas", block);
|
|
|
return false;
|
|
|
}
|
|
@@ -1910,8 +1914,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// reconstructions that fail after an appropriate amount of time.
|
|
|
pendingReconstruction.increment(block,
|
|
|
DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
|
|
- blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
|
|
- + "pendingReplications", block);
|
|
|
+ blockLog.debug("BLOCK* block {} is moved from neededReconstruction to "
|
|
|
+ + "pendingReconstruction", block);
|
|
|
|
|
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
|
|
// remove from neededReconstruction
|
|
@@ -4298,32 +4302,32 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
/**
|
|
|
* Periodically calls computeBlockRecoveryWork().
|
|
|
*/
|
|
|
- private class ReplicationMonitor implements Runnable {
|
|
|
+ private class RedundancyMonitor implements Runnable {
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
while (namesystem.isRunning()) {
|
|
|
try {
|
|
|
- // Process replication work only when active NN is out of safe mode.
|
|
|
+ // Process recovery work only when active NN is out of safe mode.
|
|
|
if (isPopulatingReplQueues()) {
|
|
|
computeDatanodeWork();
|
|
|
processPendingReconstructions();
|
|
|
rescanPostponedMisreplicatedBlocks();
|
|
|
}
|
|
|
- Thread.sleep(replicationRecheckInterval);
|
|
|
+ TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
|
|
|
} catch (Throwable t) {
|
|
|
if (!namesystem.isRunning()) {
|
|
|
- LOG.info("Stopping ReplicationMonitor.");
|
|
|
+ LOG.info("Stopping RedundancyMonitor.");
|
|
|
if (!(t instanceof InterruptedException)) {
|
|
|
- LOG.info("ReplicationMonitor received an exception"
|
|
|
+ LOG.info("RedundancyMonitor received an exception"
|
|
|
+ " while shutting down.", t);
|
|
|
}
|
|
|
break;
|
|
|
} else if (!checkNSRunning && t instanceof InterruptedException) {
|
|
|
- LOG.info("Stopping ReplicationMonitor for testing.");
|
|
|
+ LOG.info("Stopping RedundancyMonitor for testing.");
|
|
|
break;
|
|
|
}
|
|
|
- LOG.error("ReplicationMonitor thread received Runtime exception. ",
|
|
|
+ LOG.error("RedundancyMonitor thread received Runtime exception. ",
|
|
|
t);
|
|
|
terminate(1, t);
|
|
|
}
|
|
@@ -4692,6 +4696,14 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @return redundancy thread.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ Daemon getRedundancyThread() {
|
|
|
+ return redundancyThread;
|
|
|
+ }
|
|
|
+
|
|
|
public BlockIdManager getBlockIdManager() {
|
|
|
return blockIdManager;
|
|
|
}
|