|
@@ -313,30 +313,32 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
FSDirectory fsDir = namesystem.getFSDirectory();
|
|
|
final long now = new Date().getTime();
|
|
|
for (CacheDirective directive : cacheManager.getCacheDirectives()) {
|
|
|
+ scannedDirectives++;
|
|
|
// Skip processing this entry if it has expired
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Directive expiry is at " + directive.getExpiryTime());
|
|
|
- }
|
|
|
if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Skipping directive id " + directive.getId()
|
|
|
- + " because it has expired (" + directive.getExpiryTime() + "<="
|
|
|
- + now + ")");
|
|
|
+ LOG.debug("Directive " + directive.getId() + ": the directive " +
|
|
|
+ "expired at " + directive.getExpiryTime() + " (now = " +
|
|
|
+ now + ")");
|
|
|
}
|
|
|
continue;
|
|
|
}
|
|
|
- scannedDirectives++;
|
|
|
String path = directive.getPath();
|
|
|
INode node;
|
|
|
try {
|
|
|
node = fsDir.getINode(path);
|
|
|
} catch (UnresolvedLinkException e) {
|
|
|
// We don't cache through symlinks
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Directive " + directive.getId() +
|
|
|
+ ": got UnresolvedLinkException while resolving path " + path);
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
if (node == null) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("No inode found at " + path);
|
|
|
+ LOG.debug("Directive " + directive.getId() +
|
|
|
+ ": No inode found at " + path);
|
|
|
}
|
|
|
} else if (node.isDirectory()) {
|
|
|
INodeDirectory dir = node.asDirectory();
|
|
@@ -351,8 +353,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
rescanFile(directive, node.asFile());
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Ignoring non-directory, non-file inode " + node +
|
|
|
- " found at " + path);
|
|
|
+ LOG.debug("Directive " + directive.getId() +
|
|
|
+ ": ignoring non-directive, non-file inode " + node);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -380,8 +382,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
CachePool pool = directive.getPool();
|
|
|
if (pool.getBytesNeeded() > pool.getLimit()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(String.format("Skipping directive id %d file %s because "
|
|
|
- + "limit of pool %s would be exceeded (%d > %d)",
|
|
|
+ LOG.debug(String.format("Directive %d: not scanning file %s because " +
|
|
|
+ "bytesNeeded for pool %s is %d, but the pool's limit is %d",
|
|
|
directive.getId(),
|
|
|
file.getFullPathName(),
|
|
|
pool.getPoolName(),
|
|
@@ -395,6 +397,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
for (BlockInfo blockInfo : blockInfos) {
|
|
|
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
|
|
|
// We don't try to cache blocks that are under construction.
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Directive " + directive.getId() + ": can't cache " +
|
|
|
+ "block " + blockInfo + " because it is in state " +
|
|
|
+ blockInfo.getBlockUCState() + ", not COMPLETE.");
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
Block block = new Block(blockInfo.getBlockId());
|
|
@@ -403,6 +410,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
CachedBlock ocblock = cachedBlocks.get(ncblock);
|
|
|
if (ocblock == null) {
|
|
|
cachedBlocks.put(ncblock);
|
|
|
+ ocblock = ncblock;
|
|
|
} else {
|
|
|
// Update bytesUsed using the current replication levels.
|
|
|
// Assumptions: we assume that all the blocks are the same length
|
|
@@ -433,14 +441,18 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
ocblock.setReplicationAndMark(directive.getReplication(), mark);
|
|
|
}
|
|
|
}
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Directive " + directive.getId() + ": setting replication " +
|
|
|
+ "for block " + blockInfo + " to " + ocblock.getReplication());
|
|
|
+ }
|
|
|
}
|
|
|
// Increment the "cached" statistics
|
|
|
directive.addBytesCached(cachedTotal);
|
|
|
if (cachedTotal == neededTotal) {
|
|
|
directive.addFilesCached(1);
|
|
|
}
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Directive " + directive.getId() + " is caching " +
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Directive " + directive.getId() + ": caching " +
|
|
|
file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal +
|
|
|
" bytes");
|
|
|
}
|
|
@@ -500,6 +512,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
iter.hasNext(); ) {
|
|
|
DatanodeDescriptor datanode = iter.next();
|
|
|
if (!cblock.isInList(datanode.getCached())) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
|
|
|
+ "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
|
|
|
+ "because the DataNode uncached it.");
|
|
|
+ }
|
|
|
datanode.getPendingUncached().remove(cblock);
|
|
|
iter.remove();
|
|
|
}
|
|
@@ -509,8 +526,9 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
String reason = findReasonForNotCaching(cblock, blockInfo);
|
|
|
int neededCached = 0;
|
|
|
if (reason != null) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("not caching " + cblock + " because it is " + reason);
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Block " + cblock.getBlockId() + ": can't cache " +
|
|
|
+ "block because it is " + reason);
|
|
|
}
|
|
|
} else {
|
|
|
neededCached = cblock.getReplication();
|
|
@@ -523,6 +541,12 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
DatanodeDescriptor datanode = iter.next();
|
|
|
datanode.getPendingCached().remove(cblock);
|
|
|
iter.remove();
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
|
|
|
+ "PENDING_CACHED for node " + datanode.getDatanodeUuid() +
|
|
|
+ "because we already have " + numCached + " cached " +
|
|
|
+ "replicas and we only need " + neededCached);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (numCached < neededCached) {
|
|
@@ -532,6 +556,12 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
DatanodeDescriptor datanode = iter.next();
|
|
|
datanode.getPendingUncached().remove(cblock);
|
|
|
iter.remove();
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
|
|
|
+ "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
|
|
|
+ "because we only have " + numCached + " cached replicas " +
|
|
|
+ "and we need " + neededCached);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
int neededUncached = numCached -
|
|
@@ -551,6 +581,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
pendingUncached.isEmpty() &&
|
|
|
pendingCached.isEmpty()) {
|
|
|
// we have nothing more to do with this block.
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
|
|
|
+ "cachedBlocks, since neededCached == 0, and " +
|
|
|
+ "pendingUncached and pendingCached are empty.");
|
|
|
+ }
|
|
|
cbIter.remove();
|
|
|
}
|
|
|
}
|
|
@@ -609,15 +644,16 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
getStoredBlock(new Block(cachedBlock.getBlockId()));
|
|
|
if (blockInfo == null) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Not caching block " + cachedBlock + " because there " +
|
|
|
- "is no record of it on the NameNode.");
|
|
|
+ LOG.debug("Block " + cachedBlock.getBlockId() + ": can't add new " +
|
|
|
+ "cached replicas, because there is no record of this block " +
|
|
|
+ "on the NameNode.");
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
if (!blockInfo.isComplete()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Not caching block " + cachedBlock + " because it " +
|
|
|
- "is not yet complete.");
|
|
|
+ LOG.debug("Block " + cachedBlock.getBlockId() + ": can't cache this " +
|
|
|
+ "block, because it is not yet complete.");
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
@@ -665,10 +701,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
}
|
|
|
if (pendingCapacity < blockInfo.getNumBytes()) {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("Datanode " + datanode + " is not a valid possibility for"
|
|
|
- + " block " + blockInfo.getBlockId() + " of size "
|
|
|
- + blockInfo.getNumBytes() + " bytes, only has "
|
|
|
- + datanode.getCacheRemaining() + " bytes of cache remaining.");
|
|
|
+ LOG.trace("Block " + blockInfo.getBlockId() + ": DataNode " +
|
|
|
+ datanode.getDatanodeUuid() + " is not a valid possibility " +
|
|
|
+ "because the block has size " + blockInfo.getNumBytes() + ", but " +
|
|
|
+ "the DataNode only has " + datanode.getCacheRemaining() + " " +
|
|
|
+ "bytes of cache remaining.");
|
|
|
}
|
|
|
outOfCapacity++;
|
|
|
continue;
|
|
@@ -678,6 +715,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
|
|
|
neededCached, blockManager.getDatanodeManager().getStaleInterval());
|
|
|
for (DatanodeDescriptor datanode : chosen) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Block " + blockInfo.getBlockId() + ": added to " +
|
|
|
+ "PENDING_CACHED on DataNode " + datanode.getDatanodeUuid());
|
|
|
+ }
|
|
|
pendingCached.add(datanode);
|
|
|
boolean added = datanode.getPendingCached().add(cachedBlock);
|
|
|
assert added;
|
|
@@ -685,12 +726,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
// We were unable to satisfy the requested replication factor
|
|
|
if (neededCached > chosen.size()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(
|
|
|
- "Only have " +
|
|
|
+ LOG.debug("Block " + blockInfo.getBlockId() + ": we only have " +
|
|
|
(cachedBlock.getReplication() - neededCached + chosen.size()) +
|
|
|
- " of " + cachedBlock.getReplication() + " cached replicas for " +
|
|
|
- cachedBlock + " (" + outOfCapacity + " nodes have insufficient " +
|
|
|
- "capacity).");
|
|
|
+ " of " + cachedBlock.getReplication() + " cached replicas. " +
|
|
|
+ outOfCapacity + " DataNodes have insufficient cache capacity.");
|
|
|
}
|
|
|
}
|
|
|
}
|