|
@@ -21,12 +21,14 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Date;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
+import java.util.TreeMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.locks.Condition;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
@@ -76,7 +78,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
/**
|
|
|
* Pseudorandom number source
|
|
|
*/
|
|
|
- private final Random random = new Random();
|
|
|
+ private static final Random random = new Random();
|
|
|
|
|
|
/**
|
|
|
* The interval at which we scan the namesystem for caching changes.
|
|
@@ -87,17 +89,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
* The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
|
|
|
* waiting for rescan operations.
|
|
|
*/
|
|
|
- private final ReentrantLock lock = new ReentrantLock();
|
|
|
+ private final ReentrantLock lock;
|
|
|
|
|
|
/**
|
|
|
* Notifies the scan thread that an immediate rescan is needed.
|
|
|
*/
|
|
|
- private final Condition doRescan = lock.newCondition();
|
|
|
+ private final Condition doRescan;
|
|
|
|
|
|
/**
|
|
|
* Notifies waiting threads that a rescan has finished.
|
|
|
*/
|
|
|
- private final Condition scanFinished = lock.newCondition();
|
|
|
+ private final Condition scanFinished;
|
|
|
|
|
|
/**
|
|
|
* Whether there are pending CacheManager operations that necessitate a
|
|
@@ -121,11 +123,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
*/
|
|
|
private boolean shutdown = false;
|
|
|
|
|
|
- /**
|
|
|
- * The monotonic time at which the current scan started.
|
|
|
- */
|
|
|
- private long startTimeMs;
|
|
|
-
|
|
|
/**
|
|
|
* Mark status of the current scan.
|
|
|
*/
|
|
@@ -142,24 +139,27 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
private long scannedBlocks;
|
|
|
|
|
|
public CacheReplicationMonitor(FSNamesystem namesystem,
|
|
|
- CacheManager cacheManager, long intervalMs) {
|
|
|
+ CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
|
|
|
this.namesystem = namesystem;
|
|
|
this.blockManager = namesystem.getBlockManager();
|
|
|
this.cacheManager = cacheManager;
|
|
|
this.cachedBlocks = cacheManager.getCachedBlocks();
|
|
|
this.intervalMs = intervalMs;
|
|
|
+ this.lock = lock;
|
|
|
+ this.doRescan = this.lock.newCondition();
|
|
|
+ this.scanFinished = this.lock.newCondition();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- startTimeMs = 0;
|
|
|
+ long startTimeMs = 0;
|
|
|
+ Thread.currentThread().setName("CacheReplicationMonitor(" +
|
|
|
+ System.identityHashCode(this) + ")");
|
|
|
LOG.info("Starting CacheReplicationMonitor with interval " +
|
|
|
intervalMs + " milliseconds");
|
|
|
try {
|
|
|
long curTimeMs = Time.monotonicNow();
|
|
|
while (true) {
|
|
|
- // Not all of the variables accessed here need the CRM lock, but take
|
|
|
- // it anyway for simplicity
|
|
|
lock.lock();
|
|
|
try {
|
|
|
while (true) {
|
|
@@ -180,12 +180,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
doRescan.await(delta, TimeUnit.MILLISECONDS);
|
|
|
curTimeMs = Time.monotonicNow();
|
|
|
}
|
|
|
- } finally {
|
|
|
- lock.unlock();
|
|
|
- }
|
|
|
- // Mark scan as started, clear needsRescan
|
|
|
- lock.lock();
|
|
|
- try {
|
|
|
isScanning = true;
|
|
|
needsRescan = false;
|
|
|
} finally {
|
|
@@ -195,7 +189,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
mark = !mark;
|
|
|
rescan();
|
|
|
curTimeMs = Time.monotonicNow();
|
|
|
- // Retake the CRM lock to update synchronization-related variables
|
|
|
+ // Update synchronization-related variables.
|
|
|
lock.lock();
|
|
|
try {
|
|
|
isScanning = false;
|
|
@@ -208,32 +202,15 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
|
|
|
"millisecond(s).");
|
|
|
}
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.info("Shutting down CacheReplicationMonitor.");
|
|
|
+ return;
|
|
|
} catch (Throwable t) {
|
|
|
LOG.fatal("Thread exiting", t);
|
|
|
terminate(1, t);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only
|
|
|
- * waits if there are pending operations that necessitate a rescan as
|
|
|
- * indicated by {@link #setNeedsRescan()}.
|
|
|
- * <p>
|
|
|
- * Note that this call may release the FSN lock, so operations before and
|
|
|
- * after are not necessarily atomic.
|
|
|
- */
|
|
|
- public void waitForRescanIfNeeded() {
|
|
|
- lock.lock();
|
|
|
- try {
|
|
|
- if (!needsRescan) {
|
|
|
- return;
|
|
|
- }
|
|
|
- } finally {
|
|
|
- lock.unlock();
|
|
|
- }
|
|
|
- waitForRescan();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Waits for a rescan to complete. This doesn't guarantee consistency with
|
|
|
* pending operations, only relative recency, since it will not force a new
|
|
@@ -242,49 +219,27 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
* Note that this call will release the FSN lock, so operations before and
|
|
|
* after are not atomic.
|
|
|
*/
|
|
|
- public void waitForRescan() {
|
|
|
- // Drop the FSN lock temporarily and retake it after we finish waiting
|
|
|
- // Need to handle both the read lock and the write lock
|
|
|
- boolean retakeWriteLock = false;
|
|
|
- if (namesystem.hasWriteLock()) {
|
|
|
- namesystem.writeUnlock();
|
|
|
- retakeWriteLock = true;
|
|
|
- } else if (namesystem.hasReadLock()) {
|
|
|
- namesystem.readUnlock();
|
|
|
- } else {
|
|
|
- // Expected to have at least one of the locks
|
|
|
- Preconditions.checkState(false,
|
|
|
- "Need to be holding either the read or write lock");
|
|
|
+ public void waitForRescanIfNeeded() {
|
|
|
+ Preconditions.checkArgument(!namesystem.hasWriteLock(),
|
|
|
+ "Must not hold the FSN write lock when waiting for a rescan.");
|
|
|
+ Preconditions.checkArgument(lock.isHeldByCurrentThread(),
|
|
|
+ "Must hold the CRM lock when waiting for a rescan.");
|
|
|
+ if (!needsRescan) {
|
|
|
+ return;
|
|
|
}
|
|
|
- // try/finally for retaking FSN lock
|
|
|
- try {
|
|
|
- lock.lock();
|
|
|
- // try/finally for releasing CRM lock
|
|
|
+ // If no scan is already ongoing, mark the CRM as dirty and kick
|
|
|
+ if (!isScanning) {
|
|
|
+ doRescan.signal();
|
|
|
+ }
|
|
|
+ // Wait until the scan finishes and the count advances
|
|
|
+ final long startCount = scanCount;
|
|
|
+ while ((!shutdown) && (startCount >= scanCount)) {
|
|
|
try {
|
|
|
- // If no scan is already ongoing, mark the CRM as dirty and kick
|
|
|
- if (!isScanning) {
|
|
|
- needsRescan = true;
|
|
|
- doRescan.signal();
|
|
|
- }
|
|
|
- // Wait until the scan finishes and the count advances
|
|
|
- final long startCount = scanCount;
|
|
|
- while (startCount >= scanCount) {
|
|
|
- try {
|
|
|
- scanFinished.await();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
|
|
|
- + " rescan", e);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- } finally {
|
|
|
- lock.unlock();
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (retakeWriteLock) {
|
|
|
- namesystem.writeLock();
|
|
|
- } else {
|
|
|
- namesystem.readLock();
|
|
|
+ scanFinished.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
|
|
|
+ + " rescan", e);
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -294,42 +249,43 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
* changes that require a rescan.
|
|
|
*/
|
|
|
public void setNeedsRescan() {
|
|
|
- lock.lock();
|
|
|
- try {
|
|
|
- this.needsRescan = true;
|
|
|
- } finally {
|
|
|
- lock.unlock();
|
|
|
- }
|
|
|
+ Preconditions.checkArgument(lock.isHeldByCurrentThread(),
|
|
|
+ "Must hold the CRM lock when setting the needsRescan bit.");
|
|
|
+ this.needsRescan = true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Shut down and join the monitor thread.
|
|
|
+ * Shut down the monitor thread.
|
|
|
*/
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
+ Preconditions.checkArgument(namesystem.hasWriteLock());
|
|
|
lock.lock();
|
|
|
try {
|
|
|
if (shutdown) return;
|
|
|
+ // Since we hold both the FSN write lock and the CRM lock here,
|
|
|
+ // we know that the CRM thread cannot be currently modifying
|
|
|
+ // the cache manager state while we're closing it.
|
|
|
+ // Since the CRM thread checks the value of 'shutdown' after waiting
|
|
|
+ // for a lock, we know that the thread will not modify the cache
|
|
|
+ // manager state after this point.
|
|
|
shutdown = true;
|
|
|
doRescan.signalAll();
|
|
|
scanFinished.signalAll();
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
- try {
|
|
|
- if (this.isAlive()) {
|
|
|
- this.join(60000);
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- private void rescan() {
|
|
|
+ private void rescan() throws InterruptedException {
|
|
|
scannedDirectives = 0;
|
|
|
scannedBlocks = 0;
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
+ if (shutdown) {
|
|
|
+ throw new InterruptedException("CacheReplicationMonitor was " +
|
|
|
+ "shut down.");
|
|
|
+ }
|
|
|
resetStatistics();
|
|
|
rescanCacheDirectives();
|
|
|
rescanCachedBlockMap();
|
|
@@ -356,8 +312,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
FSDirectory fsDir = namesystem.getFSDirectory();
|
|
|
final long now = new Date().getTime();
|
|
|
for (CacheDirective directive : cacheManager.getCacheDirectives()) {
|
|
|
- // Reset the directive's statistics
|
|
|
- directive.resetStatistics();
|
|
|
// Skip processing this entry if it has expired
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("Directive expiry is at " + directive.getExpiryTime());
|
|
@@ -460,14 +414,21 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
directive.getReplication()) * blockInfo.getNumBytes();
|
|
|
cachedTotal += cachedByBlock;
|
|
|
|
|
|
- if (mark != ocblock.getMark()) {
|
|
|
- // Mark hasn't been set in this scan, so update replication and mark.
|
|
|
+ if ((mark != ocblock.getMark()) ||
|
|
|
+ (ocblock.getReplication() < directive.getReplication())) {
|
|
|
+ //
|
|
|
+ // Overwrite the block's replication and mark in two cases:
|
|
|
+ //
|
|
|
+ // 1. If the mark on the CachedBlock is different from the mark for
|
|
|
+ // this scan, that means the block hasn't been updated during this
|
|
|
+ // scan, and we should overwrite whatever is there, since it is no
|
|
|
+ // longer valid.
|
|
|
+ //
|
|
|
+ // 2. If the replication in the CachedBlock is less than what the
|
|
|
+ // directive asks for, we want to increase the block's replication
|
|
|
+ // field to what the directive asks for.
|
|
|
+ //
|
|
|
ocblock.setReplicationAndMark(directive.getReplication(), mark);
|
|
|
- } else {
|
|
|
- // Mark already set in this scan. Set replication to highest value in
|
|
|
- // any CacheDirective that covers this file.
|
|
|
- ocblock.setReplicationAndMark((short)Math.max(
|
|
|
- directive.getReplication(), ocblock.getReplication()), mark);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -483,6 +444,39 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private String findReasonForNotCaching(CachedBlock cblock,
|
|
|
+ BlockInfo blockInfo) {
|
|
|
+ if (blockInfo == null) {
|
|
|
+ // Somehow, a cache report with the block arrived, but the block
|
|
|
+ // reports from the DataNode haven't (yet?) described such a block.
|
|
|
+ // Alternately, the NameNode might have invalidated the block, but the
|
|
|
+ // DataNode hasn't caught up. In any case, we want to tell the DN
|
|
|
+ // to uncache this.
|
|
|
+ return "not tracked by the BlockManager";
|
|
|
+ } else if (!blockInfo.isComplete()) {
|
|
|
+ // When a cached block changes state from complete to some other state
|
|
|
+ // on the DataNode (perhaps because of append), it will begin the
|
|
|
+ // uncaching process. However, the uncaching process is not
|
|
|
+ // instantaneous, especially if clients have pinned the block. So
|
|
|
+ // there may be a period of time when incomplete blocks remain cached
|
|
|
+ // on the DataNodes.
|
|
|
+ return "not complete";
|
|
|
+ } else if (cblock.getReplication() == 0) {
|
|
|
+ // Since 0 is not a valid value for a cache directive's replication
|
|
|
+ // field, seeing a replication of 0 on a CacheBlock means that it
|
|
|
+ // has never been reached by any sweep.
|
|
|
+ return "not needed by any directives";
|
|
|
+ } else if (cblock.getMark() != mark) {
|
|
|
+ // Although the block was needed in the past, we didn't reach it during
|
|
|
+ // the current sweep. Therefore, it doesn't need to be cached any more.
|
|
|
+ // Need to set the replication to 0 so it doesn't flip back to cached
|
|
|
+ // when the mark flips on the next scan
|
|
|
+ cblock.setReplicationAndMark((short)0, mark);
|
|
|
+ return "no longer needed by any directives";
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Scan through the cached block map.
|
|
|
* Any blocks which are under-replicated should be assigned new Datanodes.
|
|
@@ -508,11 +502,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
iter.remove();
|
|
|
}
|
|
|
}
|
|
|
- // If the block's mark doesn't match with the mark of this scan, that
|
|
|
- // means that this block couldn't be reached during this scan. That means
|
|
|
- // it doesn't need to be cached any more.
|
|
|
- int neededCached = (cblock.getMark() != mark) ?
|
|
|
- 0 : cblock.getReplication();
|
|
|
+ BlockInfo blockInfo = blockManager.
|
|
|
+ getStoredBlock(new Block(cblock.getBlockId()));
|
|
|
+ String reason = findReasonForNotCaching(cblock, blockInfo);
|
|
|
+ int neededCached = 0;
|
|
|
+ if (reason != null) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("not caching " + cblock + " because it is " + reason);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ neededCached = cblock.getReplication();
|
|
|
+ }
|
|
|
int numCached = cached.size();
|
|
|
if (numCached >= neededCached) {
|
|
|
// If we have enough replicas, drop all pending cached.
|
|
@@ -566,9 +566,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
private void addNewPendingUncached(int neededUncached,
|
|
|
CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
|
|
|
List<DatanodeDescriptor> pendingUncached) {
|
|
|
- if (!cacheManager.isActive()) {
|
|
|
- return;
|
|
|
- }
|
|
|
// Figure out which replicas can be uncached.
|
|
|
LinkedList<DatanodeDescriptor> possibilities =
|
|
|
new LinkedList<DatanodeDescriptor>();
|
|
@@ -601,19 +598,18 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
* @param pendingCached A list of DataNodes that will soon cache the
|
|
|
* block.
|
|
|
*/
|
|
|
- private void addNewPendingCached(int neededCached,
|
|
|
+ private void addNewPendingCached(final int neededCached,
|
|
|
CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
|
|
|
List<DatanodeDescriptor> pendingCached) {
|
|
|
- if (!cacheManager.isActive()) {
|
|
|
- return;
|
|
|
- }
|
|
|
// To figure out which replicas can be cached, we consult the
|
|
|
// blocksMap. We don't want to try to cache a corrupt replica, though.
|
|
|
BlockInfo blockInfo = blockManager.
|
|
|
getStoredBlock(new Block(cachedBlock.getBlockId()));
|
|
|
if (blockInfo == null) {
|
|
|
- LOG.debug("Not caching block " + cachedBlock + " because it " +
|
|
|
- "was deleted from all DataNodes.");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Not caching block " + cachedBlock + " because there " +
|
|
|
+ "is no record of it on the NameNode.");
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
if (!blockInfo.isComplete()) {
|
|
@@ -623,35 +619,156 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- List<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
|
|
|
+ // Filter the list of replicas to only the valid targets
|
|
|
+ List<DatanodeDescriptor> possibilities =
|
|
|
+ new LinkedList<DatanodeDescriptor>();
|
|
|
int numReplicas = blockInfo.getCapacity();
|
|
|
Collection<DatanodeDescriptor> corrupt =
|
|
|
blockManager.getCorruptReplicas(blockInfo);
|
|
|
+ int outOfCapacity = 0;
|
|
|
for (int i = 0; i < numReplicas; i++) {
|
|
|
DatanodeDescriptor datanode = blockInfo.getDatanode(i);
|
|
|
- if ((datanode != null) &&
|
|
|
- ((!pendingCached.contains(datanode)) &&
|
|
|
- ((corrupt == null) || (!corrupt.contains(datanode))))) {
|
|
|
- possibilities.add(datanode);
|
|
|
+ if (datanode == null) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- }
|
|
|
- while (neededCached > 0) {
|
|
|
- if (possibilities.isEmpty()) {
|
|
|
- LOG.warn("We need " + neededCached + " more replica(s) than " +
|
|
|
- "actually exist to provide a cache replication of " +
|
|
|
- cachedBlock.getReplication() + " for " + cachedBlock);
|
|
|
- return;
|
|
|
+ if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- DatanodeDescriptor datanode =
|
|
|
- possibilities.remove(random.nextInt(possibilities.size()));
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("AddNewPendingCached: datanode " + datanode +
|
|
|
- " will now cache block " + cachedBlock);
|
|
|
+ if (corrupt != null && corrupt.contains(datanode)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (pendingCached.contains(datanode) || cached.contains(datanode)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ long pendingCapacity = datanode.getCacheRemaining();
|
|
|
+ // Subtract pending cached blocks from effective capacity
|
|
|
+ Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ CachedBlock cBlock = it.next();
|
|
|
+ BlockInfo info =
|
|
|
+ blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
|
|
|
+ if (info != null) {
|
|
|
+ pendingCapacity -= info.getNumBytes();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ it = datanode.getPendingUncached().iterator();
|
|
|
+ // Add pending uncached blocks from effective capacity
|
|
|
+ while (it.hasNext()) {
|
|
|
+ CachedBlock cBlock = it.next();
|
|
|
+ BlockInfo info =
|
|
|
+ blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
|
|
|
+ if (info != null) {
|
|
|
+ pendingCapacity += info.getNumBytes();
|
|
|
+ }
|
|
|
}
|
|
|
+ if (pendingCapacity < blockInfo.getNumBytes()) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("Datanode " + datanode + " is not a valid possibility for"
|
|
|
+ + " block " + blockInfo.getBlockId() + " of size "
|
|
|
+ + blockInfo.getNumBytes() + " bytes, only has "
|
|
|
+ + datanode.getCacheRemaining() + " bytes of cache remaining.");
|
|
|
+ }
|
|
|
+ outOfCapacity++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ possibilities.add(datanode);
|
|
|
+ }
|
|
|
+ List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
|
|
|
+ neededCached, blockManager.getDatanodeManager().getStaleInterval());
|
|
|
+ for (DatanodeDescriptor datanode : chosen) {
|
|
|
pendingCached.add(datanode);
|
|
|
boolean added = datanode.getPendingCached().add(cachedBlock);
|
|
|
assert added;
|
|
|
- neededCached--;
|
|
|
}
|
|
|
+ // We were unable to satisfy the requested replication factor
|
|
|
+ if (neededCached > chosen.size()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(
|
|
|
+ "Only have " +
|
|
|
+ (cachedBlock.getReplication() - neededCached + chosen.size()) +
|
|
|
+ " of " + cachedBlock.getReplication() + " cached replicas for " +
|
|
|
+ cachedBlock + " (" + outOfCapacity + " nodes have insufficient " +
|
|
|
+ "capacity).");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Chooses datanode locations for caching from a list of valid possibilities.
|
|
|
+ * Non-stale nodes are chosen before stale nodes.
|
|
|
+ *
|
|
|
+ * @param possibilities List of candidate datanodes
|
|
|
+ * @param neededCached Number of replicas needed
|
|
|
+ * @param staleInterval Age of a stale datanode
|
|
|
+ * @return A list of chosen datanodes
|
|
|
+ */
|
|
|
+ private static List<DatanodeDescriptor> chooseDatanodesForCaching(
|
|
|
+ final List<DatanodeDescriptor> possibilities, final int neededCached,
|
|
|
+ final long staleInterval) {
|
|
|
+ // Make a copy that we can modify
|
|
|
+ List<DatanodeDescriptor> targets =
|
|
|
+ new ArrayList<DatanodeDescriptor>(possibilities);
|
|
|
+ // Selected targets
|
|
|
+ List<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>();
|
|
|
+
|
|
|
+ // Filter out stale datanodes
|
|
|
+ List<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>();
|
|
|
+ Iterator<DatanodeDescriptor> it = targets.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ DatanodeDescriptor d = it.next();
|
|
|
+ if (d.isStale(staleInterval)) {
|
|
|
+ it.remove();
|
|
|
+ stale.add(d);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Select targets
|
|
|
+ while (chosen.size() < neededCached) {
|
|
|
+ // Try to use stale nodes if we're out of non-stale nodes, else we're done
|
|
|
+ if (targets.isEmpty()) {
|
|
|
+ if (!stale.isEmpty()) {
|
|
|
+ targets = stale;
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Select a random target
|
|
|
+ DatanodeDescriptor target =
|
|
|
+ chooseRandomDatanodeByRemainingCapacity(targets);
|
|
|
+ chosen.add(target);
|
|
|
+ targets.remove(target);
|
|
|
+ }
|
|
|
+ return chosen;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Choose a single datanode from the provided list of possible
|
|
|
+ * targets, weighted by the percentage of free space remaining on the node.
|
|
|
+ *
|
|
|
+ * @return The chosen datanode
|
|
|
+ */
|
|
|
+ private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(
|
|
|
+ final List<DatanodeDescriptor> targets) {
|
|
|
+ // Use a weighted probability to choose the target datanode
|
|
|
+ float total = 0;
|
|
|
+ for (DatanodeDescriptor d : targets) {
|
|
|
+ total += d.getCacheRemainingPercent();
|
|
|
+ }
|
|
|
+ // Give each datanode a portion of keyspace equal to its relative weight
|
|
|
+ // [0, w1) selects d1, [w1, w2) selects d2, etc.
|
|
|
+ TreeMap<Integer, DatanodeDescriptor> lottery =
|
|
|
+ new TreeMap<Integer, DatanodeDescriptor>();
|
|
|
+ int offset = 0;
|
|
|
+ for (DatanodeDescriptor d : targets) {
|
|
|
+ // Since we're using floats, be paranoid about negative values
|
|
|
+ int weight =
|
|
|
+ Math.max(1, (int)((d.getCacheRemainingPercent() / total) * 1000000));
|
|
|
+ offset += weight;
|
|
|
+ lottery.put(offset, d);
|
|
|
+ }
|
|
|
+ // Choose a number from [0, offset), which is the total amount of weight,
|
|
|
+ // to select the winner
|
|
|
+ DatanodeDescriptor winner =
|
|
|
+ lottery.higherEntry(random.nextInt(offset)).getValue();
|
|
|
+ return winner;
|
|
|
}
|
|
|
}
|