Bläddra i källkod

HADOOP-8240. Add a new API to allow users to specify a checksum type on FileSystem.create(..). Contributed by Kihwal Lee

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1375380 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 år sedan
förälder
incheckning
f0bbf7da53
23 ändrade filer med 404 tillägg och 65 borttagningar
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 21 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
  3. 10 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
  4. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java
  5. 4 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
  6. 39 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  7. 4 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
  8. 3 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java
  9. 12 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java
  10. 128 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
  11. 10 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
  12. 11 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java
  13. 3 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java
  14. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
  15. 4 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
  16. 11 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
  17. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
  18. 41 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  19. 17 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
  21. 15 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  22. 54 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
  23. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -33,6 +33,9 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8700.  Use enum to define the checksum constants in DataChecksum.
     (szetszwo)
 
+    HADOOP-8240. Add a new API to allow users to specify a checksum type
+    on FileSystem.create(..).  (Kihwal Lee via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 21 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,6 +47,7 @@ import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -464,6 +466,7 @@ public abstract class AbstractFileSystem {
     short replication = -1;
     long blockSize = -1;
     int bytesPerChecksum = -1;
+    ChecksumOpt checksumOpt = null;
     FsPermission permission = null;
     Progressable progress = null;
     Boolean createParent = null;
@@ -493,6 +496,12 @@ public abstract class AbstractFileSystem {
               "BytesPerChecksum option is set multiple times");
         }
         bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
+      } else if (CreateOpts.ChecksumParam.class.isInstance(iOpt)) {
+        if (checksumOpt != null) {
+          throw new  HadoopIllegalArgumentException(
+              "CreateChecksumType option is set multiple times");
+        }
+        checksumOpt = ((CreateOpts.ChecksumParam) iOpt).getValue();
       } else if (CreateOpts.Perms.class.isInstance(iOpt)) {
         if (permission != null) {
           throw new HadoopIllegalArgumentException(
@@ -530,9 +539,16 @@ public abstract class AbstractFileSystem {
     if (blockSize == -1) {
       blockSize = ssDef.getBlockSize();
     }
-    if (bytesPerChecksum == -1) {
-      bytesPerChecksum = ssDef.getBytesPerChecksum();
-    }
+
+    // Create a checksum option honoring user input as much as possible.
+    // If bytesPerChecksum is specified, it will override the one set in
+    // checksumOpt. Any missing value will be filled in using the default.
+    ChecksumOpt defaultOpt = new ChecksumOpt(
+        ssDef.getChecksumType(),
+        ssDef.getBytesPerChecksum());
+    checksumOpt = ChecksumOpt.processChecksumOpt(defaultOpt,
+        checksumOpt, bytesPerChecksum);
+
     if (bufferSize == -1) {
       bufferSize = ssDef.getFileBufferSize();
     }
@@ -549,7 +565,7 @@ public abstract class AbstractFileSystem {
     }
 
     return this.createInternal(f, createFlag, permission, bufferSize,
-      replication, blockSize, progress, bytesPerChecksum, createParent);
+      replication, blockSize, progress, checksumOpt, createParent);
   }
 
   /**
@@ -560,7 +576,7 @@ public abstract class AbstractFileSystem {
   public abstract FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> flag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent)
+      ChecksumOpt checksumOpt, boolean createParent)
       throws AccessControlException, FileAlreadyExistsException,
       FileNotFoundException, ParentNotDirectoryException,
       UnsupportedFileSystemException, UnresolvedLinkException, IOException;

+ 10 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.PureJavaCrc32;
@@ -324,13 +325,17 @@ public abstract class ChecksumFs extends FilterFs {
       final EnumSet<CreateFlag> createFlag,
       final FsPermission absolutePermission, final int bufferSize,
       final short replication, final long blockSize, 
-      final Progressable progress, final int bytesPerChecksum,
+      final Progressable progress, final ChecksumOpt checksumOpt,
       final boolean createParent) throws IOException {
       super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
 
+      // checksumOpt is passed down to the raw fs. Unless it implements
+      // checksum impelemts internally, checksumOpt will be ignored.
+      // If the raw fs does checksum internally, we will end up with
+      // two layers of checksumming. i.e. checksumming checksum file.
       this.datas = fs.getRawFs().createInternal(file, createFlag,
           absolutePermission, bufferSize, replication, blockSize, progress,
-           bytesPerChecksum,  createParent);
+           checksumOpt,  createParent);
       
       // Now create the chekcsumfile; adjust the buffsize
       int bytesPerSum = fs.getBytesPerSum();
@@ -338,7 +343,7 @@ public abstract class ChecksumFs extends FilterFs {
       this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
           EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
           absolutePermission, sumBufferSize, replication, blockSize, progress,
-          bytesPerChecksum, createParent);
+          checksumOpt, createParent);
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
       sums.writeInt(bytesPerSum);
     }
@@ -361,12 +366,11 @@ public abstract class ChecksumFs extends FilterFs {
   public FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
-
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     final FSDataOutputStream out = new FSDataOutputStream(
         new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
             bufferSize, replication, blockSize, progress,
-            bytesPerChecksum,  createParent), null);
+            checksumOpt,  createParent), null);
     return out;
   }
 

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java

@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.token.Token;
@@ -62,7 +63,7 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
   public FSDataOutputStream createInternal (Path f,
       EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
       short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     checkPath(f);
     
     // Default impl assumes that permissions do not matter
@@ -81,8 +82,8 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
       }
       // parent does exist - go ahead with create of file.
     }
-    return fsImpl.primitiveCreate(f, absolutePermission, flag, 
-        bufferSize, replication, blockSize, progress, bytesPerChecksum);
+    return fsImpl.primitiveCreate(f, absolutePermission, flag,
+        bufferSize, replication, blockSize, progress, checksumOpt);
   }
 
   @Override

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

@@ -127,7 +127,8 @@ import org.apache.hadoop.util.ShutdownHookManager;
  *  <li> replication factor
  *  <li> block size
  *  <li> buffer size
- *  <li> bytesPerChecksum (if used).
+ *  <li> encryptDataTransfer 
+ *  <li> checksum option. (checksumType and  bytesPerChecksum)
  *  </ul>
  *
  * <p>
@@ -613,7 +614,8 @@ public final class FileContext {
    *          <li>BufferSize - buffersize used in FSDataOutputStream
    *          <li>Blocksize - block size for file blocks
    *          <li>ReplicationFactor - replication for blocks
-   *          <li>BytesPerChecksum - bytes per checksum
+   *          <li>ChecksumParam - Checksum parameters. server default is used
+   *          if not specified.
    *          </ul>
    *          </ul>
    * 

+ 39 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.MultipleIOException;
@@ -53,6 +54,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -646,11 +648,14 @@ public abstract class FileSystem extends Configured implements Closeable {
    */
   public FsServerDefaults getServerDefaults() throws IOException {
     Configuration conf = getConf();
+    // CRC32 is chosen as default as it is available in all 
+    // releases that support checksum.
     return new FsServerDefaults(getDefaultBlockSize(), 
         conf.getInt("io.bytes.per.checksum", 512), 
         64 * 1024, 
         getDefaultReplication(),
-        conf.getInt("io.file.buffer.size", 4096));
+        conf.getInt("io.file.buffer.size", 4096),
+        DataChecksum.Type.CRC32);
   }
 
   /**
@@ -855,7 +860,38 @@ public abstract class FileSystem extends Configured implements Closeable {
       short replication,
       long blockSize,
       Progressable progress) throws IOException;
-  
+
+   /**
+    * Create an FSDataOutputStream at the indicated Path with a custom
+    * checksum option. This create method is the common method to be
+    * used to specify ChecksumOpt in both 0.23.x and 2.x.
+    *
+    * @param f the file name to open
+    * @param permission
+    * @param flags {@link CreateFlag}s to use for this stream.
+    * @param bufferSize the size of the buffer to be used.
+    * @param replication required block replication for the file.
+    * @param blockSize
+    * @param progress
+    * @param checksumOpt checksum parameter. If null, the values
+    *        found in conf will be used.
+    * @throws IOException
+    * @see #setPermission(Path, FsPermission)
+    */
+   public FSDataOutputStream create(Path f,
+       FsPermission permission,
+       EnumSet<CreateFlag> flags,
+       int bufferSize,
+       short replication,
+       long blockSize,
+       Progressable progress,
+       ChecksumOpt checksumOpt) throws IOException {
+     // Checksum options are ignored by default. The file systems that
+     // implement checksum need to override this method. The full
+     // support is currently only available in DFS.
+     return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
+         bufferSize, replication, blockSize, progress);
+   }
   
   /*.
    * This create has been added to support the FileContext that processes
@@ -868,7 +904,7 @@ public abstract class FileSystem extends Configured implements Closeable {
   protected FSDataOutputStream primitiveCreate(Path f,
      FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
      short replication, long blockSize, Progressable progress,
-     int bytesPerChecksum) throws IOException {
+     ChecksumOpt checksumOpt) throws IOException {
 
     boolean pathExists = exists(f);
     CreateFlag.validate(f, pathExists, flag);

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.util.Progressable;
 
 /****************************************************************
@@ -410,10 +411,11 @@ public class FilterFileSystem extends FileSystem {
   @Override
   protected FSDataOutputStream primitiveCreate(Path f,
       FsPermission absolutePermission, EnumSet<CreateFlag> flag,
-      int bufferSize, short replication, long blockSize, Progressable progress, int bytesPerChecksum)
+      int bufferSize, short replication, long blockSize,
+      Progressable progress, ChecksumOpt checksumOpt)
       throws IOException {
     return fs.primitiveCreate(f, absolutePermission, flag,
-        bufferSize, replication, blockSize, progress, bytesPerChecksum);
+        bufferSize, replication, blockSize, progress, checksumOpt);
   }
 
   @Override

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
@@ -81,11 +82,11 @@ public abstract class FilterFs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(Path f,
     EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum, boolean createParent) 
+    ChecksumOpt checksumOpt, boolean createParent) 
       throws IOException, UnresolvedLinkException {
     checkPath(f);
     return myFs.createInternal(f, flag, absolutePermission, bufferSize,
-        replication, blockSize, progress, bytesPerChecksum, createParent);
+        replication, blockSize, progress, checksumOpt, createParent);
   }
 
   @Override

+ 12 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 /****************************************************
  * Provides server default configuration values to clients.
@@ -48,17 +50,20 @@ public class FsServerDefaults implements Writable {
   private int writePacketSize;
   private short replication;
   private int fileBufferSize;
+  private DataChecksum.Type checksumType;
 
   public FsServerDefaults() {
   }
 
   public FsServerDefaults(long blockSize, int bytesPerChecksum,
-      int writePacketSize, short replication, int fileBufferSize) {
+      int writePacketSize, short replication, int fileBufferSize,
+      DataChecksum.Type checksumType) {
     this.blockSize = blockSize;
     this.bytesPerChecksum = bytesPerChecksum;
     this.writePacketSize = writePacketSize;
     this.replication = replication;
     this.fileBufferSize = fileBufferSize;
+    this.checksumType = checksumType;
   }
 
   public long getBlockSize() {
@@ -81,6 +86,10 @@ public class FsServerDefaults implements Writable {
     return fileBufferSize;
   }
 
+  public DataChecksum.Type getChecksumType() {
+    return checksumType;
+  }
+
   // /////////////////////////////////////////
   // Writable
   // /////////////////////////////////////////
@@ -91,6 +100,7 @@ public class FsServerDefaults implements Writable {
     out.writeInt(writePacketSize);
     out.writeShort(replication);
     out.writeInt(fileBufferSize);
+    WritableUtils.writeEnum(out, checksumType);
   }
 
   @InterfaceAudience.Private
@@ -100,5 +110,6 @@ public class FsServerDefaults implements Writable {
     writePacketSize = in.readInt();
     replication = in.readShort();
     fileBufferSize = in.readInt();
+    checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
   }
 }

+ 128 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

@@ -20,7 +20,9 @@ package org.apache.hadoop.fs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 
 /**
  * This class contains options related to file system operations.
@@ -46,6 +48,10 @@ public final class Options {
     public static BytesPerChecksum bytesPerChecksum(short crc) {
       return new BytesPerChecksum(crc);
     }
+    public static ChecksumParam checksumParam(
+        ChecksumOpt csumOpt) {
+      return new ChecksumParam(csumOpt);
+    }
     public static Perms perms(FsPermission perm) {
       return new Perms(perm);
     }
@@ -91,7 +97,8 @@ public final class Options {
       }
       public int getValue() { return bufferSize; }
     }
-    
+
+    /** This is not needed if ChecksumParam is specified. **/
     public static class BytesPerChecksum extends CreateOpts {
       private final int bytesPerChecksum;
       protected BytesPerChecksum(short bpc) { 
@@ -103,6 +110,14 @@ public final class Options {
       }
       public int getValue() { return bytesPerChecksum; }
     }
+
+    public static class ChecksumParam extends CreateOpts {
+      private final ChecksumOpt checksumOpt;
+      protected ChecksumParam(ChecksumOpt csumOpt) {
+        checksumOpt = csumOpt;
+      }
+      public ChecksumOpt getValue() { return checksumOpt; }
+    }
     
     public static class Perms extends CreateOpts {
       private final FsPermission permissions;
@@ -206,4 +221,116 @@ public final class Options {
       return code;
     }
   }
+
+  /**
+   * This is used in FileSystem and FileContext to specify checksum options.
+   */
+  public static class ChecksumOpt {
+    private final int crcBlockSize;
+    private final DataChecksum.Type crcType;
+
+    /**
+     * Create a uninitialized one
+     */
+    public ChecksumOpt() {
+      crcBlockSize = -1;
+      crcType = DataChecksum.Type.DEFAULT;
+    }
+
+    /**
+     * Normal ctor
+     * @param type checksum type
+     * @param size bytes per checksum
+     */
+    public ChecksumOpt(DataChecksum.Type type, int size) {
+      crcBlockSize = size;
+      crcType = type;
+    }
+
+    public int getBytesPerChecksum() {
+      return crcBlockSize;
+    }
+
+    public DataChecksum.Type getChecksumType() {
+      return crcType;
+    }
+
+    /**
+     * Create a ChecksumOpts that disables checksum
+     */
+    public static ChecksumOpt createDisabled() {
+      return new ChecksumOpt(DataChecksum.Type.NULL, -1);
+    }
+
+    /**
+     * A helper method for processing user input and default value to 
+     * create a combined checksum option. This is a bit complicated because
+     * bytesPerChecksum is kept for backward compatibility.
+     *
+     * @param defaultOpt Default checksum option
+     * @param userOpt User-specified checksum option. Ignored if null.
+     * @param userBytesPerChecksum User-specified bytesPerChecksum
+     *                Ignored if < 0.
+     */
+    public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt, 
+        ChecksumOpt userOpt, int userBytesPerChecksum) {
+      // The following is done to avoid unnecessary creation of new objects.
+      // tri-state variable: 0 default, 1 userBytesPerChecksum, 2 userOpt
+      short whichSize;
+      // true default, false userOpt
+      boolean useDefaultType;
+      
+      //  bytesPerChecksum - order of preference
+      //    user specified value in bytesPerChecksum
+      //    user specified value in checksumOpt
+      //    default.
+      if (userBytesPerChecksum > 0) {
+        whichSize = 1; // userBytesPerChecksum
+      } else if (userOpt != null && userOpt.getBytesPerChecksum() > 0) {
+        whichSize = 2; // userOpt
+      } else {
+        whichSize = 0; // default
+      }
+
+      // checksum type - order of preference
+      //   user specified value in checksumOpt
+      //   default.
+      if (userOpt != null &&
+            userOpt.getChecksumType() != DataChecksum.Type.DEFAULT) {
+        useDefaultType = false;
+      } else {
+        useDefaultType = true;
+      }
+
+      // Short out the common and easy cases
+      if (whichSize == 0 && useDefaultType) {
+        return defaultOpt;
+      } else if (whichSize == 2 && !useDefaultType) {
+        return userOpt;
+      }
+
+      // Take care of the rest of combinations
+      DataChecksum.Type type = useDefaultType ? defaultOpt.getChecksumType() :
+          userOpt.getChecksumType();
+      if (whichSize == 0) {
+        return new ChecksumOpt(type, defaultOpt.getBytesPerChecksum());
+      } else if (whichSize == 1) {
+        return new ChecksumOpt(type, userBytesPerChecksum);
+      } else {
+        return new ChecksumOpt(type, userOpt.getBytesPerChecksum());
+      }
+    }
+
+    /**
+     * A helper method for processing user input and default value to 
+     * create a combined checksum option. 
+     *
+     * @param defaultOpt Default checksum option
+     * @param userOpt User-specified checksum option
+     */
+    public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt,
+        ChecksumOpt userOpt) {
+      return processChecksumOpt(defaultOpt, userOpt, -1);
+    }
+  }
 }

