瀏覽代碼

HADOOP-1700. Support appending to file in HDFS. (dhruba)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@679871 13f79535-47bb-0310-9956-ffa450edef68
Dhruba Borthakur 17 年之前
父節點
當前提交
9354f2fe0f

+ 2 - 0
CHANGES.txt

@@ -32,6 +32,8 @@ Trunk (unreleased changes)
     HADOOP-372.  Add support for multiple input paths with a different
     InputFormat and Mapper for each path.  (Chris Smith via tomwhite)
 
+    HADOOP-1700.  Support appending to file in HDFS. (dhruba)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

+ 7 - 1
src/core/org/apache/hadoop/fs/FSInputChecker.java

@@ -243,7 +243,8 @@ abstract public class FSInputChecker extends FSInputStream {
         } 
         retry = false;
       } catch (ChecksumException ce) {
-          LOG.info("Found checksum error: "+StringUtils.stringifyException(ce));
+          LOG.info("Found checksum error: b[" + off + ", " + (off+read) + "]="
+              + StringUtils.byteToHexString(b, off, off + read), ce);
           if (retriesLeft == 0) {
             throw ce;
           }
@@ -282,6 +283,11 @@ abstract public class FSInputChecker extends FSInputStream {
   
   /* calculate checksum value */
   private long getChecksum() {
+    return checksum2long(checksum);
+  }
+
+  /** Convert a checksum byte array to a long */
+  static public long checksum2long(byte[] checksum) {
     long crc = 0L;
     for(int i=0; i<checksum.length; i++) {
       crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);

+ 32 - 13
src/core/org/apache/hadoop/fs/FSOutputSummer.java

@@ -87,18 +87,18 @@ abstract public class FSOutputSummer extends OutputStream {
     }
   }
   
-  /*
+  /**
    * Write a portion of an array, flushing to the underlying
    * stream at most once if necessary.
    */
-
   private int write1(byte b[], int off, int len) throws IOException {
     if(count==0 && len>=buf.length) {
       // local buffer is empty and user data has one chunk
       // checksum and output data
-      sum.update(b, off, buf.length);
-      writeChecksumChunk(b, off, buf.length, false);
-      return buf.length;
+      final int length = buf.length;
+      sum.update(b, off, length);
+      writeChecksumChunk(b, off, length, false);
+      return length;
     }
     
     // copy user data to local buffer
@@ -136,9 +136,9 @@ abstract public class FSOutputSummer extends OutputStream {
     }
   }
   
-  /* Generate checksum for the data chunk and output data chunk & checksum
+  /** Generate checksum for the data chunk and output data chunk & checksum
    * to the underlying output stream. If keep is true then keep the
-   * current ckecksum intact, do not reset it.
+   * current checksum intact, do not reset it.
    */
   private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
   throws IOException {
@@ -146,12 +146,31 @@ abstract public class FSOutputSummer extends OutputStream {
     if (!keep) {
       sum.reset();
     }
-    
-    checksum[0] = (byte)((tempChecksum >>> 24) & 0xFF);
-    checksum[1] = (byte)((tempChecksum >>> 16) & 0xFF);
-    checksum[2] = (byte)((tempChecksum >>>  8) & 0xFF);
-    checksum[3] = (byte)((tempChecksum >>>  0) & 0xFF);
-
+    int2byte(tempChecksum, checksum);
     writeChunk(b, off, len, checksum);
   }
+
+  /**
+   * Converts a checksum integer value to a byte stream
+   */
+  static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
+    return int2byte((int)sum.getValue(), new byte[checksumSize]);
+  }
+
+  static byte[] int2byte(int integer, byte[] bytes) {
+    bytes[0] = (byte)((integer >>> 24) & 0xFF);
+    bytes[1] = (byte)((integer >>> 16) & 0xFF);
+    bytes[2] = (byte)((integer >>>  8) & 0xFF);
+    bytes[3] = (byte)((integer >>>  0) & 0xFF);
+    return bytes;
+  }
+
+  /**
+   * Resets existing buffer with a new one of the specified size.
+   */
+  protected synchronized void resetChecksumChunk(int size) {
+    sum.reset();
+    this.buf = new byte[size];
+    this.count = 0;
+  }
 }

+ 15 - 6
src/core/org/apache/hadoop/util/StringUtils.java

@@ -132,15 +132,24 @@ public class StringUtils {
    * Given an array of bytes it will convert the bytes to a hex string
    * representation of the bytes
    * @param bytes
+   * @param start start index, inclusively
+   * @param end end index, exclusively
    * @return hex string representation of the byte array
    */
-  public static String byteToHexString(byte bytes[]) {
-    StringBuffer retString = new StringBuffer();
-    for (int i = 0; i < bytes.length; ++i) {
-      retString.append(Integer.toHexString(0x0100 + (bytes[i] & 0x00FF))
-                       .substring(1));
+  public static String byteToHexString(byte[] bytes, int start, int end) {
+    if (bytes == null) {
+      throw new IllegalArgumentException("bytes == null");
+    }
+    StringBuilder s = new StringBuilder(); 
+    for(int i = start; i < end; i++) {
+      s.append(String.format("%02x", bytes[i]));
     }
-    return retString.toString();
+    return s.toString();
+  }
+
+  /** Same as byteToHexString(bytes, 0, bytes.length). */
+  public static String byteToHexString(byte bytes[]) {
+    return byteToHexString(bytes, 0, bytes.length);
   }
 
   /**

+ 197 - 61
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -475,12 +475,45 @@ public class DFSClient implements FSConstants {
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     OutputStream result = new DFSOutputStream(src, masked,
-        overwrite, replication, blockSize, progress, buffersize);
+        overwrite, replication, blockSize, progress, buffersize,
+        conf.getInt("io.bytes.per.checksum", 512));
     synchronized (pendingCreates) {
       pendingCreates.put(src, result);
     }
     return result;
   }
+
+  /**
+   * Append to an existing HDFS file.  
+   * 
+   * @param src file name
+   * @param buffersize buffer size
+   * @param progress for reporting write-progress
+   * @return an output stream for writing into the file
+   * @throws IOException
+   * @see {@link ClientProtocol#append(String, String)}
+   */
+  OutputStream append(String src, int buffersize, Progressable progress
+      ) throws IOException {
+    checkOpen();
+    DFSFileInfo stat = null;
+    LocatedBlock lastBlock = null;
+    try {
+      stat = getFileInfo(src);
+      lastBlock = namenode.append(src, clientName);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(FileNotFoundException.class,
+                                     AccessControlException.class,
+                                     QuotaExceededException.class);
+    }
+    OutputStream result = new DFSOutputStream(src, buffersize, progress,
+        lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
+    synchronized(pendingCreates) {
+      pendingCreates.put(src, result);
+    }
+    return result;
+  }
+
   /**
    * Set replication for an existing file.
    * 
@@ -1751,14 +1784,14 @@ public class DFSClient implements FSConstants {
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private Block block;
-    private long blockSize;
+    final private long blockSize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
     private int maxPackets = 80; // each packet 64K, total 5MB
     // private int maxPackets = 1000; // each packet 64K, total 64MB
-    private DataStreamer streamer;
+    private DataStreamer streamer = new DataStreamer();;
     private ResponseProcessor response = null;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
@@ -1773,6 +1806,7 @@ public class DFSClient implements FSConstants {
     private boolean persistBlocks = false; // persist blocks on namenode
     private int recoveryErrorCount = 0; // number of times block recovery failed
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
+    private volatile boolean appendChunk = false;   // appending to existing partial block
 
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -1781,6 +1815,7 @@ public class DFSClient implements FSConstants {
       long    offsetInBlock;       // offset in block
       boolean lastPacketInBlock;   // is this the last packet in block?
       int     numChunks;           // number of chunks currently in packet
+      int     maxChunks;           // max chunks in packet
       int     dataStart;
       int     dataPos;
       int     checksumStart;
@@ -1801,6 +1836,7 @@ public class DFSClient implements FSConstants {
         checksumPos = checksumStart;
         dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
         dataPos = dataStart;
+        maxChunks = chunksPerPkt;
       }
 
       void writeData(byte[] inarray, int off, int len) {
@@ -1875,7 +1911,7 @@ public class DFSClient implements FSConstants {
     // it. When all the packets for a block are sent out and acks for each
     // if them are received, the DataStreamer closes the current block.
     //
-    private class DataStreamer extends Thread {
+    private class DataStreamer extends Daemon {
 
       private volatile boolean closed = false;
   
@@ -1897,7 +1933,7 @@ public class DFSClient implements FSConstants {
           synchronized (dataQueue) {
 
             // process IO errors if any
-            boolean doSleep = processDatanodeError();
+            boolean doSleep = processDatanodeError(hasError);
 
             // wait for a packet to be sent.
             while ((!closed && !hasError && clientRunning 
@@ -2116,7 +2152,7 @@ public class DFSClient implements FSConstants {
     // threads and mark stream as closed. Returns true if we should
     // sleep for a while after returning from this call.
     //
-    private boolean processDatanodeError() {
+    private boolean processDatanodeError(boolean hasError) {
       if (!hasError) {
         return false;
       }
@@ -2125,12 +2161,11 @@ public class DFSClient implements FSConstants {
                  " waiting for responder to exit. ");
         return true;
       }
-      String msg = "Error Recovery for block " + block +
-                   " bad datanode[" + errorIndex + "]";
-      if (nodes != null) {
-        msg += " " + nodes[errorIndex].getName();
+      if (errorIndex >= 0) {
+        LOG.warn("Error Recovery for block " + block
+            + " bad datanode[" + errorIndex + "] "
+            + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
       }
-      LOG.warn(msg);
 
       if (blockStream != null) {
         try {
@@ -2150,11 +2185,12 @@ public class DFSClient implements FSConstants {
 
       boolean success = false;
       while (!success && clientRunning) {
+        DatanodeInfo[] newnodes = null;
         if (nodes == null) {
           lastException = new IOException("Could not get block locations. " +
                                           "Aborting...");
           closed = true;
-          streamer.close();
+          if (streamer != null) streamer.close();
           return false;
         }
         StringBuilder pipelineMsg = new StringBuilder();
@@ -2164,32 +2200,33 @@ public class DFSClient implements FSConstants {
             pipelineMsg.append(", ");
           }
         }
-        String pipeline = pipelineMsg.toString();
-        if (nodes.length <= 1) {
-          lastException = new IOException("All datanodes " +
-                                          pipeline + " are bad. Aborting...");
-          closed = true;
-          streamer.close();
-          return false;
-        }
-        LOG.warn("Error Recovery for block " + block +
-                 " in pipeline " + pipeline + 
-                 ": bad datanode " + nodes[errorIndex].getName());
-
         // remove bad datanode from list of datanodes.
-        //
-        DatanodeInfo[] newnodes =  new DatanodeInfo[nodes.length-1];
-        for (int i = 0; i < errorIndex; i++) {
-          newnodes[i] = nodes[i];
-        }
-        for (int i = errorIndex; i < (nodes.length-1); i++) {
-          newnodes[i] = nodes[i+1];
+        // If errorIndex was not set (i.e. appends), then do not remove 
+        // any datanodes
+        // 
+        if (errorIndex < 0) {
+          newnodes = nodes;
+        } else {
+          if (nodes.length <= 1) {
+            lastException = new IOException("All datanodes " + pipelineMsg + 
+                                            " are bad. Aborting...");
+            closed = true;
+            if (streamer != null) streamer.close();
+            return false;
+          }
+          LOG.warn("Error Recovery for block " + block +
+                   " in pipeline " + pipelineMsg + 
+                   ": bad datanode " + nodes[errorIndex].getName());
+          newnodes =  new DatanodeInfo[nodes.length-1];
+          System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+          System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+              newnodes.length-errorIndex);
         }
 
         // Tell the primary datanode to do error recovery 
         // by stamping appropriate generation stamps.
         //
-        Block newBlock = null;
+        LocatedBlock newBlock = null;
         ClientDatanodeProtocol primary =  null;
         try {
           // Pick the "least" datanode as the primary datanode to avoid deadlock.
@@ -2206,7 +2243,7 @@ public class DFSClient implements FSConstants {
             LOG.warn(emsg);
             lastException = new IOException(emsg);
             closed = true;
-            streamer.close();
+            if (streamer != null) streamer.close();
             return false;       // abort with IOexception
           } 
           LOG.warn("Error Recovery for block " + block + " failed " +
@@ -2220,15 +2257,14 @@ public class DFSClient implements FSConstants {
         recoveryErrorCount = 0; // block recovery successful
 
         // If the block recovery generated a new generation stamp, use that
-        // from now on.
+        // from now on.  Also, setup new pipeline
         //
         if (newBlock != null) {
-          block = newBlock;
+          block = newBlock.getBlock();
+          nodes = newBlock.getLocations();
         }
 
-        // setup new pipeline
-        nodes = newnodes;
-        hasError = false;
+        this.hasError = false;
         errorIndex = 0;
         success = createBlockOutputStream(nodes, src, true);
       }
@@ -2265,17 +2301,10 @@ public class DFSClient implements FSConstants {
     }
 
     private Progressable progress;
-    /**
-     * Create a new output stream to the given DataNode.
-     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
-     */
-    public DFSOutputStream(String src, FsPermission masked,
-                           boolean overwrite,
-                           short replication, long blockSize,
-                           Progressable progress,
-                           int buffersize
-                           ) throws IOException {
-      super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
+
+    private DFSOutputStream(String src, long blockSize, Progressable progress,
+        int bytesPerChecksum) throws IOException {
+      super(new CRC32(), bytesPerChecksum, 4);
       this.src = src;
       this.blockSize = blockSize;
       this.progress = progress;
@@ -2283,7 +2312,6 @@ public class DFSClient implements FSConstants {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
       }
       
-      int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); 
       if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
         throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
                               ") and blockSize(" + blockSize + 
@@ -2293,11 +2321,18 @@ public class DFSClient implements FSConstants {
       }
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
-      int chunkSize = bytesPerChecksum + checksum.getChecksumSize();
-      chunksPerPacket = Math.max((writePacketSize - DataNode.PKT_HEADER_LEN - 
-                                  SIZE_OF_INTEGER + chunkSize-1)/chunkSize, 1);
-      packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER + 
-                   chunkSize * chunksPerPacket; 
+    }
+
+    /**
+     * Create a new output stream to the given DataNode.
+     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     */
+    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
+        short replication, long blockSize, Progressable progress,
+        int buffersize, int bytesPerChecksum) throws IOException {
+      this(src, blockSize, progress, bytesPerChecksum);
+
+      computePacketChunkSize(writePacketSize, bytesPerChecksum);
 
       try {
         namenode.create(
@@ -2306,11 +2341,89 @@ public class DFSClient implements FSConstants {
         throw re.unwrapRemoteException(AccessControlException.class,
                                        QuotaExceededException.class);
       }
-      streamer = new DataStreamer();
-      streamer.setDaemon(true);
       streamer.start();
     }
   
+    /**
+     * Create a new output stream to the given DataNode.
+     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     */
+    DFSOutputStream(String src, int buffersize, Progressable progress,
+        LocatedBlock lastBlock, DFSFileInfo stat,
+        int bytesPerChecksum) throws IOException {
+      this(src, stat.getBlockSize(), progress, bytesPerChecksum);
+
+      //
+      // The last partial block of the file has to be filled.
+      //
+      if (lastBlock != null) {
+        block = lastBlock.getBlock();
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock > blockSize) {
+          throw new IOException("The last block for file " + 
+                                src + " is full.");
+        }
+
+        // indicate that we are appending to an existing block
+        bytesCurBlock = lastBlock.getBlockSize();
+
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          this.appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+                                 bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+                                " for last block " + block +
+                                "of file " + src);
+                        
+        }
+        processDatanodeError(true);
+        streamer.start();
+      }
+      else {
+        computePacketChunkSize(writePacketSize, bytesPerChecksum);
+        streamer.start();
+      }
+    }
+
+    private void computePacketChunkSize(int psize, int csize) {
+      int chunkSize = csize + checksum.getChecksumSize();
+      int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+      chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
+      packetSize = n + chunkSize*chunksPerPacket;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("computePacketChunkSize: src=" + src +
+                  ", chunkSize=" + chunkSize +
+                  ", chunksPerPacket=" + chunksPerPacket +
+                  ", packetSize=" + packetSize);
+      }
+    }
+
     /**
      * Open a DataOutputStream to a DataNode so that it can be written to.
      * This happens when a file is created and each time a new block is allocated.
@@ -2508,8 +2621,14 @@ public class DFSClient implements FSConstants {
         if (currentPacket == null) {
           currentPacket = new Packet(packetSize, chunksPerPacket, 
                                      bytesCurBlock);
-          LOG.debug("DFSClient writeChunk allocating new packet " + 
-                    currentPacket.seqno);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
+                      currentPacket.seqno +
+                      ", src=" + src +
+                      ", packetSize=" + packetSize +
+                      ", chunksPerPacket=" + chunksPerPacket +
+                      ", bytesCurBlock=" + bytesCurBlock);
+          }
         }
 
         currentPacket.writeChecksum(checksum, 0, cklen);
@@ -2519,9 +2638,16 @@ public class DFSClient implements FSConstants {
 
         // If packet is full, enqueue it for transmission
         //
-        if (currentPacket.numChunks == chunksPerPacket ||
+        if (currentPacket.numChunks == currentPacket.maxChunks ||
             bytesCurBlock == blockSize) {
-          LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("DFSClient writeChunk packet full seqno=" +
+                      currentPacket.seqno +
+                      ", src=" + src +
+                      ", bytesCurBlock=" + bytesCurBlock +
+                      ", blockSize=" + blockSize +
+                      ", appendChunk=" + appendChunk);
+          }
           //
           // if we allocated a new packet because we encountered a block
           // boundary, reset bytesCurBlock.
@@ -2534,6 +2660,16 @@ public class DFSClient implements FSConstants {
           dataQueue.addLast(currentPacket);
           dataQueue.notifyAll();
           currentPacket = null;
+ 
+          // If this was the first write after reopening a file, then the above
+          // write filled up any partial chunk. Tell the summer to generate full 
+          // crc chunks from now on.
+          if (appendChunk) {
+            appendChunk = false;
+            resetChecksumChunk(bytesPerChecksum);
+          }
+          int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
+          computePacketChunkSize(psize, bytesPerChecksum);
         }
       }
       //LOG.debug("DFSClient writeChunk done length " + len +

+ 3 - 1
src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -157,7 +157,9 @@ public class DistributedFileSystem extends FileSystem {
   /** This optional operation is not yet supported. */
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
-    throw new IOException("Not supported");
+
+    return new FSDataOutputStream(
+        dfs.append(getPathName(f), bufferSize, progress), statistics);
   }
 
   public FSDataOutputStream create(Path f, FsPermission permission,

+ 3 - 3
src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -29,9 +29,9 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 1: start of time
+   * 2: recoverBlock returns the datanodes on which recovery was successful.
    */
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
@@ -41,5 +41,5 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
    * not have any data and the block was deleted.
    * @throws IOException
    */
-  Block recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
+  LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
 }

+ 14 - 3
src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -39,10 +39,9 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 35 : Quota-related RPCs are introduced: getQuota, clearQuota;
-   * Besides, getContentSummary also returns the quota of the directory.
+   * 36 : Added append(...).
    */
-  public static final long versionID = 35L;
+  public static final long versionID = 36L;
   
   ///////////////////////////////////////
   // File contents
@@ -108,6 +107,18 @@ public interface ClientProtocol extends VersionedProtocol {
                              long blockSize
                              ) throws IOException;
 
+  /**
+   * Append to the end of the file. 
+   * @param src path of the file being created.
+   * @param clientName name of the current client.
+   * @return information about the last partial block if any.
+   * @throws AccessControlException if permission to append file is 
+   * denied by the system. As usually on the client side the exception will 
+   * be wrapped into {@link org.apache.hadoop.ipc.RemoteException}.
+   * @throws IOException if other errors occur.
+   */
+  public LocatedBlock append(String src, String clientName) throws IOException;
+
   /**
    * Set replication for an existing file.
    * <p>

+ 126 - 12
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.*;
@@ -65,6 +67,8 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.*;
 import java.util.concurrent.Semaphore;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 
@@ -2214,10 +2218,15 @@ public class DataNode extends Configured
               /* The receiver thread cancelled this thread. 
                * We could also check any other status updates from the 
                * receiver thread (e.g. if it is ok to write to replyOut). 
+               * It is prudent to not send any more status back to the client
+               * because this datanode has a problem. The upstream datanode
+               * will detect a timout on heartbeats and will declare that
+               * this datanode is bad, and rightly so.
                */
               LOG.info("PacketResponder " + block +  " " + numTargets +
                        " : Thread is interrupted.");
               running = false;
+              continue;
             }
             
             if (!didRead) {
@@ -2321,6 +2330,7 @@ public class DataNode extends Configured
     private boolean isRecovery = false;
     private String clientName;
     DatanodeInfo srcDataNode = null;
+    private Checksum partialCrc = null;
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
                   boolean isRecovery, String clientName, 
@@ -2346,6 +2356,11 @@ public class DataNode extends Configured
           this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                     streams.checksumOut, 
                                                     SMALL_BUFFER_SIZE));
