|
@@ -103,8 +103,13 @@ public class BlockRecoveryWorker {
|
|
|
protected void recover() throws IOException {
|
|
|
List<BlockRecord> syncList = new ArrayList<>(locs.length);
|
|
|
int errorCount = 0;
|
|
|
+ int candidateReplicaCnt = 0;
|
|
|
|
|
|
- //check generation stamps
|
|
|
+ // Check generation stamps, replica size and state. Replica must satisfy
|
|
|
+ // the following criteria to be included in syncList for recovery:
|
|
|
+ // - Valid generation stamp
|
|
|
+ // - Non-zero length
|
|
|
+ // - Original state is RWR or better
|
|
|
for(DatanodeID id : locs) {
|
|
|
try {
|
|
|
DatanodeID bpReg =datanode.getBPOfferService(bpid).bpRegistration;
|
|
@@ -115,7 +120,28 @@ public class BlockRecoveryWorker {
|
|
|
if (info != null &&
|
|
|
info.getGenerationStamp() >= block.getGenerationStamp() &&
|
|
|
info.getNumBytes() > 0) {
|
|
|
- syncList.add(new BlockRecord(id, proxyDN, info));
|
|
|
+ // Count the number of candidate replicas received.
|
|
|
+ ++candidateReplicaCnt;
|
|
|
+ if (info.getOriginalReplicaState().getValue() <=
|
|
|
+ ReplicaState.RWR.getValue()) {
|
|
|
+ syncList.add(new BlockRecord(id, proxyDN, info));
|
|
|
+ } else {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Block recovery: Ignored replica with invalid " +
|
|
|
+ "original state: " + info + " from DataNode: " + id);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ if (info == null) {
|
|
|
+ LOG.debug("Block recovery: DataNode: " + id + " does not have "
|
|
|
+ + "replica for block: " + block);
|
|
|
+ } else {
|
|
|
+ LOG.debug("Block recovery: Ignored replica with invalid "
|
|
|
+ + "generation stamp or length: " + info + " from " +
|
|
|
+ "DataNode: " + id);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
} catch (RecoveryInProgressException ripE) {
|
|
|
InterDatanodeProtocol.LOG.warn(
|
|
@@ -136,6 +162,15 @@ public class BlockRecoveryWorker {
|
|
|
+ ", datanodeids=" + Arrays.asList(locs));
|
|
|
}
|
|
|
|
|
|
+ // None of the replicas reported by DataNodes has the required original
|
|
|
+ // state, report the error.
|
|
|
+ if (candidateReplicaCnt > 0 && syncList.isEmpty()) {
|
|
|
+ throw new IOException("Found " + candidateReplicaCnt +
|
|
|
+ " replica(s) for block " + block + " but none is in " +
|
|
|
+ ReplicaState.RWR.name() + " or better state. datanodeids=" +
|
|
|
+ Arrays.asList(locs));
|
|
|
+ }
|
|
|
+
|
|
|
syncBlock(syncList);
|
|
|
}
|
|
|
|
|
@@ -157,6 +192,11 @@ public class BlockRecoveryWorker {
|
|
|
// or their replicas have 0 length.
|
|
|
// The block can be deleted.
|
|
|
if (syncList.isEmpty()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("syncBlock for block " + block + ", all datanodes don't " +
|
|
|
+ "have the block or their replicas have 0 length. The block can " +
|
|
|
+ "be deleted.");
|
|
|
+ }
|
|
|
nn.commitBlockSynchronization(block, recoveryId, 0,
|
|
|
true, true, DatanodeID.EMPTY_ARRAY, null);
|
|
|
return;
|
|
@@ -195,6 +235,12 @@ public class BlockRecoveryWorker {
|
|
|
r.rInfo.getNumBytes() == finalizedLength) {
|
|
|
participatingList.add(r);
|
|
|
}
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("syncBlock replicaInfo: block=" + block +
|
|
|
+ ", from datanode " + r.id + ", receivedState=" + rState.name() +
|
|
|
+ ", receivedLength=" + r.rInfo.getNumBytes() +
|
|
|
+ ", bestState=FINALIZED, finalizedLength=" + finalizedLength);
|
|
|
+ }
|
|
|
}
|
|
|
newBlock.setNumBytes(finalizedLength);
|
|
|
break;
|
|
@@ -207,7 +253,16 @@ public class BlockRecoveryWorker {
|
|
|
minLength = Math.min(minLength, r.rInfo.getNumBytes());
|
|
|
participatingList.add(r);
|
|
|
}
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("syncBlock replicaInfo: block=" + block +
|
|
|
+ ", from datanode " + r.id + ", receivedState=" + rState.name() +
|
|
|
+ ", receivedLength=" + r.rInfo.getNumBytes() + ", bestState=" +
|
|
|
+ bestState.name());
|
|
|
+ }
|
|
|
}
|
|
|
+ // recover() guarantees syncList will have at least one replica with RWR
|
|
|
+ // or better state.
|
|
|
+ assert minLength != Long.MAX_VALUE : "wrong minLength";
|
|
|
newBlock.setNumBytes(minLength);
|
|
|
break;
|
|
|
case RUR:
|
|
@@ -254,6 +309,13 @@ public class BlockRecoveryWorker {
|
|
|
datanodes[i] = r.id;
|
|
|
storages[i] = r.storageID;
|
|
|
}
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Datanode triggering commitBlockSynchronization, block=" +
|
|
|
+ block + ", newGs=" + newBlock.getGenerationStamp() +
|
|
|
+ ", newLength=" + newBlock.getNumBytes());
|
|
|
+ }
|
|
|
+
|
|
|
nn.commitBlockSynchronization(block,
|
|
|
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
|
|
|
datanodes, storages);
|