Browse Source

HADOOP-8240. Add a new API to allow users to specify a checksum type on FileSystem.create(..). Contributed by Kihwal Lee

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1374696 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 năm trước cách đây
mục cha
commit
b0ea77303b
27 tập tin đã thay đổi với 490 bổ sung71 xóa
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 21 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
  3. 10 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
  4. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java
  5. 4 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
  6. 40 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  7. 4 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
  8. 3 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java
  9. 12 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java
  10. 128 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
  11. 10 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
  12. 11 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java
  13. 3 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java
  14. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
  15. 3 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
  16. 2 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestAfsCheckPath.java
  17. 16 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
  18. 70 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsOptions.java
  19. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
  20. 41 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  21. 9 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  22. 7 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
  24. 15 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  25. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
  26. 54 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
  27. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -222,6 +222,9 @@ Branch-2 ( Unreleased changes )
     HADOOP-7754. Expose file descriptors from Hadoop-wrapped local 
     FileSystems (todd and ahmed via tucu)
 
+    HADOOP-8240. Add a new API to allow users to specify a checksum type
+    on FileSystem.create(..).  (Kihwal Lee via szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual

+ 21 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,6 +47,7 @@ import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -467,6 +469,7 @@ public abstract class AbstractFileSystem {
     short replication = -1;
     long blockSize = -1;
     int bytesPerChecksum = -1;
+    ChecksumOpt checksumOpt = null;
     FsPermission permission = null;
     Progressable progress = null;
     Boolean createParent = null;
@@ -496,6 +499,12 @@ public abstract class AbstractFileSystem {
               "BytesPerChecksum option is set multiple times");
         }
         bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
+      } else if (CreateOpts.ChecksumParam.class.isInstance(iOpt)) {
+        if (checksumOpt != null) {
+          throw new  HadoopIllegalArgumentException(
+              "CreateChecksumType option is set multiple times");
+        }
+        checksumOpt = ((CreateOpts.ChecksumParam) iOpt).getValue();
       } else if (CreateOpts.Perms.class.isInstance(iOpt)) {
         if (permission != null) {
           throw new HadoopIllegalArgumentException(
@@ -533,9 +542,16 @@ public abstract class AbstractFileSystem {
     if (blockSize == -1) {
       blockSize = ssDef.getBlockSize();
     }
-    if (bytesPerChecksum == -1) {
-      bytesPerChecksum = ssDef.getBytesPerChecksum();
-    }
+
+    // Create a checksum option honoring user input as much as possible.
+    // If bytesPerChecksum is specified, it will override the one set in
+    // checksumOpt. Any missing value will be filled in using the default.
+    ChecksumOpt defaultOpt = new ChecksumOpt(
+        ssDef.getChecksumType(),
+        ssDef.getBytesPerChecksum());
+    checksumOpt = ChecksumOpt.processChecksumOpt(defaultOpt,
+        checksumOpt, bytesPerChecksum);
+
     if (bufferSize == -1) {
       bufferSize = ssDef.getFileBufferSize();
     }
@@ -552,7 +568,7 @@ public abstract class AbstractFileSystem {
     }
 
     return this.createInternal(f, createFlag, permission, bufferSize,
-      replication, blockSize, progress, bytesPerChecksum, createParent);
+      replication, blockSize, progress, checksumOpt, createParent);
   }
 
   /**
@@ -563,7 +579,7 @@ public abstract class AbstractFileSystem {
   public abstract FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> flag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent)
+      ChecksumOpt checksumOpt, boolean createParent)
       throws AccessControlException, FileAlreadyExistsException,
       FileNotFoundException, ParentNotDirectoryException,
       UnsupportedFileSystemException, UnresolvedLinkException, IOException;

+ 10 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.PureJavaCrc32;
@@ -324,13 +325,17 @@ public abstract class ChecksumFs extends FilterFs {
       final EnumSet<CreateFlag> createFlag,
       final FsPermission absolutePermission, final int bufferSize,
       final short replication, final long blockSize, 
-      final Progressable progress, final int bytesPerChecksum,
+      final Progressable progress, final ChecksumOpt checksumOpt,
       final boolean createParent) throws IOException {
       super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
 
+      // checksumOpt is passed down to the raw fs. Unless it implements
+      // checksum impelemts internally, checksumOpt will be ignored.
+      // If the raw fs does checksum internally, we will end up with
+      // two layers of checksumming. i.e. checksumming checksum file.
       this.datas = fs.getRawFs().createInternal(file, createFlag,
           absolutePermission, bufferSize, replication, blockSize, progress,
-           bytesPerChecksum,  createParent);
+           checksumOpt,  createParent);
       
       // Now create the chekcsumfile; adjust the buffsize
       int bytesPerSum = fs.getBytesPerSum();
@@ -338,7 +343,7 @@ public abstract class ChecksumFs extends FilterFs {
       this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
           absolutePermission, sumBufferSize, replication, blockSize, progress,
-          bytesPerChecksum, createParent);
+          checksumOpt, createParent);
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
       sums.writeInt(bytesPerSum);
     }
@@ -361,12 +366,11 @@ public abstract class ChecksumFs extends FilterFs {
   public FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
-
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     final FSDataOutputStream out = new FSDataOutputStream(
         new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
             bufferSize, replication, blockSize, progress,
-            bytesPerChecksum,  createParent), null);
+            checksumOpt,  createParent), null);
     return out;
   }
 

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java

@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.token.Token;
@@ -62,7 +63,7 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
   public FSDataOutputStream createInternal (Path f,
       EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
       short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     checkPath(f);
     
     // Default impl assumes that permissions do not matter
@@ -81,8 +82,8 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
       }
       // parent does exist - go ahead with create of file.
     }
-    return fsImpl.primitiveCreate(f, absolutePermission, flag, 
-        bufferSize, replication, blockSize, progress, bytesPerChecksum);
+    return fsImpl.primitiveCreate(f, absolutePermission, flag,
+        bufferSize, replication, blockSize, progress, checksumOpt);
   }
 
   @Override

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

@@ -127,7 +127,8 @@ import org.apache.hadoop.util.ShutdownHookManager;
  *  <li> replication factor
  *  <li> block size
  *  <li> buffer size
- *  <li> bytesPerChecksum (if used).
+ *  <li> encryptDataTransfer 
+ *  <li> checksum option. (checksumType and  bytesPerChecksum)
  *  </ul>
  *
  * <p>
@@ -613,7 +614,8 @@ public final class FileContext {
    *          <li>BufferSize - buffersize used in FSDataOutputStream
    *          <li>Blocksize - block size for file blocks
    *          <li>ReplicationFactor - replication for blocks
-   *          <li>BytesPerChecksum - bytes per checksum
+   *          <li>ChecksumParam - Checksum parameters. server default is used
+   *          if not specified.
    *          </ul>
    *          </ul>
    * 

+ 40 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.MultipleIOException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -656,14 +658,17 @@ public abstract class FileSystem extends Configured implements Closeable {
   @Deprecated
   public FsServerDefaults getServerDefaults() throws IOException {
     Configuration conf = getConf();
+    // CRC32 is chosen as default as it is available in all 
+    // releases that support checksum.
+    // The client trash configuration is ignored.
     return new FsServerDefaults(getDefaultBlockSize(), 
         conf.getInt("io.bytes.per.checksum", 512), 
         64 * 1024, 
         getDefaultReplication(),
         conf.getInt("io.file.buffer.size", 4096),
         false,
-        // NB: ignoring the client trash configuration
-        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT);
+        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
+        DataChecksum.Type.CRC32);
   }
 
   /**
@@ -889,11 +894,40 @@ public abstract class FileSystem extends Configured implements Closeable {
       short replication,
       long blockSize,
       Progressable progress) throws IOException {
-    // only DFS support this
-    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
+    return create(f, permission, flags, bufferSize, replication,
+        blockSize, progress, null);
   }
   
-  
+  /**
+   * Create an FSDataOutputStream at the indicated Path with a custom
+   * checksum option
+   * @param f the file name to open
+   * @param permission
+   * @param flags {@link CreateFlag}s to use for this stream.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize
+   * @param progress
+   * @param checksumOpt checksum parameter. If null, the values
+   *        found in conf will be used.
+   * @throws IOException
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(Path f,
+      FsPermission permission,
+      EnumSet<CreateFlag> flags,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress,
+      ChecksumOpt checksumOpt) throws IOException {
+    // Checksum options are ignored by default. The file systems that
+    // implement checksum need to override this method. The full
+    // support is currently only available in DFS.
+    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
+        bufferSize, replication, blockSize, progress);
+  }
+
   /*.
    * This create has been added to support the FileContext that processes
    * the permission
@@ -905,7 +939,7 @@ public abstract class FileSystem extends Configured implements Closeable {
   protected FSDataOutputStream primitiveCreate(Path f,
      FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
      short replication, long blockSize, Progressable progress,
-     int bytesPerChecksum) throws IOException {
+     ChecksumOpt checksumOpt) throws IOException {
 
     boolean pathExists = exists(f);
     CreateFlag.validate(f, pathExists, flag);

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.util.Progressable;
 
 /****************************************************************
@@ -410,10 +411,11 @@ public class FilterFileSystem extends FileSystem {
   @Override
   protected FSDataOutputStream primitiveCreate(Path f,
       FsPermission absolutePermission, EnumSet<CreateFlag> flag,
-      int bufferSize, short replication, long blockSize, Progressable progress, int bytesPerChecksum)
+      int bufferSize, short replication, long blockSize,
+      Progressable progress, ChecksumOpt checksumOpt)
       throws IOException {
     return fs.primitiveCreate(f, absolutePermission, flag,
-        bufferSize, replication, blockSize, progress, bytesPerChecksum);
+        bufferSize, replication, blockSize, progress, checksumOpt);
   }
 
   @Override

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
@@ -81,11 +82,11 @@ public abstract class FilterFs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(Path f,
     EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum, boolean createParent) 
+    ChecksumOpt checksumOpt, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     checkPath(f);
     return myFs.createInternal(f, flag, absolutePermission, bufferSize,
-        replication, blockSize, progress, bytesPerChecksum, createParent);
+        replication, blockSize, progress, checksumOpt, createParent);
   }
 
   @Override

+ 12 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 /****************************************************
  * Provides server default configuration values to clients.
@@ -50,13 +52,15 @@ public class FsServerDefaults implements Writable {
   private int fileBufferSize;
   private boolean encryptDataTransfer;
   private long trashInterval;
+  private DataChecksum.Type checksumType;
 
   public FsServerDefaults() {
   }
 
   public FsServerDefaults(long blockSize, int bytesPerChecksum,
       int writePacketSize, short replication, int fileBufferSize,
-      boolean encryptDataTransfer, long trashInterval) {
+      boolean encryptDataTransfer, long trashInterval,
+      DataChecksum.Type checksumType) {
     this.blockSize = blockSize;
     this.bytesPerChecksum = bytesPerChecksum;
     this.writePacketSize = writePacketSize;
@@ -64,6 +68,7 @@ public class FsServerDefaults implements Writable {
     this.fileBufferSize = fileBufferSize;
     this.encryptDataTransfer = encryptDataTransfer;
     this.trashInterval = trashInterval;
+    this.checksumType = checksumType;
   }
 
   public long getBlockSize() {
@@ -94,6 +99,10 @@ public class FsServerDefaults implements Writable {
     return trashInterval;
   }
 
+  public DataChecksum.Type getChecksumType() {
+    return checksumType;
+  }
+
   // /////////////////////////////////////////
   // Writable
   // /////////////////////////////////////////
@@ -104,6 +113,7 @@ public class FsServerDefaults implements Writable {
     out.writeInt(writePacketSize);
     out.writeShort(replication);
     out.writeInt(fileBufferSize);
+    WritableUtils.writeEnum(out, checksumType);
   }
 
   @InterfaceAudience.Private
@@ -113,5 +123,6 @@ public class FsServerDefaults implements Writable {
     writePacketSize = in.readInt();
     replication = in.readShort();
     fileBufferSize = in.readInt();
+    checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
   }
 }

+ 128 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

@@ -20,7 +20,9 @@ package org.apache.hadoop.fs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 
 /**
  * This class contains options related to file system operations.
@@ -46,6 +48,10 @@ public final class Options {
     public static BytesPerChecksum bytesPerChecksum(short crc) {
       return new BytesPerChecksum(crc);
     }
+    public static ChecksumParam checksumParam(
+        ChecksumOpt csumOpt) {
+      return new ChecksumParam(csumOpt);
+    }
     public static Perms perms(FsPermission perm) {
       return new Perms(perm);
     }
@@ -91,7 +97,8 @@ public final class Options {
       }
       public int getValue() { return bufferSize; }
     }
-    
+
+    /** This is not needed if ChecksumParam is specified. **/
     public static class BytesPerChecksum extends CreateOpts {
       private final int bytesPerChecksum;
       protected BytesPerChecksum(short bpc) { 
@@ -103,6 +110,14 @@ public final class Options {
       }
       public int getValue() { return bytesPerChecksum; }
     }
+
+    public static class ChecksumParam extends CreateOpts {
+      private final ChecksumOpt checksumOpt;
+      protected ChecksumParam(ChecksumOpt csumOpt) {
+        checksumOpt = csumOpt;
+      }
+      public ChecksumOpt getValue() { return checksumOpt; }
+    }
     
     public static class Perms extends CreateOpts {
       private final FsPermission permissions;
@@ -206,4 +221,116 @@ public final class Options {
       return code;
     }
   }
+
+  /**
+   * This is used in FileSystem and FileContext to specify checksum options.
+   */
+  public static class ChecksumOpt {
+    private final int crcBlockSize;
+    private final DataChecksum.Type crcType;
+
+    /**
+     * Create a uninitialized one
+     */
+    public ChecksumOpt() {
+      crcBlockSize = -1;
+      crcType = DataChecksum.Type.DEFAULT;
+    }
+
+    /**
+     * Normal ctor
+     * @param type checksum type
+     * @param size bytes per checksum
+     */
+    public ChecksumOpt(DataChecksum.Type type, int size) {
+      crcBlockSize = size;
+      crcType = type;
+    }
+
+    public int getBytesPerChecksum() {
+      return crcBlockSize;
+    }
+
+    public DataChecksum.Type getChecksumType() {
+      return crcType;
+    }
+
+    /**
+     * Create a ChecksumOpts that disables checksum
+     */
+    public static ChecksumOpt createDisabled() {
+      return new ChecksumOpt(DataChecksum.Type.NULL, -1);
+    }
+
+    /**
+     * A helper method for processing user input and default value to 
+     * create a combined checksum option. This is a bit complicated because
+     * bytesPerChecksum is kept for backward compatibility.
+     *
+     * @param defaultOpt Default checksum option
+     * @param userOpt User-specified checksum option. Ignored if null.
+     * @param userBytesPerChecksum User-specified bytesPerChecksum
+     *                Ignored if < 0.
+     */
+    public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt, 
+        ChecksumOpt userOpt, int userBytesPerChecksum) {
+      // The following is done to avoid unnecessary creation of new objects.
+      // tri-state variable: 0 default, 1 userBytesPerChecksum, 2 userOpt
+      short whichSize;
+      // true default, false userOpt
+      boolean useDefaultType;
+      
+      //  bytesPerChecksum - order of preference
+      //    user specified value in bytesPerChecksum
+      //    user specified value in checksumOpt
+      //    default.
+      if (userBytesPerChecksum > 0) {
+        whichSize = 1; // userBytesPerChecksum
+      } else if (userOpt != null && userOpt.getBytesPerChecksum() > 0) {
+        whichSize = 2; // userOpt
+      } else {
+        whichSize = 0; // default
+      }
+
+      // checksum type - order of preference
+      //   user specified value in checksumOpt
+      //   default.
+      if (userOpt != null &&
+            userOpt.getChecksumType() != DataChecksum.Type.DEFAULT) {
+        useDefaultType = false;
+      } else {
+        useDefaultType = true;
+      }
+
+      // Short out the common and easy cases
+      if (whichSize == 0 && useDefaultType) {
+        return defaultOpt;
+      } else if (whichSize == 2 && !useDefaultType) {
+        return userOpt;
+      }
+
+      // Take care of the rest of combinations
+      DataChecksum.Type type = useDefaultType ? defaultOpt.getChecksumType() :
+          userOpt.getChecksumType();
+      if (whichSize == 0) {
+        return new ChecksumOpt(type, defaultOpt.getBytesPerChecksum());
+      } else if (whichSize == 1) {
+        return new ChecksumOpt(type, userBytesPerChecksum);
+      } else {
+        return new ChecksumOpt(type, userOpt.getBytesPerChecksum());
+      }
+    }
+
+    /**
+     * A helper method for processing user input and default value to 
+     * create a combined checksum option. 
+     *
+     * @param defaultOpt Default checksum option
+     * @param userOpt User-specified checksum option
+     */
+    public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt,
+        ChecksumOpt userOpt) {
+      return processChecksumOpt(defaultOpt, userOpt, -1);
+    }
+  }
 }

