瀏覽代碼

HADOOP-2655. Copy on write for data and metadata files in the
presence of snapshots. Needed for supporting appends to HDFS
files. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@631019 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 年之前
父節點
當前提交
59612d7e76

+ 6 - 0
CHANGES.txt

@@ -26,6 +26,12 @@ Trunk (unreleased changes)
 
     HADOOP-2178.  Job History on DFS. (Amareshwari Sri Ramadasu via ddas)
 
+  IMPROVEMENTS
+
+    HADOOP-2655. Copy on write for data and metadata files in the 
+    presence of snapshots. Needed for supporting appends to HDFS
+    files. (dhruba) 
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 127 - 0
src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import org.apache.hadoop.dfs.FSDataset.FSVolume;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * This class is used by the datanode to maintain the map from a block 
+ * to its metadata.
+ */
+class DatanodeBlockInfo {
+
+  private FSVolume volume;       // volume where the block belongs
+  private File     file;         // block file
+  private boolean detached;      // copy-on-write done for block
+
+  DatanodeBlockInfo(FSVolume vol, File file) {
+    this.volume = vol;
+    this.file = file;
+    detached = false;
+  }
+
+  DatanodeBlockInfo(FSVolume vol) {
+    this.volume = vol;
+    this.file = null;
+    detached = false;
+  }
+
+  FSVolume getVolume() {
+    return volume;
+  }
+
+  File getFile() {
+    return file;
+  }
+
+  /**
+   * Is this block already detached?
+   */
+  boolean isDetached() {
+    return detached;
+  }
+
+  /**
+   *  Block has been successfully detached
+   */
+  void setDetached() {
+    detached = true;
+  }
+
+  /**
+   * Copy specified file into a temporary file. Then rename the
+   * temporary file to the original name. This will cause any
+   * hardlinks to the original file to be removed. The temporary
+   * files are created in the detachDir. The temporary files will
+   * be recovered (especially on Windows) on datanode restart.
+   */
+  private void detachFile(File file, Block b) throws IOException {
+    File tmpFile = volume.createDetachFile(b, file.getName());
+    try {
+      IOUtils.copyBytes(new FileInputStream(file),
+                        new FileOutputStream(tmpFile),
+                        16*1024, true);
+      if (file.length() != tmpFile.length()) {
+        throw new IOException("Copy of file " + file + " size " + file.length()+
+                              " into file " + tmpFile +
+                              " resulted in a size of " + tmpFile.length());
+      }
+      FileUtil.replaceFile(tmpFile, file);
+    } catch (IOException e) {
+      boolean done = tmpFile.delete();
+      if (!done) {
+        DataNode.LOG.info("detachFile failed to delete temporary file " +
+                          tmpFile);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Returns true if this block was copied, otherwise returns false.
+   */
+  boolean detachBlock(Block block, int numLinks) throws IOException {
+    if (isDetached()) {
+      return false;
+    }
+    if (file == null || volume == null) {
+      throw new IOException("detachBlock:Block not found. " + block);
+    }
+    File meta = FSDataset.getMetaFile(file);
+    if (meta == null) {
+      throw new IOException("Meta file not found for block " + block);
+    }
+
+    if (HardLink.getLinkCount(file) > numLinks) {
+      DataNode.LOG.info("CopyOnWrite for block " + block);
+      detachFile(file, block);
+    }
+    if (HardLink.getLinkCount(meta) > numLinks) {
+      detachFile(meta, block);
+    }
+    setDetached();
+    return true;
+  }
+}

+ 123 - 73
src/java/org/apache/hadoop/dfs/FSDataset.java

@@ -159,7 +159,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
     }
 
 
-    void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
+    void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
           children[i].getVolumeMap(volumeMap, volume);
@@ -170,25 +170,12 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       for (int i = 0; i < blockFiles.length; i++) {
         //We are not enforcing presense of metadata file
         if (Block.isBlockFilename(blockFiles[i])) {
-          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
+          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), 
+                        new DatanodeBlockInfo(volume, blockFiles[i]));
         }
       }
     }
         
