浏览代码

svn merge -c 1408450 from branch-1 for HDFS-1539. A config option for the datanode to fsycn a block file when block is completely written.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1@1408452 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父节点
当前提交
edb1eacf51

+ 3 - 0
CHANGES.txt

@@ -17,6 +17,9 @@ Release 1.1.1 - Unreleased
     and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810,
     HDFS-3646 and HDFS-2240. (szetszwo)
 
+    HDFS-1539. A config option for the datanode to fsycn a block file
+    when block is completely written. (dhruba via szetszwo)
+
   BUG FIXES
 
     HADOOP-8878. Uppercase namenode hostname causes hadoop dfs calls with

+ 2 - 0
src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -113,6 +113,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final String  DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
   public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
+  public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
 
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";

+ 9 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -61,6 +61,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
+  private OutputStream cout = null; // output stream for cehcksum file
   private FileDescriptor outFd;
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
@@ -114,6 +115,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       this.finalized = false;
       if (streams != null) {
         this.out = streams.dataOut;
+        this.cout = streams.checksumOut;
         if (out instanceof FileOutputStream) {
           this.outFd = ((FileOutputStream) out).getFD();
         } else {
@@ -121,7 +123,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               + out.getClass());
         }
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-                                                  streams.checksumOut, 
+                                                  streams.checksumOut,
                                                   SMALL_BUFFER_SIZE));
         // If this block is for appends, then remove it from periodic
         // validation.
@@ -159,6 +161,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     try {
       if (checksumOut != null) {
         checksumOut.flush();
+        if (datanode.syncOnClose && (cout instanceof FileOutputStream)) {
+          ((FileOutputStream)cout).getChannel().force(true);
+        }
         checksumOut.close();
         checksumOut = null;
       }
@@ -169,6 +174,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     try {
       if (out != null) {
         out.flush();
+        if (datanode.syncOnClose && (out instanceof FileOutputStream)) {
+          ((FileOutputStream)out).getChannel().force(true);
+        }
         out.close();
         out = null;
       }

+ 7 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -237,6 +237,8 @@ public class DataNode extends Configured
   boolean isBlockTokenEnabled;
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
+  boolean syncOnClose;
+
   final String userWithLocalPathAccess;
   private boolean connectToDnViaHostname;
   private boolean relaxedVersionCheck;
@@ -453,6 +455,11 @@ public class DataNode extends Configured
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
+    
+    // do we need to sync block file contents to disk when blockfile is closed?
+    this.syncOnClose = conf.getBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, 
+                                       DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT);
+
     DataNode.nameNodeAddr = nameNodeAddr;
 
     //initialize periodic block scanner

+ 33 - 0
src/test/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -818,6 +818,39 @@ public class TestFileCreation {
     }
   }
 
+  /**
+   * Test creating a file whose data gets sync when closed
+   */
+  public void testFileCreationSyncOnClose() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, true);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      
+      Path[] p = {new Path("/foo"), new Path("/bar")};
+      
+      //write 2 files at the same time
+      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
+      int i = 0;
+      for(; i < 100; i++) {
+        out[0].write(i);
+        out[1].write(i);
+      }
+      out[0].close();
+      for(; i < 200; i++) {out[1].write(i);}
+      out[1].close();
+
+      //verify
+      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
+      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
+      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+  
   /**
    * Create a file, write something, fsync but not close.
    * Then change lease period and wait for lease recovery.