|
@@ -121,7 +121,6 @@ public final class ErasureCodingWorker {
|
|
|
public void processErasureCodingTasks(
|
|
|
Collection<BlockECReconstructionInfo> ecTasks) {
|
|
|
for (BlockECReconstructionInfo reconInfo : ecTasks) {
|
|
|
- int xmitsSubmitted = 0;
|
|
|
try {
|
|
|
StripedReconstructionInfo stripedReconInfo =
|
|
|
new StripedReconstructionInfo(
|
|
@@ -134,20 +133,19 @@ public final class ErasureCodingWorker {
|
|
|
final StripedBlockReconstructor task =
|
|
|
new StripedBlockReconstructor(this, stripedReconInfo);
|
|
|
if (task.hasValidTargets()) {
|
|
|
+ stripedReconstructionPool.submit(task);
|
|
|
// See HDFS-12044. We increase xmitsInProgress even the task is only
|
|
|
// enqueued, so that
|
|
|
// 1) NN will not send more tasks than what DN can execute and
|
|
|
// 2) DN will not throw away reconstruction tasks, and instead keeps
|
|
|
// an unbounded number of tasks in the executor's task queue.
|
|
|
- xmitsSubmitted = Math.max((int)(task.getXmits() * xmitWeight), 1);
|
|
|
+ int xmitsSubmitted = Math.max((int)(task.getXmits() * xmitWeight), 1);
|
|
|
getDatanode().incrementXmitsInProcess(xmitsSubmitted);
|
|
|
- stripedReconstructionPool.submit(task);
|
|
|
} else {
|
|
|
LOG.warn("No missing internal block. Skip reconstruction for task:{}",
|
|
|
reconInfo);
|
|
|
}
|
|
|
} catch (Throwable e) {
|
|
|
- getDatanode().decrementXmitsInProgress(xmitsSubmitted);
|
|
|
LOG.warn("Failed to reconstruct striped block {}",
|
|
|
reconInfo.getExtendedBlock().getLocalBlock(), e);
|
|
|
}
|