浏览代码

HDFS-744. Support hsync in HDFS. Contributed by Lars Hofhans

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344419 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 年之前
父节点
当前提交
83cf475050
共有 15 个文件被更改,包括 421 次插入44 次删除
  1. 9 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
  2. 51 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  3. 20 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
  4. 14 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
  5. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  6. 47 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  7. 12 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  8. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
  9. 43 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  10. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  11. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
  12. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
  14. 8 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  15. 192 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java

@@ -44,6 +44,9 @@ import org.apache.hadoop.classification.InterfaceStability;
  * else append to an existing file.</li>
  * else append to an existing file.</li>
  * <li> CREATE|OVERWRITE - to create a file if it does not exist, 
  * <li> CREATE|OVERWRITE - to create a file if it does not exist, 
  * else overwrite an existing file.</li>
  * else overwrite an existing file.</li>
+ * <li> SYNC_BLOCK - to force closed blocks to the disk device.
+ * In addition {@link Syncable#hsync()} should be called after each write,
+ * if true synchronous behavior is required.</li>
  * </ol>
  * </ol>
  * 
  * 
  * Following combination is not valid and will result in 
  * Following combination is not valid and will result in 
@@ -71,7 +74,12 @@ public enum CreateFlag {
   /**
   /**
    * Append to a file. See javadoc for more description.
    * Append to a file. See javadoc for more description.
    */
    */
-  APPEND((short) 0x04);
+  APPEND((short) 0x04),
+
+  /**
+   * Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description.
+   */
+  SYNC_BLOCK((short) 0x08);
 
 
   private final short mode;
   private final short mode;
 
 

+ 51 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -830,6 +830,30 @@ public abstract class FileSystem extends Configured implements Closeable {
       long blockSize,
       long blockSize,
       Progressable progress) throws IOException;
       Progressable progress) throws IOException;
   
   
+  /**
+   * Create an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open
+   * @param permission
+   * @param flags {@link CreateFlag}s to use for this stream.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize
+   * @param progress
+   * @throws IOException
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(Path f,
+      FsPermission permission,
+      EnumSet<CreateFlag> flags,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    // only DFS support this
+    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
+  }
+  
   
   
   /*.
   /*.
    * This create has been added to support the FileContext that processes
    * This create has been added to support the FileContext that processes
@@ -954,10 +978,35 @@ public abstract class FileSystem extends Configured implements Closeable {
    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
        boolean overwrite, int bufferSize, short replication, long blockSize,
        boolean overwrite, int bufferSize, short replication, long blockSize,
        Progressable progress) throws IOException {
        Progressable progress) throws IOException {
-     throw new IOException("createNonRecursive unsupported for this filesystem "
-         + this.getClass());
+     return createNonRecursive(f, permission,
+         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+             : EnumSet.of(CreateFlag.CREATE), bufferSize,
+             replication, blockSize, progress);
    }
    }
 
 
+   /**
+    * Opens an FSDataOutputStream at the indicated Path with write-progress
+    * reporting. Same as create(), except fails if parent directory doesn't
+    * already exist.
+    * @param f the file name to open
+    * @param permission
+    * @param flags {@link CreateFlag}s to use for this stream.
+    * @param bufferSize the size of the buffer to be used.
+    * @param replication required block replication for the file.
+    * @param blockSize
+    * @param progress
+    * @throws IOException
+    * @see #setPermission(Path, FsPermission)
+    * @deprecated API only for 0.20-append
+    */
+    @Deprecated
+    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+        Progressable progress) throws IOException {
+      throw new IOException("createNonRecursive unsupported for this filesystem "
+          + this.getClass());
+    }
+
   /**
   /**
    * Creates the given Path as a brand-new zero-length file.  If
    * Creates the given Path as a brand-new zero-length file.  If
    * create fails, or if it already existed, return false.
    * create fails, or if it already existed, return false.

+ 20 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

@@ -807,7 +807,7 @@ public class SequenceFile {
   }
   }
   
   
   /** Write key/value pairs to a sequence-format file. */
   /** Write key/value pairs to a sequence-format file. */