+ 10 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java

@@ -23,10 +23,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This class contains constants for configuration keys used
  * in the ftp file system.
+ *
+ * Note that the settings for unimplemented features are ignored. 
+ * E.g. checksum related settings are just place holders. Even when
+ * wrapped with {@link ChecksumFileSystem}, these settings are not
+ * used. 
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -46,6 +52,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
   public static final int     CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
   public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final long    FS_TRASH_INTERVAL_DEFAULT = 0;
+  public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
+      DataChecksum.Type.CRC32;
   
   protected static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
@@ -55,7 +63,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
         REPLICATION_DEFAULT,
         STREAM_BUFFER_SIZE_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
-        FS_TRASH_INTERVAL_DEFAULT);
+        FS_TRASH_INTERVAL_DEFAULT,
+        CHECKSUM_TYPE_DEFAULT);
   }
 }
   

+ 11 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java

@@ -24,11 +24,18 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This class contains constants for configuration keys used
  * in the local file system, raw local fs and checksum fs.
+ *
+ * Note that the settings for unimplemented features are ignored. 
+ * E.g. checksum related settings are just place holders. Even when
+ * wrapped with {@link ChecksumFileSystem}, these settings are not
+ * used.
  */
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class LocalConfigKeys extends CommonConfigurationKeys {
@@ -45,7 +52,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
   public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
   public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
-
+  public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
+      DataChecksum.Type.CRC32;
   public static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
         BLOCK_SIZE_DEFAULT,
@@ -54,7 +62,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
         REPLICATION_DEFAULT,
         STREAM_BUFFER_SIZE_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
-        FS_TRASH_INTERVAL_DEFAULT);
+        FS_TRASH_INTERVAL_DEFAULT,
+        CHECKSUM_TYPE_DEFAULT);
   }
 }
   

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -159,11 +160,11 @@ class ChRootedFs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(final Path f,
       final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
       final int bufferSize, final short replication, final long blockSize,
-      final Progressable progress, final int bytesPerChecksum,
+      final Progressable progress, final ChecksumOpt checksumOpt,
       final boolean createParent) throws IOException, UnresolvedLinkException {
     return myFs.createInternal(fullPath(f), flag,
         absolutePermission, bufferSize,
-        replication, blockSize, progress, bytesPerChecksum, createParent);
+        replication, blockSize, progress, checksumOpt, createParent);
   }
 
   @Override

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -265,7 +266,7 @@ public class ViewFs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(final Path f,
       final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
       final int bufferSize, final short replication, final long blockSize,
-      final Progressable progress, final int bytesPerChecksum,
+      final Progressable progress, final ChecksumOpt checksumOpt,
       final boolean createParent) throws AccessControlException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, UnsupportedFileSystemException,