-    void getBlockMap(HashMap<Block, File> blockMap) {
-      if (children != null) {
-        for (int i = 0; i < children.length; i++) {
-          children[i].getBlockMap(blockMap);
-        }
-      }
-
-      File blockFiles[] = dir.listFiles();
-      for (int i = 0; i < blockFiles.length; i++) {
-        if (Block.isBlockFilename(blockFiles[i])) {
-          blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
-        }
-      }
-    }
     /**
      * check if a data diretory is healthy
      * @throws DiskErrorException
@@ -270,6 +257,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
 
     private FSDir dataDir;
     private File tmpDir;
+    private File detachDir; // copy on write for blocks in snapshot
     private DF usage;
     private DU dfsUsage;
     private long reserved;
@@ -281,6 +269,11 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
                                          (float) USABLE_DISK_PCT_DEFAULT);
       File parent = currentDir.getParentFile();
+
+      this.detachDir = new File(parent, "detach");
+      if (detachDir.exists()) {
+        recoverDetachedBlocks(currentDir, detachDir);
+      }
       this.dataDir = new FSDir(currentDir);
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
@@ -291,6 +284,11 @@ class FSDataset implements FSConstants, FSDatasetInterface {
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
         }
       }
+      if (!detachDir.mkdirs()) {
+        if (!detachDir.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + detachDir.toString());
+        }
+      }
       this.usage = new DF(parent, conf);
       this.dfsUsage = new DU(parent, conf);
     }
@@ -324,22 +322,33 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       return dataDir.dir;
     }
     
+    /**
+     * Temporary files. They get deleted when the datanode restarts
+     */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
-      try {
-        if (f.exists()) {
-          throw new IOException("Unexpected problem in creating temporary file for "+
-                                b + ".  File " + f + " should not be present, but is.");
-        }
-        // Create the zero-length temp file
-        //
-        if (!f.createNewFile()) {
-          throw new IOException("Unexpected problem in creating temporary file for "+
-                                b + ".  File " + f + " should be creatable, but is already present.");
-        }
-      } catch (IOException ie) {
-        System.out.println("Exception! " + ie);
-        throw ie;
+      return createTmpFile(b, f);
+    }
+
+    /**
+     * Files used for copy-on-write. They need recovery when datanode
+     * restarts.
+     */
+    File createDetachFile(Block b, String filename) throws IOException {
+      File f = new File(detachDir, filename);
+      return createTmpFile(b, f);
+    }
+
+    private File createTmpFile(Block b, File f) throws IOException {
+      if (f.exists()) {
+        throw new IOException("Unexpected problem in creating temporary file for "+
+                              b + ".  File " + f + " should not be present, but is.");
+      }
+      // Create the zero-length temp file
+      //
+      if (!f.createNewFile()) {
+        throw new IOException("Unexpected problem in creating temporary file for "+
+                              b + ".  File " + f + " should be creatable, but is already present.");
       }
       return f;
     }
@@ -360,14 +369,10 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       dataDir.getBlockInfo(blockSet);
     }
       
-    void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+    void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
       dataDir.getVolumeMap(volumeMap, this);
     }
       
-    void getBlockMap(HashMap<Block, File> blockMap) {
-      dataDir.getBlockMap(blockMap);
-    }
-      
     void clearPath(File f) {
       dataDir.clearPath(f);
     }
@@ -375,6 +380,42 @@ class FSDataset implements FSConstants, FSDatasetInterface {
     public String toString() {
       return dataDir.dir.getAbsolutePath();
     }
+
+    /**
+     * Recover detached files on datanode restart. If a detached block
+     * does not exist in the original directory, then it is moved to the
+     * original directory.
+     */
+    private void recoverDetachedBlocks(File dataDir, File dir) 
+                                           throws IOException {
+      File contents[] = dir.listFiles();
+      if (contents == null) {
+        return;
+      }
+      for (int i = 0; i < contents.length; i++) {
+        if (!contents[i].isFile()) {
+          throw new IOException ("Found " + contents[i] + " in " + dir +
+                                 " but it is not a file.");
+        }
+
+        //
+        // If the original block file still exists, then no recovery
+        // is needed.
+        //
+        File blk = new File(dataDir, contents[i].getName());
+        if (!blk.exists()) {
+          if (!contents[i].renameTo(blk)) {
+            throw new IOException("Unable to recover detached file " +
+                                  contents[i]);
+          }
+          continue;
+        }
+        if (!contents[i].delete()) {
+            throw new IOException("Unable to cleanup detached file " +
+                                  contents[i]);
+        }
+      }
+    }
   }
     
   static class FSVolumeSet {
@@ -427,18 +468,12 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       }
     }
       
-    synchronized void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+    synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
       }
     }
       
-    synchronized void getBlockMap(HashMap<Block, File> blockMap) {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].getBlockMap(blockMap);
-      }
-    }
-      
     synchronized void checkDirs() throws DiskErrorException {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].checkDirs();