-  public static class Writer implements java.io.Closeable {
+  public static class Writer implements java.io.Closeable, Syncable {
     private Configuration conf;
     private Configuration conf;
     FSDataOutputStream out;
     FSDataOutputStream out;
     boolean ownOutputStream = true;
     boolean ownOutputStream = true;
@@ -1193,13 +1193,31 @@ public class SequenceFile {
       }
       }
     }
     }
 
 
-    /** flush all currently written data to the file system */
+    /**
+     * flush all currently written data to the file system
+     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
+     */
+    @Deprecated
     public void syncFs() throws IOException {
     public void syncFs() throws IOException {
       if (out != null) {
       if (out != null) {
         out.hflush();  // flush contents to file system
         out.hflush();  // flush contents to file system
       }
       }
     }
     }
 
 
+    @Override
+    public void hsync() throws IOException {
+      if (out != null) {
+        out.hsync();
+      }
+    }
+
+    @Override
+    public void hflush() throws IOException {
+      if (out != null) {
+        out.hflush();
+      }
+    }
+    
     /** Returns the configuration of this file. */
     /** Returns the configuration of this file. */
     Configuration getConf() { return conf; }
     Configuration getConf() { return conf; }
     
     

+ 14 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

@@ -74,6 +74,11 @@ public class TestFilterFileSystem {
         Progressable progress) throws IOException {
         Progressable progress) throws IOException {
       return null;
       return null;
     }
     }
+    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+            EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+            Progressable progress) throws IOException {
+      return null;
+    }
     public boolean mkdirs(Path f) { return false; }
     public boolean mkdirs(Path f) { return false; }
     public FSDataInputStream open(Path f) { return null; }
     public FSDataInputStream open(Path f) { return null; }
     public FSDataOutputStream create(Path f) { return null; }
     public FSDataOutputStream create(Path f) { return null; }
@@ -123,6 +128,15 @@ public class TestFilterFileSystem {
         Progressable progress) {
         Progressable progress) {
       return null;
       return null;
     }
     }
+    public FSDataOutputStream create(Path f,
+        FsPermission permission,
+        EnumSet<CreateFlag> flags,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress) throws IOException {
+      return null;
+    }
     public String getName() { return null; }
     public String getName() { return null; }
     public boolean delete(Path f) { return false; }
     public boolean delete(Path f) { return false; }
     public short getReplication(Path src) { return 0 ; }
     public short getReplication(Path src) { return 0 ; }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -13,6 +13,8 @@ Trunk (unreleased changes)
 
 
     HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
     HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
 
 
+    HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->

+ 47 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -129,11 +129,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   private long initialFileSize = 0; // at time of file open
   private long initialFileSize = 0; // at time of file open
   private Progressable progress;
   private Progressable progress;
   private final short blockReplication; // replication factor of file
   private final short blockReplication; // replication factor of file