+          // If this block is for appends, then remove it from periodic
+          // validation.
+          if (blockScanner != null && isRecovery) {
+            blockScanner.deleteBlock(block);
+          }
         }
       } catch(IOException ioe) {
         IOUtils.closeStream(this);
@@ -2360,6 +2375,7 @@ public class DataNode extends Configured
       // close checksum file
       try {
         if (checksumOut != null) {
+          checksumOut.flush();
           checksumOut.close();
           checksumOut = null;
         }
@@ -2369,6 +2385,7 @@ public class DataNode extends Configured
       // close block file
       try {
         if (out != null) {
+          out.flush();
           out.close();
           out = null;
         }
@@ -2498,7 +2515,7 @@ public class DataNode extends Configured
      * be more data from next packet in buf.<br><br>
      * 
      * It tries to read a full packet with single read call.
-     * Consecutinve packets are usually of the same length.
+     * Consecutive packets are usually of the same length.
      */
     private int readNextPacket() throws IOException {
       /* This dances around buf a little bit, mainly to read 
@@ -2669,7 +2686,25 @@ public class DataNode extends Configured
           if (!finalized) {
             //finally write to the disk :
             out.write(pktBuf, dataOff, len);
-            checksumOut.write(pktBuf, checksumOff, checksumLen);
+
+            // If this is a partial chunk, then verify that this is the only
+            // chunk in the packet. Calculate new crc for this chunk.
+            if (partialCrc != null) {
+              if (len > bytesPerChecksum) {
+                throw new IOException("Got wrong length during writeBlock(" + 
+                                      block + ") from " + inAddr + " " +
+                                      "A packet can have only one partial chunk."+
+                                      " len = " + len + 
+                                      " bytesPerChecksum " + bytesPerChecksum);
+              }
+              partialCrc.update(pktBuf, dataOff, len);
+              byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
+              checksumOut.write(buf);
+              LOG.debug("Writing out partial crc for data len " + len);
+              partialCrc = null;
+            } else {
+              checksumOut.write(pktBuf, checksumOff, checksumLen);
+            }
             myMetrics.bytesWritten.inc(len);
           }
         } catch (IOException iex) {
@@ -2799,12 +2834,6 @@ public class DataNode extends Configured
       if (data.getChannelPosition(block, streams) == offsetInBlock) {
         return;                   // nothing to do 
       }
-      if (offsetInBlock % bytesPerChecksum != 0) {
-        throw new IOException("setBlockPosition trying to set position to " +
-                              offsetInBlock +
-                              " which is not a multiple of bytesPerChecksum " +
-                               bytesPerChecksum);
-      }
       long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                               offsetInBlock / bytesPerChecksum * checksumSize;
       if (out != null) {
@@ -2813,6 +2842,17 @@ public class DataNode extends Configured
       if (checksumOut != null) {
         checksumOut.flush();
       }
+
+      // If this is a partial chunk, then read in pre-existing checksum
+      if (offsetInBlock % bytesPerChecksum != 0) {
+        LOG.info("setBlockPosition trying to set position to " +
+                 offsetInBlock +
+                 " for block " + block +
+                 " which is not a multiple of bytesPerChecksum " +
+                 bytesPerChecksum);
+        computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
+      }
+
       LOG.info("Changing block file offset of block " + block + " from " + 
                data.getChannelPosition(block, streams) +
                " to " + offsetInBlock +
@@ -2821,6 +2861,58 @@ public class DataNode extends Configured
       // set the position of the block file
       data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
     }
+
+    /**
+     * reads in the partial crc chunk and computes checksum
+     * of pre-existing data in partial chunk.
+     */
+    private void computePartialChunkCrc(long blkoff, long ckoff, 
+                                        int bytesPerChecksum) throws IOException {
+
+      // find offset of the beginning of partial chunk.
+      //
+      int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
+      int checksumSize = checksum.getChecksumSize();
+      blkoff = blkoff - sizePartialChunk;
+      LOG.info("computePartialChunkCrc sizePartialChunk " + 
+                sizePartialChunk +
+                " block " + block +
+                " offset in block " + blkoff +
+                " offset in metafile " + ckoff);
+
+      // create an input stream from the block file
+      // and read in partial crc chunk into temporary buffer
+      //
+      byte[] buf = new byte[sizePartialChunk];
+      byte[] crcbuf = new byte[checksumSize];
+      FSDataset.BlockInputStreams instr = null;
+      try { 
+        instr = data.getTmpInputStreams(block, blkoff, ckoff);
+        IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
+  
+        // open meta file and read in crc value computer earlier
+        IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
+      } finally {
+        IOUtils.closeStream(instr);
+      }
+
+      // compute crc of partial chunk from data read in the block file.
+      partialCrc = new CRC32();
+      partialCrc.update(buf, 0, sizePartialChunk);
+      LOG.info("Read in partial CRC chunk from disk for block " + block);
+
+      // paranoia! verify that the pre-computed crc matches what we
+      // recalculated just now
+      if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
+        String msg = "Partial CRC " + partialCrc.getValue() +
+                     " does not match value computed the " +
+                     " last time file was closed " +
+                     FSInputChecker.checksum2long(crcbuf);
+        throw new IOException(msg);
+      }
+      //LOG.debug("Partial CRC matches 0x" + 
+      //            Long.toHexString(partialCrc.getValue()));
+    }
   }
 
   /**
@@ -3117,8 +3209,22 @@ public class DataNode extends Configured
       LOG.debug("block=" + block);
     }
     Block stored = data.getStoredBlock(block.getBlockId());
-    return stored == null?
-        null: new BlockMetaDataInfo(stored, blockScanner.getLastScanTime(stored));
+
+    if (stored == null) {
+      return null;
+    }
+    BlockMetaDataInfo info = new BlockMetaDataInfo(stored,
+                                 blockScanner.getLastScanTime(stored));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getBlockMetaDataInfo successful block=" + stored +
+                " length " + stored.getNumBytes() +
+                " genstamp " + stored.getGenerationStamp());
+    }
+
+    // paranoia! verify that the contents of the stored block
+    // matches the block file on disk.
+    data.validateBlockMetadata(stored);
+    return info;
   }
 
   public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
@@ -3159,9 +3265,17 @@ public class DataNode extends Configured
 
   // ClientDataNodeProtocol implementation
   /** {@inheritDoc} */
-  public Block recoverBlock(Block block, DatanodeInfo[] targets
+  public LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets
       ) throws IOException {
-    LOG.info("Client invoking recoverBlock for block " + block);
+    StringBuilder msg = new StringBuilder();
+    for (int i = 0; i < targets.length; i++) {
+      msg.append(targets[i].getName());
+      if (i < targets.length - 1) {
+        msg.append(",");
+      }
+    }
+    LOG.info("Client invoking recoverBlock for block " + block +
+             " on datanodes " + msg.toString());
     return LeaseManager.recoverBlock(block, targets, this, namenode, 
                                      getConf(), false);
   }

+ 136 - 9
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -99,12 +99,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       if (numBlocks < maxBlocksPerDir) {
         File dest = new File(dir, b.getBlockName());
         File metaData = getMetaFile( src, b );
-        if ( ! metaData.renameTo( getMetaFile(dest, b) ) ||
+        File newmeta = getMetaFile(dest, b);
+        if ( ! metaData.renameTo( newmeta ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
                                  " from tmp to " + 
                                  dest.getAbsolutePath() );
         }
+        if (DataNode.LOG.isDebugEnabled()) {
+          DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
+          DataNode.LOG.debug("addBlock: Moved " + src + " to " + dest);
+        }
 
         numBlocks += 1;
         return dest;
@@ -360,7 +365,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
     
     /**
-     * Temporary files. They get deleted when the datanode restarts
+     * Temporary files. They get moved to the real block directory either when
+     * the block is finalized or the datanode restarts.
      */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
@@ -632,7 +638,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   }
 
   /** {@inheritDoc} */
-  public Block getStoredBlock(long blkid) throws IOException {
+  public synchronized Block getStoredBlock(long blkid) throws IOException {
     Block b = new Block(blkid);
     File blockfile = findBlockFile(b);
     if (blockfile == null) {
@@ -659,7 +665,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return new MetaDataInputStream(new FileInputStream(checksumFile),
                                                     checksumFile.length());
   }
-    
+
   FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
@@ -736,6 +742,31 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
     return new FileInputStream(blockInFile.getFD());
   }
+
+  /**
+   * Returns handles to the block file and its metadata file
+   */
+  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
+                          long blkOffset, long ckoff) throws IOException {
+
+    DatanodeBlockInfo info = volumeMap.get(b);
+    if (info == null) {
+      throw new IOException("Block " + b + " does not exist in volumeMap.");
+    }
+    FSVolume v = info.getVolume();
+    File blockFile = v.getTmpFile(b);
+    RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
+    if (blkOffset > 0) {
+      blockInFile.seek(blkOffset);
+    }
+    File metaFile = getMetaFile(blockFile, b);
+    RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
+    if (ckoff > 0) {
+      metaInFile.seek(ckoff);
+    }
+    return new BlockInputStreams(new FileInputStream(blockInFile.getFD()),
+                                new FileInputStream(metaInFile.getFD()));
+  }
     
   private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
       return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
@@ -797,6 +828,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
 
     File blockFile = findBlockFile(oldblock);
+    if (blockFile == null) {
+      throw new IOException("Block " + oldblock + " does not exist.");
+    }
     interruptOngoingCreates(oldblock);
     
     File oldMetaFile = getMetaFile(blockFile, oldblock);
@@ -833,6 +867,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
 
     updateBlockMap(ongoingCreates, oldblock, newblock);
     updateBlockMap(volumeMap, oldblock, newblock);
+
+    // paranoia! verify that the contents of the stored block 
+    // matches the block file on disk.
+    validateBlockMetadata(newblock);
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
@@ -896,13 +934,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       if (!isRecovery) {
         throw new IOException("Block " + b + " is valid, and cannot be written to.");
       }
-      // If the block was succesfully finalized because all packets
+      // If the block was successfully finalized because all packets
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client 
       // re-opens the connection and retries sending those packets.
-      // 
-      DataNode.LOG.info("Reopen Block " + b);
-      return null;
+      // The other reason is that an "append" is occurring to this block.
+      detachBlock(b, 1);
     }
     long blockSize = b.getNumBytes();
 
@@ -934,9 +971,50 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       if (!isRecovery) {
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
-        // Do not insert temporary file into volume map.
         f = createTmpFile(v, b);
         volumeMap.put(b, new DatanodeBlockInfo(v));
+      } else if (f != null) {
+        DataNode.LOG.info("Reopen already-open Block for append " + b);
+        // create or reuse temporary file to hold block in the designated volume
+        v = volumeMap.get(b).getVolume();
+        volumeMap.put(b, new DatanodeBlockInfo(v));
+      } else {
+        // reopening block for appending to it.
+        DataNode.LOG.info("Reopen Block for append " + b);
+        v = volumeMap.get(b).getVolume();
+        f = createTmpFile(v, b);
+        File blkfile = getBlockFile(b);
+        File oldmeta = getMetaFile(b);
+        File newmeta = getMetaFile(f, b);
+
+        // rename meta file to tmp directory
+        DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+        if (!oldmeta.renameTo(newmeta)) {
+          throw new IOException("Block " + b + " reopen failed. " +
+                                " Unable to move meta file  " + oldmeta +
+                                " to tmp dir " + newmeta);
+        }
+
+        // rename block file to tmp directory
+        DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
+        if (!blkfile.renameTo(f)) {
+          if (!f.delete()) {
+            throw new IOException("Block " + b + " reopen failed. " +
+                                  " Unable to remove file " + f);
+          }
+          if (!blkfile.renameTo(f)) {
+            throw new IOException("Block " + b + " reopen failed. " +
+                                  " Unable to move block file " + blkfile +
+                                  " to tmp dir " + f);
+          }
+        }
+        volumeMap.put(b, new DatanodeBlockInfo(v));
+      }
+      if (f == null) {
+        DataNode.LOG.warn("Block " + b + " reopen failed " +
+                          " Unable to locate tmp file.");
+        throw new IOException("Block " + b + " reopen failed " +
+                              " Unable to locate tmp file.");
       }
       ongoingCreates.put(b, new ActiveFile(f, threads));
     }
@@ -957,6 +1035,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     // block size, so clients can't go crazy
     //
     File metafile = getMetaFile(f, b);
+    DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
+    DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
     return createBlockWriteStreams( f , metafile);
   }
 
