|
@@ -460,14 +460,21 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
directive.getReplication()) * blockInfo.getNumBytes();
|
|
|
cachedTotal += cachedByBlock;
|
|
|
|
|
|
- if (mark != ocblock.getMark()) {
|
|
|
- // Mark hasn't been set in this scan, so update replication and mark.
|
|
|
+ if ((mark != ocblock.getMark()) ||
|
|
|
+ (ocblock.getReplication() < directive.getReplication())) {
|
|
|
+ //
|
|
|
+ // Overwrite the block's replication and mark in two cases:
|
|
|
+ //
|
|
|
+ // 1. If the mark on the CachedBlock is different from the mark for
|
|
|
+ // this scan, that means the block hasn't been updated during this
|
|
|
+ // scan, and we should overwrite whatever is there, since it is no
|
|
|
+ // longer valid.
|
|
|
+ //
|
|
|
+ // 2. If the replication in the CachedBlock is less than what the
|
|
|
+ // directive asks for, we want to increase the block's replication
|
|
|
+ // field to what the directive asks for.
|
|
|
+ //
|
|
|
ocblock.setReplicationAndMark(directive.getReplication(), mark);
|
|
|
- } else {
|
|
|
- // Mark already set in this scan. Set replication to highest value in
|
|
|
- // any CacheDirective that covers this file.
|
|
|
- ocblock.setReplicationAndMark((short)Math.max(
|
|
|
- directive.getReplication(), ocblock.getReplication()), mark);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -483,6 +490,36 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private String findReasonForNotCaching(CachedBlock cblock,
|
|
|
+ BlockInfo blockInfo) {
|
|
|
+ if (blockInfo == null) {
|
|
|
+ // Somehow, a cache report with the block arrived, but the block
|
|
|
+ // reports from the DataNode haven't (yet?) described such a block.
|
|
|
+ // Alternately, the NameNode might have invalidated the block, but the
|
|
|
+ // DataNode hasn't caught up. In any case, we want to tell the DN
|
|
|
+ // to uncache this.
|
|
|
+ return "not tracked by the BlockManager";
|
|
|
+ } else if (!blockInfo.isComplete()) {
|
|
|
+ // When a cached block changes state from complete to some other state
|
|
|
+ // on the DataNode (perhaps because of append), it will begin the
|
|
|
+ // uncaching process. However, the uncaching process is not
|
|
|
+ // instantaneous, especially if clients have pinned the block. So
|
|
|
+ // there may be a period of time when incomplete blocks remain cached
|
|
|
+ // on the DataNodes.
|
|
|
+ return "not complete";
|
|
|
+ } else if (cblock.getReplication() == 0) {
|
|
|
+ // Since 0 is not a valid value for a cache directive's replication
|
|
|
+ // field, seeing a replication of 0 on a CacheBlock means that it
|
|
|
+ // has never been reached by any sweep.
|
|
|
+ return "not needed by any directives";
|
|
|
+ } else if (cblock.getMark() != mark) {
|
|
|
+ // Although the block was needed in the past, we didn't reach it during
|
|
|
+ // the current sweep. Therefore, it doesn't need to be cached any more.
|
|
|
+ return "no longer needed by any directives";
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Scan through the cached block map.
|
|
|
* Any blocks which are under-replicated should be assigned new Datanodes.
|
|
@@ -508,11 +545,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
iter.remove();
|
|
|
}
|
|
|
}
|
|
|
- // If the block's mark doesn't match with the mark of this scan, that
|
|
|
- // means that this block couldn't be reached during this scan. That means
|
|
|
- // it doesn't need to be cached any more.
|
|
|
- int neededCached = (cblock.getMark() != mark) ?
|
|
|
- 0 : cblock.getReplication();
|
|
|
+ BlockInfo blockInfo = blockManager.
|
|
|
+ getStoredBlock(new Block(cblock.getBlockId()));
|
|
|
+ String reason = findReasonForNotCaching(cblock, blockInfo);
|
|
|
+ int neededCached = 0;
|
|
|
+ if (reason != null) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("not caching " + cblock + " because it is " + reason);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ neededCached = cblock.getReplication();
|
|
|
+ }
|
|
|
int numCached = cached.size();
|
|
|
if (numCached >= neededCached) {
|
|
|
// If we have enough replicas, drop all pending cached.
|
|
@@ -612,8 +655,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
BlockInfo blockInfo = blockManager.
|
|
|
getStoredBlock(new Block(cachedBlock.getBlockId()));
|
|
|
if (blockInfo == null) {
|
|
|
- LOG.debug("Not caching block " + cachedBlock + " because it " +
|
|
|
- "was deleted from all DataNodes.");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Not caching block " + cachedBlock + " because there " +
|
|
|
+ "is no record of it on the NameNode.");
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
if (!blockInfo.isComplete()) {
|