|
@@ -58,6 +58,7 @@ import java.io.InputStreamReader;
|
|
|
import java.net.URI;
|
|
|
import java.text.DateFormat;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
|
public class Mover {
|
|
@@ -107,10 +108,12 @@ public class Mover {
|
|
|
private final Dispatcher dispatcher;
|
|
|
private final StorageMap storages;
|
|
|
private final List<Path> targetPaths;
|
|
|
+ private final int retryMaxAttempts;
|
|
|
+ private final AtomicInteger retryCount;
|
|
|
|
|
|
private final BlockStoragePolicy[] blockStoragePolicies;
|
|
|
|
|
|
- Mover(NameNodeConnector nnc, Configuration conf) {
|
|
|
+ Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
|
|
|
final long movedWinWidth = conf.getLong(
|
|
|
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
|
|
|
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
|
|
@@ -120,7 +123,10 @@ public class Mover {
|
|
|
final int maxConcurrentMovesPerNode = conf.getInt(
|
|
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
|
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
|
|
-
|
|
|
+ this.retryMaxAttempts = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
|
|
|
+ DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
|
|
|
+ this.retryCount = retryCount;
|
|
|
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
|
|
|
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
|
|
|
maxConcurrentMovesPerNode, conf);
|
|
@@ -255,14 +261,27 @@ public class Mover {
|
|
|
* @return whether there is still remaining migration work for the next
|
|
|
* round
|
|
|
*/
|
|
|
- private boolean processNamespace() {
|
|
|
+ private boolean processNamespace() throws IOException {
|
|
|
getSnapshottableDirs();
|
|
|
boolean hasRemaining = false;
|
|
|
for (Path target : targetPaths) {
|
|
|
hasRemaining |= processPath(target.toUri().getPath());
|
|
|
}
|
|
|
// wait for pending move to finish and retry the failed migration
|
|
|
- hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
|
|
|
+ boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
|
|
|
+ .values());
|
|
|
+ if (hasFailed) {
|
|
|
+ if (retryCount.get() == retryMaxAttempts) {
|
|
|
+ throw new IOException("Failed to move some block's after "
|
|
|
+ + retryMaxAttempts + " retries.");
|
|
|
+ } else {
|
|
|
+ retryCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Reset retry count if no failure.
|
|
|
+ retryCount.set(0);
|
|
|
+ }
|
|
|
+ hasRemaining |= hasFailed;
|
|
|
return hasRemaining;
|
|
|
}
|
|
|
|
|
@@ -528,6 +547,7 @@ public class Mover {
|
|
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
|
|
|
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
|
|
|
+ AtomicInteger retryCount = new AtomicInteger(0);
|
|
|
LOG.info("namenodes = " + namenodes);
|
|
|
|
|
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
|
@@ -541,7 +561,7 @@ public class Mover {
|
|
|
Iterator<NameNodeConnector> iter = connectors.iterator();
|
|
|
while (iter.hasNext()) {
|
|
|
NameNodeConnector nnc = iter.next();
|
|
|
- final Mover m = new Mover(nnc, conf);
|
|
|
+ final Mover m = new Mover(nnc, conf, retryCount);
|
|
|
final ExitStatus r = m.run();
|
|
|
|
|
|
if (r == ExitStatus.SUCCESS) {
|