+  private boolean shouldSyncBlock = false; // force blocks to disk upon close
   
   
   private class Packet {
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
     long    seqno;               // sequencenumber of buffer in block
     long    offsetInBlock;       // offset in block
     long    offsetInBlock;       // offset in block
-    boolean lastPacketInBlock;   // is this the last packet in block?
+    private boolean lastPacketInBlock;   // is this the last packet in block?
+    boolean syncBlock;          // this packet forces the current block to disk
     int     numChunks;           // number of chunks currently in packet
     int     numChunks;           // number of chunks currently in packet
     int     maxChunks;           // max chunks in packet
     int     maxChunks;           // max chunks in packet
 
 
@@ -245,7 +247,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
       buffer.mark();
       buffer.mark();
 
 
       PacketHeader header = new PacketHeader(
       PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       header.putInBuffer(buffer);
       header.putInBuffer(buffer);
       
       
       buffer.reset();
       buffer.reset();
@@ -1249,6 +1251,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
       long blockSize, Progressable progress, int buffersize,
       long blockSize, Progressable progress, int buffersize,
       DataChecksum checksum) throws IOException {
       DataChecksum checksum) throws IOException {
     this(dfsClient, src, blockSize, progress, checksum, replication);
     this(dfsClient, src, blockSize, progress, checksum, replication);
+    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
 
     computePacketChunkSize(dfsClient.getConf().writePacketSize,
     computePacketChunkSize(dfsClient.getConf().writePacketSize,
         checksum.getBytesPerChecksum());
         checksum.getBytesPerChecksum());
@@ -1431,6 +1434,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         currentPacket.lastPacketInBlock = true;
+        currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
         waitAndQueueCurrentPacket();
         bytesCurBlock = 0;
         bytesCurBlock = 0;
         lastFlushOffset = 0;
         lastFlushOffset = 0;
@@ -1450,6 +1454,24 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
    */
    */
   @Override
   @Override
   public void hflush() throws IOException {
   public void hflush() throws IOException {
+    flushOrSync(false);
+  }
+
+  /**
+   * The expected semantics is all data have flushed out to all replicas 
+   * and all replicas have done posix fsync equivalent - ie the OS has 
+   * flushed it to the disk device (but the disk may have it in its cache).
+   * 
+   * Note that only the current block is flushed to the disk device.
+   * To guarantee durable sync across block boundaries the stream should
+   * be created with {@link CreateFlag#SYNC_BLOCK}.
+   */
+  @Override
+  public void hsync() throws IOException {
+    flushOrSync(true);
+  }
+
+  private void flushOrSync(boolean isSync) throws IOException {
     dfsClient.checkOpen();
     dfsClient.checkOpen();
     isClosed();
     isClosed();
     try {
     try {
@@ -1477,7 +1499,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
           assert bytesCurBlock > lastFlushOffset;
           assert bytesCurBlock > lastFlushOffset;
           // record the valid offset of this flush
           // record the valid offset of this flush
           lastFlushOffset = bytesCurBlock;
           lastFlushOffset = bytesCurBlock;
-          waitAndQueueCurrentPacket();
+          if (isSync && currentPacket == null) {
+            // Nothing to send right now,
+            // but sync was requested.
+            // Send an empty packet
+            currentPacket = new Packet(packetSize, chunksPerPacket,
+                bytesCurBlock);
+          }
         } else {
         } else {
           // We already flushed up to this offset.
           // We already flushed up to this offset.
           // This means that we haven't written anything since the last flush
           // This means that we haven't written anything since the last flush
@@ -1487,8 +1515,21 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
           assert oldCurrentPacket == null :
           assert oldCurrentPacket == null :
             "Empty flush should not occur with a currentPacket";
             "Empty flush should not occur with a currentPacket";
 
 
-          // just discard the current packet since it is already been sent.
-          currentPacket = null;
+          if (isSync && bytesCurBlock > 0) {
+            // Nothing to send right now,
+            // and the block was partially written,
+            // and sync was requested.
+            // So send an empty sync packet.
+            currentPacket = new Packet(packetSize, chunksPerPacket,
+                bytesCurBlock);
+          } else {
+            // just discard the current packet since it is already been sent.
+            currentPacket = null;
+          }
+        }
+        if (currentPacket != null) {
+          currentPacket.syncBlock = isSync;
+          waitAndQueueCurrentPacket();          
         }
         }
         // Restore state of stream. Record the last flush offset 
         // Restore state of stream. Record the last flush offset 
         // of the last full chunk that was flushed.
         // of the last full chunk that was flushed.
@@ -1539,18 +1580,6 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     }
     }
   }
   }
 
 
-  /**
-   * The expected semantics is all data have flushed out to all replicas 
-   * and all replicas have done posix fsync equivalent - ie the OS has 
-   * flushed it to the disk device (but the disk may have it in its cache).
-   * 
-   * Right now by default it is implemented as hflush
-   */
-  @Override
-  public synchronized void hsync() throws IOException {
-    hflush();
-  }
-
   /**
   /**
    * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
    * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
    */
    */
