|
@@ -142,33 +142,27 @@ public class BlockRecoveryWorker {
|
|
|
ReplicaState.RWR.getValue()) {
|
|
|
syncList.add(new BlockRecord(id, proxyDN, info));
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Block recovery: Ignored replica with invalid " +
|
|
|
- "original state: " + info + " from DataNode: " + id);
|
|
|
- }
|
|
|
+ LOG.debug("Block recovery: Ignored replica with invalid " +
|
|
|
+ "original state: {} from DataNode: {}", info, id);
|
|
|
}
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- if (info == null) {
|
|
|
- LOG.debug("Block recovery: DataNode: " + id + " does not have "
|
|
|
- + "replica for block: " + block);
|
|
|
- } else {
|
|
|
- LOG.debug("Block recovery: Ignored replica with invalid "
|
|
|
- + "generation stamp or length: " + info + " from " +
|
|
|
- "DataNode: " + id);
|
|
|
- }
|
|
|
+ if (info == null) {
|
|
|
+ LOG.debug("Block recovery: DataNode: {} does not have " +
|
|
|
+ "replica for block: {}", id, block);
|
|
|
+ } else {
|
|
|
+ LOG.debug("Block recovery: Ignored replica with invalid "
|
|
|
+ + "generation stamp or length: {} from DataNode: {}", info, id);
|
|
|
}
|
|
|
}
|
|
|
} catch (RecoveryInProgressException ripE) {
|
|
|
InterDatanodeProtocol.LOG.warn(
|
|
|
- "Recovery for replica " + block + " on data-node " + id
|
|
|
- + " is already in progress. Recovery id = "
|
|
|
- + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
|
|
|
+ "Recovery for replica {} on data-node {} is already in progress. " +
|
|
|
+ "Recovery id = {} is aborted.", block, id, rBlock.getNewGenerationStamp(), ripE);
|
|
|
return;
|
|
|
} catch (IOException e) {
|
|
|
++errorCount;
|
|
|
- InterDatanodeProtocol.LOG.warn("Failed to recover block (block="
|
|
|
- + block + ", datanode=" + id + ")", e);
|
|
|
+ InterDatanodeProtocol.LOG.warn("Failed to recover block (block={}, datanode={})",
|
|
|
+ block, id, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -206,11 +200,9 @@ public class BlockRecoveryWorker {
|
|
|
// or their replicas have 0 length.
|
|
|
// The block can be deleted.
|
|
|
if (syncList.isEmpty()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("syncBlock for block " + block + ", all datanodes don't " +
|
|
|
- "have the block or their replicas have 0 length. The block can " +
|
|
|
- "be deleted.");
|
|
|
- }
|
|
|
+ LOG.debug("syncBlock for block {}, all datanodes don't " +
|
|
|
+ "have the block or their replicas have 0 length. The block can " +
|
|
|
+ "be deleted.", block);
|
|
|
nn.commitBlockSynchronization(block, recoveryId, 0,
|
|
|
true, true, DatanodeID.EMPTY_ARRAY, null);
|
|
|
return;
|
|
@@ -249,12 +241,9 @@ public class BlockRecoveryWorker {
|
|
|
r.rInfo.getNumBytes() == finalizedLength) {
|
|
|
participatingList.add(r);
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("syncBlock replicaInfo: block=" + block +
|
|
|
- ", from datanode " + r.id + ", receivedState=" + rState.name() +
|
|
|
- ", receivedLength=" + r.rInfo.getNumBytes() +
|
|
|
- ", bestState=FINALIZED, finalizedLength=" + finalizedLength);
|
|
|
- }
|
|
|
+ LOG.debug("syncBlock replicaInfo: block={}, from datanode {}, receivedState={}, " +
|
|
|
+ "receivedLength={}, bestState=FINALIZED, finalizedLength={}",
|
|
|
+ block, r.id, rState.name(), r.rInfo.getNumBytes(), finalizedLength);
|
|
|
}
|
|
|
newBlock.setNumBytes(finalizedLength);
|
|
|
break;
|
|
@@ -267,12 +256,9 @@ public class BlockRecoveryWorker {
|
|
|
minLength = Math.min(minLength, r.rInfo.getNumBytes());
|
|
|
participatingList.add(r);
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("syncBlock replicaInfo: block=" + block +
|
|
|
- ", from datanode " + r.id + ", receivedState=" + rState.name() +
|
|
|
- ", receivedLength=" + r.rInfo.getNumBytes() + ", bestState=" +
|
|
|
- bestState.name());
|
|
|
- }
|
|
|
+ LOG.debug("syncBlock replicaInfo: block={}, from datanode {}, receivedState={}, " +
|
|
|
+ "receivedLength={}, bestState={}", block, r.id, rState.name(),
|
|
|
+ r.rInfo.getNumBytes(), bestState.name());
|
|
|
}
|
|
|
// recover() guarantees syncList will have at least one replica with RWR
|
|
|
// or better state.
|
|
@@ -325,11 +311,8 @@ public class BlockRecoveryWorker {
|
|
|
storages[i] = r.storageID;
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Datanode triggering commitBlockSynchronization, block=" +
|
|
|
- block + ", newGs=" + newBlock.getGenerationStamp() +
|
|
|
- ", newLength=" + newBlock.getNumBytes());
|
|
|
- }
|
|
|
+ LOG.debug("Datanode triggering commitBlockSynchronization, block={}, newGs={}, " +
|
|
|
+ "newLength={}", block, newBlock.getGenerationStamp(), newBlock.getNumBytes());
|
|
|
|
|
|
nn.commitBlockSynchronization(block,
|
|
|
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
|
|
@@ -406,14 +389,15 @@ public class BlockRecoveryWorker {
|
|
|
//check generation stamps
|
|
|
for (int i = 0; i < locs.length; i++) {
|
|
|
DatanodeID id = locs[i];
|
|
|
+ ExtendedBlock internalBlk = null;
|
|
|
try {
|
|
|
DatanodeID bpReg = getDatanodeID(bpid);
|
|
|
+ internalBlk = new ExtendedBlock(block);
|
|
|
+ final long blockId = block.getBlockId() + blockIndices[i];
|
|
|
+ internalBlk.setBlockId(blockId);
|
|
|
InterDatanodeProtocol proxyDN = bpReg.equals(id) ?
|
|
|
datanode : DataNode.createInterDataNodeProtocolProxy(id, conf,
|
|
|
dnConf.socketTimeout, dnConf.connectToDnViaHostname);
|
|
|
- ExtendedBlock internalBlk = new ExtendedBlock(block);
|
|
|
- final long blockId = block.getBlockId() + blockIndices[i];
|
|
|
- internalBlk.setBlockId(blockId);
|
|
|
ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN,
|
|
|
new RecoveringBlock(internalBlk, null, recoveryId));
|
|
|
|
|
@@ -427,27 +411,36 @@ public class BlockRecoveryWorker {
|
|
|
// simply choose the one with larger length.
|
|
|
// TODO: better usage of redundant replicas
|
|
|
syncBlocks.put(blockId, new BlockRecord(id, proxyDN, info));
|
|
|
+ } else {
|
|
|
+ LOG.debug("Block recovery: Ignored replica with invalid " +
|
|
|
+ "original state: {} from DataNode: {} by block: {}", info, id, block);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (info == null) {
|
|
|
+ LOG.debug("Block recovery: DataNode: {} does not have " +
|
|
|
+ "replica for block: (block={}, internalBlk={})", id, block, internalBlk);
|
|
|
+ } else {
|
|
|
+ LOG.debug("Block recovery: Ignored replica with invalid "
|
|
|
+ + "generation stamp or length: {} from DataNode: {} by block: {}",
|
|
|
+ info, id, block);
|
|
|
}
|
|
|
}
|
|
|
} catch (RecoveryInProgressException ripE) {
|
|
|
InterDatanodeProtocol.LOG.warn(
|
|
|
- "Recovery for replica " + block + " on data-node " + id
|
|
|
- + " is already in progress. Recovery id = "
|
|
|
- + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
|
|
|
+ "Recovery for replica (block={}, internalBlk={}) on data-node {} is already " +
|
|
|
+ "in progress. Recovery id = {} is aborted.", block, internalBlk, id,
|
|
|
+ rBlock.getNewGenerationStamp(), ripE);
|
|
|
return;
|
|
|
} catch (IOException e) {
|
|
|
- InterDatanodeProtocol.LOG.warn("Failed to recover block (block="
|
|
|
- + block + ", datanode=" + id + ")", e);
|
|
|
+ InterDatanodeProtocol.LOG.warn("Failed to recover block (block={}, internalBlk={}, " +
|
|
|
+ "datanode={})", block, internalBlk, id, e);
|
|
|
}
|
|
|
}
|
|
|
checkLocations(syncBlocks.size());
|
|
|
|
|
|
final long safeLength = getSafeLength(syncBlocks);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Recovering block " + block
|
|
|
- + ", length=" + block.getNumBytes() + ", safeLength=" + safeLength
|
|
|
- + ", syncList=" + syncBlocks);
|
|
|
- }
|
|
|
+ LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
|
|
|
+ block.getNumBytes(), safeLength, syncBlocks);
|
|
|
|
|
|
// If some internal blocks reach the safe length, convert them to RUR
|
|
|
List<BlockRecord> rurList = new ArrayList<>(locs.length);
|
|
@@ -498,8 +491,8 @@ public class BlockRecoveryWorker {
|
|
|
r.updateReplicaUnderRecovery(bpid, recoveryId, r.rInfo.getBlockId(),
|
|
|
newSize);
|
|
|
} catch (IOException e) {
|
|
|
- InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
|
|
|
- + ", datanode=" + r.id + ")", e);
|
|
|
+ InterDatanodeProtocol.LOG.warn("Failed to updateBlock (block={}, internalBlk={}, " +
|
|
|
+ "datanode={})", block, r.rInfo, r.id, e);
|
|
|
failedList.add(r.id);
|
|
|
}
|
|
|
}
|
|
@@ -552,12 +545,9 @@ public class BlockRecoveryWorker {
|
|
|
ExtendedBlock block = rb.getBlock();
|
|
|
DatanodeInfo[] targets = rb.getLocations();
|
|
|
|
|
|
- LOG.info("BlockRecoveryWorker: " + who + " calls recoverBlock(" + block
|
|
|
- + ", targets=[" + Joiner.on(", ").join(targets) + "]"
|
|
|
- + ", newGenerationStamp=" + rb.getNewGenerationStamp()
|
|
|
- + ", newBlock=" + rb.getNewBlock()
|
|
|
- + ", isStriped=" + rb.isStriped()
|
|
|
- + ")");
|
|
|
+ LOG.info("BlockRecoveryWorker: {} calls recoverBlock({}, targets=[{}], newGenerationStamp={}"
|
|
|
+ + ", newBlock={}, isStriped={})", who, block, Joiner.on(", ").join(targets),
|
|
|
+ rb.getNewGenerationStamp(), rb.getNewBlock(), rb.isStriped());
|
|
|
}
|
|
|
|
|
|
/**
|