Browse Source

HDFS-731. Support new Syncable interface in HDFS. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@831436 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 15 years ago
parent
commit
620033054f

+ 2 - 0
CHANGES.txt

@@ -137,6 +137,8 @@ Release 0.21.0 - Unreleased
     HDFS-631. Rename configuration keys towards API standardization and
     HDFS-631. Rename configuration keys towards API standardization and
     backward compatibility. (Jitendra Nath Pandey via suresh)
     backward compatibility. (Jitendra Nath Pandey via suresh)
 
 
+    HDFS-731. Support new Syncable interface in HDFS. (hairong)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
     HDFS-381. Remove blocks from DataNode maps when corresponding file

BIN
lib/hadoop-core-0.22.0-dev.jar


BIN
lib/hadoop-core-test-0.22.0-dev.jar


+ 18 - 5
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -3495,23 +3495,24 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
       }
     }
     }
   
   
-    /**
-     * @deprecated As of HDFS 0.21.0, replaced by hflush
-     * @see #hflush()
-     */
+    @Override
     @Deprecated
     @Deprecated
     public synchronized void sync() throws IOException {
     public synchronized void sync() throws IOException {
       hflush();
       hflush();
     }
     }
     
     
     /**
     /**
-     * All data is flushed out to datanodes.
+     * flushes out to all replicas of the block. 
+     * The data is in the buffers of the DNs 
+     * but not neccessary on the DN's OS buffers. 
+     *
      * It is a synchronous operation. When it returns,
      * It is a synchronous operation. When it returns,
      * it gurantees that flushed data become visible to new readers. 
      * it gurantees that flushed data become visible to new readers. 
      * It is not guaranteed that data has been flushed to 
      * It is not guaranteed that data has been flushed to 
      * persistent store on the datanode. 
      * persistent store on the datanode. 
      * Block allocations are persisted on namenode.
      * Block allocations are persisted on namenode.
      */
      */
+    @Override
     public synchronized void hflush() throws IOException {
     public synchronized void hflush() throws IOException {
       checkOpen();
       checkOpen();
       isClosed();
       isClosed();
@@ -3561,6 +3562,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
       }
     }
     }
 
 
