|
@@ -40,6 +40,7 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
@@ -405,7 +406,8 @@ public class Dispatcher {
|
|
// Pinned block can't be moved. Add this block into failure list.
|
|
// Pinned block can't be moved. Add this block into failure list.
|
|
// Later in the next iteration mover will exclude these blocks from
|
|
// Later in the next iteration mover will exclude these blocks from
|
|
// pending moves.
|
|
// pending moves.
|
|
- target.getDDatanode().addBlockPinningFailures(this);
|
|
|
|
|
|
+ target.getDDatanode().addBlockPinningFailures(
|
|
|
|
+ this.reportedBlock.getBlock().getBlockId(), this.getSource());
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -643,7 +645,8 @@ public class Dispatcher {
|
|
/** blocks being moved but not confirmed yet */
|
|
/** blocks being moved but not confirmed yet */
|
|
private final List<PendingMove> pendings;
|
|
private final List<PendingMove> pendings;
|
|
private volatile boolean hasFailure = false;
|
|
private volatile boolean hasFailure = false;
|
|
- private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>();
|
|
|
|
|
|
+ private final Map<Long, Set<DatanodeInfo>> blockPinningFailures =
|
|
|
|
+ new ConcurrentHashMap<>();
|
|
private volatile boolean hasSuccess = false;
|
|
private volatile boolean hasSuccess = false;
|
|
private ExecutorService moveExecutor;
|
|
private ExecutorService moveExecutor;
|
|
|
|
|
|
@@ -729,16 +732,17 @@ public class Dispatcher {
|
|
this.hasFailure = true;
|
|
this.hasFailure = true;
|
|
}
|
|
}
|
|
|
|
|
|
- void addBlockPinningFailures(PendingMove pendingBlock) {
|
|
|
|
- synchronized (blockPinningFailures) {
|
|
|
|
- long blockId = pendingBlock.reportedBlock.getBlock().getBlockId();
|
|
|
|
- Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId);
|
|
|
|
|
|
+ private void addBlockPinningFailures(long blockId, DatanodeInfo source) {
|
|
|
|
+ blockPinningFailures.compute(blockId, (key, pinnedLocations) -> {
|
|
|
|
+ Set<DatanodeInfo> newPinnedLocations;
|
|
if (pinnedLocations == null) {
|
|
if (pinnedLocations == null) {
|
|
- pinnedLocations = new HashSet<>();
|
|
|
|
- blockPinningFailures.put(blockId, pinnedLocations);
|
|
|
|
|
|
+ newPinnedLocations = new HashSet<>();
|
|
|
|
+ } else {
|
|
|
|
+ newPinnedLocations = pinnedLocations;
|
|
}
|
|
}
|
|
- pinnedLocations.add(pendingBlock.getSource());
|
|
|
|
- }
|
|
|
|
|
|
+ newPinnedLocations.add(source);
|
|
|
|
+ return newPinnedLocations;
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {
|
|
Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {
|