+ 10 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java

@@ -23,10 +23,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This class contains constants for configuration keys used
  * in the ftp file system.
+ *
+ * Note that the settings for unimplemented features are ignored. 
+ * E.g. checksum related settings are just place holders. Even when
+ * wrapped with {@link ChecksumFileSystem}, these settings are not
+ * used. 
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -44,6 +50,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
   public static final String  CLIENT_WRITE_PACKET_SIZE_KEY =
                                                 "ftp.client-write-packet-size";
   public static final int     CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
+      DataChecksum.Type.CRC32;
   
   protected static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
@@ -51,7 +59,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
         BYTES_PER_CHECKSUM_DEFAULT,
         CLIENT_WRITE_PACKET_SIZE_DEFAULT,
         REPLICATION_DEFAULT,
-        STREAM_BUFFER_SIZE_DEFAULT);
+        STREAM_BUFFER_SIZE_DEFAULT,
+        CHECKSUM_TYPE_DEFAULT);
   }
 }
   

+ 11 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java

@@ -24,11 +24,18 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This class contains constants for configuration keys used
  * in the local file system, raw local fs and checksum fs.
+ *
+ * Note that the settings for unimplemented features are ignored. 
+ * E.g. checksum related settings are just place holders. Even when
+ * wrapped with {@link ChecksumFileSystem}, these settings are not
+ * used.
  */
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class LocalConfigKeys extends CommonConfigurationKeys {
@@ -43,6 +50,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
   public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
                                                 "file.client-write-packet-size";
   public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
+      DataChecksum.Type.CRC32;
 
   public static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
@@ -50,7 +59,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
         BYTES_PER_CHECKSUM_DEFAULT,
         CLIENT_WRITE_PACKET_SIZE_DEFAULT,
         REPLICATION_DEFAULT,
-        STREAM_BUFFER_SIZE_DEFAULT);
+        STREAM_BUFFER_SIZE_DEFAULT,
+        CHECKSUM_TYPE_DEFAULT);
   }
 }
   

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -157,11 +158,11 @@ class ChRootedFs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(final Path f,
       final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
       final int bufferSize, final short replication, final long blockSize,
-      final Progressable progress, final int bytesPerChecksum,
+      final Progressable progress, final ChecksumOpt checksumOpt,
       final boolean createParent) throws IOException, UnresolvedLinkException {
     return myFs.createInternal(fullPath(f), flag,
         absolutePermission, bufferSize,
-        replication, blockSize, progress, bytesPerChecksum, createParent);
+        replication, blockSize, progress, checksumOpt, createParent);
   }
 
   @Override

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -264,7 +265,7 @@ public class ViewFs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(final Path f,
       final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
       final int bufferSize, final short replication, final long blockSize,
-      final Progressable progress, final int bytesPerChecksum,
+      final Progressable progress, final ChecksumOpt checksumOpt,
       final boolean createParent) throws AccessControlException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, UnsupportedFileSystemException,
