|
@@ -18,6 +18,11 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
|
|
|
+
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
|
@@ -33,10 +38,12 @@ import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
+import org.apache.commons.lang.time.DurationFormatUtils;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.ChecksumException;
|
|
@@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -116,6 +124,12 @@ public class FsDatasetCache {
|
|
|
|
|
|
private final ThreadPoolExecutor uncachingExecutor;
|
|
|
|
|
|
+ private final ScheduledThreadPoolExecutor deferredUncachingExecutor;
|
|
|
+
|
|
|
+ private final long revocationMs;
|
|
|
+
|
|
|
+ private final long revocationPollingMs;
|
|
|
+
|
|
|
/**
|
|
|
* The approximate amount of cache space in use.
|
|
|
*
|
|
@@ -217,6 +231,24 @@ public class FsDatasetCache {
|
|
|
new LinkedBlockingQueue<Runnable>(),
|
|
|
workerFactory);
|
|
|
this.uncachingExecutor.allowCoreThreadTimeOut(true);
|
|
|
+ this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor(
|
|
|
+ 1, workerFactory);
|
|
|
+ this.revocationMs = dataset.datanode.getConf().getLong(
|
|
|
+ DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
|
|
|
+ DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT);
|
|
|
+ long confRevocationPollingMs = dataset.datanode.getConf().getLong(
|
|
|
+ DFS_DATANODE_CACHE_REVOCATION_POLLING_MS,
|
|
|
+ DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT);
|
|
|
+ long minRevocationPollingMs = revocationMs / 2;
|
|
|
+ if (minRevocationPollingMs < confRevocationPollingMs) {
|
|
|
+ throw new RuntimeException("configured value " +
|
|
|
+ confRevocationPollingMs + "for " +
|
|
|
+ DFS_DATANODE_CACHE_REVOCATION_POLLING_MS +
|
|
|
+ " is too high. It must not be more than half of the " +
|
|
|
+ "value of " + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS +
|
|
|
+ ". Reconfigure this to " + minRevocationPollingMs);
|
|
|
+ }
|
|
|
+ this.revocationPollingMs = confRevocationPollingMs;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -262,13 +294,11 @@ public class FsDatasetCache {
|
|
|
synchronized void uncacheBlock(String bpid, long blockId) {
|
|
|
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
|
|
|
Value prevValue = mappableBlockMap.get(key);
|
|
|
+ boolean deferred = false;
|
|
|
|
|
|
if (!dataset.datanode.getShortCircuitRegistry().
|
|
|
processBlockMunlockRequest(key)) {
|
|
|
- // TODO: we probably want to forcibly uncache the block (and close the
|
|
|
- // shm) after a certain timeout has elapsed.
|
|
|
- LOG.debug("{} is anchored, and can't be uncached now.", key);
|
|
|
- return;
|
|
|
+ deferred = true;
|
|
|
}
|
|
|
if (prevValue == null) {
|
|
|
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
|
|
@@ -285,12 +315,19 @@ public class FsDatasetCache {
|
|
|
new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
|
|
|
break;
|
|
|
case CACHED:
|
|
|
- LOG.debug(
|
|
|
- "Block with id {}, pool {} has been scheduled for uncaching" + ".",
|
|
|
- blockId, bpid);
|
|
|
mappableBlockMap.put(key,
|
|
|
new Value(prevValue.mappableBlock, State.UNCACHING));
|
|
|
- uncachingExecutor.execute(new UncachingTask(key));
|
|
|
+ if (deferred) {
|
|
|
+ LOG.debug("{} is anchored, and can't be uncached now. Scheduling it " +
|
|
|
+ "for uncaching in {} ",
|
|
|
+ key, DurationFormatUtils.formatDurationHMS(revocationPollingMs));
|
|
|
+ deferredUncachingExecutor.schedule(
|
|
|
+ new UncachingTask(key, revocationMs),
|
|
|
+ revocationPollingMs, TimeUnit.MILLISECONDS);
|
|
|
+ } else {
|
|
|
+ LOG.debug("{} has been scheduled for immediate uncaching.", key);
|
|
|
+ uncachingExecutor.execute(new UncachingTask(key, 0));
|
|
|
+ }
|
|
|
break;
|
|
|
default:
|
|
|
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
|
|
@@ -403,22 +440,62 @@ public class FsDatasetCache {
|
|
|
|
|
|
private class UncachingTask implements Runnable {
|
|
|
private final ExtendedBlockId key;
|
|
|
+ private final long revocationTimeMs;
|
|
|
|
|
|
- UncachingTask(ExtendedBlockId key) {
|
|
|
+ UncachingTask(ExtendedBlockId key, long revocationDelayMs) {
|
|
|
this.key = key;
|
|
|
+ if (revocationDelayMs == 0) {
|
|
|
+ this.revocationTimeMs = 0;
|
|
|
+ } else {
|
|
|
+ this.revocationTimeMs = revocationDelayMs + Time.monotonicNow();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean shouldDefer() {
|
|
|
+ /* If revocationTimeMs == 0, this is an immediate uncache request.
|
|
|
+ * No clients were anchored at the time we made the request. */
|
|
|
+ if (revocationTimeMs == 0) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ /* Let's check if any clients still have this block anchored. */
|
|
|
+ boolean anchored =
|
|
|
+ !dataset.datanode.getShortCircuitRegistry().
|
|
|
+ processBlockMunlockRequest(key);
|
|
|
+ if (!anchored) {
|
|
|
+ LOG.debug("Uncaching {} now that it is no longer in use " +
|
|
|
+ "by any clients.", key);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ long delta = revocationTimeMs - Time.monotonicNow();
|
|
|
+ if (delta < 0) {
|
|
|
+ LOG.warn("Forcibly uncaching {} after {} " +
|
|
|
+ "because client(s) {} refused to stop using it.", key,
|
|
|
+ DurationFormatUtils.formatDurationHMS(revocationTimeMs),
|
|
|
+ dataset.datanode.getShortCircuitRegistry().getClientNames(key));
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ LOG.info("Replica {} still can't be uncached because some " +
|
|
|
+ "clients continue to use it. Will wait for {}", key,
|
|
|
+ DurationFormatUtils.formatDurationHMS(delta));
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
Value value;
|
|
|
-
|
|
|
+
|
|
|
+ if (shouldDefer()) {
|
|
|
+ deferredUncachingExecutor.schedule(
|
|
|
+ this, revocationPollingMs, TimeUnit.MILLISECONDS);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
synchronized (FsDatasetCache.this) {
|
|
|
value = mappableBlockMap.get(key);
|
|
|
}
|
|
|
Preconditions.checkNotNull(value);
|
|
|
Preconditions.checkArgument(value.state == State.UNCACHING);
|
|
|
- // TODO: we will eventually need to do revocation here if any clients
|
|
|
- // are reading via mmap with checksums enabled. See HDFS-5182.
|
|
|
+
|
|
|
IOUtils.closeQuietly(value.mappableBlock);
|
|
|
synchronized (FsDatasetCache.this) {
|
|
|
mappableBlockMap.remove(key);
|
|
@@ -427,7 +504,13 @@ public class FsDatasetCache {
|
|
|
usedBytesCount.release(value.mappableBlock.getLength());
|
|
|
numBlocksCached.addAndGet(-1);
|
|
|
dataset.datanode.getMetrics().incrBlocksUncached(1);
|
|
|
- LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes);
|
|
|
+ if (revocationTimeMs != 0) {
|
|
|
+ LOG.debug("Uncaching of {} completed. usedBytes = {}",
|
|
|
+ key, newUsedBytes);
|
|
|
+ } else {
|
|
|
+ LOG.debug("Deferred uncaching of {} completed. usedBytes = {}",
|
|
|
+ key, newUsedBytes);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|