@@ -1081,6 +1161,53 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return null;
   }
 
+  /** {@inheritDoc} */
+  public void validateBlockMetadata(Block b) throws IOException {
+    DatanodeBlockInfo info = volumeMap.get(b);
+    if (info == null) {
+      throw new IOException("Block " + b + " does not exist in volumeMap.");
+    }
+    FSVolume v = info.getVolume();
+    File tmp = v.getTmpFile(b);
+    File f = getFile(b);
+    if (f == null) {
+      f = tmp;
+    }
+    if (f == null) {
+      throw new IOException("Block " + b + 
+                            " block file " + f +
+                            " does not exist on disk.");
+    }
+    if (!f.exists()) {
+      throw new IOException("Block " + b + 
+                            " block file " + f +
+                            " does not exist on disk.");
+    }
+    if (b.getNumBytes() != f.length()) {
+      throw new IOException("Block " + b + 
+                            " length is " + b.getNumBytes()  +
+                            " does not match block file length " +
+                            f.length());
+    }
+    File meta = getMetaFile(f, b);
+    if (meta == null) {
+      throw new IOException("Block " + b + 
+                            " metafile does not exist.");
+    }
+    if (!meta.exists()) {
+      throw new IOException("Block " + b + 
+                            " metafile " + meta +
+                            " does not exist on disk.");
+    }
+    long stamp = parseGenerationStamp(f, meta);
+    if (stamp != b.getGenerationStamp()) {
+      throw new IOException("Block " + b + 
+                            " genstamp is " + b.getGenerationStamp()  +
+                            " does not match meta file stamp " +
+                            stamp);
+    }
+  }
+
   /**
    * We're informed that a block is no longer valid.  We
    * could lazily garbage-collect the block, but why bother?

+ 42 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
+import java.io.Closeable;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,6 +29,7 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
@@ -115,6 +117,19 @@ public interface FSDatasetInterface extends FSDatasetMBean {
   public InputStream getBlockInputStream(Block b, long seekOffset)
             throws IOException;
 
+  /**
+   * Returns an input stream at specified offset of the specified block
+   * The block is still in the tmp directory and is not finalized
+   * @param b
+   * @param blkoff
+   * @param ckoff
+   * @return an input stream to read the contents of the specified block,
+   *  starting at the offset
+   * @throws IOException
+   */
+  public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff)
+            throws IOException;
+
      /**
       * 
       * This class contains the output streams for the data and checksum
@@ -130,6 +145,26 @@ public interface FSDatasetInterface extends FSDatasetMBean {
       }
       
     }
+
+  /**
+   * This class contains the input streams for the data and checksum
+   * of a block
+   */
+  static class BlockInputStreams implements Closeable {
+    final InputStream dataIn;
+    final InputStream checksumIn;
+
+    BlockInputStreams(InputStream dataIn, InputStream checksumIn) {
+      this.dataIn = dataIn;
+      this.checksumIn = checksumIn;
+    }
+
+    /** {@inheritDoc} */
+    public void close() {
+      IOUtils.closeStream(dataIn);
+      IOUtils.closeStream(checksumIn);
+    }
+  }
     
   /**
    * Creates the block and returns output streams to write data and CRC
@@ -222,4 +257,11 @@ public interface FSDatasetInterface extends FSDatasetMBean {
   public void setChannelPosition(Block b, BlockWriteStreams stream, long dataOffset,
                                  long ckOffset) throws IOException;
 
+  /**
+   * Validate that the contents in the Block matches
+   * the file on disk. Returns true if everything is fine.
+   * @param b The block to be verified.
+   * @throws IOException
+   */
+  public void validateBlockMetadata(Block b) throws IOException;
 }

