|
@@ -18,10 +18,15 @@
|
|
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
|
|
|
/**
|
|
|
* Keep prioritized queues of under replicated blocks.
|
|
@@ -82,6 +87,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|
|
|
|
|
/** The number of corrupt blocks with replication factor 1 */
|
|
|
private int corruptReplOneBlocks = 0;
|
|
|
+ /** Keep timestamp when a block is put into the queue. */
|
|
|
+ private final Map<BlockInfo, Long> timestampsMap =
|
|
|
+ Collections.synchronizedMap(new LinkedHashMap<BlockInfo, Long>());
|
|
|
|
|
|
/** Create an object. */
|
|
|
UnderReplicatedBlocks() {
|
|
@@ -91,12 +99,13 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Empty the queues.
|
|
|
+ * Empty the queues and timestamps.
|
|
|
*/
|
|
|
void clear() {
|
|
|
for (int i = 0; i < LEVEL; i++) {
|
|
|
priorityQueues.get(i).clear();
|
|
|
}
|
|
|
+ timestampsMap.clear();
|
|
|
}
|
|
|
|
|
|
/** Return the total number of under replication blocks */
|
|
@@ -119,6 +128,20 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|
|
return size;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return the smallest timestamp of the under-replicated/corrupt blocks.
|
|
|
+ * If there are no under-replicated or corrupt blocks, return 0.
|
|
|
+ */
|
|
|
+ long getTimeOfTheOldestBlockToBeReplicated() {
|
|
|
+ synchronized (timestampsMap) {
|
|
|
+ if (timestampsMap.isEmpty()) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ // Since we are using LinkedHashMap, the first value is the smallest.
|
|
|
+ return timestampsMap.entrySet().iterator().next().getValue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Return the number of corrupt blocks */
|
|
|
synchronized int getCorruptBlockSize() {
|
|
|
return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
|
|
@@ -197,7 +220,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|
|
+ " has only {} replicas and need {} replicas so is added to" +
|
|
|
" neededReplications at priority level {}", block, curReplicas,
|
|
|
expectedReplicas, priLevel);
|
|
|
-
|
|
|
+ timestampsMap.put(block, Time.now());
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|
|
@@ -242,8 +265,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|
|
if(priLevel >= 0 && priLevel < LEVEL
|
|
|
&& priorityQueues.get(priLevel).remove(block)) {
|
|
|
NameNode.blockStateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
|
|
|
- " from priority queue {}", block, priLevel);
|
|
|
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
|
|
|
+ " from priority queue {}", block, priLevel);
|
|
|
+ timestampsMap.remove(block);
|
|
|
return true;
|
|
|
} else {
|
|
|
// Try to remove the block from all queues if the block was
|
|
@@ -253,6 +277,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
|
|
NameNode.blockStateChangeLog.debug(
|
|
|
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
|
|
|
" {} from priority queue {}", block, priLevel);
|
|
|
+ timestampsMap.remove(block);
|
|
|
return true;
|
|
|
}
|
|
|
}
|