Преглед на файлове

HDFS-13056. Expose file-level composite CRCs in HDFS which are comparable across different instances/layouts. Contributed by Dennis Huo.

Xiao Chen преди 7 години
родител
ревизия
7c9cdad6d0
променени са 34 файла, в които са добавени 2359 реда и са изтрити 242 реда
  1. 82 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java
  2. 11 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
  3. 187 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java
  4. 220 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java
  5. 18 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
  6. 242 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java
  7. 232 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java
  8. 2 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java
  9. 42 14
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  10. 3 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  11. 248 117
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
  12. 2 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
  13. 27 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
  14. 54 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java
  15. 30 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java
  16. 9 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
  17. 8 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
  18. 44 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
  19. 5 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
  20. 21 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
  21. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
  22. 256 33
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
  23. 17 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  24. 80 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java
  25. 74 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java
  26. 45 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
  27. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
  28. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  29. 29 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  30. 99 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
  31. 47 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java
  32. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
  33. 144 29
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
  34. 50 0
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java

+ 82 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
+
+/** Composite CRC. */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Unstable
+public class CompositeCrcFileChecksum extends FileChecksum {
+  public static final int LENGTH = Integer.SIZE / Byte.SIZE;
+
+  private int crc;
+  private DataChecksum.Type crcType;
+  private int bytesPerCrc;
+
+  /** Create a CompositeCrcFileChecksum. */
+  public CompositeCrcFileChecksum(
+      int crc, DataChecksum.Type crcType, int bytesPerCrc) {
+    this.crc = crc;
+    this.crcType = crcType;
+    this.bytesPerCrc = bytesPerCrc;
+  }
+
+  @Override
+  public String getAlgorithmName() {
+    return "COMPOSITE-" + crcType.name();
+  }
+
+  @Override
+  public int getLength() {
+    return LENGTH;
+  }
+
+  @Override
+  public byte[] getBytes() {
+    return CrcUtil.intToBytes(crc);
+  }
+
+  @Override
+  public ChecksumOpt getChecksumOpt() {
+    return new ChecksumOpt(crcType, bytesPerCrc);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    crc = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(crc);
+  }
+
+  @Override
+  public String toString() {
+    return getAlgorithmName() + ":" + String.format("0x%08x", crc);
+  }
+}

+ 11 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

@@ -504,4 +504,15 @@ public final class Options {
 
   }
 
+  /**
+   * Enum for indicating what mode to use when combining chunk and block
+   * checksums to define an aggregate FileChecksum. This should be considered
+   * a client-side runtime option rather than a persistent property of any
+   * stored metadata, which is why this is not part of ChecksumOpt, which
+   * deals with properties of files at rest.
+   */
+  public enum ChecksumCombineMode {
+    MD5MD5CRC,  // MD5 of block checksums, which are MD5 over chunk CRCs
+    COMPOSITE_CRC  // Block/chunk-independent composite CRC
+  }
 }

+ 187 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java

@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Encapsulates logic for composing multiple CRCs into one or more combined CRCs
+ * corresponding to concatenated underlying data ranges. Optimized for composing
+ * a large number of CRCs that correspond to underlying chunks of data all of
+ * same size.
+ */
+@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"})
+@InterfaceStability.Unstable
+public class CrcComposer {
+  private static final int CRC_SIZE_BYTES = 4;
+  private static final Logger LOG = LoggerFactory.getLogger(CrcComposer.class);
+
+  private final int crcPolynomial;
+  private final int precomputedMonomialForHint;
+  private final long bytesPerCrcHint;
+  private final long stripeLength;
+
+  private int curCompositeCrc = 0;
+  private long curPositionInStripe = 0;
+  private ByteArrayOutputStream digestOut = new ByteArrayOutputStream();
+
+  /**
+   * Returns a CrcComposer which will collapse all ingested CRCs into a single
+   * value.
+   */
+  public static CrcComposer newCrcComposer(
+      DataChecksum.Type type, long bytesPerCrcHint)
+      throws IOException {
+    return newStripedCrcComposer(type, bytesPerCrcHint, Long.MAX_VALUE);
+  }
+
+  /**
+   * Returns a CrcComposer which will collapse CRCs for every combined
+   * underlying data size which aligns with the specified stripe boundary. For
+   * example, if "update" is called with 20 CRCs and bytesPerCrc == 5, and
+   * stripeLength == 10, then every two (10 / 5) consecutive CRCs will be
+   * combined with each other, yielding a list of 10 CRC "stripes" in the
+   * final digest, each corresponding to 10 underlying data bytes. Using
+   * a stripeLength greater than the total underlying data size is equivalent
+   * to using a non-striped CrcComposer.
+   */
+  public static CrcComposer newStripedCrcComposer(
+      DataChecksum.Type type, long bytesPerCrcHint, long stripeLength)
+      throws IOException {
+    int polynomial = DataChecksum.getCrcPolynomialForType(type);
+    return new CrcComposer(
+        polynomial,
+        CrcUtil.getMonomial(bytesPerCrcHint, polynomial),
+        bytesPerCrcHint,
+        stripeLength);
+  }
+
+  CrcComposer(
+      int crcPolynomial,
+      int precomputedMonomialForHint,
+      long bytesPerCrcHint,
+      long stripeLength) {
+    LOG.debug(
+        "crcPolynomial=0x{}, precomputedMonomialForHint=0x{}, "
+        + "bytesPerCrcHint={}, stripeLength={}",
+        Integer.toString(crcPolynomial, 16),
+        Integer.toString(precomputedMonomialForHint, 16),
+        bytesPerCrcHint,
+        stripeLength);
+    this.crcPolynomial = crcPolynomial;
+    this.precomputedMonomialForHint = precomputedMonomialForHint;
+    this.bytesPerCrcHint = bytesPerCrcHint;
+    this.stripeLength = stripeLength;
+  }
+
+  /**
+   * Composes length / CRC_SIZE_IN_BYTES more CRCs from crcBuffer, with
+   * each CRC expected to correspond to exactly {@code bytesPerCrc} underlying
+   * data bytes.
+   *
+   * @param length must be a multiple of the expected byte-size of a CRC.
+   */
+  public void update(
+      byte[] crcBuffer, int offset, int length, long bytesPerCrc)
+      throws IOException {
+    if (length % CRC_SIZE_BYTES != 0) {
+      throw new IOException(String.format(
+          "Trying to update CRC from byte array with length '%d' at offset "
+          + "'%d' which is not a multiple of %d!",
+          length, offset, CRC_SIZE_BYTES));
+    }
+    int limit = offset + length;
+    while (offset < limit) {
+      int crcB = CrcUtil.readInt(crcBuffer, offset);
+      update(crcB, bytesPerCrc);
+      offset += CRC_SIZE_BYTES;
+    }
+  }
+
+  /**
+   * Composes {@code numChecksumsToRead} additional CRCs into the current digest
+   * out of {@code checksumIn}, with each CRC expected to correspond to exactly
+   * {@code bytesPerCrc} underlying data bytes.
+   */
+  public void update(
+      DataInputStream checksumIn, long numChecksumsToRead, long bytesPerCrc)
+      throws IOException {
+    for (long i = 0; i < numChecksumsToRead; ++i) {
+      int crcB = checksumIn.readInt();
+      update(crcB, bytesPerCrc);
+    }
+  }
+
+  /**
+   * Updates with a single additional CRC which corresponds to an underlying
+   * data size of {@code bytesPerCrc}.
+   */
+  public void update(int crcB, long bytesPerCrc) throws IOException {
+    if (curCompositeCrc == 0) {
+      curCompositeCrc = crcB;
+    } else if (bytesPerCrc == bytesPerCrcHint) {
+      curCompositeCrc = CrcUtil.composeWithMonomial(
+          curCompositeCrc, crcB, precomputedMonomialForHint, crcPolynomial);
+    } else {
+      curCompositeCrc = CrcUtil.compose(
+          curCompositeCrc, crcB, bytesPerCrc, crcPolynomial);
+    }
+
+    curPositionInStripe += bytesPerCrc;
+
+    if (curPositionInStripe > stripeLength) {
+      throw new IOException(String.format(
+          "Current position in stripe '%d' after advancing by bytesPerCrc '%d' "
+          + "exceeds stripeLength '%d' without stripe alignment.",
+          curPositionInStripe, bytesPerCrc, stripeLength));
+    } else if (curPositionInStripe == stripeLength) {
+      // Hit a stripe boundary; flush the curCompositeCrc and reset for next
+      // stripe.
+      digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES);
+      curCompositeCrc = 0;
+      curPositionInStripe = 0;
+    }
+  }
+
+  /**
+   * Returns byte representation of composed CRCs; if no stripeLength was
+   * specified, the digest should be of length equal to exactly one CRC.
+   * Otherwise, the number of CRCs in the returned array is equal to the
+   * total sum bytesPerCrc divided by stripeLength. If the sum of bytesPerCrc
+   * is not a multiple of stripeLength, then the last CRC in the array
+   * corresponds to totalLength % stripeLength underlying data bytes.
+   */
+  public byte[] digest() {
+    if (curPositionInStripe > 0) {
+      digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES);
+      curCompositeCrc = 0;
+      curPositionInStripe = 0;
+    }
+    byte[] digestValue = digestOut.toByteArray();
+    digestOut.reset();
+    return digestValue;
+  }
+}

+ 220 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java

@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class provides utilities for working with CRCs.
+ */
+@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"})
+@InterfaceStability.Unstable
+public final class CrcUtil {
+  public static final int MULTIPLICATIVE_IDENTITY = 0x80000000;
+  public static final int GZIP_POLYNOMIAL = 0xEDB88320;
+  public static final int CASTAGNOLI_POLYNOMIAL = 0x82F63B78;
+
+  /**
+   * Hide default constructor for a static utils class.
+   */
+  private CrcUtil() {
+  }
+
+  /**
+   * Compute x^({@code lengthBytes} * 8) mod {@code mod}, where {@code mod} is
+   * in "reversed" (little-endian) format such that {@code mod & 1} represents
+   * x^31 and has an implicit term x^32.
+   */
+  public static int getMonomial(long lengthBytes, int mod) {
+    if (lengthBytes == 0) {
+      return MULTIPLICATIVE_IDENTITY;
+    } else if (lengthBytes < 0) {
+      throw new IllegalArgumentException(
+          "lengthBytes must be positive, got " + lengthBytes);
+    }
+
+    // Decompose into
+    // x^degree == x ^ SUM(bit[i] * 2^i) == PRODUCT(x ^ (bit[i] * 2^i))
+    // Generate each x^(2^i) by squaring.
+    // Since 'degree' is in 'bits', but we only need to support byte
+    // granularity we can begin with x^8.
+    int multiplier = MULTIPLICATIVE_IDENTITY >>> 8;
+    int product = MULTIPLICATIVE_IDENTITY;
+    long degree = lengthBytes;
+    while (degree > 0) {
+      if ((degree & 1) != 0) {
+        product = (product == MULTIPLICATIVE_IDENTITY) ? multiplier :
+            galoisFieldMultiply(product, multiplier, mod);
+      }
+      multiplier = galoisFieldMultiply(multiplier, multiplier, mod);
+      degree >>= 1;
+    }
+    return product;
+  }
+
+  /**
+   * @param monomial Precomputed x^(lengthBInBytes * 8) mod {@code mod}
+   */
+  public static int composeWithMonomial(
+      int crcA, int crcB, int monomial, int mod) {
+    return galoisFieldMultiply(crcA, monomial, mod) ^ crcB;
+  }
+
+  /**
+   * @param lengthB length of content corresponding to {@code crcB}, in bytes.
+   */
+  public static int compose(int crcA, int crcB, long lengthB, int mod) {
+    int monomial = getMonomial(lengthB, mod);
+    return composeWithMonomial(crcA, crcB, monomial, mod);
+  }
+
+  /**
+   * @return 4-byte array holding the big-endian representation of
+   *     {@code value}.
+   */
+  public static byte[] intToBytes(int value) {
+    byte[] buf = new byte[4];
+    try {
+      writeInt(buf, 0, value);
+    } catch (IOException ioe) {
+      // Since this should only be able to occur from code bugs within this
+      // class rather than user input, we throw as a RuntimeException
+      // rather than requiring this method to declare throwing IOException
+      // for something the caller can't control.
+      throw new RuntimeException(ioe);
+    }
+    return buf;
+  }
+
+  /**
+   * Writes big-endian representation of {@code value} into {@code buf}
+   * starting at {@code offset}. buf.length must be greater than or
+   * equal to offset + 4.
+   */
+  public static void writeInt(byte[] buf, int offset, int value)
+      throws IOException {
+    if (offset + 4  > buf.length) {
+      throw new IOException(String.format(
+          "writeInt out of bounds: buf.length=%d, offset=%d",
+          buf.length, offset));
+    }
+    buf[offset + 0] = (byte)((value >>> 24) & 0xff);
+    buf[offset + 1] = (byte)((value >>> 16) & 0xff);
+    buf[offset + 2] = (byte)((value >>> 8) & 0xff);
+    buf[offset + 3] = (byte)(value & 0xff);
+  }
+
+  /**
+   * Reads 4-byte big-endian int value from {@code buf} starting at
+   * {@code offset}. buf.length must be greater than or equal to offset + 4.
+   */
+  public static int readInt(byte[] buf, int offset)
+      throws IOException {
+    if (offset + 4  > buf.length) {
+      throw new IOException(String.format(
+          "readInt out of bounds: buf.length=%d, offset=%d",
+          buf.length, offset));
+    }
+    int value = ((buf[offset + 0] & 0xff) << 24) |
+                ((buf[offset + 1] & 0xff) << 16) |
+                ((buf[offset + 2] & 0xff) << 8)  |
+                ((buf[offset + 3] & 0xff));
+    return value;
+  }
+
+  /**
+   * For use with debug statements; verifies bytes.length on creation,
+   * expecting it to represent exactly one CRC, and returns a hex
+   * formatted value.
+   */
+  public static String toSingleCrcString(final byte[] bytes)
+      throws IOException {
+    if (bytes.length != 4) {
+      throw new IOException((String.format(
+          "Unexpected byte[] length '%d' for single CRC. Contents: %s",
+          bytes.length, Arrays.toString(bytes))));
+    }
+    return String.format("0x%08x", readInt(bytes, 0));
+  }
+
+  /**
+   * For use with debug statements; verifies bytes.length on creation,
+   * expecting it to be divisible by CRC byte size, and returns a list of
+   * hex formatted values.
+   */
+  public static String toMultiCrcString(final byte[] bytes)
+      throws IOException {
+    if (bytes.length % 4 != 0) {
+      throw new IOException((String.format(
+          "Unexpected byte[] length '%d' not divisible by 4. Contents: %s",
+          bytes.length, Arrays.toString(bytes))));
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append('[');
+    for (int i = 0; i < bytes.length; i += 4) {
+      sb.append(String.format("0x%08x", readInt(bytes, i)));
+      if (i != bytes.length - 4) {
+        sb.append(", ");
+      }
+    }
+    sb.append(']');
+    return sb.toString();
+  }
+
+  /**
+   * Galois field multiplication of {@code p} and {@code q} with the
+   * generator polynomial {@code m} as the modulus.
+   *
+   * @param m The little-endian polynomial to use as the modulus when
+   *     multiplying p and q, with implicit "1" bit beyond the bottom bit.
+   */
+  private static int galoisFieldMultiply(int p, int q, int m) {
+    int summation = 0;
+
+    // Top bit is the x^0 place; each right-shift increments the degree of the
+    // current term.
+    int curTerm = MULTIPLICATIVE_IDENTITY;
+
+    // Iteratively multiply p by x mod m as we go to represent the q[i] term
+    // (of degree x^i) times p.
+    int px = p;
+
+    while (curTerm != 0) {
+      if ((q & curTerm) != 0) {
+        summation ^= px;
+      }
+
+      // Bottom bit represents highest degree since we're little-endian; before
+      // we multiply by "x" for the next term, check bottom bit to know whether
+      // the resulting px will thus have a term matching the implicit "1" term
+      // of "m" and thus will need to subtract "m" after mutiplying by "x".
+      boolean hasMaxDegree = ((px & 1) != 0);
+      px >>>= 1;
+      if (hasMaxDegree) {
+        px ^= m;
+      }
+      curTerm >>>= 1;
+    }
+    return summation;
+  }
+}

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java

@@ -104,6 +104,24 @@ public class DataChecksum implements Checksum {
     }
   }
 