@@ -465,7 +500,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
   public static final String METADATA_EXTENSION = ".meta";
   public static final short METADATA_VERSION = 1;
     
-  protected static File getMetaFile( File f ) {
+  static File getMetaFile( File f ) {
     return new File( f.getAbsolutePath() + METADATA_EXTENSION );
   }
 
@@ -505,8 +540,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
   FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  private HashMap<Block,FSVolume> volumeMap = null;
-  private HashMap<Block,File> blockMap = null;
+  private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
   static  Random random = new Random();
   
   long blockWriteTimeout = 3600 * 1000;
@@ -521,10 +555,8 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
     volumes = new FSVolumeSet(volArray);
-    volumeMap = new HashMap<Block,FSVolume>();
+    volumeMap = new HashMap<Block, DatanodeBlockInfo>();
     volumes.getVolumeMap(volumeMap);
-    blockMap = new HashMap<Block,File>();
-    volumes.getBlockMap(blockMap);
     blockWriteTimeout = Math.max(
          conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000;
     registerMBean(storage.getStorageID());
@@ -591,7 +623,25 @@ class FSDataset implements FSConstants, FSDatasetInterface {
           new FileOutputStream( new RandomAccessFile( getMetaFile( f ) , "rw" ).getFD() ));
 
   }
-  
+
+  /**
+   * Make a copy of the block if this block is linked to an existing
+   * snapshot. This ensures that modifying this block does not modify
+   * data in any existing snapshots.
+   * @param b Block
+   * @param numLinks Detach if the number of links exceed this value
+   * @throws IOException
+   * @return - true if the specified block was detached
+   */
+  boolean detachBlock(Block block, int numLinks) throws IOException {
+    DatanodeBlockInfo info = null;
+
+    synchronized (this) {
+      info = volumeMap.get(block);
+    }
+    return info.detachBlock(block, numLinks);
+  }
+
   /**
    * Start writing to a block file
    * If isRecovery is true and the block pre-exists, then we kill all
@@ -653,12 +703,11 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       }
       FSVolume v = null;
       if (!isRecovery) {
-        synchronized (volumes) {
-          v = volumes.getNextVolume(blockSize);
-          // create temporary file to hold block in the designated volume
-          f = createTmpFile(v, b);
-        }
-        volumeMap.put(b, v);
+        v = volumes.getNextVolume(blockSize);
+        // create temporary file to hold block in the designated volume
+        // Do not insert temporary file into volume map.
+        f = createTmpFile(v, b);
+        volumeMap.put(b, new DatanodeBlockInfo(v));
       }
       ongoingCreates.put(b, new ActiveFile(f, threads));
     }
@@ -704,18 +753,14 @@ class FSDataset implements FSConstants, FSDatasetInterface {
     file.getChannel().position(ckOffset);
   }
 
-  File createTmpFile( FSVolume vol, Block blk ) throws IOException {
+  synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
     if ( vol == null ) {
-      synchronized ( this ) {
-        vol = volumeMap.get( blk );
-        if ( vol == null ) {
-          throw new IOException("Could not find volume for block " + blk);
-        }
+      vol = volumeMap.get( blk ).getVolume();
+      if ( vol == null ) {
+        throw new IOException("Could not find volume for block " + blk);
       }
     }
-    synchronized ( volumes ) {
-      return vol.createTmpFile(blk);
-    }
+    return vol.createTmpFile(blk);
   }
   
   //
@@ -734,13 +779,15 @@ class FSDataset implements FSConstants, FSDatasetInterface {
     if (f == null || !f.exists()) {
       throw new IOException("No temporary file " + f + " for block " + b);
     }
-    FSVolume v = volumeMap.get(b);
+    FSVolume v = volumeMap.get(b).getVolume();
+    if (v == null) {
+      throw new IOException("No volume for temporary file " + f + 
+                            " for block " + b);
+    }
         
     File dest = null;
-    synchronized (volumes) {
-      dest = v.addBlock(b, f);
-    }
-    blockMap.put(b, dest);
+    dest = v.addBlock(b, f);
+    volumeMap.put(b, new DatanodeBlockInfo(v, dest));
     ongoingCreates.remove(b);
   }
 
@@ -788,7 +835,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       FSVolume v;
       synchronized (this) {
         f = getFile(invalidBlks[i]);
-        v = volumeMap.get(invalidBlks[i]);
+        v = volumeMap.get(invalidBlks[i]).getVolume();
         if (f == null) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
                             + invalidBlks[i] + 
@@ -814,7 +861,6 @@ class FSDataset implements FSConstants, FSDatasetInterface {
           continue;
         }
         v.clearPath(parent);
-        blockMap.remove(invalidBlks[i]);
         volumeMap.remove(invalidBlks[i]);
       }
       File metaFile = getMetaFile( f );
@@ -844,7 +890,11 @@ class FSDataset implements FSConstants, FSDatasetInterface {
    * Turn the block identifier into a filename.
    */
   synchronized File getFile(Block b) {
-    return blockMap.get(b);
+    DatanodeBlockInfo info = volumeMap.get(b);
+    if (info != null) {
+      return info.getFile();
+    }
+    return null;
   }
 
   /**
@@ -896,6 +946,6 @@ class FSDataset implements FSConstants, FSDatasetInterface {
   }
   
   public long getBlockSize(Block b) {
-    return blockMap.get(b).length();
+    return getFile(b).length();
   }
 }

+ 93 - 3
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -405,31 +405,44 @@ public class FileUtil {
   public static class HardLink { 
     enum OSType {
       OS_TYPE_UNIX, 
-      OS_TYPE_WINXP; 
+      OS_TYPE_WINXP,
+      OS_TYPE_SOLARIS; 
     }
   
     private static String[] hardLinkCommand;
+    private static String[] getLinkCountCommand;
+    private static String osName = System.getProperty("os.name");
     
     static {
       switch(getOSType()) {
       case OS_TYPE_WINXP:
         hardLinkCommand = new String[] {"fsutil","hardlink","create", null, null};
+        getLinkCountCommand = new String[] {"stat","-c%h"};
+        break;
+      case OS_TYPE_SOLARIS:
+        hardLinkCommand = new String[] {"ln", null, null};
+        getLinkCountCommand = new String[] {"ls","-l"};
         break;
       case OS_TYPE_UNIX:
       default:
         hardLinkCommand = new String[] {"ln", null, null};
+        getLinkCountCommand = new String[] {"stat","-c%h"};
       }
     }
 
-    static OSType getOSType() {
-      String osName = System.getProperty("os.name");
+    static private OSType getOSType() {
       if (osName.indexOf("Windows") >= 0 && 
           (osName.indexOf("XpP") >= 0 || osName.indexOf("2003") >= 0))
         return OSType.OS_TYPE_WINXP;
+      else if (osName.indexOf("SunOS") >= 0)
+         return OSType.OS_TYPE_SOLARIS;
       else
         return OSType.OS_TYPE_UNIX;
     }
     
+    /**
+     * Creates a hardlink 
+     */
     public static void createHardLink(File target, 
                                       File linkName) throws IOException {
       int len = hardLinkCommand.length;
@@ -453,6 +466,55 @@ public class FileUtil {
         process.destroy();
       }
     }
+
+    /**
+     * Retrieves the number of links to the specified file.
+     */
+    public static int getLinkCount(File fileName) throws IOException {
+      int len = getLinkCountCommand.length;
+      String[] cmd = new String[len + 1];
+      for (int i = 0; i < len; i++) {
+        cmd[i] = getLinkCountCommand[i];
+      }
+      cmd[len] = fileName.toString();
+      String inpMsg = "";
+      String errMsg = "";
+      int exitValue = -1;
+      BufferedReader in = null;
+      BufferedReader err = null;
+
+      // execute shell command
+      Process process = Runtime.getRuntime().exec(cmd);
+      try {
+        exitValue = process.waitFor();
+        in = new BufferedReader(new InputStreamReader(
+                                    process.getInputStream()));
+        inpMsg = in.readLine();
+        if (inpMsg == null)  inpMsg = "";
+        
+        err = new BufferedReader(new InputStreamReader(
+                                     process.getErrorStream()));
+        errMsg = err.readLine();
+        if (errMsg == null)  errMsg = "";
+        if (exitValue != 0) {
+          throw new IOException(inpMsg + errMsg);
+        }
+        if (getOSType() == OSType.OS_TYPE_SOLARIS) {
+          String[] result = inpMsg.split("\\s+");
+          return Integer.parseInt(result[1]);
+        } else {
+          return Integer.parseInt(inpMsg);
+        }
+      } catch (NumberFormatException e) {
+        throw new IOException(StringUtils.stringifyException(e) + inpMsg + errMsg);
+      } catch (InterruptedException e) {
+        throw new IOException(StringUtils.stringifyException(e) + inpMsg + errMsg);
+      } finally {
+        process.destroy();
+        if (in != null) in.close();
+        if (err != null) err.close();
+      }
+    }
   }
 
   /**
@@ -510,4 +572,32 @@ public class FileUtil {
     }
     return tmp;
   }
+
+  /**
+   * Move the src file to the name specified by target.
+   * @param src the source file
+   * @param target the target file
+   * @exception IOException If this operation fails
+   */
+  public static void replaceFile(File src, File target) throws IOException {
+    /* renameTo() has two limitations on Windows platform.
+     * src.renameTo(target) fails if
+     * 1) If target already exists OR
+     * 2) If target is already open for reading/writing.
+     */
+    if (!src.renameTo(target)) {
+      int retries = 5;
+      while (target.exists() && !target.delete() && retries-- >= 0) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          throw new IOException("replaceFile interrupted.");
+        }
+      }
+      if (!src.renameTo(target)) {
+        throw new IOException("Unable to rename " + src +
+                              " to " + target);
+      }
+    }
+  }
 }

