Sfoglia il codice sorgente

HADOOP-4185. Adds setVerifyChecksum() method to FileSystem. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@718543 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 anni fa
parent
commit
d1d523c636

+ 3 - 0
CHANGES.txt

@@ -115,6 +115,9 @@ Trunk (unreleased changes)
     the map task and a counter for counting the number of spilled records in both
     map and reduce tasks.
 
+    HADOOP-4185. Adds setVerifyChecksum() method to FileSystem.
+    (Sharad Agarwal via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

+ 12 - 4
src/core/org/apache/hadoop/fs/ChecksumFileSystem.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.util.StringUtils;
 public abstract class ChecksumFileSystem extends FilterFileSystem {
   private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
   private int bytesPerChecksum = 512;
+  private boolean verifyChecksum = true;
 
   public static double getApproxChkSumLength(long size) {
     return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
@@ -54,6 +55,13 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
     }
   }
+  
+  /**
+   * Set whether to verify checksum.
+   */
+  public void setVerifyChecksum(boolean verifyChecksum) {
+    this.verifyChecksum = verifyChecksum;
+  }
 
   /** get the raw file system */
   public FileSystem getRawFileSystem() {
@@ -127,14 +135,14 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         if (!Arrays.equals(version, CHECKSUM_VERSION))
           throw new IOException("Not a checksum file: "+sumFile);
         this.bytesPerSum = sums.readInt();
-        set(new CRC32(), bytesPerSum, 4);
+        set(fs.verifyChecksum, new CRC32(), bytesPerSum, 4);
       } catch (FileNotFoundException e) {         // quietly ignore
-        set(null, 1, 0);
+        set(fs.verifyChecksum, null, 1, 0);
       } catch (IOException e) {                   // loudly ignore
         LOG.warn("Problem opening checksum file: "+ file + 
                  ".  Ignoring exception: " + 
                  StringUtils.stringifyException(e));
-        set(null, 1, 0);
+        set(fs.verifyChecksum, null, 1, 0);
       }
     }
     
@@ -175,7 +183,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
       if( sums != null ) {
         sums.close();
       }
-      set(null, 1, 0);
+      set(fs.verifyChecksum, null, 1, 0);
     }
     
 

+ 4 - 3
src/core/org/apache/hadoop/fs/FSInputChecker.java

@@ -69,8 +69,7 @@ abstract public class FSInputChecker extends FSInputStream {
   protected FSInputChecker( Path file, int numOfRetries, 
       boolean verifyChecksum, Checksum sum, int chunkSize, int checksumSize ) {
     this(file, numOfRetries);
-    this.verifyChecksum = verifyChecksum;
-    set(sum, chunkSize, checksumSize);
+    set(verifyChecksum, sum, chunkSize, checksumSize);
   }
   
   /** Reads in next checksum chunk data into <code>buf</code> at <code>offset</code>
@@ -393,12 +392,14 @@ abstract public class FSInputChecker extends FSInputStream {
   
   /**
    * Set the checksum related parameters
+   * @param verifyChecksum whether to verify checksum
    * @param sum which type of checksum to use
    * @param maxChunkSize maximun chunk size
    * @param checksumSize checksum size
    */
-  final protected synchronized void set(
+  final protected synchronized void set(boolean verifyChecksum,
       Checksum sum, int maxChunkSize, int checksumSize ) {
+    this.verifyChecksum = verifyChecksum;
     this.sum = sum;
     this.buf = new byte[maxChunkSize];
     this.checksum = new byte[checksumSize];

+ 9 - 0
src/core/org/apache/hadoop/fs/FileSystem.java

@@ -1302,6 +1302,15 @@ public abstract class FileSystem extends Configured implements Closeable {
   public FileChecksum getFileChecksum(Path f) throws IOException {
     return null;
   }
+  
+  /**
+   * Set the verify checksum flag. This is only applicable if the 
+   * corresponding FileSystem supports checksum. By default doesn't do anything.
+   * @param verifyChecksum
+   */
+  public void setVerifyChecksum(boolean verifyChecksum) {
+    //doesn't do anything
+  }
 
   /**
    * Return a list of file status objects that corresponds to the list of paths

+ 5 - 0
src/core/org/apache/hadoop/fs/FilterFileSystem.java

@@ -249,6 +249,11 @@ public class FilterFileSystem extends FileSystem {
   public FileChecksum getFileChecksum(Path f) throws IOException {
     return fs.getFileChecksum(f);
   }
+  
+  /** {@inheritDoc} */
+  public void setVerifyChecksum(boolean verifyChecksum) {
+    fs.setVerifyChecksum(verifyChecksum);
+  }
 
   @Override
   public Configuration getConf() {

+ 1 - 4
src/core/org/apache/hadoop/fs/FsShell.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 /** Provide command line access to a FileSystem. */
 public class FsShell extends Configured implements Tool {
@@ -204,9 +203,7 @@ public class FsShell extends Configured implements Tool {
   private FileSystem getSrcFileSystem(Path src, boolean verifyChecksum
       ) throws IOException { 
     FileSystem srcFs = src.getFileSystem(getConf());
-    if (srcFs instanceof DistributedFileSystem) {
-      ((DistributedFileSystem)srcFs).setVerifyChecksum(verifyChecksum);
-    }
+    srcFs.setVerifyChecksum(verifyChecksum);
     return srcFs;
   }
 

+ 39 - 0
src/test/org/apache/hadoop/fs/TestChecksumFileSystem.java

@@ -66,4 +66,43 @@ public class TestChecksumFileSystem extends TestCase {
     inMemFs.delete(testPath, true);
     assertTrue("nothing in the namespace", inMemFs.listStatus(new Path("/")).length == 0);
   }
+  
+  public void testVerifyChecksum() throws Exception {
+    String TEST_ROOT_DIR
+    = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
+    
+    Configuration conf = new Configuration();
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    Path testPath = new Path(TEST_ROOT_DIR, "testPath");
+    Path testPath11 = new Path(TEST_ROOT_DIR, "testPath11");
+    FSDataOutputStream fout = localFs.create(testPath);
+    fout.write("testing".getBytes());
+    fout.close();
+    
+    fout = localFs.create(testPath11);
+    fout.write("testing you".getBytes());
+    fout.close();
+    
+    localFs.delete(localFs.getChecksumFile(testPath), true);
+    assertTrue("checksum deleted", !localFs.exists(localFs.getChecksumFile(testPath)));
+    
+    //copying the wrong checksum file
+    FileUtil.copy(localFs, localFs.getChecksumFile(testPath11), localFs, 
+        localFs.getChecksumFile(testPath),false,true,conf);
+    assertTrue("checksum exists", localFs.exists(localFs.getChecksumFile(testPath)));
+    
+    boolean errorRead = false;
+    try {
+      TestLocalFileSystem.readFile(localFs, testPath);
+    }catch(ChecksumException ie) {
+      errorRead = true;
+    }
+    assertTrue("error reading", errorRead);
+    
+    //now setting verify false, the read should succeed
+    localFs.setVerifyChecksum(false);
+    String str = TestLocalFileSystem.readFile(localFs, testPath);
+    assertTrue("read", "testing".equals(str));
+    
+  }
 }