+  /**
+   * @return the int representation of the polynomial associated with the
+   *     CRC {@code type}, suitable for use with further CRC arithmetic.
+   * @throws IOException if there is no CRC polynomial applicable
+   *     to the given {@code type}.
+   */
+  public static int getCrcPolynomialForType(Type type) throws IOException {
+    switch (type) {
+    case CRC32:
+      return CrcUtil.GZIP_POLYNOMIAL;
+    case CRC32C:
+      return CrcUtil.CASTAGNOLI_POLYNOMIAL;
+    default:
+      throw new IOException(
+          "No CRC polynomial could be associated with type: " + type);
+    }
+  }
+
   public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) {
     if ( bytesPerChecksum <= 0 ) {
       return null;

+ 242 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java

@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unittests for CrcComposer.
+ */
+public class TestCrcComposer {
+  @Rule
+  public Timeout globalTimeout = new Timeout(10000);
+
+  private Random rand = new Random(1234);
+
+  private DataChecksum.Type type = DataChecksum.Type.CRC32C;
+  private DataChecksum checksum = DataChecksum.newDataChecksum(
+      type, Integer.MAX_VALUE);
+  private int dataSize = 75;
+  private byte[] data = new byte[dataSize];
+  private int chunkSize = 10;
+  private int cellSize = 20;
+
+  private int fullCrc;
+  private int[] crcsByChunk;
+  private int[] crcsByCell;
+
+  private byte[] crcBytesByChunk;
+  private byte[] crcBytesByCell;
+
+  @Before
+  public void setup() throws IOException {
+    rand.nextBytes(data);
+    fullCrc = getRangeChecksum(data, 0, dataSize);
+
+    // 7 chunks of size chunkSize, 1 chunk of size (dataSize % chunkSize).
+    crcsByChunk = new int[8];
+    for (int i = 0; i < 7; ++i) {
+      crcsByChunk[i] = getRangeChecksum(data, i * chunkSize, chunkSize);
+    }
+    crcsByChunk[7] = getRangeChecksum(
+        data, (crcsByChunk.length - 1) * chunkSize, dataSize % chunkSize);
+
+    // 3 cells of size cellSize, 1 cell of size (dataSize % cellSize).
+    crcsByCell = new int[4];
+    for (int i = 0; i < 3; ++i) {
+      crcsByCell[i] = getRangeChecksum(data, i * cellSize, cellSize);
+    }
+    crcsByCell[3] = getRangeChecksum(
+        data, (crcsByCell.length - 1) * cellSize, dataSize % cellSize);
+
+    crcBytesByChunk = intArrayToByteArray(crcsByChunk);
+    crcBytesByCell = intArrayToByteArray(crcsByCell);
+  }
+
+  private int getRangeChecksum(byte[] buf, int offset, int length) {
+    checksum.reset();
+    checksum.update(buf, offset, length);
+    return (int) checksum.getValue();
+  }
+
+  private byte[] intArrayToByteArray(int[] values) throws IOException {
+    byte[] bytes = new byte[values.length * 4];
+    for (int i = 0; i < values.length; ++i) {
+      CrcUtil.writeInt(bytes, i * 4, values[i]);
+    }
+    return bytes;
+  }
+
+  @Test
+  public void testUnstripedIncorrectChunkSize() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+
+    // If we incorrectly specify that all CRCs ingested correspond to chunkSize
+    // when the last CRC in the array actually corresponds to
+    // dataSize % chunkSize then we expect the resulting CRC to not be equal to
+    // the fullCrc.
+    digester.update(crcBytesByChunk, 0, crcBytesByChunk.length, chunkSize);
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertNotEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUnstripedByteArray() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+    digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize);
+    digester.update(
+        crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUnstripedDataInputStream() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(crcBytesByChunk));
+    digester.update(input, crcsByChunk.length - 1, chunkSize);
+    digester.update(input, 1, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUnstripedSingleCrcs() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+    for (int i = 0; i < crcsByChunk.length - 1; ++i) {
+      digester.update(crcsByChunk[i], chunkSize);
+    }
+    digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testStripedByteArray() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+    digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize);
+    digester.update(
+        crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertArrayEquals(crcBytesByCell, digest);
+  }
+
+  @Test
+  public void testStripedDataInputStream() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(crcBytesByChunk));
+    digester.update(input, crcsByChunk.length - 1, chunkSize);
+    digester.update(input, 1, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertArrayEquals(crcBytesByCell, digest);
+  }
+
+  @Test
+  public void testStripedSingleCrcs() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+    for (int i = 0; i < crcsByChunk.length - 1; ++i) {
+      digester.update(crcsByChunk[i], chunkSize);
+    }
+    digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertArrayEquals(crcBytesByCell, digest);
+  }
+
+  @Test
+  public void testMultiStageMixed() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+
+    // First combine chunks into cells.
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(crcBytesByChunk));
+    digester.update(input, crcsByChunk.length - 1, chunkSize);
+    digester.update(input, 1, dataSize % chunkSize);
+    byte[] digest = digester.digest();
+
+    // Second, individually combine cells into full crc.
+    digester =
+        CrcComposer.newCrcComposer(type, cellSize);
+    for (int i = 0; i < digest.length - 4; i += 4) {
+      int cellCrc = CrcUtil.readInt(digest, i);
+      digester.update(cellCrc, cellSize);
+    }
+    digester.update(digest, digest.length - 4, 4, dataSize % cellSize);
+    digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUpdateMismatchesStripe() throws Exception {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+
+    digester.update(crcsByChunk[0], chunkSize);
+
+    // Going from chunkSize to chunkSize + cellSize will cross a cellSize
+    // boundary in a single CRC, which is not allowed, since we'd lack a
+    // CRC corresponding to the actual cellSize boundary.
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "stripe",
+        () -> digester.update(crcsByChunk[1], cellSize));
+  }
+
+  @Test
+  public void testUpdateByteArrayLengthUnalignedWithCrcSize()
+      throws Exception {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "length",
+        () -> digester.update(crcBytesByChunk, 0, 6, chunkSize));
+  }
+}

+ 232 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java

@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unittests for CrcUtil.
+ */
+public class TestCrcUtil {
+  @Rule
+  public Timeout globalTimeout = new Timeout(10000);
+
+  private Random rand = new Random(1234);
+
+  @Test
+  public void testComposeCrc32() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, false);
+  }
+
+  @Test
+  public void testComposeCrc32c() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, false);
+  }
+
+  @Test
+  public void testComposeCrc32WithMonomial() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, true);
+  }
+
+  @Test
+  public void testComposeCrc32cWithMonomial() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, true);
+  }
+
+  @Test
+  public void testComposeCrc32ZeroLength() throws IOException {
+    doTestComposeCrcZerolength(DataChecksum.Type.CRC32);
+  }
+
+  @Test
+  public void testComposeCrc32CZeroLength() throws IOException {
+    doTestComposeCrcZerolength(DataChecksum.Type.CRC32C);
+  }
+
+  /**
+   * Helper method to compare a DataChecksum-computed end-to-end CRC against
+   * a piecewise-computed CRC that uses CrcUtil.compose on "chunk CRCs"
+   * corresponding to ever {@code chunkSize} bytes.
+   */
+  private static void doTestComposeCrc(
+      byte[] data, DataChecksum.Type type, int chunkSize, boolean useMonomial)
+      throws IOException {
+    int crcPolynomial = DataChecksum.getCrcPolynomialForType(type);
+
+    // Get full end-to-end CRC in a single shot first.
+    DataChecksum checksum = DataChecksum.newDataChecksum(
+        type, Integer.MAX_VALUE);
+    checksum.update(data, 0, data.length);
+    int fullCrc = (int) checksum.getValue();
+
+    // Now compute CRCs of each chunk individually first, and compose them in a
+    // second pass to compare to the end-to-end CRC.
+    int compositeCrc = 0;
+    int crcMonomial =
+        useMonomial ? CrcUtil.getMonomial(chunkSize, crcPolynomial) : 0;
+    for (int offset = 0;
+        offset + chunkSize <= data.length;
+        offset += chunkSize) {
+      checksum.reset();
+      checksum.update(data, offset, chunkSize);
+      int partialCrc = (int) checksum.getValue();
+      if (useMonomial) {
+        compositeCrc = CrcUtil.composeWithMonomial(
+            compositeCrc, partialCrc, crcMonomial, crcPolynomial);
+      } else {
+        compositeCrc = CrcUtil.compose(
+            compositeCrc, partialCrc, chunkSize, crcPolynomial);
+      }
+    }
+
+    // There may be a final partial chunk smaller than chunkSize.
+    int partialChunkSize = data.length % chunkSize;
+    if (partialChunkSize > 0) {
+      checksum.reset();
+      checksum.update(data, data.length - partialChunkSize, partialChunkSize);
+      int partialCrc = (int) checksum.getValue();
+      compositeCrc = CrcUtil.compose(
+          compositeCrc, partialCrc, partialChunkSize, crcPolynomial);
+    }
+    assertEquals(
+        String.format(
+            "Using CRC type '%s' with crcPolynomial '0x%08x' and chunkSize '%d'"
+            + ", expected '0x%08x', got '0x%08x'",
+            type, crcPolynomial, chunkSize, fullCrc, compositeCrc),
+        fullCrc,
+        compositeCrc);
+  }
+
+  /**
+   * Helper method for testing the behavior of composing a CRC with a
+   * zero-length second CRC.
+   */
+  private static void doTestComposeCrcZerolength(DataChecksum.Type type)
+      throws IOException {
+    // Without loss of generality, we can pick any integer as our fake crcA
+    // even if we don't happen to know the preimage.
+    int crcA = 0xCAFEBEEF;
+    int crcPolynomial = DataChecksum.getCrcPolynomialForType(type);
+    DataChecksum checksum = DataChecksum.newDataChecksum(
+        type, Integer.MAX_VALUE);
+    int crcB = (int) checksum.getValue();
+    assertEquals(crcA, CrcUtil.compose(crcA, crcB, 0, crcPolynomial));
+
+    int monomial = CrcUtil.getMonomial(0, crcPolynomial);
+    assertEquals(
+        crcA, CrcUtil.composeWithMonomial(crcA, crcB, monomial, crcPolynomial));
+  }
+
+  @Test
+  public void testIntSerialization() throws IOException {
+    byte[] bytes = CrcUtil.intToBytes(0xCAFEBEEF);
+    assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0));
+
+    bytes = new byte[8];
+    CrcUtil.writeInt(bytes, 0, 0xCAFEBEEF);
+    assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0));
+    CrcUtil.writeInt(bytes, 4, 0xABCDABCD);
+    assertEquals(0xABCDABCD, CrcUtil.readInt(bytes, 4));
+
+    // Assert big-endian format for general Java consistency.
+    assertEquals(0xBEEFABCD, CrcUtil.readInt(bytes, 2));
+  }
+
+  @Test
+  public void testToSingleCrcStringBadLength()
+      throws Exception {
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "length",
+        () -> CrcUtil.toSingleCrcString(new byte[8]));
+  }
+
+  @Test
+  public void testToSingleCrcString() throws IOException {
+    byte[] buf = CrcUtil.intToBytes(0xcafebeef);
+    assertEquals(
+        "0xcafebeef", CrcUtil.toSingleCrcString(buf));
+  }
+
+  @Test
+  public void testToMultiCrcStringBadLength()
+      throws Exception {
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "length",
+        () -> CrcUtil.toMultiCrcString(new byte[6]));
+  }
+
+  @Test
+  public void testToMultiCrcStringMultipleElements()
+      throws IOException {
+    byte[] buf = new byte[12];
+    CrcUtil.writeInt(buf, 0, 0xcafebeef);
+    CrcUtil.writeInt(buf, 4, 0xababcccc);
+    CrcUtil.writeInt(buf, 8, 0xddddefef);
+    assertEquals(
+        "[0xcafebeef, 0xababcccc, 0xddddefef]",
+        CrcUtil.toMultiCrcString(buf));
+  }
+
+  @Test
+  public void testToMultiCrcStringSingleElement()
+      throws IOException {
+    byte[] buf = new byte[4];
+    CrcUtil.writeInt(buf, 0, 0xcafebeef);
+    assertEquals(
+        "[0xcafebeef]",
+        CrcUtil.toMultiCrcString(buf));
+  }
+
+  @Test
+  public void testToMultiCrcStringNoElements()
+      throws IOException {
+    assertEquals(
+        "[]",
+        CrcUtil.toMultiCrcString(new byte[0]));
+  }
+}

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -130,9 +130,9 @@ public class Hdfs extends AbstractFileSystem {
   }
 
   @Override
