|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
@@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
@@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.StopWatch;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
|
/**
|
|
@@ -56,27 +59,59 @@ import org.apache.hadoop.util.Time;
|
|
|
@InterfaceAudience.Private
|
|
|
public class DirectoryScanner implements Runnable {
|
|
|
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
|
|
|
+ private static final int MILLIS_PER_SECOND = 1000;
|
|
|
+ private static final String START_MESSAGE =
|
|
|
+ "Periodic Directory Tree Verification scan"
|
|
|
+ + " starting at %dms with interval of %dms";
|
|
|
+ private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
|
|
|
+ + " and throttle limit of %dms/s";
|
|
|
|
|
|
private final FsDatasetSpi<?> dataset;
|
|
|
private final ExecutorService reportCompileThreadPool;
|
|
|
private final ScheduledExecutorService masterThread;
|
|
|
private final long scanPeriodMsecs;
|
|
|
+ private final int throttleLimitMsPerSec;
|
|
|
private volatile boolean shouldRun = false;
|
|
|
private boolean retainDiffs = false;
|
|
|
private final DataNode datanode;
|
|
|
|
|
|
+ /**
|
|
|
+ * Total combined wall clock time (in milliseconds) spent by the report
|
|
|
+ * compiler threads executing. Used for testing purposes.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ final AtomicLong timeRunningMs = new AtomicLong(0L);
|
|
|
+ /**
|
|
|
+ * Total combined wall clock time (in milliseconds) spent by the report
|
|
|
+ * compiler threads blocked by the throttle. Used for testing purposes.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ final AtomicLong timeWaitingMs = new AtomicLong(0L);
|
|
|
+ /**
|
|
|
+ * The complete list of block differences indexed by block pool ID.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
|
|
|
+ /**
|
|
|
+ * Statistics about the block differences in each blockpool, indexed by
|
|
|
+ * block pool ID.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
final Map<String, Stats> stats = new HashMap<String, Stats>();
|
|
|
|
|
|
/**
|
|
|
- * Allow retaining diffs for unit test and analysis
|
|
|
- * @param b - defaults to false (off)
|
|
|
+ * Allow retaining diffs for unit test and analysis. Defaults to false (off)
|
|
|
+ * @param b whether to retain diffs
|
|
|
*/
|
|
|
+ @VisibleForTesting
|
|
|
void setRetainDiffs(boolean b) {
|
|
|
retainDiffs = b;
|
|
|
}
|
|
|
|
|
|
- /** Stats tracked for reporting and testing, per blockpool */
|
|
|
+ /**
|
|
|
+ * Stats tracked for reporting and testing, per blockpool
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
static class Stats {
|
|
|
final String bpid;
|
|
|
long totalBlocks = 0;
|
|
@@ -86,6 +121,10 @@ public class DirectoryScanner implements Runnable {
|
|
|
long mismatchBlocks = 0;
|
|
|
long duplicateBlocks = 0;
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a new Stats object for the given blockpool ID.
|
|
|
+ * @param bpid blockpool ID
|
|
|
+ */
|
|
|
public Stats(String bpid) {
|
|
|
this.bpid = bpid;
|
|
|
}
|
|
@@ -99,18 +138,32 @@ public class DirectoryScanner implements Runnable {
|
|
|
+ ", mismatched blocks:" + mismatchBlocks;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper class for compiling block info reports from report compiler threads.
|
|
|
+ */
|
|
|
static class ScanInfoPerBlockPool extends
|
|
|
HashMap<String, LinkedList<ScanInfo>> {
|
|
|
|
|
|
private static final long serialVersionUID = 1L;
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a new info list.
|
|
|
+ */
|
|
|
ScanInfoPerBlockPool() {super();}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a new info list initialized to the given expected size.
|
|
|
+ * See {@link java.util.HashMap#HashMap(int)}.
|
|
|
+ *
|
|
|
+ * @param sz initial expected size
|
|
|
+ */
|
|
|
ScanInfoPerBlockPool(int sz) {super(sz);}
|
|
|
|
|
|
/**
|
|
|
* Merges {@code that} ScanInfoPerBlockPool into this one
|
|
|
+ *
|
|
|
+ * @param the ScanInfoPerBlockPool to merge
|
|
|
*/
|
|
|
public void addAll(ScanInfoPerBlockPool that) {
|
|
|
if (that == null) return;
|
|
@@ -132,6 +185,7 @@ public class DirectoryScanner implements Runnable {
|
|
|
/**
|
|
|
* Convert all the LinkedList values in this ScanInfoPerBlockPool map
|
|
|
* into sorted arrays, and return a new map of these arrays per blockpool
|
|
|
+ *
|
|
|
* @return a map of ScanInfo arrays per blockpool
|
|
|
*/
|
|
|
public Map<String, ScanInfo[]> toSortedArrays() {
|
|
@@ -208,6 +262,9 @@ public class DirectoryScanner implements Runnable {
|
|
|
* For example, the condensed version of /foo//bar is /foo/bar
|
|
|
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O
|
|
|
* on the filesystem.
|
|
|
+ *
|
|
|
+ * @param path the path to condense
|
|
|
+ * @return the condensed path
|
|
|
*/
|
|
|
private static String getCondensedPath(String path) {
|
|
|
return CONDENSED_PATH_REGEX.matcher(path).
|
|
@@ -230,6 +287,15 @@ public class DirectoryScanner implements Runnable {
|
|
|
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a ScanInfo object for a block. This constructor will examine
|
|
|
+ * the block data and meta-data files.
|
|
|
+ *
|
|
|
+ * @param blockId the block ID
|
|
|
+ * @param blockFile the path to the block data file
|
|
|
+ * @param metaFile the path to the block meta-data file
|
|
|
+ * @param vol the volume that contains the block
|
|
|
+ */
|
|
|
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
|
|
|
this.blockId = blockId;
|
|
|
String condensedVolPath = vol == null ? null :
|
|
@@ -248,15 +314,31 @@ public class DirectoryScanner implements Runnable {
|
|
|
this.volume = vol;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the block data file.
|
|
|
+ *
|
|
|
+ * @return the block data file
|
|
|
+ */
|
|
|
File getBlockFile() {
|
|
|
return (blockSuffix == null) ? null :
|
|
|
new File(volume.getBasePath(), blockSuffix);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return the length of the data block. The length returned is the length
|
|
|
+ * cached when this object was created.
|
|
|
+ *
|
|
|
+ * @return the length of the data block
|
|
|
+ */
|
|
|
long getBlockFileLength() {
|
|
|
return blockFileLength;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the block meta data file or null if there isn't one.
|
|
|
+ *
|
|
|
+ * @return the block meta data file
|
|
|
+ */
|
|
|
File getMetaFile() {
|
|
|
if (metaSuffix == null) {
|
|
|
return null;
|
|
@@ -267,10 +349,20 @@ public class DirectoryScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the block ID.
|
|
|
+ *
|
|
|
+ * @return the block ID
|
|
|
+ */
|
|
|
long getBlockId() {
|
|
|
return blockId;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the volume that contains the block that this object describes.
|
|
|
+ *
|
|
|
+ * @return the volume
|
|
|
+ */
|
|
|
FsVolumeSpi getVolume() {
|
|
|
return volume;
|
|
|
}
|
|
@@ -309,12 +401,44 @@ public class DirectoryScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a new directory scanner, but don't cycle it running yet.
|
|
|
+ *
|
|
|
+ * @param datanode the parent datanode
|
|
|
+ * @param dataset the dataset to scan
|
|
|
+ * @param conf the Configuration object
|
|
|
+ */
|
|
|
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
|
|
|
this.datanode = datanode;
|
|
|
this.dataset = dataset;
|
|
|
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
|
|
|
- scanPeriodMsecs = interval * 1000L; //msec
|
|
|
+ scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
|
|
|
+
|
|
|
+ int throttle =
|
|
|
+ conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
|
|
|
+
|
|
|
+ if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
|
|
|
+ if (throttle > MILLIS_PER_SECOND) {
|
|
|
+ LOG.error(
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
|
|
|
+ + " set to value above 1000 ms/sec. Assuming default value of " +
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
|
|
|
+ } else {
|
|
|
+ LOG.error(
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
|
|
|
+ + " set to value below 1 ms/sec. Assuming default value of " +
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ throttleLimitMsPerSec =
|
|
|
+ DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
|
|
|
+ } else {
|
|
|
+ throttleLimitMsPerSec = throttle;
|
|
|
+ }
|
|
|
+
|
|
|
int threads =
|
|
|
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
|
|
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
|
|
@@ -325,30 +449,50 @@ public class DirectoryScanner implements Runnable {
|
|
|
new Daemon.DaemonFactory());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Start the scanner. The scanner will run every
|
|
|
+ * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
|
|
|
+ */
|
|
|
void start() {
|
|
|
shouldRun = true;
|
|
|
long offset = ThreadLocalRandom.current().nextInt(
|
|
|
- (int) (scanPeriodMsecs/1000L)) * 1000L; //msec
|
|
|
+ (int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
|
|
|
long firstScanTime = Time.now() + offset;
|
|
|
- LOG.info("Periodic Directory Tree Verification scan starting at "
|
|
|
- + firstScanTime + " with interval " + scanPeriodMsecs);
|
|
|
+ String logMsg;
|
|
|
+
|
|
|
+ if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
|
|
|
+ logMsg = String.format(START_MESSAGE_WITH_THROTTLE, firstScanTime,
|
|
|
+ scanPeriodMsecs, throttleLimitMsPerSec);
|
|
|
+ } else {
|
|
|
+ logMsg = String.format(START_MESSAGE, firstScanTime, scanPeriodMsecs);
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info(logMsg);
|
|
|
masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
|
|
|
TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
- // for unit test
|
|
|
+ /**
|
|
|
+ * Return whether the scanner has been started.
|
|
|
+ *
|
|
|
+ * @return whether the scanner has been started
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
boolean getRunStatus() {
|
|
|
return shouldRun;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Clear the current cache of diffs and statistics.
|
|
|
+ */
|
|
|
private void clear() {
|
|
|
diffs.clear();
|
|
|
stats.clear();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Main program loop for DirectoryScanner
|
|
|
- * Runs "reconcile()" periodically under the masterThread.
|
|
|
+ * Main program loop for DirectoryScanner. Runs {@link reconcile()}
|
|
|
+ * and handles any exceptions.
|
|
|
*/
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -372,6 +516,12 @@ public class DirectoryScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Stops the directory scanner. This method will wait for 1 minute for the
|
|
|
+ * main thread to exit and an additional 1 minute for the report compilation
|
|
|
+ * threads to exit. If a thread does not exit in that time period, it is
|
|
|
+ * left running, and an error is logged.
|
|
|
+ */
|
|
|
void shutdown() {
|
|
|
if (!shouldRun) {
|
|
|
LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
|
|
@@ -380,7 +530,11 @@ public class DirectoryScanner implements Runnable {
|
|
|
}
|
|
|
shouldRun = false;
|
|
|
if (masterThread != null) masterThread.shutdown();
|
|
|
- if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
|
|
|
+
|
|
|
+ if (reportCompileThreadPool != null) {
|
|
|
+ reportCompileThreadPool.shutdownNow();
|
|
|
+ }
|
|
|
+
|
|
|
if (masterThread != null) {
|
|
|
try {
|
|
|
masterThread.awaitTermination(1, TimeUnit.MINUTES);
|
|
@@ -403,6 +557,7 @@ public class DirectoryScanner implements Runnable {
|
|
|
/**
|
|
|
* Reconcile differences between disk and in-memory blocks
|
|
|
*/
|
|
|
+ @VisibleForTesting
|
|
|
void reconcile() throws IOException {
|
|
|
scan();
|
|
|
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
|
|
@@ -421,7 +576,7 @@ public class DirectoryScanner implements Runnable {
|
|
|
* Scan for the differences between disk and in-memory blocks
|
|
|
* Scan only the "finalized blocks" lists of both disk and memory.
|
|
|
*/
|
|
|
- void scan() {
|
|
|
+ private void scan() {
|
|
|
clear();
|
|
|
Map<String, ScanInfo[]> diskReport = getDiskReport();
|
|
|
|
|
@@ -509,8 +664,13 @@ public class DirectoryScanner implements Runnable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Block is found on the disk. In-memory block is missing or does not match
|
|
|
- * the block on the disk
|
|
|
+ * Add the ScanInfo object to the list of differences and adjust the stats
|
|
|
+ * accordingly. This method is called when a block is found on the disk,
|
|
|
+ * but the in-memory block is missing or does not match the block on the disk.
|
|
|
+ *
|
|
|
+ * @param diffRecord the list to which to add the info
|
|
|
+ * @param statsRecord the stats to update
|
|
|
+ * @param info the differing info
|
|
|
*/
|
|
|
private void addDifference(LinkedList<ScanInfo> diffRecord,
|
|
|
Stats statsRecord, ScanInfo info) {
|
|
@@ -519,7 +679,15 @@ public class DirectoryScanner implements Runnable {
|
|
|
diffRecord.add(info);
|
|
|
}
|
|
|
|
|
|
- /** Block is not found on the disk */
|
|
|
+ /**
|
|
|
+ * Add a new ScanInfo object to the list of differences and adjust the stats
|
|
|
+ * accordingly. This method is called when a block is not found on the disk.
|
|
|
+ *
|
|
|
+ * @param diffRecord the list to which to add the info
|
|
|
+ * @param statsRecord the stats to update
|
|
|
+ * @param blockId the id of the missing block
|
|
|
+ * @param vol the volume that contains the missing block
|
|
|
+ */
|
|
|
private void addDifference(LinkedList<ScanInfo> diffRecord,
|
|
|
Stats statsRecord, long blockId,
|
|
|
FsVolumeSpi vol) {
|
|
@@ -528,7 +696,13 @@ public class DirectoryScanner implements Runnable {
|
|
|
diffRecord.add(new ScanInfo(blockId, null, null, vol));
|
|
|
}
|
|
|
|
|
|
- /** Get lists of blocks on the disk sorted by blockId, per blockpool */
|
|
|
+ /**
|
|
|
+ * Get the lists of blocks on the disks in the dataset, sorted by blockId.
|
|
|
+ * The returned map contains one entry per blockpool, keyed by the blockpool
|
|
|
+ * ID.
|
|
|
+ *
|
|
|
+ * @return a map of sorted arrays of block information
|
|
|
+ */
|
|
|
private Map<String, ScanInfo[]> getDiskReport() {
|
|
|
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
|
|
|
ScanInfoPerBlockPool[] dirReports = null;
|
|
@@ -555,6 +729,12 @@ public class DirectoryScanner implements Runnable {
|
|
|
compilersInProgress.entrySet()) {
|
|
|
try {
|
|
|
dirReports[report.getKey()] = report.getValue().get();
|
|
|
+
|
|
|
+ // If our compiler threads were interrupted, give up on this run
|
|
|
+ if (dirReports[report.getKey()] == null) {
|
|
|
+ dirReports = null;
|
|
|
+ break;
|
|
|
+ }
|
|
|
} catch (Exception ex) {
|
|
|
LOG.error("Error compiling report", ex);
|
|
|
// Propagate ex to DataBlockScanner to deal with
|
|
@@ -573,38 +753,102 @@ public class DirectoryScanner implements Runnable {
|
|
|
return list.toSortedArrays();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Helper method to determine if a file name is consistent with a block.
|
|
|
+ * meta-data file
|
|
|
+ *
|
|
|
+ * @param blockId the block ID
|
|
|
+ * @param metaFile the file to check
|
|
|
+ * @return whether the file name is a block meta-data file name
|
|
|
+ */
|
|
|
private static boolean isBlockMetaFile(String blockId, String metaFile) {
|
|
|
return metaFile.startsWith(blockId)
|
|
|
&& metaFile.endsWith(Block.METADATA_EXTENSION);
|
|
|
}
|
|
|
|
|
|
- private static class ReportCompiler
|
|
|
- implements Callable<ScanInfoPerBlockPool> {
|
|
|
+ /**
|
|
|
+ * The ReportCompiler class encapsulates the process of searching a datanode's
|
|
|
+ * disks for block information. It operates by performing a DFS of the
|
|
|
+ * volume to discover block information.
|
|
|
+ *
|
|
|
+ * When the ReportCompiler discovers block information, it create a new
|
|
|
+ * ScanInfo object for it and adds that object to its report list. The report
|
|
|
+ * list is returned by the {@link #call()} method.
|
|
|
+ */
|
|
|
+ private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
|
|
|
private final FsVolumeSpi volume;
|
|
|
private final DataNode datanode;
|
|
|
+ // Variable for tracking time spent running for throttling purposes
|
|
|
+ private final StopWatch throttleTimer = new StopWatch();
|
|
|
+ // Variable for tracking time spent running and waiting for testing
|
|
|
+ // purposes
|
|
|
+ private final StopWatch perfTimer = new StopWatch();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The associated thread. Used for testing purposes only.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ Thread currentThread;
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a report compiler for the given volume on the given datanode.
|
|
|
+ *
|
|
|
+ * @param datanode the target datanode
|
|
|
+ * @param volume the target volume
|
|
|
+ */
|
|
|
public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
|
|
|
this.datanode = datanode;
|
|
|
this.volume = volume;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Run this report compiler thread.
|
|
|
+ *
|
|
|
+ * @return the block info report list
|
|
|
+ * @throws IOException if the block pool isn't found
|
|
|
+ */
|
|
|
@Override
|
|
|
- public ScanInfoPerBlockPool call() throws Exception {
|
|
|
+ public ScanInfoPerBlockPool call() throws IOException {
|
|
|
+ currentThread = Thread.currentThread();
|
|
|
+
|
|
|
String[] bpList = volume.getBlockPoolList();
|
|
|
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
|
|
|
for (String bpid : bpList) {
|
|
|
- LinkedList<ScanInfo> report = new LinkedList<ScanInfo>();
|
|
|
+ LinkedList<ScanInfo> report = new LinkedList<>();
|
|
|
File bpFinalizedDir = volume.getFinalizedDir(bpid);
|
|
|
- result.put(bpid,
|
|
|
- compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
|
|
|
+
|
|
|
+ perfTimer.start();
|
|
|
+ throttleTimer.start();
|
|
|
+
|
|
|
+ try {
|
|
|
+ result.put(bpid,
|
|
|
+ compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ // Exit quickly and flag the scanner to do the same
|
|
|
+ result = null;
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- /** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
|
|
|
+ /**
|
|
|
+ * Compile a list of {@link ScanInfo} for the blocks in the directory
|
|
|
+ * given by {@code dir}.
|
|
|
+ *
|
|
|
+ * @param vol the volume that contains the directory to scan
|
|
|
+ * @param bpFinalizedDir the root directory of the directory to scan
|
|
|
+ * @param dir the directory to scan
|
|
|
+ * @param report the list onto which blocks reports are placed
|
|
|
+ */
|
|
|
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
|
|
|
- File bpFinalizedDir, File dir, LinkedList<ScanInfo> report) {
|
|
|
+ File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
|
|
|
+ throws InterruptedException {
|
|
|
+
|
|
|
File[] files;
|
|
|
+
|
|
|
+ throttle();
|
|
|
+
|
|
|
try {
|
|
|
files = FileUtil.listFiles(dir);
|
|
|
} catch (IOException ioe) {
|
|
@@ -622,6 +866,12 @@ public class DirectoryScanner implements Runnable {
|
|
|
* blk_<blockid>_<genstamp>.meta
|
|
|
*/
|
|
|
for (int i = 0; i < files.length; i++) {
|
|
|
+ // Make sure this thread can make a timely exit. With a low throttle
|
|
|
+ // rate, completing a run can take a looooong time.
|
|
|
+ if (Thread.interrupted()) {
|
|
|
+ throw new InterruptedException();
|
|
|
+ }
|
|
|
+
|
|
|
if (files[i].isDirectory()) {
|
|
|
compileReport(vol, bpFinalizedDir, files[i], report);
|
|
|
continue;
|
|
@@ -668,5 +918,40 @@ public class DirectoryScanner implements Runnable {
|
|
|
+ " has to be upgraded to block ID-based layout");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Called by the thread before each potential disk scan so that a pause
|
|
|
+ * can be optionally inserted to limit the number of scans per second.
|
|
|
+ * The limit is controlled by
|
|
|
+ * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
|
|
|
+ */
|
|
|
+ private void throttle() throws InterruptedException {
|
|
|
+ accumulateTimeRunning();
|
|
|
+
|
|
|
+ if ((throttleLimitMsPerSec < 1000) &&
|
|
|
+ (throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
|
|
|
+
|
|
|
+ Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
|
|
|
+ throttleTimer.reset().start();
|
|
|
+ }
|
|
|
+
|
|
|
+ accumulateTimeWaiting();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method to measure time running.
|
|
|
+ */
|
|
|
+ private void accumulateTimeRunning() {
|
|
|
+ timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
|
|
|
+ perfTimer.reset().start();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method to measure time waiting.
|
|
|
+ */
|
|
|
+ private void accumulateTimeWaiting() {
|
|
|
+ timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
|
|
|
+ perfTimer.reset().start();
|
|
|
+ }
|
|
|
}
|
|
|
}
|