Browse Source

HADOOP-3250. Extend FileSystem API to allow appending to files.
Contributed by Tsz Wo (Nicholas), SZE.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@662537 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 years ago
parent
commit
3f226ec5c2

+ 3 - 0
CHANGES.txt

@@ -114,6 +114,9 @@ Trunk (unreleased changes)
 
 
     HADOOP-3246. Add FTPFileSystem.  (Ankur Goel via cutting)
     HADOOP-3246. Add FTPFileSystem.  (Ankur Goel via cutting)
 
 
+    HADOOP-3250. Extend FileSystem API to allow appending to files.
+    (Tsz Wo (Nicholas), SZE via cdouglas)
+
   IMPROVEMENTS
   IMPROVEMENTS
    
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

+ 6 - 0
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -128,6 +128,12 @@ public class DistributedFileSystem extends FileSystem {
           dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
           dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
   }
   }
 
 
+  /** This optional operation is not yet supported. */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
   public FSDataOutputStream create(Path f, FsPermission permission,
   public FSDataOutputStream create(Path f, FsPermission permission,
     boolean overwrite,
     boolean overwrite,
     int bufferSize, short replication, long blockSize,
     int bufferSize, short replication, long blockSize,

+ 6 - 0
src/java/org/apache/hadoop/dfs/HftpFileSystem.java

@@ -228,6 +228,12 @@ public class HftpFileSystem extends FileSystem {
   @Override
   @Override
   public void setWorkingDirectory(Path f) { }
   public void setWorkingDirectory(Path f) { }
 
 
+  /** This optional operation is not yet supported. */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
   @Override
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
   public FSDataOutputStream create(Path f, FsPermission permission,
                                    boolean overwrite, int bufferSize,
                                    boolean overwrite, int bufferSize,

+ 30 - 0
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -521,6 +521,36 @@ public abstract class FileSystem extends Configured implements Closeable {
     }
     }
   }
   }
 
 
+  /**
+   * Append to an existing file (optional operation).
+   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
+   * @param f the existing file to be appended.
+   * @throws IOException
+   */
+  public FSDataOutputStream append(Path f) throws IOException {
+    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
+  }
+  /**
+   * Append to an existing file (optional operation).
+   * Same as append(f, bufferSize, null).
+   * @param f the existing file to be appended.
+   * @param bufferSize the size of the buffer to be used.
+   * @throws IOException
+   */
+  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+    return append(f, bufferSize, null);
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   * @param f the existing file to be appended.
+   * @param bufferSize the size of the buffer to be used.
+   * @param progress for reporting progress if it is not null.
+   * @throws IOException
+   */
+  public abstract FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException;
+  
   /**
   /**
    * Get replication.
    * Get replication.
    * 
    * 

+ 7 - 1
src/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -104,7 +104,13 @@ public class FilterFileSystem extends FileSystem {
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     return fs.open(f, bufferSize);
     return fs.open(f, bufferSize);
   }
   }
-  
+
+  /** {@inheritDoc} */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    return fs.append(f, bufferSize, progress);
+  }
+
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   @Override
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
   public FSDataOutputStream create(Path f, FsPermission permission,

+ 6 - 0
src/java/org/apache/hadoop/fs/InMemoryFileSystem.java

@@ -199,6 +199,12 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
       }
       }
     }
     }
   
   
+    /** This optional operation is not yet supported. */
+    public FSDataOutputStream append(Path f, int bufferSize,
+        Progressable progress) throws IOException {
+      throw new IOException("Not supported");
+    }
+
     /**
     /**
      * @param permission Currently ignored.
      * @param permission Currently ignored.
      */
      */

+ 18 - 5
src/java/org/apache/hadoop/fs/RawLocalFileSystem.java

@@ -183,8 +183,8 @@ public class RawLocalFileSystem extends FileSystem {
   class LocalFSFileOutputStream extends OutputStream {
   class LocalFSFileOutputStream extends OutputStream {
     FileOutputStream fos;
     FileOutputStream fos;
     
     
-    public LocalFSFileOutputStream(Path f) throws IOException {
-      this.fos = new FileOutputStream(pathToFile(f));
+    private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
+      this.fos = new FileOutputStream(pathToFile(f), append);
     }
     }
     
     
     /*
     /*
@@ -209,6 +209,20 @@ public class RawLocalFileSystem extends FileSystem {
     }
     }
   }
   }
   
   
+  /** {@inheritDoc} */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    if (!exists(f)) {
+      throw new FileNotFoundException("File " + f + " not found.");
+    }
+    if (getFileStatus(f).isDir()) {
+      throw new IOException("Cannot append to a diretory (=" + f + " ).");
+    }
+    return new FSDataOutputStream(new BufferedOutputStream(
+        new LocalFSFileOutputStream(f, true), bufferSize), statistics);
+  }
+
+  /** {@inheritDoc} */
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
                                    short replication, long blockSize, Progressable progress)
                                    short replication, long blockSize, Progressable progress)
     throws IOException {
     throws IOException {
@@ -219,9 +233,8 @@ public class RawLocalFileSystem extends FileSystem {
     if (parent != null && !mkdirs(parent)) {
     if (parent != null && !mkdirs(parent)) {
       throw new IOException("Mkdirs failed to create " + parent.toString());
       throw new IOException("Mkdirs failed to create " + parent.toString());
     }
     }
-    return new FSDataOutputStream(
-        new BufferedOutputStream(new LocalFSFileOutputStream(f), bufferSize),
-        statistics);
+    return new FSDataOutputStream(new BufferedOutputStream(
+        new LocalFSFileOutputStream(f, false), bufferSize), statistics);
   }
   }
 
 
   /** {@inheritDoc} */
   /** {@inheritDoc} */

+ 6 - 0
src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java

@@ -243,6 +243,12 @@ public class FTPFileSystem extends FileSystem {
     return fos;
     return fos;
   }
   }
 
 