@@ -1675,6 +1704,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         currentPacket.lastPacketInBlock = true;
+        currentPacket.syncBlock = shouldSyncBlock;
       }
       }
 
 
       flushInternal();             // flush all data to Datanodes
       flushInternal();             // flush all data to Datanodes

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -223,12 +223,19 @@ public class DistributedFileSystem extends FileSystem {
 
 
   @Override
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,
   public HdfsDataOutputStream create(Path f, FsPermission permission,
-    boolean overwrite, int bufferSize, short replication, long blockSize,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return create(f, permission,
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+        blockSize, progress);
+  }
+  
+  @Override
+  public HdfsDataOutputStream create(Path f, FsPermission permission,
+    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
     Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
     statistics.incrementWriteOps(1);
-    final EnumSet<CreateFlag> cflags = overwrite?
-        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-        : EnumSet.of(CreateFlag.CREATE);
     final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
     final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
         replication, blockSize, progress, bufferSize);
         replication, blockSize, progress, bufferSize);
     return new HdfsDataOutputStream(out, statistics);
     return new HdfsDataOutputStream(out, statistics);
@@ -249,6 +256,7 @@ public class DistributedFileSystem extends FileSystem {
   /**
   /**
    * Same as create(), except fails if parent directory doesn't already exist.
    * Same as create(), except fails if parent directory doesn't already exist.
    */
    */
+  @Override
   public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
   public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {
       long blockSize, Progressable progress) throws IOException {

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java

@@ -40,6 +40,7 @@ public class PacketHeader {
       .setSeqno(0)
       .setSeqno(0)
       .setLastPacketInBlock(false)
       .setLastPacketInBlock(false)
       .setDataLen(0)
       .setDataLen(0)
+      .setSyncBlock(false)
       .build().getSerializedSize();
       .build().getSerializedSize();
   public static final int PKT_HEADER_LEN =
   public static final int PKT_HEADER_LEN =
     6 + PROTO_SIZE;
     6 + PROTO_SIZE;
@@ -51,13 +52,14 @@ public class PacketHeader {
   }
   }
 
 
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
-                      boolean lastPacketInBlock, int dataLen) {
+                      boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
     this.packetLen = packetLen;
     this.packetLen = packetLen;
     proto = PacketHeaderProto.newBuilder()
     proto = PacketHeaderProto.newBuilder()
       .setOffsetInBlock(offsetInBlock)
       .setOffsetInBlock(offsetInBlock)
       .setSeqno(seqno)
       .setSeqno(seqno)
       .setLastPacketInBlock(lastPacketInBlock)
       .setLastPacketInBlock(lastPacketInBlock)
       .setDataLen(dataLen)
       .setDataLen(dataLen)
+      .setSyncBlock(syncBlock)
       .build();
       .build();
   }
   }
 
 
@@ -81,6 +83,10 @@ public class PacketHeader {
     return packetLen;
     return packetLen;
   }
   }
 
 
