|
@@ -42,7 +42,7 @@ import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.concurrent.atomic.LongAdder;
|
|
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.commons.lang3.time.DurationFormatUtils;
|
|
@@ -120,7 +120,7 @@ public class FsDatasetCache {
|
|
|
private final HashMap<ExtendedBlockId, Value> mappableBlockMap =
|
|
|
new HashMap<ExtendedBlockId, Value>();
|
|
|
|
|
|
- private final AtomicLong numBlocksCached = new AtomicLong(0);
|
|
|
+ private final LongAdder numBlocksCached = new LongAdder();
|
|
|
|
|
|
private final FsDatasetImpl dataset;
|
|
|
|
|
@@ -143,11 +143,11 @@ public class FsDatasetCache {
|
|
|
/**
|
|
|
* Number of cache commands that could not be completed successfully
|
|
|
*/
|
|
|
- final AtomicLong numBlocksFailedToCache = new AtomicLong(0);
|
|
|
+ final LongAdder numBlocksFailedToCache = new LongAdder();
|
|
|
/**
|
|
|
* Number of uncache commands that could not be completed successfully
|
|
|
*/
|
|
|
- final AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
|
|
|
+ final LongAdder numBlocksFailedToUncache = new LongAdder();
|
|
|
|
|
|
public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
|
|
|
this.dataset = dataset;
|
|
@@ -204,7 +204,7 @@ public class FsDatasetCache {
|
|
|
for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) {
|
|
|
mappableBlockMap.put(entry.getKey(),
|
|
|
new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED));
|
|
|
- numBlocksCached.addAndGet(1);
|
|
|
+ numBlocksCached.increment();
|
|
|
dataset.datanode.getMetrics().incrBlocksCached(1);
|
|
|
}
|
|
|
}
|
|
@@ -278,7 +278,7 @@ public class FsDatasetCache {
|
|
|
LOG.debug("Block with id {}, pool {} already exists in the "
|
|
|
+ "FsDatasetCache with state {}", blockId, bpid, prevValue.state
|
|
|
);
|
|
|
- numBlocksFailedToCache.incrementAndGet();
|
|
|
+ numBlocksFailedToCache.increment();
|
|
|
return;
|
|
|
}
|
|
|
mappableBlockMap.put(key, new Value(null, State.CACHING));
|
|
@@ -301,7 +301,7 @@ public class FsDatasetCache {
|
|
|
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
|
|
|
+ "because it is not currently in the mappableBlockMap.", blockId,
|
|
|
bpid);
|
|
|
- numBlocksFailedToUncache.incrementAndGet();
|
|
|
+ numBlocksFailedToUncache.increment();
|
|
|
return;
|
|
|
}
|
|
|
switch (prevValue.state) {
|
|
@@ -331,7 +331,7 @@ public class FsDatasetCache {
|
|
|
default:
|
|
|
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
|
|
|
+ "because it is in state {}.", blockId, bpid, prevValue.state);
|
|
|
- numBlocksFailedToUncache.incrementAndGet();
|
|
|
+ numBlocksFailedToUncache.increment();
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -469,7 +469,7 @@ public class FsDatasetCache {
|
|
|
dataset.datanode.
|
|
|
getShortCircuitRegistry().processBlockMlockEvent(key);
|
|
|
}
|
|
|
- numBlocksCached.addAndGet(1);
|
|
|
+ numBlocksCached.increment();
|
|
|
dataset.datanode.getMetrics().incrBlocksCached(1);
|
|
|
success = true;
|
|
|
} finally {
|
|
@@ -482,7 +482,7 @@ public class FsDatasetCache {
|
|
|
LOG.debug("Caching of {} was aborted. We are now caching only {} "
|
|
|
+ "bytes in total.", key, cacheLoader.getCacheUsed());
|
|
|
IOUtils.closeQuietly(mappableBlock);
|
|
|
- numBlocksFailedToCache.incrementAndGet();
|
|
|
+ numBlocksFailedToCache.increment();
|
|
|
|
|
|
synchronized (FsDatasetCache.this) {
|
|
|
mappableBlockMap.remove(key);
|
|
@@ -561,7 +561,7 @@ public class FsDatasetCache {
|
|
|
}
|
|
|
long newUsedBytes = cacheLoader.
|
|
|
release(key, value.mappableBlock.getLength());
|
|
|
- numBlocksCached.addAndGet(-1);
|
|
|
+ numBlocksCached.decrement();
|
|
|
dataset.datanode.getMetrics().incrBlocksUncached(1);
|
|
|
if (revocationTimeMs != 0) {
|
|
|
LOG.debug("Uncaching of {} completed. usedBytes = {}",
|
|
@@ -607,15 +607,15 @@ public class FsDatasetCache {
|
|
|
}
|
|
|
|
|
|
public long getNumBlocksFailedToCache() {
|
|
|
- return numBlocksFailedToCache.get();
|
|
|
+ return numBlocksFailedToCache.longValue();
|
|
|
}
|
|
|
|
|
|
public long getNumBlocksFailedToUncache() {
|
|
|
- return numBlocksFailedToUncache.get();
|
|
|
+ return numBlocksFailedToUncache.longValue();
|
|
|
}
|
|
|
|
|
|
public long getNumBlocksCached() {
|
|
|
- return numBlocksCached.get();
|
|
|
+ return numBlocksCached.longValue();
|
|
|
}
|
|
|
|
|
|
public synchronized boolean isCached(String bpid, long blockId) {
|