+  /** This optional operation is not yet supported. */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+  
   /**
   /**
    * Convenience method, so that we don't open a new connection when using this
    * Convenience method, so that we don't open a new connection when using this
    * method from within another method. Otherwise every API invocation incurs
    * method from within another method. Otherwise every API invocation incurs

+ 6 - 0
src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java

@@ -183,6 +183,12 @@ public class KosmosFileSystem extends FileSystem {
         }
         }
     }
     }
     
     
+    /** This optional operation is not yet supported. */
+    public FSDataOutputStream append(Path f, int bufferSize,
+        Progressable progress) throws IOException {
+      throw new IOException("Not supported");
+    }
+
     public FSDataOutputStream create(Path file, FsPermission permission,
     public FSDataOutputStream create(Path file, FsPermission permission,
                                      boolean overwrite, int bufferSize,
                                      boolean overwrite, int bufferSize,
 				     short replication, long blockSize, Progressable progress)
 				     short replication, long blockSize, Progressable progress)

+ 6 - 0
src/java/org/apache/hadoop/fs/s3/S3FileSystem.java

@@ -179,6 +179,12 @@ public class S3FileSystem extends FileSystem {
     return ret.toArray(new FileStatus[0]);
     return ret.toArray(new FileStatus[0]);
   }
   }
 
 
+  /** This optional operation is not yet supported. */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
   /**
   /**
    * @param permission Currently ignored.
    * @param permission Currently ignored.
    */
    */

+ 65 - 1
src/test/org/apache/hadoop/fs/TestLocalFileSystem.java

@@ -29,12 +29,26 @@ public class TestLocalFileSystem extends TestCase {
     = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
     = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
 
 
 
 
-  private void writeFile(FileSystem fs, Path name) throws IOException {
+  static void writeFile(FileSystem fs, Path name) throws IOException {
     FSDataOutputStream stm = fs.create(name);
     FSDataOutputStream stm = fs.create(name);
     stm.writeBytes("42\n");
     stm.writeBytes("42\n");
     stm.close();
     stm.close();
   }
   }
   
   
+  static String readFile(FileSystem fs, Path name) throws IOException {
+    byte[] b = new byte[1024];
+    int offset = 0;
+    FSDataInputStream in = fs.open(name);
+    for(int remaining, n;
+        (remaining = b.length - offset) > 0 && (n = in.read(b, offset, remaining)) != -1;
+        offset += n); 
+    in.close();
+
+    String s = new String(b, 0, offset);
+    System.out.println("s=" + s);
+    return s;
+  }
+
   private void cleanupFile(FileSystem fs, Path name) throws IOException {
   private void cleanupFile(FileSystem fs, Path name) throws IOException {
     assertTrue(fs.exists(name));
     assertTrue(fs.exists(name));
     fs.delete(name, true);
     fs.delete(name, true);
@@ -140,4 +154,54 @@ public class TestLocalFileSystem extends TestCase {
     cleanupFile(fs, path);
     cleanupFile(fs, path);
   }
   }
 
 
+  public void testAppend() throws IOException {
+    Configuration conf = new Configuration();
+    final String dir = TEST_ROOT_DIR + "/append";
+    LocalFileSystem fs = FileSystem.getLocal(conf);
+
+    //normal case
+    {
+      Path f = new Path(dir, "f");
+      FSDataOutputStream out = fs.create(f);
+      out.writeBytes("something");
+      out.close();
+      assertEquals("something", readFile(fs, f));
+  
+      out = fs.append(f);
+      out.writeBytes(" more");
+      out.close();
+      assertEquals("something more", readFile(fs, f));
+  
+      out = fs.append(f);
+      out.writeBytes(" and more");
+      out.close();
+      assertEquals("something more and more", readFile(fs, f));
+
+      cleanupFile(fs, f);
+    }
+
+    //file not found case
+    {
+      Path f = new Path(dir, "fileNotFound");
+      try {
+        fs.append(f);
+        assertTrue(false);
+      }
+      catch(FileNotFoundException e) {
+        System.out.println("good: " + e);
+      }
+    }
+
+    //append to dir case
+    {
+      Path f = new Path(dir);
+      try {
+        fs.append(f);
+        assertTrue(false);
+      }
+      catch(IOException e) {
+        System.out.println("good: " + e);
+      }
+    }
+  }
 }
 }