|
@@ -140,6 +140,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
*/
|
|
|
private long scannedBlocks;
|
|
|
|
|
|
+ /**
|
|
|
+ * Avoid to hold global lock for long times.
|
|
|
+ */
|
|
|
+ private long lastScanTimeMs;
|
|
|
+
|
|
|
public CacheReplicationMonitor(FSNamesystem namesystem,
|
|
|
CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
|
|
|
this.namesystem = namesystem;
|
|
@@ -284,6 +289,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
private void rescan() throws InterruptedException {
|
|
|
scannedDirectives = 0;
|
|
|
scannedBlocks = 0;
|
|
|
+ lastScanTimeMs = Time.monotonicNow();
|
|
|
try {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
@@ -315,6 +321,19 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void reacquireLock(long last) {
|
|
|
+ long now = Time.monotonicNow();
|
|
|
+ if (now - last > cacheManager.getMaxLockTimeMs()) {
|
|
|
+ try {
|
|
|
+ namesystem.writeUnlock();
|
|
|
+ Thread.sleep(cacheManager.getSleepTimeMs());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ } finally {
|
|
|
+ namesystem.writeLock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Scan all CacheDirectives. Use the information to figure out
|
|
|
* what cache replication factor each block should have.
|
|
@@ -447,6 +466,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
if (cachedTotal == neededTotal) {
|
|
|
directive.addFilesCached(1);
|
|
|
}
|
|
|
+ if (cacheManager.isCheckLockTimeEnable()) {
|
|
|
+ reacquireLock(lastScanTimeMs);
|
|
|
+ lastScanTimeMs = Time.monotonicNow();
|
|
|
+ }
|
|
|
LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(),
|
|
|
file.getFullPathName(), cachedTotal, neededTotal);
|
|
|
}
|
|
@@ -518,6 +541,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (cacheManager.isCheckLockTimeEnable()) {
|
|
|
+ reacquireLock(lastScanTimeMs);
|
|
|
+ lastScanTimeMs = Time.monotonicNow();
|
|
|
+ }
|
|
|
for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
|
|
|
cbIter.hasNext(); ) {
|
|
|
scannedBlocks++;
|
|
@@ -603,6 +630,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
);
|
|
|
cbIter.remove();
|
|
|
}
|
|
|
+ if (cacheManager.isCheckLockTimeEnable()) {
|
|
|
+ reacquireLock(lastScanTimeMs);
|
|
|
+ lastScanTimeMs = Time.monotonicNow();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|