-  public FileChecksum getFileChecksum(Path f) 
+  public FileChecksum getFileChecksum(Path f)
       throws IOException, UnresolvedLinkException {
-    return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
+    return dfs.getFileChecksumWithCombineMode(getUriPath(f), Long.MAX_VALUE);
   }
 
   @Override

+ 42 - 14
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -76,6 +77,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumCombineMode;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
@@ -1753,18 +1755,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return encryptionKey;
   }
 
-  /**
-   * Get the checksum of the whole file or a range of the file. Note that the
-   * range always starts from the beginning of the file. The file can be
-   * in replicated form, or striped mode. It can be used to checksum and compare
-   * two replicated files, or two striped files, but not applicable for two
-   * files of different block layout forms.
-   * @param src The file path
-   * @param length the length of the range, i.e., the range is [0, length]
-   * @return The checksum
-   * @see DistributedFileSystem#getFileChecksum(Path)
-   */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+  private FileChecksum getFileChecksumInternal(
+      String src, long length, ChecksumCombineMode combineMode)
       throws IOException {
     checkOpen();
     Preconditions.checkArgument(length >= 0);
@@ -1779,15 +1771,51 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
     maker = ecPolicy != null ?
         new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
-            length, blockLocations, namenode, this, ecPolicy) :
+            length, blockLocations, namenode, this, ecPolicy, combineMode) :
         new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
-            blockLocations, namenode, this);
+            blockLocations, namenode, this, combineMode);
 
     maker.compute();
 
     return maker.getFileChecksum();
   }
 