+ 114 - 51
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.io.IOUtils;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.DataOutputStream;
@@ -912,7 +913,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                  String holder, String clientMachine,
                  boolean overwrite, short replication, long blockSize
                 ) throws IOException {
-    startFileInternal(src, permissions, holder, clientMachine, overwrite,
+    startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
                       replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
@@ -928,18 +929,26 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                               String holder, 
                                               String clientMachine, 
                                               boolean overwrite,
+                                              boolean append,
                                               short replication,
                                               long blockSize
                                               ) throws IOException {
-    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
-                                  +src+" for "+holder+" at "+clientMachine);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+          + ", holder=" + holder
+          + ", clientMachine=" + clientMachine
+          + ", replication=" + replication
+          + ", overwrite=" + overwrite
+          + ", append=" + append);
+    }
+
     if (isInSafeMode())
       throw new SafeModeException("Cannot create file" + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
       throw new IOException("Invalid file name: " + src);
     }
     if (isPermissionEnabled) {
-      if (overwrite && dir.exists(src)) {
+      if (append || (overwrite && dir.exists(src))) {
         checkPathAccess(src, FsAction.WRITE);
       }
       else {
@@ -996,7 +1005,15 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       } catch(IOException e) {
         throw new IOException("failed to create "+e.getMessage());
       }
-      if (!dir.isValidToCreate(src)) {
+      if (append) {
+        if (myFile == null) {
+          throw new FileNotFoundException("failed to append to non-existent file "
+              + src + " on client " + clientMachine);
+        } else if (myFile.isDirectory()) {
+          throw new IOException("failed to append to directory " + src 
+                                +" on client " + clientMachine);
+        }
+      } else if (!dir.isValidToCreate(src)) {
         if (overwrite) {
           delete(src, true);
         } else {
@@ -1009,71 +1026,116 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      //
-      // Now we can add the name to the filesystem. This file has no
-      // blocks associated with it.
-      //
-      checkFsObjectLimit();
+      if (append) {
+        //
+        // Replace current node with a INodeUnderConstruction.
+        // Recreate in-memory lease record.
+        //
+        INodeFile node = (INodeFile) myFile;
+        INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                        node.getLocalNameBytes(),
+                                        node.getReplication(),
+                                        node.getModificationTime(),
+                                        node.getPreferredBlockSize(),
+                                        node.getBlocks(),
+                                        node.getPermissionStatus(),
+                                        holder,
+                                        clientMachine,
+                                        clientNode);
+        dir.replaceNode(src, node, cons);
+        leaseManager.addLease(cons.clientName, src);
 
-      // increment global generation stamp
-      long genstamp = nextGenerationStamp();
-      INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
-          replication, blockSize, holder, clientMachine, clientNode, genstamp);
-      if (newNode == null) {
-        throw new IOException("DIR* NameSystem.startFile: " +
-                              "Unable to add file to namespace.");
+      } else {
+       // Now we can add the name to the filesystem. This file has no
+       // blocks associated with it.
+       //
+       checkFsObjectLimit();
+
+        // increment global generation stamp
+        long genstamp = nextGenerationStamp();
+        INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
+            replication, blockSize, holder, clientMachine, clientNode, genstamp);
+        if (newNode == null) {
+          throw new IOException("DIR* NameSystem.startFile: " +
+                                "Unable to add file to namespace.");
+        }
+        leaseManager.addLease(newNode.clientName, src);
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+                                     +"add "+src+" to namespace for "+holder);
+        }
       }
-      leaseManager.addLease(newNode.clientName, src);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
                                    +ie.getMessage());
       throw ie;
     }