+    /**
+     * The expected semantics is all data have flushed out to all replicas 
+     * and all replicas have done posix fsync equivalent - ie the OS has 
+     * flushed it to the disk device (but the disk may have it in its cache).
+     * 
+     * Right now by default it is implemented as hflush
+     */
+    @Override
+    public synchronized void hsync() throws IOException {
+      hflush();
+    }
+    
     /**
     /**
      * Waits till all existing data is flushed and confirmations 
      * Waits till all existing data is flushed and confirmations 
      * received from datanodes. 
      * received from datanodes. 

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/FileAppendTest4.java

@@ -99,7 +99,7 @@ public class FileAppendTest4 {
           // append flushedBytes bytes to the file
           // append flushedBytes bytes to the file
           out = fs.append(p);
           out = fs.append(p);
           out.write(contents, oldFileLen, flushedBytes1);
           out.write(contents, oldFileLen, flushedBytes1);
-          out.sync();
+          out.hflush();
 
 
           // write another flushedBytes2 bytes to the file
           // write another flushedBytes2 bytes to the file
           out.write(contents, oldFileLen + flushedBytes1, flushedBytes2);
           out.write(contents, oldFileLen + flushedBytes1, flushedBytes2);

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java

@@ -47,7 +47,7 @@ public class TestAbandonBlock extends junit.framework.TestCase {
       for(int i = 0; i < 1024; i++) {
       for(int i = 0; i < 1024; i++) {
         fout.write(123);
         fout.write(123);
       }
       }
-      fout.sync();
+      fout.hflush();
   
   
       //try reading the block by someone
       //try reading the block by someone
       final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);
       final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -189,14 +189,14 @@ public class TestFileAppend extends TestCase {
       // write to file
       // write to file
       int mid = AppendTestUtil.FILE_SIZE /2;
       int mid = AppendTestUtil.FILE_SIZE /2;
       stm.write(fileContents, 0, mid);
       stm.write(fileContents, 0, mid);
-      stm.sync();
+      stm.hflush();
       System.out.println("Wrote and Flushed first part of file.");
       System.out.println("Wrote and Flushed first part of file.");
 
 
       // write the remainder of the file
       // write the remainder of the file
       stm.write(fileContents, mid, AppendTestUtil.FILE_SIZE - mid);
       stm.write(fileContents, mid, AppendTestUtil.FILE_SIZE - mid);
       System.out.println("Written second part of file");
       System.out.println("Written second part of file");
-      stm.sync();
-      stm.sync();
+      stm.hflush();
+      stm.hflush();
       System.out.println("Wrote and Flushed second part of file.");
       System.out.println("Wrote and Flushed second part of file.");
 
 
       // verify that full blocks are sane
       // verify that full blocks are sane
@@ -244,7 +244,7 @@ public class TestFileAppend extends TestCase {
       int start = 0;
       int start = 0;
       for (start = 0; (start + 29) < AppendTestUtil.FILE_SIZE; ) {
       for (start = 0; (start + 29) < AppendTestUtil.FILE_SIZE; ) {
         stm.write(fileContents, start, 29);
         stm.write(fileContents, start, 29);
-        stm.sync();
+        stm.hflush();
         start += 29;
         start += 29;
       }
       }
       stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);
       stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java

@@ -236,7 +236,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     FSDataOutputStream out = fs.append(p);
     FSDataOutputStream out = fs.append(p);
     final int len2 = (int)BLOCK_SIZE/2; 
     final int len2 = (int)BLOCK_SIZE/2; 
     AppendTestUtil.write(out, len1, len2);
     AppendTestUtil.write(out, len1, len2);
-    out.sync();
+    out.hflush();
     
     
     //c. Rename file to file.new.
     //c. Rename file to file.new.
     final Path pnew = new Path(p + ".new");
     final Path pnew = new Path(p + ".new");
@@ -327,7 +327,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     stm = fs.append(p);
     stm = fs.append(p);
     // Append to a partial CRC trunk
     // Append to a partial CRC trunk
     stm.write(fileContents, 1, 1);
     stm.write(fileContents, 1, 1);
-    stm.sync();
+    stm.hflush();
     // The partial CRC trunk is not full yet and close the file
     // The partial CRC trunk is not full yet and close the file
     stm.close();
     stm.close();
     System.out.println("Append 1 byte and closed the file " + p);
     System.out.println("Append 1 byte and closed the file " + p);
@@ -341,11 +341,11 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     // append to a partial CRC trunk
     // append to a partial CRC trunk
     stm.write(fileContents, 2, 1);
     stm.write(fileContents, 2, 1);
     // The partial chunk is not full yet, force to send a packet to DN
     // The partial chunk is not full yet, force to send a packet to DN
-    stm.sync();
+    stm.hflush();
     System.out.println("Append and flush 1 byte");
     System.out.println("Append and flush 1 byte");
     // The partial chunk is not full yet, force to send another packet to DN
     // The partial chunk is not full yet, force to send another packet to DN
     stm.write(fileContents, 3, 2);
     stm.write(fileContents, 3, 2);
-    stm.sync();
+    stm.hflush();
     System.out.println("Append and flush 2 byte");
     System.out.println("Append and flush 2 byte");
 
 
     // fill up the partial chunk and close the file
     // fill up the partial chunk and close the file

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -512,7 +512,7 @@ public class TestFileCreation extends junit.framework.TestCase {
 
 
       // write two full blocks.
       // write two full blocks.
       writeFile(stm, numBlocks * blockSize);
       writeFile(stm, numBlocks * blockSize);
-      stm.sync();
+      stm.hflush();
 
 
       // rename file wile keeping it open.
       // rename file wile keeping it open.
       Path fileRenamed = new Path("/filestatusRenamed.dat");
       Path fileRenamed = new Path("/filestatusRenamed.dat");
@@ -880,7 +880,7 @@ public class TestFileCreation extends junit.framework.TestCase {
   }
   }
 
 
   /**
   /**
-   * Create a file, write something, fsync but not close.
+   * Create a file, write something, hflush but not close.
    * Then change lease period and wait for lease recovery.
    * Then change lease period and wait for lease recovery.
    * Finally, read the block directly from each Datanode and verify the content.
    * Finally, read the block directly from each Datanode and verify the content.
    */
    */
@@ -905,7 +905,7 @@ public class TestFileCreation extends junit.framework.TestCase {
       final Path fpath = new Path(f);
       final Path fpath = new Path(f);
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       out.write("something".getBytes());
       out.write("something".getBytes());
-      out.sync();
+      out.hflush();
 
 
       // set the soft and hard limit to be 1 second so that the
       // set the soft and hard limit to be 1 second so that the
       // namenode triggers lease recovery
       // namenode triggers lease recovery
@@ -991,7 +991,7 @@ public class TestFileCreation extends junit.framework.TestCase {
       final Path fpath = new Path(f);
       final Path fpath = new Path(f);
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       out.write("something_dhruba".getBytes());
       out.write("something_dhruba".getBytes());
-      out.sync();    // ensure that block is allocated
+      out.hflush();    // ensure that block is allocated
 
 
       // shutdown last datanode in pipeline.
       // shutdown last datanode in pipeline.
       cluster.stopDataNode(2);
       cluster.stopDataNode(2);

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreationClient.java

@@ -129,7 +129,7 @@ public class TestFileCreationClient extends junit.framework.TestCase {
         for(; running; i++) {
         for(; running; i++) {
           System.out.println(getName() + " writes " + i);
           System.out.println(getName() + " writes " + i);
           out.write(i);
           out.write(i);
-          out.sync();
+          out.hflush();
           sleep(100);
           sleep(100);
         }
         }
       }
       }

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreationDelete.java

@@ -59,7 +59,7 @@ public class TestFileCreationDelete extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file1);
           + "Created file " + file1);
       TestFileCreation.writeFile(stm1, 1000);
       TestFileCreation.writeFile(stm1, 1000);