+  /**
+   * Get the checksum of the whole file or a range of the file. Note that the
+   * range always starts from the beginning of the file. The file can be
+   * in replicated form, or striped mode. Depending on the
+   * dfs.checksum.combine.mode, checksums may or may not be comparable between
+   * different block layout forms.
+   *
+   * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
+   * @return The checksum
+   * @see DistributedFileSystem#getFileChecksum(Path)
+   */
+  public FileChecksum getFileChecksumWithCombineMode(String src, long length)
+      throws IOException {
+    ChecksumCombineMode combineMode = getConf().getChecksumCombineMode();
+    return getFileChecksumInternal(src, length, combineMode);
+  }
+
+  /**
+   * Get the checksum of the whole file or a range of the file. Note that the
+   * range always starts from the beginning of the file. The file can be
+   * in replicated form, or striped mode. It can be used to checksum and compare
+   * two replicated files, or two striped files, but not applicable for two
+   * files of different block layout forms.
+   *
+   * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
+   * @return The checksum
+   * @see DistributedFileSystem#getFileChecksum(Path)
+   */
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+      throws IOException {
+    return (MD5MD5CRC32FileChecksum) getFileChecksumInternal(
+        src, length, ChecksumCombineMode.MD5MD5CRC);
+  }
+
   protected LocatedBlocks getBlockLocations(String src,
                                             long length) throws IOException {
     //get block locations for the file range

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -1681,7 +1681,8 @@ public class DistributedFileSystem extends FileSystem
     return new FileSystemLinkResolver<FileChecksum>() {
       @Override
       public FileChecksum doCall(final Path p) throws IOException {
-        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
+        return dfs.getFileChecksumWithCombineMode(
+            getPathName(p), Long.MAX_VALUE);
       }
 
       @Override
@@ -1701,7 +1702,7 @@ public class DistributedFileSystem extends FileSystem
     return new FileSystemLinkResolver<FileChecksum>() {
       @Override
       public FileChecksum doCall(final Path p) throws IOException {
-        return dfs.getFileChecksum(getPathName(p), length);
+        return dfs.getFileChecksumWithCombineMode(getPathName(p), length);
       }
 
       @Override

+ 248 - 117
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java

@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.hadoop.fs.CompositeCrcFileChecksum;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.fs.Options.ChecksumCombineMode;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -41,6 +46,8 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,9 +74,11 @@ final class FileChecksumHelper {
     private final long length;
     private final DFSClient client;
     private final ClientProtocol namenode;
-    private final DataOutputBuffer md5out = new DataOutputBuffer();
+    private final ChecksumCombineMode combineMode;
+    private final BlockChecksumType blockChecksumType;
+    private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer();
 
-    private MD5MD5CRC32FileChecksum fileChecksum;
+    private FileChecksum fileChecksum;
     private LocatedBlocks blockLocations;
 
     private int timeout;
@@ -88,12 +97,24 @@ final class FileChecksumHelper {
     FileChecksumComputer(String src, long length,
                          LocatedBlocks blockLocations,
                          ClientProtocol namenode,
-                         DFSClient client) throws IOException {
+                         DFSClient client,
+                         ChecksumCombineMode combineMode) throws IOException {
       this.src = src;
       this.length = length;
       this.blockLocations = blockLocations;
       this.namenode = namenode;
       this.client = client;
+      this.combineMode = combineMode;
+      switch (combineMode) {
+      case MD5MD5CRC:
+        this.blockChecksumType = BlockChecksumType.MD5CRC;
+        break;
+      case COMPOSITE_CRC:
+        this.blockChecksumType = BlockChecksumType.COMPOSITE_CRC;
+        break;
+      default:
+        throw new IOException("Unknown ChecksumCombineMode: " + combineMode);
+      }
 
       this.remaining = length;
 
@@ -121,11 +142,19 @@ final class FileChecksumHelper {
       return namenode;
     }
 
-    DataOutputBuffer getMd5out() {
-      return md5out;
+    ChecksumCombineMode getCombineMode() {
+      return combineMode;
+    }
+
+    BlockChecksumType getBlockChecksumType() {
+      return blockChecksumType;
+    }
+
+    DataOutputBuffer getBlockChecksumBuf() {
+      return blockChecksumBuf;
     }
 
-    MD5MD5CRC32FileChecksum getFileChecksum() {
+    FileChecksum getFileChecksum() {
       return fileChecksum;
     }
 
@@ -226,17 +255,31 @@ final class FileChecksumHelper {
     }
 
     /**
-     * Compute and aggregate block checksums block by block.
+     * Compute block checksums block by block and append the raw bytes of the
+     * block checksums into getBlockChecksumBuf().
+     *
      * @throws IOException
      */
     abstract void checksumBlocks() throws IOException;
 
     /**
-     * Make final file checksum result given the computing process done.
+     * Make final file checksum result given the per-block or per-block-group
+     * checksums collected into getBlockChecksumBuf().
      */
-    MD5MD5CRC32FileChecksum makeFinalResult() {
+    FileChecksum makeFinalResult() throws IOException {
+      switch (combineMode) {
+      case MD5MD5CRC:
+        return makeMd5CrcResult();
+      case COMPOSITE_CRC:
+        return makeCompositeCrcResult();
+      default:
+        throw new IOException("Unknown ChecksumCombineMode: " + combineMode);
+      }
+    }
+
+    FileChecksum makeMd5CrcResult() {
       //compute file MD5
-      final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
+      final MD5Hash fileMD5 = MD5Hash.digest(blockChecksumBuf.getData());
       switch (crcType) {
       case CRC32:
         return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
@@ -250,6 +293,58 @@ final class FileChecksumHelper {
       }
     }
 
+    FileChecksum makeCompositeCrcResult() throws IOException {
+      long blockSizeHint = 0;
+      if (locatedBlocks.size() > 0) {
+        blockSizeHint = locatedBlocks.get(0).getBlockSize();
+      }
+      CrcComposer crcComposer =
+          CrcComposer.newCrcComposer(getCrcType(), blockSizeHint);
+      byte[] blockChecksumBytes = blockChecksumBuf.getData();
+
+      long sumBlockLengths = 0;
+      for (int i = 0; i < locatedBlocks.size() - 1; ++i) {
+        LocatedBlock block = locatedBlocks.get(i);
+        // For everything except the last LocatedBlock, we expect getBlockSize()
+        // to accurately reflect the number of file bytes digested in the block
+        // checksum.
+        sumBlockLengths += block.getBlockSize();
+        int blockCrc = CrcUtil.readInt(blockChecksumBytes, i * 4);
+
+        crcComposer.update(blockCrc, block.getBlockSize());
+        LOG.debug(
+            "Added blockCrc 0x{} for block index {} of size {}",
+            Integer.toString(blockCrc, 16), i, block.getBlockSize());
+      }
+
+      // NB: In some cases the located blocks have their block size adjusted
+      // explicitly based on the requested length, but not all cases;
+      // these numbers may or may not reflect actual sizes on disk.
+      long reportedLastBlockSize =
+          blockLocations.getLastLocatedBlock().getBlockSize();
+      long consumedLastBlockLength = reportedLastBlockSize;
+      if (length - sumBlockLengths < reportedLastBlockSize) {
+        LOG.warn(
+            "Last block length {} is less than reportedLastBlockSize {}",
+            length - sumBlockLengths, reportedLastBlockSize);
+        consumedLastBlockLength = length - sumBlockLengths;
+      }
+      // NB: blockChecksumBytes.length may be much longer than actual bytes
+      // written into the DataOutput.
+      int lastBlockCrc = CrcUtil.readInt(
+          blockChecksumBytes, 4 * (locatedBlocks.size() - 1));
+      crcComposer.update(lastBlockCrc, consumedLastBlockLength);
+      LOG.debug(
+          "Added lastBlockCrc 0x{} for block index {} of size {}",
+          Integer.toString(lastBlockCrc, 16),
+          locatedBlocks.size() - 1,
+          consumedLastBlockLength);
+
+      int compositeCrc = CrcUtil.readInt(crcComposer.digest(), 0);
+      return new CompositeCrcFileChecksum(
+          compositeCrc, getCrcType(), bytesPerCRC);
+    }
+
     /**
      * Create and return a sender given an IO stream pair.
      */
@@ -267,6 +362,117 @@ final class FileChecksumHelper {
         IOUtils.closeStream(pair.out);
       }
     }
+
+    /**
+     * Parses out various checksum properties like bytesPerCrc, crcPerBlock,
+     * and crcType from {@code checksumData} and either stores them as the
+     * authoritative value or compares them to a previously extracted value
+     * to check comppatibility.
+     *
+     * @param checksumData response from the datanode
+     * @param locatedBlock the block corresponding to the response
+     * @param datanode the datanode which produced the response
+     * @param blockIdx the block or block-group index of the response
+     */
+    void extractChecksumProperties(
+        OpBlockChecksumResponseProto checksumData,
+        LocatedBlock locatedBlock,
+        DatanodeInfo datanode,
+        int blockIdx)
+        throws IOException {
+      //read byte-per-checksum
+      final int bpc = checksumData.getBytesPerCrc();
+      if (blockIdx == 0) { //first block
+        setBytesPerCRC(bpc);
+      } else if (bpc != getBytesPerCRC()) {
+        if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) {
+          LOG.warn(
+              "Current bytesPerCRC={} doesn't match next bpc={}, but "
+              + "continuing anyway because we're using COMPOSITE_CRC. "
+              + "If trying to preserve CHECKSUMTYPE, only the current "
+              + "bytesPerCRC will be preserved.", getBytesPerCRC(), bpc);
+        } else {
+          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+              + " but bytesPerCRC=" + getBytesPerCRC());
+        }
+      }
+
+      //read crc-per-block
+      final long cpb = checksumData.getCrcPerBlock();
+      if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
+        setCrcPerBlock(cpb);
+      }
+
+      // read crc-type
+      final DataChecksum.Type ct;
+      if (checksumData.hasCrcType()) {
+        ct = PBHelperClient.convert(checksumData.getCrcType());
+      } else {
+        LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+            "inferring checksum by reading first byte");
+        ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
+      }
+
+      if (blockIdx == 0) {
+        setCrcType(ct);
+      } else if (getCrcType() != DataChecksum.Type.MIXED &&
+          getCrcType() != ct) {
+        if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) {
+          throw new IOException(
+              "DataChecksum.Type.MIXED is not supported for COMPOSITE_CRC");
+        } else {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+      }
+
+      if (blockIdx == 0) {
+        LOG.debug("set bytesPerCRC={}, crcPerBlock={}",
+            getBytesPerCRC(), getCrcPerBlock());
+      }
+    }
+
+    /**
+     * Parses out the raw blockChecksum bytes from {@code checksumData}
+     * according to the blockChecksumType and populates the cumulative
+     * blockChecksumBuf with it.
+     *
+     * @return a debug-string representation of the parsed checksum if
+     *     debug is enabled, otherwise null.
+     */
+    String populateBlockChecksumBuf(OpBlockChecksumResponseProto checksumData)
+        throws IOException {
+      String blockChecksumForDebug = null;
+      switch (getBlockChecksumType()) {
+      case MD5CRC:
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getBlockChecksum().toByteArray());
+        md5.write(getBlockChecksumBuf());
+        if (LOG.isDebugEnabled()) {
+          blockChecksumForDebug = md5.toString();
+        }
+        break;
+      case COMPOSITE_CRC:
+        BlockChecksumType returnedType = PBHelperClient.convert(
+            checksumData.getBlockChecksumOptions().getBlockChecksumType());
+        if (returnedType != BlockChecksumType.COMPOSITE_CRC) {
+          throw new IOException(String.format(
+              "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC",
+              returnedType));
+        }
+        byte[] crcBytes = checksumData.getBlockChecksum().toByteArray();
+        if (LOG.isDebugEnabled()) {
+          blockChecksumForDebug = CrcUtil.toSingleCrcString(crcBytes);
+        }
+        getBlockChecksumBuf().write(crcBytes);
+        break;
+      default:
+        throw new IOException(
+            "Unknown BlockChecksumType: " + getBlockChecksumType());
+      }
+      return blockChecksumForDebug;
+    }
   }
 
   /**
@@ -278,8 +484,10 @@ final class FileChecksumHelper {
     ReplicatedFileChecksumComputer(String src, long length,
                                    LocatedBlocks blockLocations,
                                    ClientProtocol namenode,
-                                   DFSClient client) throws IOException {
-      super(src, length, blockLocations, namenode, client);
+                                   DFSClient client,
+                                   ChecksumCombineMode combineMode)
+        throws IOException {
+      super(src, length, blockLocations, namenode, client, combineMode);
     }
 
     @Override
@@ -295,7 +503,8 @@ final class FileChecksumHelper {
         LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
 
         if (!checksumBlock(locatedBlock)) {
-          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+          throw new PathIOException(
+              getSrc(), "Fail to get block MD5 for " + locatedBlock);
         }
       }
     }
@@ -368,9 +577,11 @@ final class FileChecksumHelper {
         LOG.debug("write to {}: {}, block={}", datanode,
             Op.BLOCK_CHECKSUM, block);
 
-        // get block MD5
-        createSender(pair).blockChecksum(block,
-            locatedBlock.getBlockToken());
+        // get block checksum
+        createSender(pair).blockChecksum(
+            block,
+            locatedBlock.getBlockToken(),
+            new BlockChecksumOptions(getBlockChecksumType()));
 
         final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
             PBHelperClient.vintPrefixed(pair.in));
@@ -381,51 +592,11 @@ final class FileChecksumHelper {
 
         OpBlockChecksumResponseProto checksumData =
             reply.getChecksumResponse();
-
-        //read byte-per-checksum
-        final int bpc = checksumData.getBytesPerCrc();
-        if (blockIdx == 0) { //first block
-          setBytesPerCRC(bpc);
-        } else if (bpc != getBytesPerCRC()) {
-          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
-              + " but bytesPerCRC=" + getBytesPerCRC());
-        }
-
-        //read crc-per-block
-        final long cpb = checksumData.getCrcPerBlock();
-        if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
-          setCrcPerBlock(cpb);
-        }
-
-        //read md5
-        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
-        md5.write(getMd5out());
-
-        // read crc-type
-        final DataChecksum.Type ct;
-        if (checksumData.hasCrcType()) {
-          ct = PBHelperClient.convert(checksumData.getCrcType());
-        } else {
-          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
-              "inferring checksum by reading first byte");
-          ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
-        }
-
-        if (blockIdx == 0) { // first block
-          setCrcType(ct);
-        } else if (getCrcType() != DataChecksum.Type.MIXED
-            && getCrcType() != ct) {
-          // if crc types are mixed in a file
-          setCrcType(DataChecksum.Type.MIXED);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          if (blockIdx == 0) {
-            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
-                + ", crcPerBlock=" + getCrcPerBlock());
-          }
-          LOG.debug("got reply from " + datanode + ": md5=" + md5);
-        }
+        extractChecksumProperties(
+            checksumData, locatedBlock, datanode, blockIdx);
+        String blockChecksumForDebug = populateBlockChecksumBuf(checksumData);
+        LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}",
+            datanode, blockChecksumForDebug, getBlockChecksumType());
       }
     }
   }
@@ -442,9 +613,10 @@ final class FileChecksumHelper {
                                           LocatedBlocks blockLocations,
                                           ClientProtocol namenode,
                                           DFSClient client,
-                                          ErasureCodingPolicy ecPolicy)
+                                          ErasureCodingPolicy ecPolicy,
+                                          ChecksumCombineMode combineMode)
         throws IOException {
-      super(src, length, blockLocations, namenode, client);
+      super(src, length, blockLocations, namenode, client, combineMode);
 
       this.ecPolicy = ecPolicy;
     }
@@ -464,7 +636,8 @@ final class FileChecksumHelper {
         LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
 
         if (!checksumBlockGroup(blockGroup)) {
-          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+          throw new PathIOException(
+              getSrc(), "Fail to get block checksum for " + locatedBlock);
         }
       }
     }
@@ -519,16 +692,18 @@ final class FileChecksumHelper {
                              StripedBlockInfo stripedBlockInfo,
                              DatanodeInfo datanode,
                              long requestedNumBytes) throws IOException {
-
       try (IOStreamPair pair = getClient().connectToDN(datanode,
           getTimeout(), blockGroup.getBlockToken())) {
 
         LOG.debug("write to {}: {}, blockGroup={}",
             datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
 
-        // get block MD5
-        createSender(pair).blockGroupChecksum(stripedBlockInfo,
-            blockGroup.getBlockToken(), requestedNumBytes);
+        // get block group checksum
+        createSender(pair).blockGroupChecksum(
+            stripedBlockInfo,
+            blockGroup.getBlockToken(),
+            requestedNumBytes,
+            new BlockChecksumOptions(getBlockChecksumType()));
 
         BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
             PBHelperClient.vintPrefixed(pair.in));
@@ -538,54 +713,10 @@ final class FileChecksumHelper {
         DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
 
         OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse();
-
-        //read byte-per-checksum
-        final int bpc = checksumData.getBytesPerCrc();
-        if (bgIdx == 0) { //first block
-          setBytesPerCRC(bpc);
-        } else {
-          if (bpc != getBytesPerCRC()) {
-            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
-                + " but bytesPerCRC=" + getBytesPerCRC());
-          }
-        }
-
-        //read crc-per-block
-        final long cpb = checksumData.getCrcPerBlock();
-        if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
-          setCrcPerBlock(cpb);
-        }
-
-        //read md5
-        final MD5Hash md5 = new MD5Hash(
-            checksumData.getMd5().toByteArray());
-        md5.write(getMd5out());
-
-        // read crc-type
-        final DataChecksum.Type ct;
-        if (checksumData.hasCrcType()) {
-          ct = PBHelperClient.convert(checksumData.getCrcType());
-        } else {
-          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
-              "inferring checksum by reading first byte");
-          ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
-        }
-
-        if (bgIdx == 0) {
-          setCrcType(ct);
-        } else if (getCrcType() != DataChecksum.Type.MIXED &&
-            getCrcType() != ct) {
-          // if crc types are mixed in a file
-          setCrcType(DataChecksum.Type.MIXED);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          if (bgIdx == 0) {
-            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
-                + ", crcPerBlock=" + getCrcPerBlock());
-          }
-          LOG.debug("got reply from " + datanode + ": md5=" + md5);
-        }
+        extractChecksumProperties(checksumData, blockGroup, datanode, bgIdx);
+        String blockChecksumForDebug = populateBlockChecksumBuf(checksumData);
+        LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}",
+            datanode, blockChecksumForDebug, getBlockChecksumType());
       }
     }
   }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

@@ -120,6 +120,8 @@ public interface HdfsClientConfigKeys {
   String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  String  DFS_CHECKSUM_COMBINE_MODE_KEY = "dfs.checksum.combine.mode";
+  String  DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC";
   String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
       "dfs.datanode.socket.write.timeout";
   String  DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Options.ChecksumCombineMode;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
@@ -38,6 +39,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
@@ -106,6 +109,7 @@ public class DfsClientConf {
   private final int datanodeSocketWriteTimeout;
   private final int ioBufferSize;
   private final ChecksumOpt defaultChecksumOpt;
+  private final ChecksumCombineMode checksumCombineMode;
   private final int writePacketSize;
   private final int writeMaxPackets;
   private final ByteArrayManager.Conf writeByteArrayManagerConf;
@@ -177,6 +181,7 @@ public class DfsClientConf {
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
     defaultChecksumOpt = getChecksumOptFromConf(conf);
+    checksumCombineMode = getChecksumCombineModeFromConf(conf);
     dataTransferTcpNoDelay = conf.getBoolean(
         DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
         DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
@@ -300,6 +305,21 @@ public class DfsClientConf {
     }
   }
 
+  private static ChecksumCombineMode getChecksumCombineModeFromConf(
+      Configuration conf) {
+    final String mode = conf.get(
+        DFS_CHECKSUM_COMBINE_MODE_KEY,
+        DFS_CHECKSUM_COMBINE_MODE_DEFAULT);
+    try {
+      return ChecksumCombineMode.valueOf(mode);
+    } catch(IllegalArgumentException iae) {
+      LOG.warn("Bad checksum combine mode: {}. Using default {}", mode,
+               DFS_CHECKSUM_COMBINE_MODE_DEFAULT);
+      return ChecksumCombineMode.valueOf(
+          DFS_CHECKSUM_COMBINE_MODE_DEFAULT);
+    }
+  }
+
   // Construct a checksum option from conf
   public static ChecksumOpt getChecksumOptFromConf(Configuration conf) {
     DataChecksum.Type type = getChecksumType(conf);
@@ -392,6 +412,13 @@ public class DfsClientConf {
     return defaultChecksumOpt;
   }
 
+  /**
+   * @return the checksumCombineMode
+   */
+  public ChecksumCombineMode getChecksumCombineMode() {
+    return checksumCombineMode;
+  }
+
   /**
    * @return the writePacketSize
    */

+ 54 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Encapsulates various options related to how fine-grained data checksums are
+ * combined into block-level checksums.
+ */
+@InterfaceAudience.Private
+public class BlockChecksumOptions {
+  private final BlockChecksumType blockChecksumType;
+  private final long stripeLength;
+
+  public BlockChecksumOptions(
+      BlockChecksumType blockChecksumType, long stripeLength) {
+    this.blockChecksumType = blockChecksumType;
+    this.stripeLength = stripeLength;
+  }
+
+  public BlockChecksumOptions(BlockChecksumType blockChecksumType) {
+    this(blockChecksumType, 0);
+  }
+
+  public BlockChecksumType getBlockChecksumType() {
+    return blockChecksumType;
+  }
+
+  public long getStripeLength() {
+    return stripeLength;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("blockChecksumType=%s, stripedLength=%d",
+        blockChecksumType, stripeLength);
+  }
+}

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Algorithms/types denoting how block-level checksums are computed using
+ * lower-level chunk checksums/CRCs.
+ */
+@InterfaceAudience.Private
+public enum BlockChecksumType {
+  MD5CRC,  // BlockChecksum obtained by taking the MD5 digest of chunk CRCs
+  COMPOSITE_CRC  // Chunk-independent CRC, optionally striped
+}

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
@@ -214,11 +215,13 @@ public interface DataTransferProtocol {
    *
    * @param blk a block.
    * @param blockToken security token for accessing the block.
+   * @param blockChecksumOptions determines how the block-level checksum is
+   *     computed from underlying block metadata.
    * @throws IOException
    */
   void blockChecksum(ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken) throws IOException;
-
+      Token<BlockTokenIdentifier> blockToken,
+      BlockChecksumOptions blockChecksumOptions) throws IOException;
 
   /**
    * Get striped block group checksum (MD5 of CRC32).
@@ -227,9 +230,12 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @param requestedNumBytes requested number of bytes in the block group
    *                          to compute the checksum.
+   * @param blockChecksumOptions determines how the block-level checksum is
+   *     computed from underlying block metadata.
    * @throws IOException
    */
   void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
           Token<BlockTokenIdentifier> blockToken,
-          long requestedNumBytes) throws IOException;
+          long requestedNumBytes,
+          BlockChecksumOptions blockChecksumOptions) throws IOException;
 }

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