-
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
-                                  +"add "+src+" to namespace for "+holder);
-    }
   }
 
-  /** append is not yet ready.  This method is for testing. */
-  void appendFileInternal(String src, String holder, String clientMachine
+  /**
+   * Append to an existing file in the namespace.
+   */
+  LocatedBlock appendFile(String src, String holder, String clientMachine
       ) throws IOException {
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
           +src+" for "+holder+" at "+clientMachine);
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot append file" + src, safeMode);
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-
-    try {
-      INodeFile f = dir.getFileINode(src);
-      //assume f != null && !f.isUnderConstruction() && lease does not exist
-      //TODO: remove the assumption 
+    startFileInternal(src, null, holder, clientMachine, false, true, 
+                      (short)maxReplication, (long)0);
+    getEditLog().logSync();
 
-      DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
-          clientMachine);
-      INodeFileUnderConstruction newnode = f.toINodeFileUnderConstruction(
-          holder, clientMachine, clientNode);
+    //
+    // Create a LocatedBlock object for the last block of the file
+    // to be returned to the client. Return null if the file does not
+    // have a partial block at the end.
+    //
+    LocatedBlock lb = null;
+    synchronized (this) {
+      INodeFile file = dir.getFileINode(src);
+
+      Block[] blocks = file.getBlocks();
+      if (blocks != null && blocks.length > 0) {
+        Block last = blocks[blocks.length-1];
+        BlockInfo storedBlock = blocksMap.getStoredBlock(last);
+        if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+          long fileLength = file.computeContentSummary().getLength();
+          DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
+          Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+          for (int i = 0; it != null && it.hasNext(); i++) {
+            targets[i] = it.next();
+          }
+          lb = new LocatedBlock(last, targets, 
+                                fileLength-storedBlock.getNumBytes());
 
-      dir.replaceNode(src, f, newnode);
-      leaseManager.addLease(newnode.clientName, src);
+          // Remove block from replication queue.
+          updateNeededReplications(last, 0, 0);
 
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.appendFile: ", ie);
-      throw ie;
+          // remove this block from the list of pending blocks to be deleted. 
+          // This reduces the possibility of triggering HADOOP-1349.
+          //
+          for(Collection<Block> v : recentInvalidateSets.values()) {
+            v.remove(last);
+          }
+        }
+      }
+    }
+    if (lb != null) {
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
+            +src+" for "+holder+" at "+clientMachine
+            +" block " + lb.getBlock()
+            +" block size " + lb.getBlock().getNumBytes());
+      }
     }
 
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: "
-          +"add "+src+" to namespace for "+holder);
+    if (auditLog.isInfoEnabled()) {
+      logAuditEvent(UserGroupInformation.getCurrentUGI(),
+                    Server.getRemoteIp(),
+                    "append", src, null, null);
     }