-      stm1.sync();
+      stm1.hflush();
 
 
       // create file2.
       // create file2.
       Path file2 = new Path("/file2");
       Path file2 = new Path("/file2");
@@ -67,7 +67,7 @@ public class TestFileCreationDelete extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file2);
           + "Created file " + file2);
       TestFileCreation.writeFile(stm2, 1000);
       TestFileCreation.writeFile(stm2, 1000);
-      stm2.sync();
+      stm2.hflush();
 
 
       // rm dir
       // rm dir
       fs.delete(dir, true);
       fs.delete(dir, true);

+ 3 - 3
src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

@@ -79,9 +79,9 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
       System.out.println("size=" + size);
       System.out.println("size=" + size);
       stm.write(buffer, 0, size);
       stm.write(buffer, 0, size);
 
 
-      // sync file
-      AppendTestUtil.LOG.info("sync");
-      stm.sync();
+      // hflush file
+      AppendTestUtil.LOG.info("hflush");
+      stm.hflush();
       AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
       AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
       dfs.dfs.leasechecker.interruptAndJoin();
       dfs.dfs.leasechecker.interruptAndJoin();
 
 

+ 6 - 6
src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java

@@ -71,7 +71,7 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file1);
           + "Created file " + file1);
       TestFileCreation.writeFile(stm1);
       TestFileCreation.writeFile(stm1);
-      stm1.sync();
+      stm1.hflush();
 
 
       // create file2.
       // create file2.
       Path dir2 = new Path("/user/dir2");
       Path dir2 = new Path("/user/dir2");
@@ -80,7 +80,7 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file2);
           + "Created file " + file2);
       TestFileCreation.writeFile(stm2);
       TestFileCreation.writeFile(stm2);
-      stm2.sync();
+      stm2.hflush();
 
 
       // move dir1 while file1 is open
       // move dir1 while file1 is open
       Path dir3 = new Path("/user/dir3");
       Path dir3 = new Path("/user/dir3");
@@ -155,7 +155,7 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file1);
           + "Created file " + file1);
       TestFileCreation.writeFile(stm1);
       TestFileCreation.writeFile(stm1);
-      stm1.sync();
+      stm1.hflush();
 
 
       // create file2.
       // create file2.
       Path dir2 = new Path("/user/dir2");
       Path dir2 = new Path("/user/dir2");
@@ -164,7 +164,7 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file2);
           + "Created file " + file2);
       TestFileCreation.writeFile(stm2);
       TestFileCreation.writeFile(stm2);
-      stm2.sync();
+      stm2.hflush();
 
 
       // move dir1 while file1 is open
       // move dir1 while file1 is open
       Path dir3 = new Path("/user/dir3");
       Path dir3 = new Path("/user/dir3");
@@ -228,7 +228,7 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: " +
       System.out.println("testFileCreationDeleteParent: " +
                          "Created file " + file1);
                          "Created file " + file1);
       TestFileCreation.writeFile(stm1);
       TestFileCreation.writeFile(stm1);
-      stm1.sync();
+      stm1.hflush();
 
 
       Path dir2 = new Path("/user/dir2");
       Path dir2 = new Path("/user/dir2");
       fs.mkdirs(dir2);
       fs.mkdirs(dir2);
@@ -291,7 +291,7 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       System.out.println("testFileCreationDeleteParent: "
       System.out.println("testFileCreationDeleteParent: "
           + "Created file " + file1);
           + "Created file " + file1);
       TestFileCreation.writeFile(stm1);
       TestFileCreation.writeFile(stm1);
-      stm1.sync();
+      stm1.hflush();
 
 
       Path dir2 = new Path("/user/dir2");
       Path dir2 = new Path("/user/dir2");
 
 

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java

@@ -96,7 +96,7 @@ public class TestDatanodeRestart {
       new Random().nextBytes(writeBuf);
       new Random().nextBytes(writeBuf);
       out = fs.create(src);
       out = fs.create(src);
       out.write(writeBuf);
       out.write(writeBuf);
-      out.sync();
+      out.hflush();
       DataNode dn = cluster.getDataNodes().get(0);
       DataNode dn = cluster.getDataNodes().get(0);
       for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
       for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
         File currentDir = volume.getDir().getParentFile();
         File currentDir = volume.getDir().getParentFile();

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java

@@ -203,7 +203,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       stm = fs.append(fileToAppend);
       stm = fs.append(fileToAppend);
       int mid = rawData.length - 1;
       int mid = rawData.length - 1;
       stm.write(rawData, 1, mid - 1);
       stm.write(rawData, 1, mid - 1);
-      stm.sync();
+      stm.hflush();
 
 
       /*
       /*
        * wait till token used in stm expires
        * wait till token used in stm expires
@@ -255,7 +255,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // write a partial block
       // write a partial block
       int mid = rawData.length - 1;
       int mid = rawData.length - 1;
       stm.write(rawData, 0, mid);
       stm.write(rawData, 0, mid);
-      stm.sync();
+      stm.hflush();
 
 
       /*
       /*
        * wait till token used in stm expires
        * wait till token used in stm expires