|
@@ -18,6 +18,8 @@
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
@@ -30,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
|
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
@@ -46,11 +50,14 @@ import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.security.MessageDigest;
|
|
import java.security.MessageDigest;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Map;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Utilities for Block checksum computing, for both replicated and striped
|
|
* Utilities for Block checksum computing, for both replicated and striped
|
|
* blocks.
|
|
* blocks.
|
|
*/
|
|
*/
|
|
|
|
+@InterfaceAudience.Private
|
|
final class BlockChecksumHelper {
|
|
final class BlockChecksumHelper {
|
|
|
|
|
|
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
|
|
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
|
|
@@ -327,6 +334,7 @@ final class BlockChecksumHelper {
|
|
private final ErasureCodingPolicy ecPolicy;
|
|
private final ErasureCodingPolicy ecPolicy;
|
|
private final DatanodeInfo[] datanodes;
|
|
private final DatanodeInfo[] datanodes;
|
|
private final Token<BlockTokenIdentifier>[] blockTokens;
|
|
private final Token<BlockTokenIdentifier>[] blockTokens;
|
|
|
|
+ private final byte[] blockIndices;
|
|
|
|
|
|
private final DataOutputBuffer md5writer = new DataOutputBuffer();
|
|
private final DataOutputBuffer md5writer = new DataOutputBuffer();
|
|
|
|
|
|
@@ -338,17 +346,61 @@ final class BlockChecksumHelper {
|
|
this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
|
|
this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
|
|
this.datanodes = stripedBlockInfo.getDatanodes();
|
|
this.datanodes = stripedBlockInfo.getDatanodes();
|
|
this.blockTokens = stripedBlockInfo.getBlockTokens();
|
|
this.blockTokens = stripedBlockInfo.getBlockTokens();
|
|
|
|
+ this.blockIndices = stripedBlockInfo.getBlockIndices();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class LiveBlockInfo {
|
|
|
|
+ private final DatanodeInfo dn;
|
|
|
|
+ private final Token<BlockTokenIdentifier> token;
|
|
|
|
+
|
|
|
|
+ LiveBlockInfo(DatanodeInfo dn, Token<BlockTokenIdentifier> token) {
|
|
|
|
+ this.dn = dn;
|
|
|
|
+ this.token = token;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ DatanodeInfo getDn() {
|
|
|
|
+ return dn;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Token<BlockTokenIdentifier> getToken() {
|
|
|
|
+ return token;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
void compute() throws IOException {
|
|
void compute() throws IOException {
|
|
- for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
|
|
|
|
- ExtendedBlock block =
|
|
|
|
- StripedBlockUtil.constructInternalBlock(blockGroup,
|
|
|
|
- ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
|
|
|
|
- DatanodeInfo targetDatanode = datanodes[idx];
|
|
|
|
- Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
|
|
|
|
- checksumBlock(block, idx, blockToken, targetDatanode);
|
|
|
|
|
|
+ assert datanodes.length == blockIndices.length;
|
|
|
|
+
|
|
|
|
+ Map<Byte, LiveBlockInfo> liveDns = new HashMap<>(datanodes.length);
|
|
|
|
+ int blkIndxLen = blockIndices.length;
|
|
|
|
+ int numDataUnits = ecPolicy.getNumDataUnits();
|
|
|
|
+ // Prepare live datanode list. Missing data blocks will be reconstructed
|
|
|
|
+ // and recalculate checksum.
|
|
|
|
+ for (int idx = 0; idx < blkIndxLen; idx++) {
|
|
|
|
+ liveDns.put(blockIndices[idx],
|
|
|
|
+ new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
|
|
|
|
+ }
|
|
|
|
+ for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
|
|
|
|
+ try {
|
|
|
|
+ LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
|
|
|
|
+ if (liveBlkInfo == null) {
|
|
|
|
+ // reconstruct block and calculate checksum for missing node
|
|
|
|
+ recalculateChecksum(idx);
|
|
|
|
+ } else {
|
|
|
|
+ try {
|
|
|
|
+ ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
|
|
|
|
+ blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
|
|
|
|
+ checksumBlock(block, idx, liveBlkInfo.getToken(),
|
|
|
|
+ liveBlkInfo.getDn());
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.warn("Exception while reading checksum", ioe);
|
|
|
|
+ // reconstruct block and calculate checksum for the failed node
|
|
|
|
+ recalculateChecksum(idx);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.warn("Failed to get the checksum", e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
MD5Hash md5out = MD5Hash.digest(md5writer.getData());
|
|
MD5Hash md5out = MD5Hash.digest(md5writer.getData());
|
|
@@ -379,52 +431,90 @@ final class BlockChecksumHelper {
|
|
DataTransferProtos.OpBlockChecksumResponseProto checksumData =
|
|
DataTransferProtos.OpBlockChecksumResponseProto checksumData =
|
|
reply.getChecksumResponse();
|
|
reply.getChecksumResponse();
|
|
|
|
|
|
- //read byte-per-checksum
|
|
|
|
- final int bpc = checksumData.getBytesPerCrc();
|
|
|
|
- if (blockIdx == 0) { //first block
|
|
|
|
- setBytesPerCRC(bpc);
|
|
|
|
- } else if (bpc != getBytesPerCRC()) {
|
|
|
|
- throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
|
|
|
|
- + " but bytesPerCRC=" + getBytesPerCRC());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //read crc-per-block
|
|
|
|
- final long cpb = checksumData.getCrcPerBlock();
|
|
|
|
- if (blockIdx == 0) {
|
|
|
|
- setCrcPerBlock(cpb);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //read md5
|
|
|
|
- final MD5Hash md5 = new MD5Hash(
|
|
|
|
- checksumData.getMd5().toByteArray());
|
|
|
|
- md5.write(md5writer);
|
|
|
|
-
|
|
|
|
// read crc-type
|
|
// read crc-type
|
|
final DataChecksum.Type ct;
|
|
final DataChecksum.Type ct;
|
|
if (checksumData.hasCrcType()) {
|
|
if (checksumData.hasCrcType()) {
|
|
ct = PBHelperClient.convert(checksumData.getCrcType());
|
|
ct = PBHelperClient.convert(checksumData.getCrcType());
|
|
} else {
|
|
} else {
|
|
- LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
|
|
|
|
- "inferring checksum by reading first byte");
|
|
|
|
|
|
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: "
|
|
|
|
+ + "inferring checksum by reading first byte");
|
|
ct = DataChecksum.Type.DEFAULT;
|
|
ct = DataChecksum.Type.DEFAULT;
|
|
}
|
|
}
|
|
|
|
|
|
- if (blockIdx == 0) { // first block
|
|
|
|
- setCrcType(ct);
|
|
|
|
- } else if (getCrcType() != DataChecksum.Type.MIXED &&
|
|
|
|
- getCrcType() != ct) {
|
|
|
|
- // if crc types are mixed in a file
|
|
|
|
- setCrcType(DataChecksum.Type.MIXED);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(),
|
|
|
|
+ checksumData.getCrcPerBlock(), ct);
|
|
|
|
+ //read md5
|
|
|
|
+ final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
|
|
|
|
+ md5.write(md5writer);
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- if (blockIdx == 0) {
|
|
|
|
- LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
|
|
|
|
- + ", crcPerBlock=" + getCrcPerBlock());
|
|
|
|
- }
|
|
|
|
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
|
|
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Reconstruct this data block and recalculate checksum.
|
|
|
|
+ *
|
|
|
|
+ * @param errBlkIndex
|
|
|
|
+ * error index to be reconstrcuted and recalculate checksum.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void recalculateChecksum(int errBlkIndex) throws IOException {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Recalculate checksum for the missing/failed block index "
|
|
|
|
+ + errBlkIndex);
|
|
|
|
+ }
|
|
|
|
+ byte[] errIndices = new byte[1];
|
|
|
|
+ errIndices[0] = (byte) errBlkIndex;
|
|
|
|
+ StripedReconstructionInfo stripedReconInfo =
|
|
|
|
+ new StripedReconstructionInfo(
|
|
|
|
+ blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
|
|
|
|
+ final StripedBlockChecksumReconstructor checksumRecon =
|
|
|
|
+ new StripedBlockChecksumReconstructor(
|
|
|
|
+ getDatanode().getErasureCodingWorker(), stripedReconInfo,
|
|
|
|
+ md5writer);
|
|
|
|
+ checksumRecon.reconstruct();
|
|
|
|
+
|
|
|
|
+ DataChecksum checksum = checksumRecon.getChecksum();
|
|
|
|
+ long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
|
|
|
|
+ : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
|
|
|
|
+ setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
|
|
|
|
+ crcPerBlock, checksum.getChecksumType());
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Recalculated checksum for the block index " + errBlkIndex
|
|
|
|
+ + ": md5=" + checksumRecon.getMD5());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
|
|
|
|
+ final long cpb, DataChecksum.Type ct) throws IOException {
|
|
|
|
+ //read byte-per-checksum
|
|
|
|
+ if (blockIdx == 0) { //first block
|
|
|
|
+ setBytesPerCRC(bpc);
|
|
|
|
+ } else if (bpc != getBytesPerCRC()) {
|
|
|
|
+ throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
|
|
|
|
+ + " but bytesPerCRC=" + getBytesPerCRC());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //read crc-per-block
|
|
|
|
+ if (blockIdx == 0) {
|
|
|
|
+ setCrcPerBlock(cpb);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (blockIdx == 0) { // first block
|
|
|
|
+ setCrcType(ct);
|
|
|
|
+ } else if (getCrcType() != DataChecksum.Type.MIXED &&
|
|
|
|
+ getCrcType() != ct) {
|
|
|
|
+ // if crc types are mixed in a file
|
|
|
|
+ setCrcType(DataChecksum.Type.MIXED);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ if (blockIdx == 0) {
|
|
|
|
+ LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
|
|
|
|
+ + ", crcPerBlock=" + getCrcPerBlock());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|