+ 11 - 0
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -607,4 +607,15 @@ public class MiniDFSCluster {
   void setLeasePeriod(long soft, long hard) {
     nameNode.namesystem.setLeasePeriod(soft, hard);
   }
+
+  /**
+   * Returns the current set of datanodes
+   */
+  DataNode[] listDataNodes() {
+    DataNode[] list = new DataNode[dataNodes.size()];
+    for (int i = 0; i < dataNodes.size(); i++) {
+      list[i] = dataNodes.get(i).datanode;
+    }
+    return list;
+  }
 }

+ 134 - 0
src/test/org/apache/hadoop/dfs/TestFileAppend.java

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.net.*;
+import java.util.Random;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil.HardLink;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestFileAppend extends TestCase {
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 1024;
+  static final int numBlocks = 10;
+  static final int fileSize = numBlocks * blockSize + 1;
+  boolean simulatedStorage = false;
+
+  /*
+   * creates a file but does not close it
+   */ 
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+
+  //
+  // writes to file but does not close it
+  //
+  private void writeFile(FSDataOutputStream stm) throws IOException {
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+  }
+
+  /**
+   * Test that copy on write for blocks works correctly
+   */
+  public void testCopyOnWrite() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    InetSocketAddress addr = new InetSocketAddress("localhost",
+                                                   cluster.getNameNodePort());
+    DFSClient client = new DFSClient(addr, conf);
+    try {
+
+      // create a new file, write to it and close it.
+      //
+      Path file1 = new Path("/filestatus.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+      writeFile(stm);
+      stm.close();
+
+      // Get a handle to the datanode
+      DataNode[] dn = cluster.listDataNodes();
+      assertTrue("There should be only one datanode but found " + dn.length,
+                  dn.length == 1);
+
+      LocatedBlocks locations = client.namenode.getBlockLocations(
+                                  file1.toString(), 0, Long.MAX_VALUE);
+      List<LocatedBlock> blocks = locations.getLocatedBlocks();
+      FSDataset dataset = (FSDataset) dn[0].data;
+
+      //
+      // Create hard links for a few of the blocks
+      //
+      for (int i = 0; i < blocks.size(); i = i + 2) {
+        Block b = (Block) blocks.get(i).getBlock();
+        FSDataset fsd = (FSDataset) dataset;
+        File f = fsd.getFile(b);
+        File link = new File(f.toString() + ".link");
+        System.out.println("Creating hardlink for File " + f + 
+                           " to " + link);
+        HardLink.createHardLink(f, link);
+      }
+
+      //
+      // Detach all blocks. This should remove hardlinks (if any)
+      //
+      for (int i = 0; i < blocks.size(); i++) {
+        Block b = (Block) blocks.get(i).getBlock();
+        System.out.println("testCopyOnWrite detaching block " + b);
+        assertTrue("Detaching block " + b + " should have returned true",
+                   dataset.detachBlock(b, 1) == true);
+      }
+
+      // Since the blocks were already detached earlier, these calls should
+      // return false
+      //
+      for (int i = 0; i < blocks.size(); i++) {
+        Block b = (Block) blocks.get(i).getBlock();
+        System.out.println("testCopyOnWrite detaching block " + b);
+        assertTrue("Detaching block " + b + " should have returned false",
+                   dataset.detachBlock(b, 1) == false);
+      }
+
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+}