@@ -282,7 +283,7 @@ public class ViewFs extends AbstractFileSystem {
     assert(res.remainingPath != null);
     return res.targetFileSystem.createInternal(res.remainingPath, flag,
         absolutePermission, bufferSize, replication,
-        blockSize, progress, bytesPerChecksum,
+        blockSize, progress, checksumOpt,
         createParent);
   }
 
@@ -631,7 +632,7 @@ public class ViewFs extends AbstractFileSystem {
     public FSDataOutputStream createInternal(final Path f,
         final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
         final int bufferSize, final short replication, final long blockSize,
-        final Progressable progress, final int bytesPerChecksum,
+        final Progressable progress, final ChecksumOpt checksumOpt,
         final boolean createParent) throws AccessControlException,
         FileAlreadyExistsException, FileNotFoundException,
         ParentNotDirectoryException, UnsupportedFileSystemException,

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java

@@ -43,12 +43,14 @@ public class DataChecksum implements Checksum {
   public static final int CHECKSUM_NULL    = 0;
   public static final int CHECKSUM_CRC32   = 1;
   public static final int CHECKSUM_CRC32C  = 2;
- 
+  public static final int CHECKSUM_DEFAULT = 3; 
+
   /** The checksum types */
   public static enum Type {
     NULL  (CHECKSUM_NULL, 0),
     CRC32 (CHECKSUM_CRC32, 4),
-    CRC32C(CHECKSUM_CRC32C, 4);
+    CRC32C(CHECKSUM_CRC32C, 4),
+    DEFAULT(CHECKSUM_DEFAULT, 0); // This cannot be used to create DataChecksum
 
     public final int id;
     public final int size;

+ 11 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

@@ -32,6 +32,7 @@ import java.util.Iterator;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.security.Credentials;
@@ -124,6 +125,16 @@ public class TestFilterFileSystem {
         Progressable progress) {
       return null;
     }
+    public FSDataOutputStream create(Path f,
+        FsPermission permission,
+        EnumSet<CreateFlag> flags,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress,
+        ChecksumOpt checksumOpt) throws IOException {
+      return null;
+    }
     public String getName() { return null; }
     public boolean delete(Path f) { return false; }
     public short getReplication(Path src) { return 0 ; }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSystem {
   public FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     return new FSDataOutputStream(dfs.primitiveCreate(getUriPath(f),
         absolutePermission, createFlag, createParent, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum), getStatistics());
+        progress, bufferSize, checksumOpt), getStatistics());
   }
 
   @Override

+ 41 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -171,8 +172,7 @@ public class DFSClient implements java.io.Closeable {
     final int maxBlockAcquireFailures;
     final int confTime;
     final int ioBufferSize;
-    final DataChecksum.Type checksumType;
-    final int bytesPerChecksum;
+    final ChecksumOpt defaultChecksumOpt;
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
@@ -197,9 +197,7 @@ public class DFSClient implements java.io.Closeable {
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-      checksumType = getChecksumType(conf);
-      bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
-          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      defaultChecksumOpt = getChecksumOptFromConf(conf);
       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsServerConstants.READ_TIMEOUT);
       /** dfs.write.packet.size is an internal config variable */
@@ -243,9 +241,32 @@ public class DFSClient implements java.io.Closeable {
       }
     }
 
-    private DataChecksum createChecksum() {
-      return DataChecksum.newDataChecksum(
-          checksumType, bytesPerChecksum);
+    // Construct a checksum option from conf
+    private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+      DataChecksum.Type type = getChecksumType(conf);
+      int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      return new ChecksumOpt(type, bytesPerChecksum);
+    }
+
+    // create a DataChecksum with the default option.
+    private DataChecksum createChecksum() throws IOException {
+      return createChecksum(null);
+    }
+
+    private DataChecksum createChecksum(ChecksumOpt userOpt) 
+        throws IOException {
+      // Fill in any missing field with the default.
+      ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
+          defaultChecksumOpt, userOpt);
+      DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+          myOpt.getChecksumType(),
+          myOpt.getBytesPerChecksum());
+      if (dataChecksum == null) {
+        throw new IOException("Invalid checksum type specified: "
+            + myOpt.getChecksumType().name());
+      }
+      return dataChecksum;
     }
   }
  
