Browse Source

HDFS-536. Support hflush at DFSClient. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@803969 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 years ago
parent
commit
191e0c47e6

+ 2 - 0
CHANGES.txt

@@ -13,6 +13,8 @@ Trunk (unreleased changes)
 
     HDFS-461. Tool to analyze file size distribution in HDFS. (shv)
 
+    HDFS-536. Support hflush at DFSClient. (hairong)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

+ 36 - 18
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -2414,11 +2414,29 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
+            blockStream.flush();
 
             if (one.lastPacketInBlock) {
-              blockStream.writeInt(0); // indicate end-of-block 
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
+                  try {
+                    dataQueue.wait(1000);   // wait for acks to arrive from datanodes
+                  } catch (InterruptedException  e) {
+                  }
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+
+              // done receiving all acks
+              if (response != null) {
+                response.close(); // notify responder to close
+              }
+              // indicate end-of-block
+              blockStream.writeInt(0);
+              blockStream.flush();
             }
-            blockStream.flush();
             if (LOG.isDebugEnabled()) {
               LOG.debug("DataStreamer block " + block +
                   " wrote packet seqno:" + one.seqno +
@@ -2442,18 +2460,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
           // Is this block full?
           if (one.lastPacketInBlock) {
-            synchronized (dataQueue) {
-              while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
-                try {
-                  dataQueue.wait(1000);   // wait for acks to arrive from datanodes
-                } catch (InterruptedException  e) {
-                }
-              }
-            }
-            if (streamerClosed || hasError || !clientRunning) {
-              continue;
-            }
-
             LOG.debug("Closing old block " + block);
             this.setName("DataStreamer for file " + src);
             closeResponder();
@@ -2473,7 +2479,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
 
       private void closeInternal() {
-        closeResponder();
+        closeResponder();       // close and join
         closeStream();
         streamerClosed = true;
         closed = true;
@@ -3189,11 +3195,23 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
   
     /**
-     * All data is written out to datanodes. It is not guaranteed 
-     * that data has been flushed to persistent store on the 
-     * datanode. Block allocations are persisted on namenode.
+     * @deprecated As of HDFS 0.21.0, replaced by hflush
+     * @see #hflush()
      */
+    @Deprecated
     public synchronized void sync() throws IOException {
+      hflush();
+    }
+    
+    /**
+     * All data is flushed out to datanodes.
+     * It is a synchronous operation. When it returns,
+     * it gurantees that flushed data become visible to new readers. 
+     * It is not guaranteed that data has been flushed to 
+     * persistent store on the datanode. 
+     * Block allocations are persisted on namenode.
+     */
+    public synchronized void hflush() throws IOException {
       checkOpen();
       isClosed();
       try {

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java

@@ -49,7 +49,7 @@ public class TestBlocksScheduledCounter extends TestCase {
       out.write(i);
     }
     // flush to make sure a block is allocated.
-    ((DFSOutputStream)(out.getWrappedStream())).sync();
+    ((DFSOutputStream)(out.getWrappedStream())).hflush();
     
     ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
     cluster.getNamesystem().DFSNodesStatus(dnList, dnList);