@@ -283,7 +284,7 @@ public class ViewFs extends AbstractFileSystem {
     assert(res.remainingPath != null);
     return res.targetFileSystem.createInternal(res.remainingPath, flag,
         absolutePermission, bufferSize, replication,
-        blockSize, progress, bytesPerChecksum,
+        blockSize, progress, checksumOpt,
         createParent);
   }
 
@@ -632,7 +633,7 @@ public class ViewFs extends AbstractFileSystem {
     public FSDataOutputStream createInternal(final Path f,
         final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
         final int bufferSize, final short replication, final long blockSize,
-        final Progressable progress, final int bytesPerChecksum,
+        final Progressable progress, final ChecksumOpt checksumOpt,
         final boolean createParent) throws AccessControlException,
         FileAlreadyExistsException, FileNotFoundException,
         ParentNotDirectoryException, UnsupportedFileSystemException,

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java

@@ -43,12 +43,14 @@ public class DataChecksum implements Checksum {
   public static final int CHECKSUM_NULL    = 0;
   public static final int CHECKSUM_CRC32   = 1;
   public static final int CHECKSUM_CRC32C  = 2;
+  public static final int CHECKSUM_DEFAULT  = 3; 
  
   /** The checksum types */
   public static enum Type {
     NULL  (CHECKSUM_NULL, 0),
     CRC32 (CHECKSUM_CRC32, 4),
-    CRC32C(CHECKSUM_CRC32C, 4);
+    CRC32C(CHECKSUM_CRC32C, 4),
+    DEFAULT(CHECKSUM_DEFAULT, 0); // This cannot be used to create DataChecksum
 
     public final int id;
     public final int size;

+ 2 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestAfsCheckPath.java

@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import java.util.EnumSet;
 
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Test;
@@ -76,7 +77,7 @@ public class TestAfsCheckPath {
     @Override
     public FSDataOutputStream createInternal(Path f, EnumSet<CreateFlag> flag,
         FsPermission absolutePermission, int bufferSize, short replication,
-        long blockSize, Progressable progress, int bytesPerChecksum,
+        long blockSize, Progressable progress, ChecksumOpt checksumOpt,
         boolean createParent) throws IOException {
       // deliberately empty
       return null;

+ 16 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

@@ -32,6 +32,7 @@ import java.util.Iterator;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.security.Credentials;
@@ -80,6 +81,11 @@ public class TestFilterFileSystem {
             Progressable progress) throws IOException {
       return null;
     }
+    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+            EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+            Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+      return null;
+    }
     public boolean mkdirs(Path f) { return false; }
     public FSDataInputStream open(Path f) { return null; }
     public FSDataOutputStream create(Path f) { return null; }
@@ -138,6 +144,16 @@ public class TestFilterFileSystem {
         Progressable progress) throws IOException {
       return null;
     }
+    public FSDataOutputStream create(Path f,
+        FsPermission permission,
+        EnumSet<CreateFlag> flags,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress,
+        ChecksumOpt checksumOpt) throws IOException {
+      return null;
+    }
     public String getName() { return null; }
     public boolean delete(Path f) { return false; }
     public short getReplication(Path src) { return 0 ; }

+ 70 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsOptions.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.util.DataChecksum;
+
+import org.junit.Test;
+
+public class TestFsOptions {
+
+  @Test
+  public void testProcessChecksumOpt() {
+    ChecksumOpt defaultOpt = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
+    ChecksumOpt finalOpt;
+
+    // Give a null 
+    finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, null);
+    checkParams(defaultOpt, finalOpt);
+
+    // null with bpc
+    finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, null, 1024);
+    checkParams(DataChecksum.Type.CRC32, 1024, finalOpt);
+
+    ChecksumOpt myOpt = new ChecksumOpt();
+
+    // custom with unspecified parameters
+    finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt);
+    checkParams(defaultOpt, finalOpt);
+
+    myOpt = new ChecksumOpt(DataChecksum.Type.CRC32C, 2048);
+
+    // custom config
+    finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt);
+    checkParams(DataChecksum.Type.CRC32C, 2048, finalOpt);
+
+    // custom config + bpc
+    finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt, 4096);
+    checkParams(DataChecksum.Type.CRC32C, 4096, finalOpt);
+  }
+
+  private void checkParams(ChecksumOpt expected, ChecksumOpt obtained) {
+    assertEquals(expected.getChecksumType(), obtained.getChecksumType());
+    assertEquals(expected.getBytesPerChecksum(), obtained.getBytesPerChecksum());
+  }
+
+  private void checkParams(DataChecksum.Type type, int bpc, ChecksumOpt obtained) {
+    assertEquals(type, obtained.getChecksumType());
+    assertEquals(bpc, obtained.getBytesPerChecksum());
+  }
+}

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSystem {
   public HdfsDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
         absolutePermission, createFlag, createParent, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum), getStatistics());