+    return lb;
   }
 
   /**
@@ -2284,7 +2346,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
 
         // block should belong to a file
         INodeFile fileINode = blocksMap.getINode(block);
-        if(fileINode == null) { // abandoned block 
+        // abandoned block or block reopened for append
+        if(fileINode == null || fileINode.isUnderConstruction()) { 
           neededReplicationsIterator.remove(); // remove from neededReplications
           replIndex--;
           continue;

+ 12 - 4
src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -26,7 +26,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -425,7 +427,7 @@ public class LeaseManager {
   }
 
   /** Recover a block */
-  public static Block recoverBlock(Block block, DatanodeID[] datanodeids,
+  public static LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
       DataNode primary, DatanodeProtocol namenode, Configuration conf,
       boolean closeFile) throws IOException {
 
@@ -485,7 +487,7 @@ public class LeaseManager {
   }
 
   /** Block synchronization */
-  private static Block syncBlock(Block block, long minlength,
+  private static LocatedBlock syncBlock(Block block, long minlength,
       List<BlockRecord> syncList, DatanodeProtocol namenode,
       boolean closeFile) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -517,10 +519,16 @@ public class LeaseManager {
     }
 
     if (!successList.isEmpty()) {
+      DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+
       namenode.commitBlockSynchronization(block,
           newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
-          successList.toArray(new DatanodeID[successList.size()]));
-      return newblock; // success
+          nlist);
+      DatanodeInfo[] info = new DatanodeInfo[nlist.length];
+      for (int i = 0; i < nlist.length; i++) {
+        info[i] = new DatanodeInfo(nlist[i]);
+      }
+      return new LocatedBlock(newblock, info); // success
     }
     return null; // failed
   }