@@ -27,6 +27,7 @@ import java.util.Arrays;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
@@ -267,9 +268,11 @@ public class Sender implements DataTransferProtocol {
 
   @Override
   public void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      final Token<BlockTokenIdentifier> blockToken,
+      BlockChecksumOptions blockChecksumOptions) throws IOException {
     OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+        .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions))
         .build();
 
     send(out, Op.BLOCK_CHECKSUM, proto);
@@ -277,8 +280,9 @@ public class Sender implements DataTransferProtocol {
 
   @Override
   public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
-      Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
-          throws IOException {
+      Token<BlockTokenIdentifier> blockToken,
+      long requestedNumBytes,
+      BlockChecksumOptions blockChecksumOptions) throws IOException {
     OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(
             stripedBlockInfo.getBlock(), blockToken))
@@ -291,6 +295,7 @@ public class Sender implements DataTransferProtocol {
         .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
             stripedBlockInfo.getErasureCodingPolicy()))
         .setRequestedNumBytes(requestedNumBytes)
+        .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions))
         .build();
 
     send(out, Op.BLOCK_GROUP_CHECKSUM, proto);

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -61,6 +61,8 @@ import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -247,6 +249,48 @@ public class PBHelperClient {
     return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
   }
 
+  public static HdfsProtos.BlockChecksumTypeProto convert(
+      BlockChecksumType type) {
+    switch(type) {
+    case MD5CRC:
+      return HdfsProtos.BlockChecksumTypeProto.MD5CRC;
+    case COMPOSITE_CRC:
+      return HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC;
+    default:
+      throw new IllegalStateException(
+          "BUG: BlockChecksumType not found, type=" + type);
+    }
+  }
+
+  public static BlockChecksumType convert(
+      HdfsProtos.BlockChecksumTypeProto blockChecksumTypeProto) {
+    switch(blockChecksumTypeProto) {
+    case MD5CRC:
+      return BlockChecksumType.MD5CRC;
+    case COMPOSITE_CRC:
+      return BlockChecksumType.COMPOSITE_CRC;
+    default:
+      throw new IllegalStateException(
+          "BUG: BlockChecksumTypeProto not found, type="
+          + blockChecksumTypeProto);
+    }
+  }
+
+  public static HdfsProtos.BlockChecksumOptionsProto convert(
+      BlockChecksumOptions options) {
+    return HdfsProtos.BlockChecksumOptionsProto.newBuilder()
+        .setBlockChecksumType(convert(options.getBlockChecksumType()))
+        .setStripeLength(options.getStripeLength())
+        .build();
+  }
+
+  public static BlockChecksumOptions convert(
+      HdfsProtos.BlockChecksumOptionsProto options) {
+    return new BlockChecksumOptions(
+        convert(options.getBlockChecksumType()),
+        options.getStripeLength());
+  }
+
   public static ExtendedBlockProto convert(final ExtendedBlock b) {
     if (b == null) return null;
     return ExtendedBlockProto.newBuilder().

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto

@@ -148,8 +148,9 @@ message OpCopyBlockProto {
   required BaseHeaderProto header = 1;
 }
 
-message OpBlockChecksumProto { 
+message OpBlockChecksumProto {
   required BaseHeaderProto header = 1;
+  optional BlockChecksumOptionsProto blockChecksumOptions = 2;
 }
 
 message OpBlockGroupChecksumProto {
@@ -160,6 +161,7 @@ message OpBlockGroupChecksumProto {
   required ErasureCodingPolicyProto ecPolicy = 4;
   repeated uint32 blockIndices = 5;
   required uint64 requestedNumBytes = 6;
+  optional BlockChecksumOptionsProto blockChecksumOptions = 7;
 }
 
 /**
@@ -313,8 +315,9 @@ message DNTransferAckProto {
 message OpBlockChecksumResponseProto {
   required uint32 bytesPerCrc = 1;
   required uint64 crcPerBlock = 2;
-  required bytes md5 = 3;
+  required bytes blockChecksum = 3;
   optional ChecksumTypeProto crcType = 4;
+  optional BlockChecksumOptionsProto blockChecksumOptions = 5;
 }
 
 message OpCustomProto {

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -480,6 +480,27 @@ enum ChecksumTypeProto {
   CHECKSUM_CRC32C = 2;
 }
 
+enum BlockChecksumTypeProto {
+  MD5CRC = 1;  // BlockChecksum obtained by taking the MD5 digest of chunk CRCs
+  COMPOSITE_CRC = 2;  // Chunk-independent CRC, optionally striped
+}
+
+/**
+ * Algorithms/types denoting how block-level checksums are computed using
+ * lower-level chunk checksums/CRCs.
+ * These options should be kept in sync with
+ * org.apache.hadoop.hdfs.protocol.BlockChecksumOptions.
+ */
+message BlockChecksumOptionsProto {
+  optional BlockChecksumTypeProto blockChecksumType = 1 [default = MD5CRC];
+
+  // Only used if blockChecksumType specifies a striped format, such as
+  // COMPOSITE_CRC. If so, then the blockChecksum in the response is expected
+  // to be the concatenation of N crcs, where
+  // N == ((requestedLength - 1) / stripedLength) + 1
+  optional uint64 stripeLength = 2;
+}
+
 /**
  * HDFS Server Defaults
  */

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

@@ -301,8 +301,9 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-    blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()),
-        PBHelperClient.convert(proto.getHeader().getToken()));
+      blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()),
+          PBHelperClient.convert(proto.getHeader().getToken()),
+          PBHelperClient.convert(proto.getBlockChecksumOptions()));
     } finally {
       if (traceScope != null) traceScope.close();
     }