+        progress, bufferSize, checksumOpt), getStatistics());
   }
 
   @Override

+ 41 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -91,6 +91,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -203,8 +204,7 @@ public class DFSClient implements java.io.Closeable {
     final int maxBlockAcquireFailures;
     final int confTime;
     final int ioBufferSize;
-    final DataChecksum.Type checksumType;
-    final int bytesPerChecksum;
+    final ChecksumOpt defaultChecksumOpt;
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
@@ -243,9 +243,7 @@ public class DFSClient implements java.io.Closeable {
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-      checksumType = getChecksumType(conf);
-      bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
-          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      defaultChecksumOpt = getChecksumOptFromConf(conf);
       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsServerConstants.READ_TIMEOUT);
       /** dfs.write.packet.size is an internal config variable */
@@ -300,9 +298,32 @@ public class DFSClient implements java.io.Closeable {
       }
     }
 
-    private DataChecksum createChecksum() {
-      return DataChecksum.newDataChecksum(
-          checksumType, bytesPerChecksum);
+    // Construct a checksum option from conf
+    private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+      DataChecksum.Type type = getChecksumType(conf);
+      int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      return new ChecksumOpt(type, bytesPerChecksum);
+    }
+
+    // create a DataChecksum with the default option.
+    private DataChecksum createChecksum() throws IOException {
+      return createChecksum(null);
+    }
+
+    private DataChecksum createChecksum(ChecksumOpt userOpt) 
+        throws IOException {
+      // Fill in any missing field with the default.
+      ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
+          defaultChecksumOpt, userOpt);
+      DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+          myOpt.getChecksumType(),
+          myOpt.getBytesPerChecksum());
+      if (dataChecksum == null) {
+        throw new IOException("Invalid checksum type specified: "
+            + myOpt.getChecksumType().name());
+      }
+      return dataChecksum;
     }
   }
  