+ 14 - 6
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -104,6 +104,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   private Server server;
   private Thread emptier;
   private int handlerCount = 2;
+  private boolean supportAppends = true; // allow appending to hdfs files
     
   private InetSocketAddress nameNodeAddress = null;
     
@@ -144,6 +145,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
    */
   private void initialize(String address, Configuration conf) throws IOException {
     InetSocketAddress socAddr = NameNode.getAddress(address);
+    this.supportAppends = conf.getBoolean("dfs.support.append", true);
     this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
     this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                                 handlerCount, false, conf);
@@ -285,8 +287,10 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
                              long blockSize
                              ) throws IOException {
     String clientMachine = getClientMachine();
-    stateChangeLog.debug("*DIR* NameNode.create: file "
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.create: file "
                          +src+" for "+clientName+" at "+clientMachine);
+    }
     if (!checkPathLength(src)) {
       throw new IOException("create: Pathname too long.  Limit " 
                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
@@ -299,17 +303,21 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     myMetrics.numCreateFileOps.inc();
   }
 
-  /** Coming in a future release.... */
-  public void append(String src, String clientName) throws IOException {
+  /** {@inheritDoc} */
+  public LocatedBlock append(String src, String clientName) throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*DIR* NameNode.append: file "
           +src+" for "+clientName+" at "+clientMachine);
     }
-    //TODO: add namesystem.appendFile(...), which calls appendFileInternal(...)
-    namesystem.appendFileInternal(src, clientName, clientMachine);
+    if (supportAppends == false) {
+      throw new IOException("Append to hdfs not supported." +
+                            " Please refer to dfs.support.append configuration parameter.");
+    }
 
-    //TODO: inc myMetrics;
+    LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
+    myMetrics.numFilesAppended.inc();
+    return info;
   }
 
   /** {@inheritDoc} */

+ 2 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

