|
@@ -38,16 +38,16 @@ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
class InvalidateBlocks {
|
|
|
- /** Mapping: StorageID -> Collection of Blocks */
|
|
|
- private final Map<String, LightWeightHashSet<Block>> node2blocks =
|
|
|
- new TreeMap<String, LightWeightHashSet<Block>>();
|
|
|
+ /** Mapping: DatanodeInfo -> Collection of Blocks */
|
|
|
+ private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks =
|
|
|
+ new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>();
|
|
|
/** The total number of blocks in the map. */
|
|
|
private long numBlocks = 0L;
|
|
|
|
|
|
- private final DatanodeManager datanodeManager;
|
|
|
+ private final int blockInvalidateLimit;
|
|
|
|
|
|
- InvalidateBlocks(final DatanodeManager datanodeManager) {
|
|
|
- this.datanodeManager = datanodeManager;
|
|
|
+ InvalidateBlocks(final int blockInvalidateLimit) {
|
|
|
+ this.blockInvalidateLimit = blockInvalidateLimit;
|
|
|
}
|
|
|
|
|
|
/** @return the number of blocks to be invalidated . */
|
|
@@ -60,12 +60,9 @@ class InvalidateBlocks {
|
|
|
* invalidation. Blocks are compared including their generation stamps:
|
|
|
* if a block is pending invalidation but with a different generation stamp,
|
|
|
* returns false.
|
|
|
- * @param storageID the storage to check
|
|
|
- * @param the block to look for
|
|
|
- *
|
|
|
*/
|
|
|
- synchronized boolean contains(final String storageID, final Block block) {
|
|
|
- final LightWeightHashSet<Block> s = node2blocks.get(storageID);
|
|
|
+ synchronized boolean contains(final DatanodeInfo dn, final Block block) {
|
|
|
+ final LightWeightHashSet<Block> s = node2blocks.get(dn);
|
|
|
if (s == null) {
|
|
|
return false; // no invalidate blocks for this storage ID
|
|
|
}
|
|
@@ -80,10 +77,10 @@ class InvalidateBlocks {
|
|
|
*/
|
|
|
synchronized void add(final Block block, final DatanodeInfo datanode,
|
|
|
final boolean log) {
|
|
|
- LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid());
|
|
|
+ LightWeightHashSet<Block> set = node2blocks.get(datanode);
|
|
|
if (set == null) {
|
|
|
set = new LightWeightHashSet<Block>();
|
|
|
- node2blocks.put(datanode.getDatanodeUuid(), set);
|
|
|
+ node2blocks.put(datanode, set);
|
|
|
}
|
|
|
if (set.add(block)) {
|
|
|
numBlocks++;
|
|
@@ -95,20 +92,20 @@ class InvalidateBlocks {
|
|
|
}
|
|
|
|
|
|
/** Remove a storage from the invalidatesSet */
|
|
|
- synchronized void remove(final String storageID) {
|
|
|
- final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
|
|
|
+ synchronized void remove(final DatanodeInfo dn) {
|
|
|
+ final LightWeightHashSet<Block> blocks = node2blocks.remove(dn);
|
|
|
if (blocks != null) {
|
|
|
numBlocks -= blocks.size();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Remove the block from the specified storage. */
|
|
|
- synchronized void remove(final String storageID, final Block block) {
|
|
|
- final LightWeightHashSet<Block> v = node2blocks.get(storageID);
|
|
|
+ synchronized void remove(final DatanodeInfo dn, final Block block) {
|
|
|
+ final LightWeightHashSet<Block> v = node2blocks.get(dn);
|
|
|
if (v != null && v.remove(block)) {
|
|
|
numBlocks--;
|
|
|
if (v.isEmpty()) {
|
|
|
- node2blocks.remove(storageID);
|
|
|
+ node2blocks.remove(dn);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -122,34 +119,33 @@ class InvalidateBlocks {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
|
|
|
+ for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
|
|
|
final LightWeightHashSet<Block> blocks = entry.getValue();
|
|
|
if (blocks.size() > 0) {
|
|
|
- out.println(datanodeManager.getDatanode(entry.getKey()));
|
|
|
+ out.println(entry.getKey());
|
|
|
out.println(blocks);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** @return a list of the storage IDs. */
|
|
|
- synchronized List<String> getStorageIDs() {
|
|
|
- return new ArrayList<String>(node2blocks.keySet());
|
|
|
+ synchronized List<DatanodeInfo> getDatanodes() {
|
|
|
+ return new ArrayList<DatanodeInfo>(node2blocks.keySet());
|
|
|
}
|
|
|
|
|
|
- synchronized List<Block> invalidateWork(
|
|
|
- final String storageId, final DatanodeDescriptor dn) {
|
|
|
- final LightWeightHashSet<Block> set = node2blocks.get(storageId);
|
|
|
+ synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
|
|
|
+ final LightWeightHashSet<Block> set = node2blocks.get(dn);
|
|
|
if (set == null) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
// # blocks that can be sent in one message is limited
|
|
|
- final int limit = datanodeManager.blockInvalidateLimit;
|
|
|
+ final int limit = blockInvalidateLimit;
|
|
|
final List<Block> toInvalidate = set.pollN(limit);
|
|
|
|
|
|
// If we send everything in this message, remove this node entry
|
|
|
if (set.isEmpty()) {
|
|
|
- remove(storageId);
|
|
|
+ remove(dn);
|
|
|
}
|
|
|
|
|
|
dn.addBlocksToBeInvalidated(toInvalidate);
|