+  public boolean getSyncBlock() {
+    return proto.getSyncBlock();
+  }
+
   @Override
   @Override
   public String toString() {
   public String toString() {
     return "PacketHeader with packetLen=" + packetLen +
     return "PacketHeader with packetLen=" + packetLen +

+ 43 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -110,6 +111,8 @@ class BlockReceiver implements Closeable {
   private final BlockConstructionStage stage;
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
   private final boolean isTransfer;
 
 
+  private boolean syncOnClose;
+
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
       final String inAddr, final String myAddr,
       final BlockConstructionStage stage, 
       final BlockConstructionStage stage, 
@@ -245,14 +248,18 @@ class BlockReceiver implements Closeable {
    * close files.
    * close files.
    */
    */
   public void close() throws IOException {
   public void close() throws IOException {
-
     IOException ioe = null;
     IOException ioe = null;
+    if (syncOnClose && (out != null || checksumOut != null)) {
+      datanode.metrics.incrFsyncCount();      
+    }
     // close checksum file
     // close checksum file
     try {
     try {
       if (checksumOut != null) {
       if (checksumOut != null) {
         checksumOut.flush();
         checksumOut.flush();
-        if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
+        if (syncOnClose && (cout instanceof FileOutputStream)) {
+          long start = Util.now();
           ((FileOutputStream)cout).getChannel().force(true);
           ((FileOutputStream)cout).getChannel().force(true);
+          datanode.metrics.addFsync(Util.now() - start);
         }
         }
         checksumOut.close();
         checksumOut.close();
         checksumOut = null;
         checksumOut = null;
@@ -267,8 +274,10 @@ class BlockReceiver implements Closeable {
     try {
     try {
       if (out != null) {
       if (out != null) {
         out.flush();
         out.flush();
-        if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
+        if (syncOnClose && (out instanceof FileOutputStream)) {
+          long start = Util.now();
           ((FileOutputStream)out).getChannel().force(true);
           ((FileOutputStream)out).getChannel().force(true);
+          datanode.metrics.addFsync(Util.now() - start);
         }
         }
         out.close();
         out.close();
         out = null;
         out = null;
@@ -290,12 +299,25 @@ class BlockReceiver implements Closeable {
    * Flush block data and metadata files to disk.
    * Flush block data and metadata files to disk.
    * @throws IOException
    * @throws IOException
    */
    */
-  void flush() throws IOException {
+  void flushOrSync(boolean isSync) throws IOException {
+    if (isSync && (out != null || checksumOut != null)) {
+      datanode.metrics.incrFsyncCount();      
+    }
     if (checksumOut != null) {
     if (checksumOut != null) {
       checksumOut.flush();
       checksumOut.flush();
+      if (isSync && (cout instanceof FileOutputStream)) {
+        long start = Util.now();
+        ((FileOutputStream)cout).getChannel().force(true);
+        datanode.metrics.addFsync(Util.now() - start);
+      }
     }
     }
     if (out != null) {
     if (out != null) {
       out.flush();
       out.flush();
+      if (isSync && (out instanceof FileOutputStream)) {
+        long start = Util.now();
+        ((FileOutputStream)out).getChannel().force(true);
+        datanode.metrics.addFsync(Util.now() - start);
+      }
     }
     }
   }
   }
 
 
@@ -533,7 +555,9 @@ class BlockReceiver implements Closeable {
       header.getOffsetInBlock(),
       header.getOffsetInBlock(),
       header.getSeqno(),
       header.getSeqno(),
       header.isLastPacketInBlock(),
       header.isLastPacketInBlock(),
-      header.getDataLen(), endOfHeader);
+      header.getDataLen(),
+      header.getSyncBlock(),
+      endOfHeader);
   }
   }
 
 
   /**
   /**
@@ -549,15 +573,19 @@ class BlockReceiver implements Closeable {
    * returns the number of data bytes that the packet has.
    * returns the number of data bytes that the packet has.
    */
    */
   private int receivePacket(long offsetInBlock, long seqno,
   private int receivePacket(long offsetInBlock, long seqno,
-      boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
+      boolean lastPacketInBlock, int len, boolean syncBlock,
+      int endOfHeader) throws IOException {
     if (LOG.isDebugEnabled()){
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
       LOG.debug("Receiving one packet for block " + block +
                 " of length " + len +
                 " of length " + len +
                 " seqno " + seqno +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
                 " offsetInBlock " + offsetInBlock +
+                " syncBlock " + syncBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
                 " lastPacketInBlock " + lastPacketInBlock);
     }
     }
-    
+    // make sure the block gets sync'ed upon close
+    this.syncOnClose |= syncBlock && lastPacketInBlock;
+
     // update received bytes
     // update received bytes
     long firstByteInBlock = offsetInBlock;
     long firstByteInBlock = offsetInBlock;
     offsetInBlock += len;
     offsetInBlock += len;
@@ -587,6 +615,10 @@ class BlockReceiver implements Closeable {
       if(LOG.isDebugEnabled()) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Receiving an empty packet or the end of the block " + block);
         LOG.debug("Receiving an empty packet or the end of the block " + block);
       }
       }
+      // flush unless close() would flush anyway
+      if (syncBlock && !lastPacketInBlock) {
+        flushOrSync(true);
+      }
     } else {
     } else {
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
                                                             checksumSize;
@@ -677,8 +709,8 @@ class BlockReceiver implements Closeable {
             );
             );
             checksumOut.write(pktBuf, checksumOff, checksumLen);
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
           }
-          /// flush entire packet
-          flush();
+          /// flush entire packet, sync unless close() will sync
+          flushOrSync(syncBlock && !lastPacketInBlock);
           
           
           replicaInfo.setLastChecksumAndDataLen(
           replicaInfo.setLastChecksumAndDataLen(
             offsetInBlock, lastChunkChecksum
             offsetInBlock, lastChunkChecksum
@@ -730,6 +762,7 @@ class BlockReceiver implements Closeable {
       String mirrAddr, DataTransferThrottler throttlerArg,
       String mirrAddr, DataTransferThrottler throttlerArg,
       DatanodeInfo[] downstreams) throws IOException {
       DatanodeInfo[] downstreams) throws IOException {
 
 
+      syncOnClose = datanode.getDnConf().syncOnClose;
       boolean responderClosed = false;
       boolean responderClosed = false;
       mirrorOut = mirrOut;
       mirrorOut = mirrOut;
       mirrorAddr = mirrAddr;
       mirrorAddr = mirrAddr;
@@ -768,7 +801,7 @@ class BlockReceiver implements Closeable {
           datanode.data.convertTemporaryToRbw(block);
           datanode.data.convertTemporaryToRbw(block);
         } else {
         } else {
           // for isDatnode or TRANSFER_FINALIZED
           // for isDatnode or TRANSFER_FINALIZED
-          // Finalize the block. Does this fsync()?
+          // Finalize the block.
           datanode.data.finalizeBlock(block);
           datanode.data.finalizeBlock(block);
         }
         }
         datanode.metrics.incrBlocksWritten();
         datanode.metrics.incrBlocksWritten();

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -701,8 +701,9 @@ class BlockSender implements java.io.Closeable {
    */
    */
   private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
   private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
     pkt.clear();
+    // both syncBlock and syncPacket are false
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
-        (dataLen == 0), dataLen);
+        (dataLen == 0), dataLen, false);
     header.putInBuffer(pkt);
     header.putInBuffer(pkt);
   }
   }
   
   

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -61,6 +61,8 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong writesFromLocalClient;
   @Metric MutableCounterLong writesFromLocalClient;
   @Metric MutableCounterLong writesFromRemoteClient;
   @Metric MutableCounterLong writesFromRemoteClient;
   @Metric MutableCounterLong blocksGetLocalPathInfo;
   @Metric MutableCounterLong blocksGetLocalPathInfo;
+
+  @Metric MutableCounterLong fsyncCount;
   
   
   @Metric MutableCounterLong volumeFailures;
   @Metric MutableCounterLong volumeFailures;
 
 
@@ -72,6 +74,8 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
   @Metric MutableRate blockReports;
 
 
+  @Metric MutableRate fsync;
+
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
   final String name;
 
 
@@ -151,6 +155,14 @@ public class DataNodeMetrics {
     blocksRead.incr();
     blocksRead.incr();
   }
   }
 
 
+  public void incrFsyncCount() {
+    fsyncCount.incr();
+  }
+
+  public void addFsync(long latency) {
+    fsync.add(latency);
+  }
+
   public void shutdown() {
   public void shutdown() {
     DefaultMetricsSystem.shutdown();
     DefaultMetricsSystem.shutdown();
   }
   }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -113,6 +113,7 @@ message PacketHeaderProto {
   required sfixed64 seqno = 2;
   required sfixed64 seqno = 2;
   required bool lastPacketInBlock = 3;
   required bool lastPacketInBlock = 3;
   required sfixed32 dataLen = 4;
   required sfixed32 dataLen = 4;
+  optional bool syncBlock = 5 [default = false];
 }
 }
 
 
 enum Status {
 enum Status {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java

@@ -139,7 +139,7 @@ public class AppendTestUtil {
   /**
   /**
    *  create a buffer that contains the entire test file data.
    *  create a buffer that contains the entire test file data.
    */
    */
-  static byte[] initBuffer(int size) {
+  public static byte[] initBuffer(int size) {
     if (seed == -1)
     if (seed == -1)
       seed = nextLong();
       seed = nextLong();
     return randomBytes(seed, size);
     return randomBytes(seed, size);

+ 8 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -159,7 +159,8 @@ public class TestDataTransferProtocol extends TestCase {
       block.getNumBytes(), // OffsetInBlock
       block.getNumBytes(), // OffsetInBlock
       100,                 // sequencenumber
       100,                 // sequencenumber
       true,                // lastPacketInBlock
       true,                // lastPacketInBlock
-      0);                  // chunk length
+      0,                   // chunk length
+      false);               // sync block
     hdr.write(sendOut);
     hdr.write(sendOut);
     sendOut.writeInt(0);           // zero checksum
     sendOut.writeInt(0);           // zero checksum
 
 
@@ -402,7 +403,8 @@ public class TestDataTransferProtocol extends TestCase {
       0,     // offset in block,
       0,     // offset in block,
       100,   // seqno
       100,   // seqno
       false, // last packet
       false, // last packet
-      -1 - random.nextInt(oneMil)); // bad datalen
+      -1 - random.nextInt(oneMil), // bad datalen
+      false);
     hdr.write(sendOut);
     hdr.write(sendOut);
 
 
     sendResponse(Status.SUCCESS, "", null, recvOut);
     sendResponse(Status.SUCCESS, "", null, recvOut);
@@ -424,7 +426,8 @@ public class TestDataTransferProtocol extends TestCase {
       0,     // OffsetInBlock
       0,     // OffsetInBlock
       100,   // sequencenumber
       100,   // sequencenumber
       true,  // lastPacketInBlock
       true,  // lastPacketInBlock
-      0);    // chunk length
+      0,     // chunk length
+      false);    
     hdr.write(sendOut);
     hdr.write(sendOut);
     sendOut.writeInt(0);           // zero checksum
     sendOut.writeInt(0);           // zero checksum
     sendOut.flush();
     sendOut.flush();
@@ -508,8 +511,8 @@ public class TestDataTransferProtocol extends TestCase {
       1024,                // OffsetInBlock
       1024,                // OffsetInBlock
       100,                 // sequencenumber
       100,                 // sequencenumber
       false,               // lastPacketInBlock
       false,               // lastPacketInBlock
-      4096);               // chunk length
-
+      4096,                // chunk length
+      false);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     hdr.write(new DataOutputStream(baos));
     hdr.write(new DataOutputStream(baos));
 
 

+ 192 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java

@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.test.MetricsAsserts.*;
+
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.junit.Test;
+
+public class TestHSync {
+  
+  private void checkSyncMetric(MiniDFSCluster cluster, int dn, long value) {
+    DataNode datanode = cluster.getDataNodes().get(dn);
+    assertCounter("FsyncCount", value, getMetrics(datanode.getMetrics().name()));    
+  }
+  private void checkSyncMetric(MiniDFSCluster cluster, long value) {
+    checkSyncMetric(cluster, 0, value);
+  }
+  /** Test basic hsync cases */
+  @Test
+  public void testHSync() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    final FileSystem fs = cluster.getFileSystem();
+
+    final Path p = new Path("/testHSync/foo");
+    final int len = 1 << 16;
+    FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+        4096, (short) 1, len, null);
+    out.hflush();
+    // hflush does not sync
+    checkSyncMetric(cluster, 0);
+    out.hsync();
+    // hsync on empty file does nothing
+    checkSyncMetric(cluster, 0);
+    out.write(1);
+    checkSyncMetric(cluster, 0);
+    out.hsync();
+    checkSyncMetric(cluster, 1);
+    // avoiding repeated hsyncs is a potential future optimization
+    out.hsync();
+    checkSyncMetric(cluster, 2);
+    out.hflush();
+    // hflush still does not sync
+    checkSyncMetric(cluster, 2);
+    out.close();
+    // close is sync'ing
+    checkSyncMetric(cluster, 3);
+
+    // same with a file created with out SYNC_BLOCK
+    out = fs.create(p, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+        4096, (short) 1, len, null);
+    out.hsync();
+    checkSyncMetric(cluster, 3);
+    out.write(1);
+    checkSyncMetric(cluster, 3);
+    out.hsync();
+    checkSyncMetric(cluster, 4);
+    // repeated hsyncs
+    out.hsync();
+    checkSyncMetric(cluster, 5);
+    out.close();
+    // close does not sync (not opened with SYNC_BLOCK)
+    checkSyncMetric(cluster, 5);
+    cluster.shutdown();
+  }
+
+  /** Test hsync on an exact block boundary */
+  @Test
+  public void testHSyncBlockBoundary() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    final FileSystem fs = cluster.getFileSystem();
+    
+    final Path p = new Path("/testHSyncBlockBoundary/foo");
+    final int len = 1 << 16;
+    final byte[] fileContents = AppendTestUtil.initBuffer(len);
+    FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+        4096, (short) 1, len, null);
+    // fill exactly one block (tests the SYNC_BLOCK case) and flush
+    out.write(fileContents, 0, len);
+    out.hflush();
+    // the full block should have caused a sync
+    checkSyncMetric(cluster, 1);
+    out.hsync();
+    // first on block again
+    checkSyncMetric(cluster, 1);
+    // write one more byte and sync again
+    out.write(1);
+    out.hsync();
+    checkSyncMetric(cluster, 2);
+    out.close();
+    checkSyncMetric(cluster, 3);
+    cluster.shutdown();
+  }
+
+  /** Test hsync via SequenceFiles */
+  @Test
+  public void testSequenceFileSync() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+    final FileSystem fs = cluster.getFileSystem();
+    final Path p = new Path("/testSequenceFileSync/foo");
+    final int len = 1 << 16;
+    FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+        4096, (short) 1, len, null);
+    Writer w = SequenceFile.createWriter(new Configuration(),
+        Writer.stream(out),
+        Writer.keyClass(RandomDatum.class),
+        Writer.valueClass(RandomDatum.class),
+        Writer.compression(CompressionType.NONE, new DefaultCodec()));
+    w.hflush();
+    checkSyncMetric(cluster, 0);
+    w.hsync();
+    checkSyncMetric(cluster, 1);
+    int seed = new Random().nextInt();
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    generator.next();
+    w.append(generator.getKey(), generator.getValue());
+    w.hsync();
+    checkSyncMetric(cluster, 2);
+    w.close();
+    checkSyncMetric(cluster, 2);
+    out.close();
+    checkSyncMetric(cluster, 3);
+    cluster.shutdown();
+  }
+
+  /** Test that syncBlock is correctly performed at replicas */
+  @Test
+  public void testHSyncWithReplication() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    final FileSystem fs = cluster.getFileSystem();
+
+    final Path p = new Path("/testHSyncWithReplication/foo");
+    final int len = 1 << 16;
+    FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+        4096, (short) 3, len, null);
+    out.write(1);
+    out.hflush();
+    checkSyncMetric(cluster, 0, 0);
+    checkSyncMetric(cluster, 1, 0);
+    checkSyncMetric(cluster, 2, 0);
+    out.hsync();
+    checkSyncMetric(cluster, 0, 1);
+    checkSyncMetric(cluster, 1, 1);
+    checkSyncMetric(cluster, 2, 1);
+    out.hsync();
+    checkSyncMetric(cluster, 0, 2);
+    checkSyncMetric(cluster, 1, 2);
+    checkSyncMetric(cluster, 2, 2);
+    cluster.shutdown();
+  }
+}