@@ -1143,12 +1164,13 @@ public class DFSClient implements java.io.Closeable {
     return create(src, FsPermission.getDefault(),
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
             : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
-        buffersize);
+        buffersize, null);
   }
 
   /**
    * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 
-   * long, Progressable, int)} with <code>createParent</code> set to true.
+   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+   *  set to true.
    */
   public DFSOutputStream create(String src, 
                              FsPermission permission,
@@ -1156,10 +1178,11 @@ public class DFSClient implements java.io.Closeable {
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
+                             int buffersize,
+                             ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize);
+        replication, blockSize, progress, buffersize, checksumOpt);
   }
 
   /**
@@ -1177,6 +1200,7 @@ public class DFSClient implements java.io.Closeable {
    * @param blockSize maximum block size
    * @param progress interface for reporting client progress
    * @param buffersize underlying buffer size 
+   * @param checksumOpts checksum options
    * 
    * @return output stream
    * 
@@ -1190,8 +1214,8 @@ public class DFSClient implements java.io.Closeable {
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
-    throws IOException {
+                             int buffersize,
+                             ChecksumOpt checksumOpt) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -1202,7 +1226,7 @@ public class DFSClient implements java.io.Closeable {
     }
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
-        buffersize, dfsClientConf.createChecksum());
+        buffersize, dfsClientConf.createChecksum(checksumOpt));
     beginFileLease(src, result);
     return result;
   }
@@ -1240,15 +1264,13 @@ public class DFSClient implements java.io.Closeable {
                              long blockSize,
                              Progressable progress,
                              int buffersize,
-                             int bytesPerChecksum)
+                             ChecksumOpt checksumOpt)
       throws IOException, UnresolvedLinkException {
     checkOpen();
     CreateFlag.validate(flag);
     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
     if (result == null) {
-      DataChecksum checksum = DataChecksum.newDataChecksum(
-          dfsClientConf.checksumType,
-          bytesPerChecksum);
+      DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
       result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
           checksum);

+ 9 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -258,19 +259,19 @@ public class DistributedFileSystem extends FileSystem {
   public HdfsDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
-    return create(f, permission,
+    return this.create(f, permission,
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
             : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
-        blockSize, progress);
+        blockSize, progress, null);
   }
   
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,
     EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
-    Progressable progress) throws IOException {
+    Progressable progress, ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
     final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
-        replication, blockSize, progress, bufferSize);
+        replication, blockSize, progress, bufferSize, checksumOpt);
     return new HdfsDataOutputStream(out, statistics);
   }
   
@@ -279,11 +280,11 @@ public class DistributedFileSystem extends FileSystem {
   protected HdfsDataOutputStream primitiveCreate(Path f,
     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum) throws IOException {
+    ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
     return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
         absolutePermission, flag, true, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum),statistics);
+        progress, bufferSize, checksumOpt),statistics);
    } 
 
   /**
@@ -298,7 +299,8 @@ public class DistributedFileSystem extends FileSystem {
       flag.add(CreateFlag.CREATE);
     }
     return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag,
-        false, replication, blockSize, progress, bufferSize), statistics);
+        false, replication, blockSize, progress, 
+        bufferSize, null), statistics);
   }
 
   @Override

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
@@ -134,6 +135,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
@@ -1003,7 +1005,8 @@ public class PBHelper {
         fs.getWritePacketSize(), (short) fs.getReplication(),
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
-        fs.getTrashInterval());
+        fs.getTrashInterval(),
+        DataChecksum.Type.valueOf(fs.getChecksumType().name()));
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -1015,7 +1018,9 @@ public class PBHelper {
       .setReplication(fs.getReplication())
       .setFileBufferSize(fs.getFileBufferSize())
       .setEncryptDataTransfer(fs.getEncryptDataTransfer())
-      .setTrashInterval(fs.getTrashInterval()).build();
+      .setTrashInterval(fs.getTrashInterval())
+      .setChecksumType(ChecksumTypeProto.valueOf(fs.getChecksumType().name()))
+      .build();
   }
   
   public static FsPermissionProto convert(FsPermission p) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java

@@ -215,7 +215,7 @@ public class DatanodeWebHdfsMethods {
             fullpath, permission.getFsPermission(), 
             overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
                 : EnumSet.of(CreateFlag.CREATE),
-            replication.getValue(conf), blockSize.getValue(conf), null, b), null);
+            replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
         IOUtils.copyBytes(in, out, b);
         out.close();
         out = null;

+ 15 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
@@ -195,6 +197,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
@@ -476,6 +479,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
             "must not be specified if HA is not enabled.");
       }
 
+      // Get the checksum type from config
+      String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
+      DataChecksum.Type checksumType;
+      try {
+         checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
+      } catch (IllegalArgumentException iae) {
+         throw new IOException("Invalid checksum type in "
+            + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
+      }
+
       this.serverDefaults = new FsServerDefaults(
           conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
           conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
@@ -483,7 +496,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
           conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
           conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
-          conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT));
+          conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
+          checksumType);
       
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -178,6 +178,15 @@ message HdfsFileStatusProto {
   optional LocatedBlocksProto locations = 12;  // suppled only if asked by client
 } 
 
+/**
+ * Checksum algorithms/types used in HDFS
+ */
+enum ChecksumTypeProto {
+  NULL = 0;
+  CRC32 = 1;
+  CRC32C = 2;
+}
+
 /**
  * HDFS Server Defaults
  */