@@ -325,7 +326,8 @@ public abstract class Receiver implements DataTransferProtocol {
     try {
       blockGroupChecksum(stripedBlockInfo,
           PBHelperClient.convert(proto.getHeader().getToken()),
-          proto.getRequestedNumBytes());
+          proto.getRequestedNumBytes(),
+          PBHelperClient.convert(proto.getBlockChecksumOptions()));
     } finally {
       if (traceScope != null) {
         traceScope.close();

+ 256 - 33
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java

@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -32,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumCompositeCrcReconstructor;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumMd5CrcReconstructor;
 import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor;
 import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -40,6 +44,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +76,7 @@ final class BlockChecksumHelper {
    */
   static abstract class AbstractBlockChecksumComputer {
     private final DataNode datanode;
+    private final BlockChecksumOptions blockChecksumOptions;
 
     private byte[] outBytes;
     private int bytesPerCRC = -1;
@@ -77,8 +84,11 @@ final class BlockChecksumHelper {
     private long crcPerBlock = -1;
     private int checksumSize = -1;
 
-    AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
+    AbstractBlockChecksumComputer(
+        DataNode datanode,
+        BlockChecksumOptions blockChecksumOptions) throws IOException {
       this.datanode = datanode;
+      this.blockChecksumOptions = blockChecksumOptions;
     }
 
     abstract void compute() throws IOException;
@@ -92,6 +102,10 @@ final class BlockChecksumHelper {
       return datanode;
     }
 
+    BlockChecksumOptions getBlockChecksumOptions() {
+      return blockChecksumOptions;
+    }
+
     InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
         throws IOException {
       return datanode.data.getBlockInputStream(block, seekOffset);
@@ -155,8 +169,10 @@ final class BlockChecksumHelper {
     private DataChecksum checksum;
 
     BlockChecksumComputer(DataNode datanode,
-                          ExtendedBlock block) throws IOException {
-      super(datanode);
+                          ExtendedBlock block,
+                          BlockChecksumOptions blockChecksumOptions)
+        throws IOException {
+      super(datanode, blockChecksumOptions);
       this.block = block;
       this.requestLength = block.getNumBytes();
       Preconditions.checkArgument(requestLength >= 0);
@@ -268,8 +284,10 @@ final class BlockChecksumHelper {
   static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer {
 
     ReplicatedBlockChecksumComputer(DataNode datanode,
-                                    ExtendedBlock block) throws IOException {
-      super(datanode, block);
+                                    ExtendedBlock block,
+                                    BlockChecksumOptions blockChecksumOptions)
+        throws IOException {
+      super(datanode, block, blockChecksumOptions);
     }
 
     @Override
@@ -277,22 +295,38 @@ final class BlockChecksumHelper {
       try {
         readHeader();
 
-        MD5Hash md5out;
-        if (isPartialBlk() && getCrcPerBlock() > 0) {
-          md5out = checksumPartialBlock();
-        } else {
-          md5out = checksumWholeBlock();
+        BlockChecksumType type =
+            getBlockChecksumOptions().getBlockChecksumType();
+        switch (type) {
+        case MD5CRC:
+          computeMd5Crc();
+          break;
+        case COMPOSITE_CRC:
+          computeCompositeCrc(getBlockChecksumOptions().getStripeLength());
+          break;
+        default:
+          throw new IOException(String.format(
+              "Unrecognized BlockChecksumType: %s", type));
         }
-        setOutBytes(md5out.getDigest());
-
-        LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
-            getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
       } finally {
         IOUtils.closeStream(getChecksumIn());
         IOUtils.closeStream(getMetadataIn());
       }
     }
 
+    private void computeMd5Crc() throws IOException {
+      MD5Hash md5out;
+      if (isPartialBlk() && getCrcPerBlock() > 0) {
+        md5out = checksumPartialBlock();
+      } else {
+        md5out = checksumWholeBlock();
+      }
+      setOutBytes(md5out.getDigest());
+
+      LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
+          getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
+    }
+
     private MD5Hash checksumWholeBlock() throws IOException {
       MD5Hash md5out = MD5Hash.digest(getChecksumIn());
       return md5out;
@@ -320,6 +354,56 @@ final class BlockChecksumHelper {
 
       return new MD5Hash(digester.digest());
     }
+
+    private void computeCompositeCrc(long stripeLength) throws IOException {
+      long checksumDataLength =
+          Math.min(getVisibleLength(), getRequestLength());
+      if (stripeLength <= 0 || stripeLength > checksumDataLength) {
+        stripeLength = checksumDataLength;
+      }
+
+      CrcComposer crcComposer = CrcComposer.newStripedCrcComposer(
+          getCrcType(), getBytesPerCRC(), stripeLength);
+      DataInputStream checksumIn = getChecksumIn();
+
+      // Whether getting the checksum for the entire block (which itself may
+      // not be a full block size and may have a final chunk smaller than
+      // getBytesPerCRC()), we begin with a number of full chunks, all of size
+      // getBytesPerCRC().
+      long numFullChunks = checksumDataLength / getBytesPerCRC();
+      crcComposer.update(checksumIn, numFullChunks, getBytesPerCRC());
+
+      // There may be a final partial chunk that is not full-sized. Unlike the
+      // MD5 case, we still consider this a "partial chunk" even if
+      // getRequestLength() == getVisibleLength(), since the CRC composition
+      // depends on the byte size of that final chunk, even if it already has a
+      // precomputed CRC stored in metadata. So there are two cases:
+      //   1. Reading only part of a block via getRequestLength(); we get the
+      //      crcPartialBlock() explicitly.
+      //   2. Reading full visible length; the partial chunk already has a CRC
+      //      stored in block metadata, so we just continue reading checksumIn.
+      long partialChunkSize = checksumDataLength % getBytesPerCRC();
+      if (partialChunkSize > 0) {
+        if (isPartialBlk()) {
+          byte[] partialChunkCrcBytes = crcPartialBlock();
+          crcComposer.update(
+              partialChunkCrcBytes, 0, partialChunkCrcBytes.length,
+              partialChunkSize);
+        } else {
+          int partialChunkCrc = checksumIn.readInt();
+          crcComposer.update(partialChunkCrc, partialChunkSize);
+        }
+      }
+
+      byte[] composedCrcs = crcComposer.digest();
+      setOutBytes(composedCrcs);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "block={}, getBytesPerCRC={}, crcPerBlock={}, compositeCrc={}",
+            getBlock(), getBytesPerCRC(), getCrcPerBlock(),
+            CrcUtil.toMultiCrcString(composedCrcs));
+      }
+    }
   }
 
   /**
@@ -335,19 +419,29 @@ final class BlockChecksumHelper {
     private final byte[] blockIndices;
     private final long requestedNumBytes;
 
-    private final DataOutputBuffer md5writer = new DataOutputBuffer();
+    private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer();
+
+    // Keeps track of the positions within blockChecksumBuf where each data
+    // block's checksum begins; for fixed-size block checksums this is easily
+    // calculated as a multiple of the checksum size, but for striped block
+    // CRCs, it's less error-prone to simply keep track of exact byte offsets
+    // before each block checksum is populated into the buffer.
+    private final int[] blockChecksumPositions;
 
-    BlockGroupNonStripedChecksumComputer(DataNode datanode,
-                                         StripedBlockInfo stripedBlockInfo,
-                                         long requestedNumBytes)
+    BlockGroupNonStripedChecksumComputer(
+        DataNode datanode,
+        StripedBlockInfo stripedBlockInfo,
+        long requestedNumBytes,
+        BlockChecksumOptions blockChecksumOptions)
         throws IOException {
-      super(datanode);
+      super(datanode, blockChecksumOptions);
       this.blockGroup = stripedBlockInfo.getBlock();
       this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
       this.datanodes = stripedBlockInfo.getDatanodes();
       this.blockTokens = stripedBlockInfo.getBlockTokens();
       this.blockIndices = stripedBlockInfo.getBlockIndices();
       this.requestedNumBytes = requestedNumBytes;
+      this.blockChecksumPositions = new int[this.ecPolicy.getNumDataUnits()];
     }
 
     private static class LiveBlockInfo {
@@ -383,6 +477,9 @@ final class BlockChecksumHelper {
       }
       long checksumLen = 0;
       for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
+        // Before populating the blockChecksum at this index, record the byte
+        // offset where it will begin.
+        blockChecksumPositions[idx] = blockChecksumBuf.getLength();
         try {
           ExtendedBlock block = getInternalBlock(numDataUnits, idx);
 
@@ -409,8 +506,75 @@ final class BlockChecksumHelper {
         }
       }
 
-      MD5Hash md5out = MD5Hash.digest(md5writer.getData());
-      setOutBytes(md5out.getDigest());
+      BlockChecksumType type = getBlockChecksumOptions().getBlockChecksumType();
+      switch (type) {
+      case MD5CRC:
+        MD5Hash md5out = MD5Hash.digest(blockChecksumBuf.getData());
+        setOutBytes(md5out.getDigest());
+        break;
+      case COMPOSITE_CRC:
+        byte[] digest = reassembleNonStripedCompositeCrc(checksumLen);
+        setOutBytes(digest);
+        break;
+      default:
+        throw new IOException(String.format(
+            "Unrecognized BlockChecksumType: %s", type));
+      }
+    }
+
+    /**
+     * @param checksumLen The sum of bytes associated with the block checksum
+     *     data being digested into a block-group level checksum.
+     */
+    private byte[] reassembleNonStripedCompositeCrc(long checksumLen)
+        throws IOException {
+      int numDataUnits = ecPolicy.getNumDataUnits();
+      CrcComposer crcComposer = CrcComposer.newCrcComposer(
+          getCrcType(), ecPolicy.getCellSize());
+
+      // This should hold all the cell-granularity checksums of blk0
+      // followed by all cell checksums of blk1, etc. We must unstripe the
+      // cell checksums in order of logical file bytes. Also, note that the
+      // length of this array may not equal the the number of actually valid
+      // bytes in the buffer (blockChecksumBuf.getLength()).
+      byte[] flatBlockChecksumData = blockChecksumBuf.getData();
+
+      // Initialize byte-level cursors to where each block's checksum begins
+      // inside the combined flattened buffer.
+      int[] blockChecksumCursors = new int[numDataUnits];
+      for (int idx = 0; idx < numDataUnits; ++idx) {
+        blockChecksumCursors[idx] = blockChecksumPositions[idx];
+      }
+
+      // Reassemble cell-level CRCs in the right order.
+      long numFullCells = checksumLen / ecPolicy.getCellSize();
+      for (long cellIndex = 0; cellIndex < numFullCells; ++cellIndex) {
+        int blockIndex = (int) (cellIndex % numDataUnits);
+        int checksumCursor = blockChecksumCursors[blockIndex];
+        int cellCrc = CrcUtil.readInt(
+            flatBlockChecksumData, checksumCursor);
+        blockChecksumCursors[blockIndex] += 4;
+        crcComposer.update(cellCrc, ecPolicy.getCellSize());
+      }
+      if (checksumLen % ecPolicy.getCellSize() != 0) {
+        // Final partial cell.
+        int blockIndex = (int) (numFullCells % numDataUnits);
+        int checksumCursor = blockChecksumCursors[blockIndex];
+        int cellCrc = CrcUtil.readInt(
+            flatBlockChecksumData, checksumCursor);
+        blockChecksumCursors[blockIndex] += 4;
+        crcComposer.update(cellCrc, checksumLen % ecPolicy.getCellSize());
+      }
+      byte[] digest = crcComposer.digest();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("flatBlockChecksumData.length={}, numDataUnits={}, "
+            + "checksumLen={}, digest={}",
+            flatBlockChecksumData.length,
+            numDataUnits,
+            checksumLen,
+            CrcUtil.toSingleCrcString(digest));
+      }
+      return digest;
     }
 
     private ExtendedBlock getInternalBlock(int numDataUnits, int idx) {
@@ -437,8 +601,26 @@ final class BlockChecksumHelper {
         LOG.debug("write to {}: {}, block={}",
             getDatanode(), Op.BLOCK_CHECKSUM, block);
 
-        // get block MD5
-        createSender(pair).blockChecksum(block, blockToken);
+        // get block checksum
+        // A BlockGroupCheckum of type COMPOSITE_CRC uses underlying
+        // BlockChecksums also of type COMPOSITE_CRC but with
+        // stripeLength == ecPolicy.getCellSize().
+        BlockChecksumOptions childOptions;
+        BlockChecksumType groupChecksumType =
+            getBlockChecksumOptions().getBlockChecksumType();
+        switch (groupChecksumType) {
+        case MD5CRC:
+          childOptions = getBlockChecksumOptions();
+          break;
+        case COMPOSITE_CRC:
+          childOptions = new BlockChecksumOptions(
+              BlockChecksumType.COMPOSITE_CRC, ecPolicy.getCellSize());
+          break;
+        default:
+          throw new IOException(
+              "Unknown BlockChecksumType: " + groupChecksumType);
+        }
+        createSender(pair).blockChecksum(block, blockToken, childOptions);
 
         final DataTransferProtos.BlockOpResponseProto reply =
             DataTransferProtos.BlockOpResponseProto.parseFrom(
@@ -463,10 +645,37 @@ final class BlockChecksumHelper {
 
         setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(),
             checksumData.getCrcPerBlock(), ct);
-        //read md5
-        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
-        md5.write(md5writer);
-        LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5);
+
+        switch (groupChecksumType) {
+        case MD5CRC:
+          //read md5
+          final MD5Hash md5 =
+              new MD5Hash(checksumData.getBlockChecksum().toByteArray());
+          md5.write(blockChecksumBuf);
+          LOG.debug("got reply from datanode:{}, md5={}",
+              targetDatanode, md5);
+          break;
+        case COMPOSITE_CRC:
+          BlockChecksumType returnedType = PBHelperClient.convert(
+              checksumData.getBlockChecksumOptions().getBlockChecksumType());
+          if (returnedType != BlockChecksumType.COMPOSITE_CRC) {
+            throw new IOException(String.format(
+                "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC",
+                returnedType));
+          }
+          byte[] checksumBytes =
+              checksumData.getBlockChecksum().toByteArray();
+          blockChecksumBuf.write(checksumBytes, 0, checksumBytes.length);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("got reply from datanode:{} for blockIdx:{}, checksum:{}",
+                targetDatanode, blockIdx,
+                CrcUtil.toMultiCrcString(checksumBytes));
+          }
+          break;
+        default:
+          throw new IOException(
+              "Unknown BlockChecksumType: " + groupChecksumType);
+        }
       }
     }
 
@@ -489,10 +698,16 @@ final class BlockChecksumHelper {
       StripedReconstructionInfo stripedReconInfo =
           new StripedReconstructionInfo(
               blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
+      BlockChecksumType groupChecksumType =
+          getBlockChecksumOptions().getBlockChecksumType();
       final StripedBlockChecksumReconstructor checksumRecon =
-          new StripedBlockChecksumReconstructor(
+          groupChecksumType == BlockChecksumType.COMPOSITE_CRC ?
+          new StripedBlockChecksumCompositeCrcReconstructor(
               getDatanode().getErasureCodingWorker(), stripedReconInfo,
-              md5writer, blockLength);
+              blockChecksumBuf, blockLength) :
+          new StripedBlockChecksumMd5CrcReconstructor(
+              getDatanode().getErasureCodingWorker(), stripedReconInfo,
+              blockChecksumBuf, blockLength);
       checksumRecon.reconstruct();
 
       DataChecksum checksum = checksumRecon.getChecksum();
@@ -501,8 +716,8 @@ final class BlockChecksumHelper {
       setOrVerifyChecksumProperties(errBlkIndex,
           checksum.getBytesPerChecksum(), crcPerBlock,
           checksum.getChecksumType());
-      LOG.debug("Recalculated checksum for the block index:{}, md5={}",
-          errBlkIndex, checksumRecon.getMD5());
+      LOG.debug("Recalculated checksum for the block index:{}, checksum={}",
+          errBlkIndex, checksumRecon.getDigestObject());
     }
 
     private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
@@ -524,8 +739,16 @@ final class BlockChecksumHelper {
         setCrcType(ct);
       } else if (getCrcType() != DataChecksum.Type.MIXED &&
           getCrcType() != ct) {
-        // if crc types are mixed in a file
-        setCrcType(DataChecksum.Type.MIXED);
+        BlockChecksumType groupChecksumType =
+            getBlockChecksumOptions().getBlockChecksumType();
+        if (groupChecksumType == BlockChecksumType.COMPOSITE_CRC) {
+          throw new IOException(String.format(
+              "BlockChecksumType COMPOSITE_CRC doesn't support MIXED "
+              + "underlying types; previous block was %s, next block is %s",
+              getCrcType(), ct));
+        } else {
+          setCrcType(DataChecksum.Type.MIXED);
+        }
       }
 
       if (blockIdx == 0) {

+ 17 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -968,15 +969,16 @@ class DataXceiver extends Receiver implements Runnable {
 
   @Override
   public void blockChecksum(ExtendedBlock block,
-                            Token<BlockTokenIdentifier> blockToken)
+      Token<BlockTokenIdentifier> blockToken,
+      BlockChecksumOptions blockChecksumOptions)
       throws IOException {
     updateCurrentThreadName("Getting checksum for block " + block);
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM,
         BlockTokenIdentifier.AccessMode.READ);
-    BlockChecksumComputer maker =
-        new ReplicatedBlockChecksumComputer(datanode, block);
+    BlockChecksumComputer maker = new ReplicatedBlockChecksumComputer(
+        datanode, block, blockChecksumOptions);
 
     try {
       maker.compute();
@@ -987,8 +989,10 @@ class DataXceiver extends Receiver implements Runnable {
           .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
               .setBytesPerCrc(maker.getBytesPerCRC())
               .setCrcPerBlock(maker.getCrcPerBlock())
-              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
-              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+              .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType()))
+              .setBlockChecksumOptions(
+                  PBHelperClient.convert(blockChecksumOptions)))
           .build()
           .writeDelimitedTo(out);
       out.flush();
@@ -1007,7 +1011,9 @@ class DataXceiver extends Receiver implements Runnable {
 
   @Override
   public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
-      final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
+      final Token<BlockTokenIdentifier> blockToken,
+      long requestedNumBytes,
+      BlockChecksumOptions blockChecksumOptions)
       throws IOException {
     final ExtendedBlock block = stripedBlockInfo.getBlock();
     updateCurrentThreadName("Getting checksum for block group" +
@@ -1018,7 +1024,7 @@ class DataXceiver extends Receiver implements Runnable {
 
     AbstractBlockChecksumComputer maker =
         new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
-            requestedNumBytes);
+            requestedNumBytes, blockChecksumOptions);
 
     try {
       maker.compute();
@@ -1029,8 +1035,10 @@ class DataXceiver extends Receiver implements Runnable {
           .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
               .setBytesPerCrc(maker.getBytesPerCRC())
               .setCrcPerBlock(maker.getCrcPerBlock())
-              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
-              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+              .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType()))
+              .setBlockChecksumOptions(
+                  PBHelperClient.convert(blockChecksumOptions)))
           .build()
           .writeDelimitedTo(out);
       out.flush();

+ 80 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.CrcComposer;
+
+/**
+ * Computes striped composite CRCs over reconstructed chunk CRCs.
+ */
+@InterfaceAudience.Private
+public class StripedBlockChecksumCompositeCrcReconstructor
+    extends StripedBlockChecksumReconstructor {
+  private final int ecPolicyCellSize;
+
+  private byte[] digestValue;
+  private CrcComposer digester;
+
+  public StripedBlockChecksumCompositeCrcReconstructor(
+      ErasureCodingWorker worker,
+      StripedReconstructionInfo stripedReconInfo,
+      DataOutputBuffer checksumWriter,
+      long requestedBlockLength) throws IOException {
+    super(worker, stripedReconInfo, checksumWriter, requestedBlockLength);
+    this.ecPolicyCellSize = stripedReconInfo.getEcPolicy().getCellSize();
+  }
+
+  @Override
+  public Object getDigestObject() {
+    return digestValue;
+  }
+
+  @Override
+  void prepareDigester() throws IOException {
+    digester = CrcComposer.newStripedCrcComposer(
+        getChecksum().getChecksumType(),
+        getChecksum().getBytesPerChecksum(),
+        ecPolicyCellSize);
+  }
+
+  @Override
+  void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum)
+      throws IOException {
+    if (digester == null) {
+      throw new IOException(String.format(
+          "Called updatedDigester with checksumBytes.length=%d, "
+          + "dataBytesPerChecksum=%d but digester is null",
+          checksumBytes.length, dataBytesPerChecksum));
+    }
+    digester.update(
+        checksumBytes, 0, checksumBytes.length, dataBytesPerChecksum);
+  }
+
+  @Override
+  void commitDigest() throws IOException {
+    if (digester == null) {
+      throw new IOException("Called commitDigest() but digester is null");
+    }
+    digestValue = digester.digest();
+    getChecksumWriter().write(digestValue, 0, digestValue.length);
+  }
+}

+ 74 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MD5Hash;
+
+/**
+ * Computes running MD5-of-CRC over reconstructed chunk CRCs.
+ */
+@InterfaceAudience.Private
+public class StripedBlockChecksumMd5CrcReconstructor
+    extends StripedBlockChecksumReconstructor {
+  private MD5Hash md5;
+  private MessageDigest digester;
+
+  public StripedBlockChecksumMd5CrcReconstructor(ErasureCodingWorker worker,
+      StripedReconstructionInfo stripedReconInfo,
+      DataOutputBuffer checksumWriter,
+      long requestedBlockLength) throws IOException {
+    super(worker, stripedReconInfo, checksumWriter, requestedBlockLength);
+  }
+
+  @Override
+  public Object getDigestObject() {
+    return md5;
+  }
+
+  @Override
+  void prepareDigester() throws IOException {
+    digester = MD5Hash.getDigester();
+  }
+
+  @Override
+  void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum)
+      throws IOException {
+    if (digester == null) {
+      throw new IOException(String.format(
+          "Called updatedDigester with checksumBytes.length=%d, "
+          + "dataBytesPerChecksum=%d but digester is null",
+          checksumBytes.length, dataBytesPerChecksum));
+    }
+    digester.update(checksumBytes, 0, checksumBytes.length);
+  }
+
+  @Override
+  void commitDigest() throws IOException {
+    if (digester == null) {
+      throw new IOException("Called commitDigest() but digester is null");
+    }
+    byte[] digest = digester.digest();
+    md5 = new MD5Hash(digest);
+    md5.write(getChecksumWriter());
+  }
+}

+ 45 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java

@@ -19,12 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MD5Hash;
 
 /**
  * StripedBlockChecksumReconstructor reconstruct one or more missed striped
@@ -33,18 +31,17 @@ import org.apache.hadoop.io.MD5Hash;
  * using the newly reconstructed block.
  */
 @InterfaceAudience.Private
-public class StripedBlockChecksumReconstructor extends StripedReconstructor {
-
+public abstract class StripedBlockChecksumReconstructor
+    extends StripedReconstructor {
   private ByteBuffer targetBuffer;
   private final byte[] targetIndices;
 
   private byte[] checksumBuf;
   private DataOutputBuffer checksumWriter;
-  private MD5Hash md5;
   private long checksumDataLen;
   private long requestedLen;
 
-  public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
+  protected StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
       StripedReconstructionInfo stripedReconInfo,
       DataOutputBuffer checksumWriter,
       long requestedBlockLength) throws IOException {
@@ -72,8 +69,9 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
     checksumBuf = new byte[tmpLen];
   }
 
+  @Override
   public void reconstruct() throws IOException {
-    MessageDigest digester = MD5Hash.getDigester();
+    prepareDigester();
     long maxTargetLength = getMaxTargetLength();
     try {
       while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
@@ -88,24 +86,54 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
         reconstructTargets(toReconstructLen);
 
         // step3: calculate checksum
-        checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
-            toReconstructLen, digester);
+        checksumDataLen += checksumWithTargetOutput(
+            targetBuffer.array(), toReconstructLen);
 
         updatePositionInBlock(toReconstructLen);
         requestedLen -= toReconstructLen;
         clearBuffers();
       }
 
-      byte[] digest = digester.digest();
-      md5 = new MD5Hash(digest);
-      md5.write(checksumWriter);
+      commitDigest();
     } finally {
       cleanup();
     }
   }
 
-  private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
-      MessageDigest digester) throws IOException {
+  /**
+   * Should return a representation of a completed/reconstructed digest which
+   * is suitable for debug printing.
+   */
+  public abstract Object getDigestObject();
+
+  /**
+   * This will be called before starting reconstruction.
+   */
+  abstract void prepareDigester() throws IOException;
+
+  /**
+   * This will be called repeatedly with chunked checksums computed in-flight
+   * over reconstructed data.
+   *
+   * @param dataBytesPerChecksum the number of underlying data bytes
+   *     corresponding to each checksum inside {@code checksumBytes}.
+   */
+  abstract void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum)
+      throws IOException;
+
+  /**
+   * This will be called when reconstruction of entire requested length is
+   * complete and any final digests should be committed to
+   * implementation-specific output fields.
+   */
+  abstract void commitDigest() throws IOException;
+
+  protected DataOutputBuffer getChecksumWriter() {
+    return checksumWriter;
+  }
+
+  private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen)
+      throws IOException {
     long checksumDataLength = 0;
     // Calculate partial block checksum. There are two cases.
     // case-1) length of data bytes which is fraction of bytesPerCRC
@@ -128,7 +156,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
         checksumBuf = new byte[checksumRemaining];
         getChecksum().calculateChunkedSums(outputData, dataOffset,
             remainingLen, checksumBuf, 0);
-        digester.update(checksumBuf, 0, checksumBuf.length);
+        updateDigester(checksumBuf, getChecksum().getBytesPerChecksum());
         checksumDataLength = checksumBuf.length;
         dataOffset = remainingLen;
       }
@@ -139,7 +167,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
         getChecksum().reset();
         getChecksum().update(outputData, dataOffset, partialLength);
         getChecksum().writeValue(partialCrc, 0, true);
-        digester.update(partialCrc);
+        updateDigester(partialCrc, partialLength);
         checksumDataLength += partialCrc.length;
       }
 
@@ -151,7 +179,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
         outputData.length, checksumBuf, 0);
 
     // updates digest using the checksum array of bytes
-    digester.update(checksumBuf, 0, checksumBuf.length);
+    updateDigester(checksumBuf, getChecksum().getBytesPerChecksum());
     return checksumBuf.length;
   }
 
@@ -176,10 +204,6 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
     targetBuffer.clear();
   }
 
-  public MD5Hash getMD5() {
-    return md5;
-  }
-
   public long getChecksumDataLen() {
     return checksumDataLen;
   }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java

@@ -79,6 +79,7 @@ class StripedBlockReconstructor extends StripedReconstructor
     }
   }
 
+  @Override
   void reconstruct() throws IOException {
     while (getPositionInBlock() < getMaxTargetLength()) {
       DataNodeFaultInjector.get().stripedBlockReconstruction();

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -3540,6 +3540,17 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.checksum.combine.mode</name>
+  <value>MD5MD5CRC</value>
+  <description>
+    Defines how lower-level chunk/block checksums are combined into file-level
+    checksums; the original MD5MD5CRC mode is not comparable between files
+    with different block layouts, while modes like COMPOSITE_CRC are
+    comparable independently of block layout.
+  </description>
+</property>
+
 <property>
   <name>dfs.client.block.write.locateFollowingBlock.retries</name>
   <value>5</value>

+ 29 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -857,6 +857,20 @@ public class DFSTestUtil {
     }
   }
 
+  /* Write the given bytes to the given file using the specified blockSize */
+  public static void writeFile(
+      FileSystem fs, Path p, byte[] bytes, long blockSize)
+      throws IOException {
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+    try (InputStream is = new ByteArrayInputStream(bytes);
+        FSDataOutputStream os = fs.create(
+            p, false, 4096, fs.getDefaultReplication(p), blockSize)) {
+      IOUtils.copyBytes(is, os, bytes.length);
+    }
+  }
+
   /* Write the given string to the given file */
   public static void writeFile(FileSystem fs, Path p, String s)
       throws IOException {
@@ -901,14 +915,27 @@ public class DFSTestUtil {
    */
   public static void appendFileNewBlock(DistributedFileSystem fs,
       Path p, int length) throws IOException {
-    assert fs.exists(p);
     assert length >= 0;
     byte[] toAppend = new byte[length];
     Random random = new Random();
     random.nextBytes(toAppend);
+    appendFileNewBlock(fs, p, toAppend);
+  }
+
+  /**
+   * Append specified bytes to a given file, starting with new block.
+   *
+   * @param fs The file system
+   * @param p Path of the file to append
+   * @param bytes The data to append
+   * @throws IOException
+   */
+  public static void appendFileNewBlock(DistributedFileSystem fs,
+      Path p, byte[] bytes) throws IOException {
+    assert fs.exists(p);
     try (FSDataOutputStream out = fs.append(p,
         EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
-      out.write(toAppend);
+      out.write(bytes);
     }
   }
 

+ 99 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -30,7 +31,9 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -74,6 +77,9 @@ public class TestFileChecksum {
   private String stripedFile2 = ecDir + "/stripedFileChecksum2";
   private String replicatedFile = "/replicatedFileChecksum";
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   @Before
   public void setup() throws IOException {
     int numDNs = dataBlocks + parityBlocks + 2;
@@ -83,6 +89,7 @@ public class TestFileChecksum {
         false);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    customizeConf(conf);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     Path ecPath = new Path(ecDir);
     cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault());
@@ -106,6 +113,39 @@ public class TestFileChecksum {
     }
   }
 
+  /**
+   * Subclasses may customize the conf to run the full set of tests under
+   * different conditions.
+   */
+  protected void customizeConf(Configuration preparedConf) {
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether equivalent files
+   * in striped and replicated formats are expected to have the same
+   * overall FileChecksum.
+   */
+  protected boolean expectComparableStripedAndReplicatedFiles() {
+    return false;
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether equivalent files
+   * in replicated formats with different block sizes are expected to have the
+   * same overall FileChecksum.
+   */
+  protected boolean expectComparableDifferentBlockSizeReplicatedFiles() {
+    return false;
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether checksums are
+   * supported for files where different blocks have different bytesPerCRC.
+   */
+  protected boolean expectSupportForSingleFileMixedBytesPerChecksum() {
+    return false;
+  }
+
   @Test(timeout = 90000)
   public void testStripedFileChecksum1() throws Exception {
     int length = 0;
@@ -182,7 +222,30 @@ public class TestFileChecksum {
     FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
         10, false);
 
-    Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
+    if (expectComparableStripedAndReplicatedFiles()) {
+      Assert.assertEquals(stripedFileChecksum1, replicatedFileChecksum);
+    } else {
+      Assert.assertNotEquals(stripedFileChecksum1, replicatedFileChecksum);
+    }
+  }
+
+  @Test(timeout = 90000)
+  public void testDifferentBlockSizeReplicatedFileChecksum() throws Exception {
+    byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
+    String replicatedFile1 = "/replicatedFile1";
+    String replicatedFile2 = "/replicatedFile2";
+    DFSTestUtil.writeFile(
+        fs, new Path(replicatedFile1), fileData, blockSize);
+    DFSTestUtil.writeFile(
+        fs, new Path(replicatedFile2), fileData, blockSize / 2);
+    FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false);
+    FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false);
+
+    if (expectComparableDifferentBlockSizeReplicatedFiles()) {
+      Assert.assertEquals(checksum1, checksum2);
+    } else {
+      Assert.assertNotEquals(checksum1, checksum2);
+    }
   }
 
   @Test(timeout = 90000)
@@ -471,6 +534,40 @@ public class TestFileChecksum {
         bytesPerCRC - 1);
   }
 
+  @Test(timeout = 90000)
+  public void testMixedBytesPerChecksum() throws Exception {
+    int fileLength = bytesPerCRC * 3;
+    byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
+    String replicatedFile1 = "/replicatedFile1";
+
+    // Split file into two parts.
+    byte[] fileDataPart1 = new byte[bytesPerCRC * 2];
+    System.arraycopy(fileData, 0, fileDataPart1, 0, fileDataPart1.length);
+    byte[] fileDataPart2 = new byte[fileData.length - fileDataPart1.length];
+    System.arraycopy(
+        fileData, fileDataPart1.length, fileDataPart2, 0, fileDataPart2.length);
+
+    DFSTestUtil.writeFile(fs, new Path(replicatedFile1), fileDataPart1);
+
+    // Modify bytesPerCRC for second part that we append as separate block.
+    conf.setInt(
+        HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesPerCRC / 2);
+    DFSTestUtil.appendFileNewBlock(
+        ((DistributedFileSystem) FileSystem.newInstance(conf)),
+        new Path(replicatedFile1), fileDataPart2);
+
+    if (expectSupportForSingleFileMixedBytesPerChecksum()) {
+      String replicatedFile2 = "/replicatedFile2";
+      DFSTestUtil.writeFile(fs, new Path(replicatedFile2), fileData);
+      FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false);
+      FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false);
+      Assert.assertEquals(checksum1, checksum2);
+    } else {
+      exception.expect(IOException.class);
+      FileChecksum checksum = getFileChecksum(replicatedFile1, -1, false);
+    }
+  }
+
   private FileChecksum getFileChecksum(String filePath, int range,
                                        boolean killDn) throws Exception {
     int dnIdxToDie = -1;
@@ -537,4 +634,4 @@ public class TestFileChecksum {
 
     return -1;
   }
-}
+}

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
+/**
+ * End-to-end tests for COMPOSITE_CRC combine mode.
+ */
+public class TestFileChecksumCompositeCrc extends TestFileChecksum {
+  @Override
+  protected void customizeConf(Configuration conf) {
+    conf.set(
+        HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC");
+  }
+
+  @Override
+  protected boolean expectComparableStripedAndReplicatedFiles() {
+    return true;
+  }
+
+  @Override
+  protected boolean expectComparableDifferentBlockSizeReplicatedFiles() {
+    return true;
+  }
+
+  @Override
+  protected boolean expectSupportForSingleFileMixedBytesPerChecksum() {
+    return true;
+  }
+}

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -682,6 +683,19 @@ public class TestPBHelper {
         HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
   }
 
+  @Test
+  public void testBlockChecksumTypeProto() {
+    assertEquals(BlockChecksumType.MD5CRC,
+        PBHelperClient.convert(HdfsProtos.BlockChecksumTypeProto.MD5CRC));
+    assertEquals(BlockChecksumType.COMPOSITE_CRC,
+        PBHelperClient.convert(
+            HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC));
+    assertEquals(PBHelperClient.convert(BlockChecksumType.MD5CRC),
+        HdfsProtos.BlockChecksumTypeProto.MD5CRC);
+    assertEquals(PBHelperClient.convert(BlockChecksumType.COMPOSITE_CRC),
+        HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC);
+  }
+
   @Test
   public void testAclEntryProto() {
     // All fields populated.

+ 144 - 29
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java

@@ -73,20 +73,42 @@ public class TestCopyMapper {
   private static final String SOURCE_PATH = "/tmp/source";
   private static final String TARGET_PATH = "/tmp/target";
 
-  private static Configuration configuration;
-
   @BeforeClass
   public static void setup() throws Exception {
-    configuration = getConfigurationForCluster();
-    cluster = new MiniDFSCluster.Builder(configuration)
+    Configuration configuration = getConfigurationForCluster();
+    setCluster(new MiniDFSCluster.Builder(configuration)
                 .numDataNodes(1)
                 .format(true)
-                .build();
+                .build());
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether copying files with
+   * non-default block sizes without setting BLOCKSIZE as a preserved attribute
+   * is expected to succeed with CRC checks enabled.
+   */
+  protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() {
+    return false;
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether copying files with
+   * non-default bytes-per-crc without setting CHECKSUMTYPE as a preserved
+   * attribute is expected to succeed with CRC checks enabled.
+   */
+  protected boolean expectDifferentBytesPerCrcToSucceed() {
+    return false;
+  }
+
+  protected static void setCluster(MiniDFSCluster c) {
+    cluster = c;
   }
 
-  private static Configuration getConfigurationForCluster() throws IOException {
+  protected static Configuration getConfigurationForCluster()
+      throws IOException {
     Configuration configuration = new Configuration();
-    System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
+    System.setProperty(
+        "test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
     configuration.set("hadoop.log.dir", "target/tmp");
     configuration.set("dfs.namenode.fs-limits.min-block-size", "0");
     LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
@@ -136,7 +158,8 @@ public class TestCopyMapper {
     }
   }
 
-  private static void createSourceDataWithDifferentBlockSize() throws Exception {
+  private static void createSourceDataWithDifferentBlockSize()
+      throws Exception {
     mkdirs(SOURCE_PATH + "/1");
     mkdirs(SOURCE_PATH + "/2");
     mkdirs(SOURCE_PATH + "/2/3/4");
@@ -163,6 +186,21 @@ public class TestCopyMapper {
         512));
   }
 
+  private static void createSourceDataWithDifferentBytesPerCrc()
+      throws Exception {
+    mkdirs(SOURCE_PATH + "/1");
+    mkdirs(SOURCE_PATH + "/2");
+    mkdirs(SOURCE_PATH + "/2/3/4");
+    mkdirs(SOURCE_PATH + "/2/3");
+    mkdirs(SOURCE_PATH + "/5");
+    touchFile(SOURCE_PATH + "/5/6", false,
+        new ChecksumOpt(DataChecksum.Type.CRC32C, 32));
+    mkdirs(SOURCE_PATH + "/7");
+    mkdirs(SOURCE_PATH + "/7/8");
+    touchFile(SOURCE_PATH + "/7/8/9", false,
+        new ChecksumOpt(DataChecksum.Type.CRC32C, 64));
+  }
+
   private static void mkdirs(String path) throws Exception {
     FileSystem fileSystem = cluster.getFileSystem();
     final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
@@ -281,7 +319,7 @@ public class TestCopyMapper {
                   path)), context);
     }
 
-    verifyCopy(fs, false);
+    verifyCopy(fs, false, true);
     // verify that we only copied new appended data
     Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
         .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
@@ -317,6 +355,11 @@ public class TestCopyMapper {
     EnumSet<DistCpOptions.FileAttribute> fileAttributes
             = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
     if (preserveChecksum) {
+      // We created source files with both different checksum types and
+      // non-default block sizes; here we don't explicitly add BLOCKSIZE
+      // as a preserved attribute, but the current behavior is that
+      // preserving CHECKSUMTYPE also automatically implies preserving
+      // BLOCKSIZE.
       fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
     }
     configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
@@ -339,7 +382,7 @@ public class TestCopyMapper {
     }
 
     // Check that the maps worked.
-    verifyCopy(fs, preserveChecksum);
+    verifyCopy(fs, preserveChecksum, true);
     Assert.assertEquals(numFiles, stubContext.getReporter()
         .getCounter(CopyMapper.Counter.COPY).getValue());
     Assert.assertEquals(numDirs, stubContext.getReporter()
@@ -361,7 +404,8 @@ public class TestCopyMapper {
     }
   }
 
-  private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+  private void verifyCopy(
+      FileSystem fs, boolean preserveChecksum, boolean preserveReplication)
       throws Exception {
     for (Path path : pathList) {
       final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
@@ -370,8 +414,10 @@ public class TestCopyMapper {
       Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
       FileStatus sourceStatus = fs.getFileStatus(path);
       FileStatus targetStatus = fs.getFileStatus(targetPath);
-      Assert.assertEquals(sourceStatus.getReplication(),
-          targetStatus.getReplication());
+      if (preserveReplication) {
+        Assert.assertEquals(sourceStatus.getReplication(),
+            targetStatus.getReplication());
+      }
       if (preserveChecksum) {
         Assert.assertEquals(sourceStatus.getBlockSize(),
             targetStatus.getBlockSize());
@@ -505,7 +551,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -574,7 +620,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -649,7 +695,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -730,7 +776,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -887,7 +933,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered when get FileSystem.", e);
             throw new RuntimeException(e);
@@ -938,12 +984,13 @@ public class TestCopyMapper {
   }
 
   @Test(timeout=40000)
-  public void testCopyFailOnBlockSizeDifference() throws Exception {
+  public void testCopyWithDifferentBlockSizes() throws Exception {
     try {
       deleteState();
       createSourceDataWithDifferentBlockSize();
 
       FileSystem fs = cluster.getFileSystem();
+
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
       Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
@@ -959,17 +1006,79 @@ public class TestCopyMapper {
 
       for (Path path : pathList) {
         final FileStatus fileStatus = fs.getFileStatus(path);
-        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH),
-            path)), new CopyListingFileStatus(fileStatus), context);
+        copyMapper.map(
+            new Text(
+                DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+            new CopyListingFileStatus(fileStatus), context);
       }
 
-      Assert.fail("Copy should have failed because of block-size difference.");
+      if (expectDifferentBlockSizesMultipleBlocksToSucceed()) {
+        verifyCopy(fs, false, false);
+      } else {
+        Assert.fail(
+            "Copy should have failed because of block-size difference.");
+      }
+    } catch (Exception exception) {
+      if (expectDifferentBlockSizesMultipleBlocksToSucceed()) {
+        throw exception;
+      } else {
+        // Check that the exception suggests the use of -pb/-skipcrccheck.
+        // This could be refactored to use LambdaTestUtils if we add support
+        // for listing multiple different independent substrings to expect
+        // in the exception message and add support for LambdaTestUtils to
+        // inspect the transitive cause and/or suppressed exceptions as well.
+        Throwable cause = exception.getCause().getCause();
+        GenericTestUtils.assertExceptionContains("-pb", cause);
+        GenericTestUtils.assertExceptionContains("-skipcrccheck", cause);
+      }
     }
-    catch (IOException exception) {
-      // Check that the exception suggests the use of -pb/-skipcrccheck.
-      Throwable cause = exception.getCause().getCause();
-      GenericTestUtils.assertExceptionContains("-pb", cause);
-      GenericTestUtils.assertExceptionContains("-skipcrccheck", cause);
+  }
+
+  @Test(timeout=40000)
+  public void testCopyWithDifferentBytesPerCrc() throws Exception {
+    try {
+      deleteState();
+      createSourceDataWithDifferentBytesPerCrc();
+
+      FileSystem fs = cluster.getFileSystem();
+
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+          = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+          = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+          DistCpUtils.packAttributes(fileAttributes));
+
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(
+            new Text(
+                DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+            new CopyListingFileStatus(fileStatus), context);
+      }
+
+      if (expectDifferentBytesPerCrcToSucceed()) {
+        verifyCopy(fs, false, false);
+      } else {
+        Assert.fail(
+            "Copy should have failed because of bytes-per-crc difference.");
+      }
+    } catch (Exception exception) {
+      if (expectDifferentBytesPerCrcToSucceed()) {
+        throw exception;
+      } else {
+        // This could be refactored to use LambdaTestUtils if we add support
+        // for LambdaTestUtils to inspect the transitive cause and/or
+        // suppressed exceptions as well.
+        Throwable cause = exception.getCause().getCause();
+        GenericTestUtils.assertExceptionContains("mismatch", cause);
+      }
     }
   }
 
@@ -980,6 +1089,7 @@ public class TestCopyMapper {
       createSourceData();
 
       FileSystem fs = cluster.getFileSystem();
+
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
       Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
@@ -1010,6 +1120,12 @@ public class TestCopyMapper {
         final FileStatus source = fs.getFileStatus(path);
         final FileStatus target = fs.getFileStatus(targetPath);
         if (!source.isDirectory() ) {
+          // The reason the checksum check succeeds despite block sizes not
+          // matching between the two is that when only one block is ever
+          // written (partial or complete), the crcPerBlock is not included
+          // in the FileChecksum algorithmName. If we had instead written
+          // a large enough file to exceed the blocksize, then the copy
+          // would not have succeeded.
           Assert.assertTrue(preserve ||
                   source.getBlockSize() != target.getBlockSize());
           Assert.assertTrue(preserve ||
@@ -1020,8 +1136,7 @@ public class TestCopyMapper {
                   source.getReplication() == target.getReplication());
         }
       }
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
       e.printStackTrace();
     }

+ 50 - 0
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
+import org.junit.BeforeClass;
+
+/**
+ * End-to-end tests for COMPOSITE_CRC combine mode.
+ */
+public class TestCopyMapperCompositeCrc extends TestCopyMapper {
+  @BeforeClass
+  public static void setup() throws Exception {
+    Configuration configuration = TestCopyMapper.getConfigurationForCluster();
+    configuration.set(
+        HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC");
+    TestCopyMapper.setCluster(new MiniDFSCluster.Builder(configuration)
+                .numDataNodes(1)
+                .format(true)
+                .build());
+  }
+
+  @Override
+  protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() {
+    return true;
+  }
+
+  @Override
+  protected boolean expectDifferentBytesPerCrcToSucceed() {
+    return true;
+  }
+}