|
@@ -104,21 +104,21 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
private final Condition scanFinished;
|
|
|
|
|
|
/**
|
|
|
- * Whether there are pending CacheManager operations that necessitate a
|
|
|
- * CacheReplicationMonitor rescan. Protected by the CRM lock.
|
|
|
+ * The number of rescans completed. Used to wait for scans to finish.
|
|
|
+ * Protected by the CacheReplicationMonitor lock.
|
|
|
*/
|
|
|
- private boolean needsRescan = true;
|
|
|
+ private long completedScanCount = 0;
|
|
|
|
|
|
/**
|
|
|
- * Whether we are currently doing a rescan. Protected by the CRM lock.
|
|
|
+ * The scan we're currently performing, or -1 if no scan is in progress.
|
|
|
+ * Protected by the CacheReplicationMonitor lock.
|
|
|
*/
|
|
|
- private boolean isScanning = false;
|
|
|
+ private long curScanCount = -1;
|
|
|
|
|
|
/**
|
|
|
- * The number of rescans completed. Used to wait for scans to finish.
|
|
|
- * Protected by the CacheReplicationMonitor lock.
|
|
|
+ * The number of rescans we need to complete. Protected by the CRM lock.
|
|
|
*/
|
|
|
- private long scanCount = 0;
|
|
|
+ private long neededScanCount = 0;
|
|
|
|
|
|
/**
|
|
|
* True if this monitor should terminate. Protected by the CRM lock.
|
|
@@ -169,7 +169,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
LOG.info("Shutting down CacheReplicationMonitor");
|
|
|
return;
|
|
|
}
|
|
|
- if (needsRescan) {
|
|
|
+ if (completedScanCount < neededScanCount) {
|
|
|
LOG.info("Rescanning because of pending operations");
|
|
|
break;
|
|
|
}
|
|
@@ -182,8 +182,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
doRescan.await(delta, TimeUnit.MILLISECONDS);
|
|
|
curTimeMs = Time.monotonicNow();
|
|
|
}
|
|
|
- isScanning = true;
|
|
|
- needsRescan = false;
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
@@ -194,8 +192,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
// Update synchronization-related variables.
|
|
|
lock.lock();
|
|
|
try {
|
|
|
- isScanning = false;
|
|
|
- scanCount++;
|
|
|
+ completedScanCount = curScanCount;
|
|
|
+ curScanCount = -1;
|
|
|
scanFinished.signalAll();
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
@@ -226,16 +224,15 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
"Must not hold the FSN write lock when waiting for a rescan.");
|
|
|
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
|
|
|
"Must hold the CRM lock when waiting for a rescan.");
|
|
|
- if (!needsRescan) {
|
|
|
+ if (neededScanCount <= completedScanCount) {
|
|
|
return;
|
|
|
}
|
|
|
// If no scan is already ongoing, mark the CRM as dirty and kick
|
|
|
- if (!isScanning) {
|
|
|
+ if (curScanCount < 0) {
|
|
|
doRescan.signal();
|
|
|
}
|
|
|
// Wait until the scan finishes and the count advances
|
|
|
- final long startCount = scanCount;
|
|
|
- while ((!shutdown) && (startCount >= scanCount)) {
|
|
|
+ while ((!shutdown) && (completedScanCount < neededScanCount)) {
|
|
|
try {
|
|
|
scanFinished.await();
|
|
|
} catch (InterruptedException e) {
|
|
@@ -253,7 +250,14 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
public void setNeedsRescan() {
|
|
|
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
|
|
|
"Must hold the CRM lock when setting the needsRescan bit.");
|
|
|
- this.needsRescan = true;
|
|
|
+ if (curScanCount >= 0) {
|
|
|
+ // If there is a scan in progress, we need to wait for the scan after
|
|
|
+ // that.
|
|
|
+ neededScanCount = curScanCount + 1;
|
|
|
+ } else {
|
|
|
+ // If there is no scan in progress, we need to wait for the next scan.
|
|
|
+ neededScanCount = completedScanCount + 1;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -284,10 +288,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
scannedBlocks = 0;
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
+ lock.lock();
|
|
|
if (shutdown) {
|
|
|
throw new InterruptedException("CacheReplicationMonitor was " +
|
|
|
"shut down.");
|
|
|
}
|
|
|
+ curScanCount = completedScanCount + 1;
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
+ try {
|
|
|
resetStatistics();
|
|
|
rescanCacheDirectives();
|
|
|
rescanCachedBlockMap();
|