@@ -47,6 +47,7 @@ public class NameNodeMetrics implements Updater {
     private NameNodeStatistics namenodeStats;
     
     public MetricsTimeVaryingInt numFilesCreated = new MetricsTimeVaryingInt("FilesCreated");
+    public MetricsTimeVaryingInt numFilesAppended = new MetricsTimeVaryingInt("FilesAppended");
     public MetricsTimeVaryingInt numGetBlockLocations = new MetricsTimeVaryingInt("GetBlockLocations");
     public MetricsTimeVaryingInt numFilesRenamed = new MetricsTimeVaryingInt("FilesRenamed");
     public MetricsTimeVaryingInt numGetListingOps = 
@@ -99,6 +100,7 @@ public class NameNodeMetrics implements Updater {
     public void doUpdates(MetricsContext unused) {
       synchronized (this) {
         numFilesCreated.pushMetric(metricsRecord);
+        numFilesAppended.pushMetric(metricsRecord);
         numGetBlockLocations.pushMetric(metricsRecord);
         numFilesRenamed.pushMetric(metricsRecord);
         numGetListingOps.pushMetric(metricsRecord);

+ 7 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeStatistics.java

@@ -210,4 +210,11 @@ public class NameNodeStatistics implements NameNodeStatisticsMBean {
   public int getNumFilesRenamed() {
     return myMetrics.numFilesRenamed.getPreviousIntervalValue();
   }
+
+  /**
+   * @inheritDoc
+   */
+  public int getNumFilesAppended() {
+    return myMetrics.numFilesAppended.getPreviousIntervalValue();
+  }
 }

+ 15 - 0
src/test/org/apache/hadoop/hdfs/TestDatanodeDeath.java

@@ -32,12 +32,27 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import org.apache.log4j.Level;
+import org.apache.commons.logging.impl.Log4JLogger;
 
 /**
  * This class tests that a file need not be closed before its
  * data can be read by another client.
  */
 public class TestDatanodeDeath extends TestCase {
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int numBlocks = 2;

+ 423 - 0
src/test/org/apache/hadoop/hdfs/TestFileAppend2.java

@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestFileAppend2 extends TestCase {
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 1024;
+  static final int numBlocks = 5;
+  static final int fileSize = numBlocks * blockSize + 1;
+  boolean simulatedStorage = false;
+  byte[] fileContents = null;
+  Random rand = new Random(seed);
+
+  int numDatanodes = 5;
+  int numberOfFiles = 50;
+  int numThreads = 10;
+  int numAppendsPerThread = 20;
+/***
+  int numberOfFiles = 1;
+  int numThreads = 1;
+  int numAppendsPerThread = 2000;
+****/
+  Workload[] workload = null;
+  ArrayList<Path> testFiles = new ArrayList<Path>();
+  volatile static boolean globalStatus = true;
+
+  //
+  // create a buffer that contains the entire test file data.
+  //
+  private void initBuffer(int size) {
+    fileContents = new byte[size];
+    rand.nextBytes(fileContents);
+  }
+
+  /*
+   * creates a file but does not close it
+   */ 
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+
+  private void checkFile(FileSystem fs, Path name, int len) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[len];
+    stm.readFully(0, actual);
+    checkData(actual, 0, fileContents, "Read 2");
+    stm.close();
+  }
+
+  private void checkFullFile(FileSystem fs, Path name) throws IOException {
+    checkFile(fs, name, fileSize);
+  }
+
+  private void checkData(byte[] actual, int from, byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                   expected[from+idx]+" actual "+actual[idx],
+                   expected[from+idx], actual[idx]);
+      actual[idx] = 0;
+    }
+  }
+
+
+  /**
+   * Creates one file, writes a few bytes to it and then closed it.
+   * Reopens the same file for appending, write all blocks and then close.
+   * Verify that all data exists in file.
+   */ 
+  public void testSimpleAppend() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    conf.setInt("dfs.datanode.handler.count", 50);
+    initBuffer(fileSize);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      { // test appending to a file.
+
+        // create a new file.
+        Path file1 = new Path("/simpleAppend.dat");
+        FSDataOutputStream stm = createFile(fs, file1, 1);
+        System.out.println("Created file simpleAppend.dat");
+  
+        // write to file
+        int mid = 186;   // io.bytes.per.checksum bytes
+        System.out.println("Writing " + mid + " bytes to file " + file1);
+        stm.write(fileContents, 0, mid);
+        stm.close();
+        System.out.println("Wrote and Closed first part of file.");
+  
+        // write to file
+        int mid2 = 607;   // io.bytes.per.checksum bytes
+        System.out.println("Writing " + mid + " bytes to file " + file1);
+        stm = fs.append(file1);
+        stm.write(fileContents, mid, mid2-mid);
+        stm.close();
+        System.out.println("Wrote and Closed second part of file.");
+  
+        // write the remainder of the file
+        stm = fs.append(file1);
+        System.out.println("Writing " + (fileSize - mid2) + " bytes to file " + file1);
+        stm.write(fileContents, mid2, fileSize - mid2);
+        System.out.println("Written second part of file");
+        stm.close();
+        System.out.println("Wrote and Closed second part of file.");
+  
+        // verify that entire file is good
+        checkFullFile(fs, file1);
+      }
+
+      { // test appending to an non-existing file.
+        FSDataOutputStream out = null;
+        try {
+          out = fs.append(new Path("/non-existing.dat"));
+          fail("Expected to have FileNotFoundException");
+        }
+        catch(java.io.FileNotFoundException fnfe) {
+          System.out.println("Good: got " + fnfe);
+          fnfe.printStackTrace(System.out);
+        }
+        finally {
+          IOUtils.closeStream(out);
+        }
+      }
+
+      { // test append permission.
+
+        //set root to all writable 
+        Path root = new Path("/");
+        fs.setPermission(root, new FsPermission((short)0777));
+        fs.close();
+
+        // login as a different user
+        final UserGroupInformation superuser = UserGroupInformation.getCurrentUGI();
+        String username = "testappenduser";
+        String group = "testappendgroup";
+        assertFalse(superuser.getUserName().equals(username));
+        assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
+        UnixUserGroupInformation appenduser = UnixUserGroupInformation.createImmutable(
+            new String[]{username, group});
+        UnixUserGroupInformation.saveToConf(conf,
+            UnixUserGroupInformation.UGI_PROPERTY_NAME, appenduser);
+        fs = FileSystem.get(conf);
+
+        // create a file
+        Path dir = new Path(root, getClass().getSimpleName());
+        Path foo = new Path(dir, "foo.dat");
+        FSDataOutputStream out = null;
+        int offset = 0;
+        try {
+          out = fs.create(foo);
+          int len = 10 + rand.nextInt(100);
+          out.write(fileContents, offset, len);
+          offset += len;
+        }
+        finally {
+          IOUtils.closeStream(out);
+        }
+
+        // change dir and foo to minimal permissions.
+        fs.setPermission(dir, new FsPermission((short)0100));
+        fs.setPermission(foo, new FsPermission((short)0200));
+
+        // try append, should success
+        out = null;
+        try {
+          out = fs.append(foo);
+          int len = 10 + rand.nextInt(100);
+          out.write(fileContents, offset, len);
+          offset += len;
+        }
+        finally {
+          IOUtils.closeStream(out);
+        }
+
+        // change dir and foo to all but no write on foo.
+        fs.setPermission(foo, new FsPermission((short)0577));
+        fs.setPermission(dir, new FsPermission((short)0777));
+
+        // try append, should fail
+        out = null;
+        try {
+          out = fs.append(foo);
+          fail("Expected to have AccessControlException");
+        }
+        catch(AccessControlException ace) {
+          System.out.println("Good: got " + ace);
+          ace.printStackTrace(System.out);
+        }
+        finally {
+          IOUtils.closeStream(out);
+        }
+      }
+    } catch (IOException e) {
+      System.out.println("Exception :" + e);
+      throw e; 
+    } catch (Throwable e) {
+      System.out.println("Throwable :" + e);
+      e.printStackTrace();
+      throw new IOException("Throwable : " + e);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  //
+  // an object that does a bunch of appends to files
+  //
+  class Workload extends Thread {
+    private int id;
+    private MiniDFSCluster cluster;
+
+    Workload(MiniDFSCluster cluster, int threadIndex) {
+      id = threadIndex;
+      this.cluster = cluster;
+    }
+
+    // create a bunch of files. Write to them and then verify.
+    public void run() {
+      System.out.println("Workload " + id + " starting... ");
+      for (int i = 0; i < numAppendsPerThread; i++) {
+   
+        // pick a file at random and remove it from pool
+        Path testfile = null;
+        synchronized (testFiles) {
+          if (testFiles.size() == 0) {
+            System.out.println("Completed write to almost all files.");
+            return;  
+          }
+          int index = rand.nextInt(testFiles.size());
+          testfile = testFiles.remove(index);
+        }
+
+        long len = 0;
+        int sizeToAppend = 0;
+        try {
+          FileSystem fs = cluster.getFileSystem();
+
+          // add a random number of bytes to file
+          len = fs.getFileStatus(testfile).getLen();
+
+          // if file is already full, then pick another file
+          if (len >= fileSize) {
+            System.out.println("File " + testfile + " is full.");
+            continue;
+          }
+  
+          // do small size appends so that we can trigger multiple
+          // appends to the same file.
+          //
+          int left = (int)(fileSize - len)/3;
+          if (left <= 0) {
+            left = 1;
+          }
+          sizeToAppend = rand.nextInt(left);
+
+          System.out.println("Workload thread " + id +
+                             " appending " + sizeToAppend + " bytes " +
+                             " to file " + testfile +
+                             " of size " + len);
+          FSDataOutputStream stm = fs.append(testfile);
+          stm.write(fileContents, (int)len, sizeToAppend);
+          stm.close();
+
+          // wait for the file size to be reflected in the namenode metadata
+          while (fs.getFileStatus(testfile).getLen() != (len + sizeToAppend)) {
+            try {
+              System.out.println("Workload thread " + id +
+                                 " file " + testfile  +
+                                 " size " + fs.getFileStatus(testfile).getLen() +
+                                 " expected size " + (len + sizeToAppend) +
+                                 " waiting for namenode metadata update.");
+              Thread.sleep(5000);
+            } catch (InterruptedException e) { 
+            }
+          }
+
+          assertTrue("File " + testfile + " size is " + 
+                     fs.getFileStatus(testfile).getLen() +
+                     " but expected " + (len + sizeToAppend),
+                    fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
+
+          checkFile(fs, testfile, (int)(len + sizeToAppend));
+        } catch (Throwable e) {
+          globalStatus = false;
+          if (e != null && e.toString() != null) {
+            System.out.println("Workload exception " + id + 
+                               " testfile " + testfile +
+                               " " + e);
+            e.printStackTrace();
+          }
+          assertTrue("Workload exception " + id + " testfile " + testfile +
+                     " expected size " + (len + sizeToAppend),
+                     false);
+        }
+
+        // Add testfile back to the pool of files.
+        synchronized (testFiles) {
+          testFiles.add(testfile);
+        }
+      }
+    }
+  }
+
+  /**
+   * Test that appends to files at random offsets.
+   */
+  public void testComplexAppend() throws IOException {
+    initBuffer(fileSize);
+    Configuration conf = new Configuration();
+    conf.setInt("heartbeat.recheck.interval", 2000);
+    conf.setInt("dfs.heartbeat.interval", 2);
+    conf.setInt("dfs.replication.pending.timeout.sec", 2);
+    conf.setInt("dfs.socket.timeout", 30000);
+    conf.setInt("dfs.datanode.socket.write.timeout", 30000);
+    conf.setInt("dfs.datanode.handler.count", 50);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
+                                                true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+
+    try {
+      // create a bunch of test files with random replication factors.
+      // Insert them into a linked list.
+      //
+      for (int i = 0; i < numberOfFiles; i++) {
+        short replication = (short)(rand.nextInt(numDatanodes) + 1);
+        Path testFile = new Path("/" + i + ".dat");
+        FSDataOutputStream stm = createFile(fs, testFile, replication);
+        stm.close();
+        testFiles.add(testFile);
+      }
+
+      // Create threads and make them run workload concurrently.
+      workload = new Workload[numThreads];
+      for (int i = 0; i < numThreads; i++) {
+        workload[i] = new Workload(cluster, i);
+        workload[i].start();
+      }
+
+      // wait for all transactions to get over
+      for (int i = 0; i < numThreads; i++) {
+        try {
+          System.out.println("Waiting for thread " + i + " to complete...");
+          workload[i].join();
+          System.out.println("Waiting for thread " + i + " complete.");
+        } catch (InterruptedException e) {
+          i--;      // retry
+        }
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+
+    // If any of the worker thread failed in their job, indicate that
+    // this test failed.
+    //
+    assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
+  }
+}

+ 10 - 0
src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -394,6 +394,16 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return result;
   }
 
+  /** Not supported */
+  public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
+      ) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  /** No-op */
+  public void validateBlockMetadata(Block b) {
+  }
+
   /**
    * Returns metaData of block b as an input stream
    * @param b - the block for which the metadata is desired