Browse Source

HADOOP-6313. Integrate change 831416 from trunk. (Hairong Kuang via suresh)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21@831434 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 15 years ago
parent
commit
c721b3d9a2

+ 3 - 0
CHANGES.txt

@@ -209,6 +209,9 @@ Trunk (unreleased changes)
     HADOOP-6205. Implementing aspects development and fault injeciton
     framework for Hadoop (cos)
 
+    HADOOP-6313. Implement Syncable interface in FSDataOutputStream to expose
+    flush APIs to application users. (Hairong Kuang via suresh)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

+ 20 - 1
src/java/org/apache/hadoop/fs/FSDataOutputStream.java

@@ -91,10 +91,29 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
     return wrappedStream;
   }
 
-  /** {@inheritDoc} */
+  @Override  // Syncable
+  @Deprecated
   public void sync() throws IOException {
     if (wrappedStream instanceof Syncable) {
       ((Syncable)wrappedStream).sync();
     }
   }
+  
+  @Override  // Syncable
+  public void hflush() throws IOException {
+    if (wrappedStream instanceof Syncable) {
+      ((Syncable)wrappedStream).hflush();
+    } else {
+      wrappedStream.flush();
+    }
+  }
+  
+  @Override  // Syncable
+  public void hsync() throws IOException {
+    if (wrappedStream instanceof Syncable) {
+      ((Syncable)wrappedStream).hsync();
+    } else {
+      wrappedStream.flush();
+    }
+  }
 }

+ 2 - 7
src/java/org/apache/hadoop/fs/RawLocalFileSystem.java

@@ -180,7 +180,7 @@ public class RawLocalFileSystem extends FileSystem {
   /*********************************************************
    * For create()'s FSOutputStream.
    *********************************************************/
-  class LocalFSFileOutputStream extends OutputStream implements Syncable {
+  class LocalFSFileOutputStream extends OutputStream {
     private FileOutputStream fos;
     
     private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
@@ -207,13 +207,8 @@ public class RawLocalFileSystem extends FileSystem {
         throw new FSError(e);                // assume native fs error
       }
     }
-
-    /** {@inheritDoc} */
-    public void sync() throws IOException {
-      fos.getFD().sync();      
-    }
   }
-  
+
   /** {@inheritDoc} */
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {

+ 16 - 4
src/java/org/apache/hadoop/fs/Syncable.java

@@ -20,11 +20,23 @@ package org.apache.hadoop.fs;
 
 import java.io.IOException;
 
-/** This interface declare the sync() operation. */
+/** This interface for flush/sync operation. */
 public interface Syncable {
   /**
-   * Synchronize all buffer with the underlying devices.
-   * @throws IOException
+   * @deprecated As of HADOOP 0.21.0, replaced by hflush
+   * @see #hflush()
    */
-  public void sync() throws IOException;
+  @Deprecated  public void sync() throws IOException;
+  
+  /** Flush out the data in client's user buffer. After the return of
+   * this call, new readers will see the data.
+   * @throws IOException if any error occurs
+   */
+  public void hflush() throws IOException;
+  
+  /** Similar to posix fsync, flush out the data in client's user buffer 
+   * all the way to the disk device (but the disk may have it in its cache).
+   * @throws IOException if error occurs
+   */
+  public void hsync() throws IOException;
 }

+ 37 - 0
src/test/core/org/apache/hadoop/fs/TestLocalFileSystem.java

@@ -111,6 +111,43 @@ public class TestLocalFileSystem extends TestCase {
     }
   }
 
+  /**
+   * test Syncable interface on raw local file system
+   * @throws IOException
+   */
+  public void testSyncable() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf).getRawFileSystem();
+    Path file = new Path(TEST_ROOT_DIR, "syncable");
+    FSDataOutputStream out = fs.create(file);;
+    final int bytesWritten = 1;
+    byte[] expectedBuf = new byte[] {'0', '1', '2', '3'};
+    try {
+      out.write(expectedBuf, 0, 1);
+      out.hflush();
+      verifyFile(fs, file, bytesWritten, expectedBuf);
+      out.write(expectedBuf, bytesWritten, expectedBuf.length-bytesWritten);
+      out.hsync();
+      verifyFile(fs, file, expectedBuf.length, expectedBuf);
+    } finally {
+      out.close();
+    }
+  }
+  
+  private void verifyFile(FileSystem fs, Path file, int bytesToVerify, 
+      byte[] expectedBytes) throws IOException {
+    FSDataInputStream in = fs.open(file);
+    try {
+      byte[] readBuf = new byte[bytesToVerify];
+      in.readFully(readBuf, 0, bytesToVerify);
+      for (int i=0; i<bytesToVerify; i++) {
+        assertEquals(expectedBytes[i], readBuf[i]);
+      }
+    } finally {
+      in.close();
+    }
+  }
+  
   public void testCopy() throws IOException {
     Configuration conf = new Configuration();
     LocalFileSystem fs = FileSystem.getLocal(conf);