@@ -926,12 +947,13 @@ public class DFSClient implements java.io.Closeable {
     return create(src, FsPermission.getDefault(),
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
             : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
-        buffersize);
+        buffersize, null);
   }
 
   /**
    * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 
-   * long, Progressable, int)} with <code>createParent</code> set to true.
+   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+   *  set to true.
    */
   public OutputStream create(String src, 
                              FsPermission permission,
@@ -939,10 +961,11 @@ public class DFSClient implements java.io.Closeable {
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
+                             int buffersize,
+                             ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize);
+        replication, blockSize, progress, buffersize, checksumOpt);
   }
 
   /**
@@ -960,6 +983,7 @@ public class DFSClient implements java.io.Closeable {
    * @param blockSize maximum block size
    * @param progress interface for reporting client progress
    * @param buffersize underlying buffer size 
+   * @param checksumOpts checksum options
    * 
    * @return output stream
    * 
@@ -973,8 +997,8 @@ public class DFSClient implements java.io.Closeable {
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
-    throws IOException {
+                             int buffersize,
+                             ChecksumOpt checksumOpt) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -985,7 +1009,7 @@ public class DFSClient implements java.io.Closeable {
     }
     final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
         createParent, replication, blockSize, progress, buffersize,
-        dfsClientConf.createChecksum());
+        dfsClientConf.createChecksum(checksumOpt));
     beginFileLease(src, result);
     return result;
   }
@@ -1023,15 +1047,13 @@ public class DFSClient implements java.io.Closeable {
                              long blockSize,
                              Progressable progress,
                              int buffersize,
-                             int bytesPerChecksum)
+                             ChecksumOpt checksumOpt)
       throws IOException, UnresolvedLinkException {
     checkOpen();
     CreateFlag.validate(flag);
     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
     if (result == null) {
-      DataChecksum checksum = DataChecksum.newDataChecksum(
-          dfsClientConf.checksumType,
-          bytesPerChecksum);
+      DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
       result = new DFSOutputStream(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
           checksum);

+ 17 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -212,11 +213,19 @@ public class DistributedFileSystem extends FileSystem {
   public FSDataOutputStream create(Path f, FsPermission permission,
     boolean overwrite, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
-    statistics.incrementWriteOps(1);
-    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
+    return this.create(f, permission,
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
-        bufferSize), statistics);
+            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+        blockSize, progress, null);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
+    Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+    statistics.incrementWriteOps(1);
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission, cflags,
+        replication, blockSize, progress, bufferSize, checksumOpt), statistics);
   }
   
   @SuppressWarnings("deprecation")
@@ -224,11 +233,11 @@ public class DistributedFileSystem extends FileSystem {
   protected FSDataOutputStream primitiveCreate(Path f,
     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum) throws IOException {
+    ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementReadOps(1);
     return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f),
         absolutePermission, flag, true, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum),statistics);
+        progress, bufferSize, checksumOpt), statistics);
    } 
 
   /**
@@ -242,7 +251,8 @@ public class DistributedFileSystem extends FileSystem {
       flag.add(CreateFlag.CREATE);
     }
     return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
-        false, replication, blockSize, progress, bufferSize), statistics);
+        false, replication, blockSize, progress,
+         bufferSize, null), statistics);
   }
 
   @Override

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java

@@ -194,7 +194,7 @@ public class DatanodeWebHdfsMethods {
             fullpath, permission.getFsPermission(), 
             overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
                 : EnumSet.of(CreateFlag.CREATE),
-            replication.getValue(conf), blockSize.getValue(conf), null, b), null);
+            replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
         IOUtils.copyBytes(in, out, b);
         out.close();
         out = null;

+ 15 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -169,6 +171,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
@@ -485,13 +488,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
                                               DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
+
+    // Get the checksum type from config
+    String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
+    DataChecksum.Type checksumType;
+    try {
+       checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
+    } catch (IllegalArgumentException iae) {
+       throw new IOException("Invalid checksum type in "
+          + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
+    }
     
     this.serverDefaults = new FsServerDefaults(
         conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
         conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
         conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
         (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
-        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+        checksumType);
     
     this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                      DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

+ 54 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -27,18 +27,22 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Test;
@@ -568,4 +572,54 @@ public class TestDistributedFileSystem {
     testDFSClient();
     testFileChecksum();
   }
+
+  @Test
+  public void testCreateWithCustomChecksum() throws Exception {
+    Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
+    MiniDFSCluster cluster = null;
+    Path testBasePath = new Path("/test/csum");
+    // create args 
+    Path path1 = new Path(testBasePath, "file_wtih_crc1");
+    Path path2 = new Path(testBasePath, "file_with_crc2");
+    ChecksumOpt opt1 = new ChecksumOpt(DataChecksum.Type.CRC32C, 512);
+    ChecksumOpt opt2 = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
+
+    // common args
+    FsPermission perm = FsPermission.getDefault().applyUMask(
+        FsPermission.getUMask(conf));
+    EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.OVERWRITE,
+        CreateFlag.CREATE);
+    short repl = 1;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      FileSystem dfs = cluster.getFileSystem();
+
+      dfs.mkdirs(testBasePath);
+
+      // create two files with different checksum types
+      FSDataOutputStream out1 = dfs.create(path1, perm, flags, 4096, repl,
+          131072L, null, opt1);
+      FSDataOutputStream out2 = dfs.create(path2, perm, flags, 4096, repl,
+          131072L, null, opt2);
+
+      for (int i = 0; i < 1024; i++) {
+        out1.write(i);
+        out2.write(i);
+      }
+      out1.close();
+      out2.close();
+
+      // the two checksums must be different.
+      FileChecksum sum1 = dfs.getFileChecksum(path1);
+      FileChecksum sum2 = dfs.getFileChecksum(path2);
+      assertFalse(sum1.equals(sum2));
+    } finally {
+      if (cluster != null) {
+        cluster.getFileSystem().delete(testBasePath, true);
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -430,7 +431,7 @@ public class TestResourceLocalizationService {
         new FSDataOutputStream(new DataOutputBuffer(), null);
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
-          anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
+          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
       final LocalResource resource = getPrivateMockedResource(r);
       final LocalResourceRequest req = new LocalResourceRequest(resource);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =