瀏覽代碼

HDFS-3177. Update DFSClient and DataXceiver to handle different checkum types in file checksum computation. Contributed by Kihwal Lee

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1376928 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父節點
當前提交
c46de830da

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -421,6 +421,9 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3832. Remove protocol methods related to DistributedUpgrade. (suresh)
 
+    HDFS-3177. Update DFSClient and DataXceiver to handle different checkum
+    types in file checksum computation.  (Kihwal Lee via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

+ 27 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -89,7 +89,9 @@ import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -1603,7 +1605,8 @@ public class DFSClient implements java.io.Closeable {
     }
     List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
     final DataOutputBuffer md5out = new DataOutputBuffer();
-    int bytesPerCRC = 0;
+    int bytesPerCRC = -1;
+    DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
     long crcPerBlock = 0;
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
@@ -1707,6 +1710,17 @@ public class DFSClient implements java.io.Closeable {
               checksumData.getMd5().toByteArray());
           md5.write(md5out);
           
+          // read crc-type
+          final DataChecksum.Type ct = HdfsProtoUtil.
+              fromProto(checksumData.getCrcType());
+          if (i == 0) { // first block
+            crcType = ct;
+          } else if (crcType != DataChecksum.Type.MIXED
+              && crcType != ct) {
+            // if crc types are mixed in a file
+            crcType = DataChecksum.Type.MIXED;
+          }
+
           done = true;
 
           if (LOG.isDebugEnabled()) {
@@ -1732,7 +1746,18 @@ public class DFSClient implements java.io.Closeable {
 
     //compute file MD5
     final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 
-    return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
+    switch (crcType) {
+      case CRC32:
+        return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      case CRC32C:
+        return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      default:
+        // we should never get here since the validity was checked
+        // when getCrcType() was called above.
+        return null;
+    }
   }
 
   /**

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.Lists;
@@ -155,6 +156,14 @@ public abstract class HdfsProtoUtil {
     return ret;
   }
 
+  public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
+    return DataChecksum.Type.valueOf(type.name());
+  }
+
+  public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.name());
+  }
+
   public static InputStream vintPrefixed(final InputStream input)
   throws IOException {
     final int firstByte = input.read();
@@ -167,4 +176,4 @@ public abstract class HdfsProtoUtil {
   
     return new ExactSizeInputStream(input, size);
   }
-}
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -609,6 +609,7 @@ class DataXceiver extends Receiver implements Runnable {
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
+          .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
           )
         .build()
         .writeDelimitedTo(out);

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -185,4 +185,5 @@ message OpBlockChecksumResponseProto {
   required uint32 bytesPerCrc = 1;
   required uint64 crcPerBlock = 2;
   required bytes md5 = 3;
+  optional ChecksumTypeProto crcType = 4 [default = CRC32];
 }

+ 10 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.VolumeId;
@@ -708,9 +709,16 @@ public class TestDistributedFileSystem {
       out2.close();
 
       // the two checksums must be different.
-      FileChecksum sum1 = dfs.getFileChecksum(path1);
-      FileChecksum sum2 = dfs.getFileChecksum(path2);
+      MD5MD5CRC32FileChecksum sum1 =
+          (MD5MD5CRC32FileChecksum)dfs.getFileChecksum(path1);
+      MD5MD5CRC32FileChecksum sum2 =
+          (MD5MD5CRC32FileChecksum)dfs.getFileChecksum(path2);
       assertFalse(sum1.equals(sum2));
+
+      // check the individual params
+      assertEquals(DataChecksum.Type.CRC32C, sum1.getCrcType());
+      assertEquals(DataChecksum.Type.CRC32,  sum2.getCrcType());
+
     } finally {
       if (cluster != null) {
         cluster.getFileSystem().delete(testBasePath, true);