|
@@ -121,6 +121,7 @@ public class Dispatcher {
|
|
|
|
|
|
private final long getBlocksSize;
|
|
private final long getBlocksSize;
|
|
private final long getBlocksMinBlockSize;
|
|
private final long getBlocksMinBlockSize;
|
|
|
|
+ private final long blockMoveTimeout;
|
|
|
|
|
|
private final int ioFileBufferSize;
|
|
private final int ioFileBufferSize;
|
|
|
|
|
|
@@ -327,6 +328,11 @@ public class Dispatcher {
|
|
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
|
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
|
HdfsConstants.READ_TIMEOUT);
|
|
HdfsConstants.READ_TIMEOUT);
|
|
|
|
|
|
|
|
+ // Set read timeout so that it doesn't hang forever against
|
|
|
|
+ // unresponsive nodes. Datanode normally sends IN_PROGRESS response
|
|
|
|
+ // twice within the client read timeout period (every 30 seconds by
|
|
|
|
+ // default). Here, we make it give up after 5 minutes of no response.
|
|
|
|
+ sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
|
|
sock.setKeepAlive(true);
|
|
sock.setKeepAlive(true);
|
|
|
|
|
|
OutputStream unbufOut = sock.getOutputStream();
|
|
OutputStream unbufOut = sock.getOutputStream();
|
|
@@ -382,13 +388,26 @@ public class Dispatcher {
|
|
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
|
|
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
|
|
}
|
|
}
|
|
|
|
|
|
- /** Receive a block copy response from the input stream */
|
|
|
|
|
|
+ /** Check whether to continue waiting for response */
|
|
|
|
+ private boolean stopWaitingForResponse(long startTime) {
|
|
|
|
+ return source.isIterationOver() ||
|
|
|
|
+ (blockMoveTimeout > 0 &&
|
|
|
|
+ (Time.monotonicNow() - startTime > blockMoveTimeout));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Receive a reportedBlock copy response from the input stream */
|
|
private void receiveResponse(DataInputStream in) throws IOException {
|
|
private void receiveResponse(DataInputStream in) throws IOException {
|
|
|
|
+ long startTime = Time.monotonicNow();
|
|
BlockOpResponseProto response =
|
|
BlockOpResponseProto response =
|
|
BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
|
BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
|
while (response.getStatus() == Status.IN_PROGRESS) {
|
|
while (response.getStatus() == Status.IN_PROGRESS) {
|
|
// read intermediate responses
|
|
// read intermediate responses
|
|
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
|
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
|
|
|
+ // Stop waiting for slow block moves. Even if it stops waiting,
|
|
|
|
+ // the actual move may continue.
|
|
|
|
+ if (stopWaitingForResponse(startTime)) {
|
|
|
|
+ throw new IOException("Block move timed out");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
String logInfo = "block move is failed";
|
|
String logInfo = "block move is failed";
|
|
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
|
|
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
|
|
@@ -626,6 +645,7 @@ public class Dispatcher {
|
|
|
|
|
|
private final List<Task> tasks = new ArrayList<Task>(2);
|
|
private final List<Task> tasks = new ArrayList<Task>(2);
|
|
private long blocksToReceive = 0L;
|
|
private long blocksToReceive = 0L;
|
|
|
|
+ private final long startTime = Time.monotonicNow();
|
|
/**
|
|
/**
|
|
* Source blocks point to the objects in {@link Dispatcher#globalBlocks}
|
|
* Source blocks point to the objects in {@link Dispatcher#globalBlocks}
|
|
* because we want to keep one copy of a block and be aware that the
|
|
* because we want to keep one copy of a block and be aware that the
|
|
@@ -637,6 +657,13 @@ public class Dispatcher {
|
|
dn.super(storageType, maxSize2Move);
|
|
dn.super(storageType, maxSize2Move);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Check if the iteration is over
|
|
|
|
+ */
|
|
|
|
+ public boolean isIterationOver() {
|
|
|
|
+ return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
|
|
|
|
+ }
|
|
|
|
+
|
|
/** Add a task */
|
|
/** Add a task */
|
|
void addTask(Task task) {
|
|
void addTask(Task task) {
|
|
Preconditions.checkState(task.target != this,
|
|
Preconditions.checkState(task.target != this,
|
|
@@ -777,24 +804,15 @@ public class Dispatcher {
|
|
* elapsed time of the iteration has exceeded the max time limit.
|
|
* elapsed time of the iteration has exceeded the max time limit.
|
|
*/
|
|
*/
|
|
private void dispatchBlocks() {
|
|
private void dispatchBlocks() {
|
|
- final long startTime = Time.monotonicNow();
|
|
|
|
this.blocksToReceive = 2 * getScheduledSize();
|
|
this.blocksToReceive = 2 * getScheduledSize();
|
|
- boolean isTimeUp = false;
|
|
|
|
int noPendingMoveIteration = 0;
|
|
int noPendingMoveIteration = 0;
|
|
- while (!isTimeUp && getScheduledSize() > 0
|
|
|
|
|
|
+ while (getScheduledSize() > 0 && !isIterationOver()
|
|
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
|
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
|
if (LOG.isTraceEnabled()) {
|
|
if (LOG.isTraceEnabled()) {
|
|
LOG.trace(this + " blocksToReceive=" + blocksToReceive
|
|
LOG.trace(this + " blocksToReceive=" + blocksToReceive
|
|
+ ", scheduledSize=" + getScheduledSize()
|
|
+ ", scheduledSize=" + getScheduledSize()
|
|
+ ", srcBlocks#=" + srcBlocks.size());
|
|
+ ", srcBlocks#=" + srcBlocks.size());
|
|
}
|
|
}
|
|
- // check if time is up or not
|
|
|
|
- if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
|
|
|
|
- LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
|
|
|
|
- + " seconds). Skipping " + this);
|
|
|
|
- isTimeUp = true;
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
final PendingMove p = chooseNextMove();
|
|
final PendingMove p = chooseNextMove();
|
|
if (p != null) {
|
|
if (p != null) {
|
|
// Reset no pending move counter
|
|
// Reset no pending move counter
|
|
@@ -841,6 +859,11 @@ public class Dispatcher {
|
|
} catch (InterruptedException ignored) {
|
|
} catch (InterruptedException ignored) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if (isIterationOver()) {
|
|
|
|
+ LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
|
|
|
|
+ + " seconds) has been reached. Stopping " + this);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -860,13 +883,14 @@ public class Dispatcher {
|
|
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
|
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
|
- 0L, 0L, conf);
|
|
|
|
|
|
+ 0L, 0L, 0, conf);
|
|
}
|
|
}
|
|
|
|
|
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
|
- long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
|
|
|
|
|
|
+ long getBlocksSize, long getBlocksMinBlockSize,
|
|
|
|
+ int blockMoveTimeout, Configuration conf) {
|
|
this.nnc = nnc;
|
|
this.nnc = nnc;
|
|
this.excludedNodes = excludedNodes;
|
|
this.excludedNodes = excludedNodes;
|
|
this.includedNodes = includedNodes;
|
|
this.includedNodes = includedNodes;
|
|
@@ -881,6 +905,7 @@ public class Dispatcher {
|
|
|
|
|
|
this.getBlocksSize = getBlocksSize;
|
|
this.getBlocksSize = getBlocksSize;
|
|
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
|
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
|
|
|
+ this.blockMoveTimeout = blockMoveTimeout;
|
|
|
|
|
|
this.saslClient = new SaslDataTransferClient(conf,
|
|
this.saslClient = new SaslDataTransferClient(conf,
|
|
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
|
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|