@@ -189,6 +198,7 @@ message FsServerDefaultsProto {
   required uint32 fileBufferSize = 5;
   optional bool encryptDataTransfer = 6 [default = false];
   optional uint64 trashInterval = 7 [default = 0];
+  optional ChecksumTypeProto checksumType = 8 [default = CRC32];
 }
 
 

+ 54 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -36,16 +37,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.Test;
@@ -664,4 +668,54 @@ public class TestDistributedFileSystem {
           (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
     }
   }
+
+  @Test
+  public void testCreateWithCustomChecksum() throws Exception {
+    Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
+    MiniDFSCluster cluster = null;
+    Path testBasePath = new Path("/test/csum");
+    // create args 
+    Path path1 = new Path(testBasePath, "file_wtih_crc1");
+    Path path2 = new Path(testBasePath, "file_with_crc2");
+    ChecksumOpt opt1 = new ChecksumOpt(DataChecksum.Type.CRC32C, 512);
+    ChecksumOpt opt2 = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
+
+    // common args
+    FsPermission perm = FsPermission.getDefault().applyUMask(
+        FsPermission.getUMask(conf));
+    EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.OVERWRITE,
+        CreateFlag.CREATE);
+    short repl = 1;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      FileSystem dfs = cluster.getFileSystem();
+
+      dfs.mkdirs(testBasePath);
+
+      // create two files with different checksum types
+      FSDataOutputStream out1 = dfs.create(path1, perm, flags, 4096, repl,
+          131072L, null, opt1);
+      FSDataOutputStream out2 = dfs.create(path2, perm, flags, 4096, repl,
+          131072L, null, opt2);
+
+      for (int i = 0; i < 1024; i++) {
+        out1.write(i);
+        out2.write(i);
+      }
+      out1.close();
+      out2.close();
+
+      // the two checksums must be different.
+      FileChecksum sum1 = dfs.getFileChecksum(path1);
+      FileChecksum sum2 = dfs.getFileChecksum(path2);
+      assertFalse(sum1.equals(sum2));
+    } finally {
+      if (cluster != null) {
+        cluster.getFileSystem().delete(testBasePath, true);
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -430,7 +431,7 @@ public class TestResourceLocalizationService {
         new FSDataOutputStream(new DataOutputBuffer(), null);
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
-          anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
+          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
       final LocalResource resource = getPrivateMockedResource(r);
       final LocalResourceRequest req = new LocalResourceRequest(resource);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =