Преглед на файлове

HDFS-589. Change block write protocol to support pipeline recovery. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@814936 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang преди 15 години
родител
ревизия
825165d09a

+ 5 - 2
CHANGES.txt

@@ -4,6 +4,8 @@ Append branch (unreleased changes)
 
   INCOMPATIBLE CHANGES
 
+    HDFS-544. Add a "rbw" subdir to DataNode data directory. (hairong)
+
     HDFS-576. Block report includes under-construction replicas. (shv)
 
   NEW FEATURES
@@ -13,8 +15,6 @@ Append branch (unreleased changes)
     HDFS-517. Introduce BlockInfoUnderConstruction to reflect block replica
     states while writing. (shv)
 
-    HDFS-544. Add a "rbw" subdir to DataNode data directory. (hairong)
-
     HDFS-565. Introduce block committing logic during new block allocation
     and file close. (shv)
 
@@ -41,6 +41,9 @@ Append branch (unreleased changes)
     HDFS-603. Add a new interface, Replica, which is going to replace the use
     of Block in datanode.  (szetszwo)
 
+    HDFS-589. Change block write protocol to support pipeline recovery. 
+    (hairong)  
+
   BUG FIXES
 
     HDFS-547. TestHDFSFileSystemContract#testOutputStreamClosedTwice

+ 98 - 64
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -2239,7 +2240,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
-    private DataStreamer streamer = new DataStreamer();
+    private DataStreamer streamer;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0; // write packet size, including the header.
@@ -2346,6 +2347,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         buffer.reset();
         return buffer;
       }
+      
+      // get the packet's last byte's offset in the block
+      long getLastByteOffsetBlock() {
+        return offsetInBlock + dataPos - dataStart;
+      }
     }
   
     //
@@ -2360,7 +2366,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
       private int recoveryErrorCount = 0; // number of times block recovery failed
       private volatile boolean streamerClosed = false;
-      private Block block;
+      private Block block; // its length is number of bytes acked
       private AccessToken accessToken;
       private DataOutputStream blockStream;
       private DataInputStream blockReplyStream;
@@ -2368,7 +2374,73 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
       volatile boolean hasError = false;
       volatile int errorIndex = 0;
-  
+      private BlockConstructionStage stage;  // block construction stage
+      private long bytesSent = 0; // number of bytes that've been sent
+
+      /**
+       * Default construction for file create
+       */
+      private DataStreamer() {
+        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      }
+      
+      /**
+       * Construct a data streamer for append
+       * @param lastBlock last block of the file to be appended
+       * @param stat status of the file to be appended
+       * @param bytesPerChecksum number of bytes per checksum
+       * @throws IOException if error occurs
+       */
+      private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+          int bytesPerChecksum) throws IOException {
+        stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+        block = lastBlock.getBlock();
+        bytesSent = block.getNumBytes();
+        accessToken = lastBlock.getAccessToken();
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock == blockSize) {
+          throw new IOException("The last block for file " + 
+              src + " is full.");
+        }
+
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+              bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+              " for last block " + block +
+              "of file " + src);
+
+        }
+        processDatanodeError(true, true);
+      }
+
       /*
        * streamer thread is the only thread that opens streams to datanode, 
        * and closes them. Any error recovery is also done by this thread.
@@ -2419,6 +2491,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                   " block " + block);
               response = new ResponseProcessor(nodes);
               response.start();
+              stage = BlockConstructionStage.DATA_STREAMING;
             }
 
             if (offsetInBlock >= blockSize) {
@@ -2441,7 +2514,13 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
             blockStream.flush();
-
+            
+            // update bytesSent
+            long tmpBytesSent = one.getLastByteOffsetBlock();
+            if (bytesSent < tmpBytesSent) {
+              bytesSent = tmpBytesSent;
+            }
+            
             if (one.lastPacketInBlock) {
               synchronized (dataQueue) {
                 while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
@@ -2450,21 +2529,13 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                   } catch (InterruptedException  e) {
                   }
                 }
-                // update block length
-                block.setNumBytes(offsetInBlock + one.dataPos - one.dataStart);
               }
               
               if (ackQueue.isEmpty()) { // done receiving all acks
-                if (response != null) {
-                  response.close(); // notify responder to close
-                }
                 // indicate end-of-block
                 blockStream.writeInt(0);
                 blockStream.flush();
               }
-            } else {
-              // update block length
-              block.setNumBytes(offsetInBlock + one.dataPos - one.dataStart);
             }
             if (LOG.isDebugEnabled()) {
               LOG.debug("DataStreamer block " + block +
@@ -2494,6 +2565,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             closeResponder();
             closeStream();
             nodes = null;
+            stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
           }
           if (progress != null) { progress.progress(); }
 
@@ -2588,12 +2660,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
               // verify seqno from datanode
               long seqno = blockReplyStream.readLong();
               LOG.debug("DFSClient received ack for seqno " + seqno);
+              Packet one = null;
               if (seqno == -1) {
                 continue;
               } else if (seqno == -2) {
                 // no nothing
               } else {
-                Packet one = null;
                 synchronized (dataQueue) {
                   one = ackQueue.getFirst();
                 }
@@ -2618,6 +2690,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                 }
               }
 
+              if (one != null) {
+                // update bytesAcked
+                block.setNumBytes(one.getLastByteOffsetBlock());
+              }
+              
               synchronized (dataQueue) {
                 ackQueue.removeFirst();
                 dataQueue.notifyAll();
@@ -2813,6 +2890,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           long startTime = System.currentTimeMillis();
           lb = locateFollowingBlock(startTime);
           block = lb.getBlock();
+          block.setNumBytes(0);
           accessToken = lb.getAccessToken();
           nodes = lb.getLocations();
 
@@ -2882,9 +2960,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
               DataNode.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
+          // send the request: newGS now uses a dummy value 0 for now
           DataTransferProtocol.Sender.opWriteBlock(out,
-              block.getBlockId(), block.getGenerationStamp(), nodes.length,
-              recoveryFlag, client, null, nodes, accessToken);
+              block.getBlockId(), block.getGenerationStamp(),
+              nodes.length, recoveryFlag?stage.getRecoveryStage():stage, 0,
+              block.getNumBytes(), bytesSent, client, null, nodes, accessToken);
           checksum.writeHeader(out);
           out.flush();
 
@@ -2973,54 +3053,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         } 
       }
 
-      void initAppend(LocatedBlock lastBlock, FileStatus stat,
-          int bytesPerChecksum) throws IOException {
-        block = lastBlock.getBlock();
-        accessToken = lastBlock.getAccessToken();
-        long usedInLastBlock = stat.getLen() % blockSize;
-        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-        // calculate the amount of free space in the pre-existing 
-        // last crc chunk
-        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-        int freeInCksum = bytesPerChecksum - usedInCksum;
-
-        // if there is space in the last block, then we have to 
-        // append to that block
-        if (freeInLastBlock > blockSize) {
-          throw new IOException("The last block for file " + 
-              src + " is full.");
-        }
-
-        if (usedInCksum > 0 && freeInCksum > 0) {
-          // if there is space in the last partial chunk, then 
-          // setup in such a way that the next packet will have only 
-          // one chunk that fills up the partial chunk.
-          //
-          computePacketChunkSize(0, freeInCksum);
-          resetChecksumChunk(freeInCksum);
-          appendChunk = true;
-        } else {
-          // if the remaining space in the block is smaller than 
-          // that expected size of of a packet, then create 
-          // smaller size packet.
-          //
-          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
-              bytesPerChecksum);
-        }
-
-        // setup pipeline to append to the last block XXX retries??
-        nodes = lastBlock.getLocations();
-        errorIndex = -1;   // no errors yet.
-        if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-              " for last block " + block +
-              "of file " + src);
-
-        }
-        processDatanodeError(true, true);
-      }
-
       Block getBlock() {
         return block;
       }
@@ -3105,6 +3137,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                                        NSQuotaExceededException.class,
                                        DSQuotaExceededException.class);
       }
+      streamer = new DataStreamer();
       streamer.start();
     }
   
@@ -3124,9 +3157,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       if (lastBlock != null) {
         // indicate that we are appending to an existing block
         bytesCurBlock = lastBlock.getBlockSize();
-        streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+        streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
       } else {
         computePacketChunkSize(writePacketSize, bytesPerChecksum);
+        streamer = new DataStreamer();
       }
       streamer.start();
     }

+ 73 - 13
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.AccessToken;
 
 /**
@@ -38,12 +39,12 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 16:
-   *    Datanode now needs to send back a status code together 
-   *    with firstBadLink during pipeline setup for dfs write
-   *    (only for DFSClients, not for other datanodes).
+   * Version 17:
+   *    Change the block write protocol to support pipeline recovery.
+   *    Additional fields, like recovery flags, new GS, minBytesRcvd, 
+   *    and maxBytesRcvd are included.
    */
-  public static final int DATA_TRANSFER_VERSION = 16;
+  public static final int DATA_TRANSFER_VERSION = 17;
 
   /** Operation */
   public enum Op {
@@ -119,6 +120,55 @@ public interface DataTransferProtocol {
     }
   };
   
+  public enum BlockConstructionStage {
+    /** The enumerates are always listed as regular stage followed by the
+     * recovery stage. 
+     * Changing this order will make getRecoveryStage not working.
+     */
+    // pipeline set up for block append
+    PIPELINE_SETUP_APPEND,
+    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+    PIPELINE_SETUP_APPEND_RECOVERY,
+    // data streaming
+    DATA_STREAMING,
+    // pipeline setup for failed data streaming recovery
+    PIPELINE_SETUP_STREAMING_RECOVERY,
+    // close the block and pipeline
+    PIPELINE_CLOSE,
+    // Recover a failed PIPELINE_CLOSE
+    PIPELINE_CLOSE_RECOVERY,
+    // pipeline set up for block creation
+    PIPELINE_SETUP_CREATE;
+    
+    final static private byte RECOVERY_BIT = (byte)1;
+    
+    /**
+     * get the recovery stage of this stage
+     */
+    public BlockConstructionStage getRecoveryStage() {
+      if (this == PIPELINE_SETUP_CREATE) {
+        throw new IllegalArgumentException( "Unexpected blockStage " + this);
+      } else {
+        return values()[ordinal()|RECOVERY_BIT];
+      }
+    }
+    
+    private static BlockConstructionStage valueOf(byte code) {
+      return code < 0 || code >= values().length? null: values()[code];
+    }
+    
+    /** Read from in */
+    private static BlockConstructionStage readFields(DataInput in)
+    throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** write to out */
+    private void write(DataOutput out) throws IOException {
+      out.writeByte(ordinal());
+    }
+  }    
+
   /** @deprecated Deprecated at 0.21.  Use Op.WRITE_BLOCK instead. */
   @Deprecated
   public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
@@ -187,15 +237,19 @@ public interface DataTransferProtocol {
     
     /** Send OP_WRITE_BLOCK */
     public static void opWriteBlock(DataOutputStream out,
-        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
-        String client, DatanodeInfo src, DatanodeInfo[] targets,
-        AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, int pipelineSize, 
+        BlockConstructionStage stage, long newGs, long minBytesRcvd,
+        long maxBytesRcvd, String client, DatanodeInfo src, 
+        DatanodeInfo[] targets, AccessToken accesstoken) throws IOException {
       op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
       out.writeInt(pipelineSize);
-      out.writeBoolean(isRecovery);
+      stage.write(out);
+      WritableUtils.writeVLong(out, newGs);
+      WritableUtils.writeVLong(out, minBytesRcvd);
+      WritableUtils.writeVLong(out, maxBytesRcvd);
       Text.writeString(out, client);
 
       out.writeBoolean(src != null);
@@ -307,7 +361,11 @@ public interface DataTransferProtocol {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
-      final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+      final BlockConstructionStage stage = 
+        BlockConstructionStage.readFields(in);
+      final long newGs = WritableUtils.readVLong(in);
+      final long minBytesRcvd = WritableUtils.readVLong(in);
+      final long maxBytesRcvd = WritableUtils.readVLong(in);
       final String client = Text.readString(in); // working on behalf of this client
       final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
 
@@ -321,8 +379,8 @@ public interface DataTransferProtocol {
       }
       final AccessToken accesstoken = readAccessToken(in);
 
-      opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
-          client, src, targets, accesstoken);
+      opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
+          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
     }
 
     /**
@@ -330,7 +388,9 @@ public interface DataTransferProtocol {
      * Write a block.
      */
     protected abstract void opWriteBlock(DataInputStream in,
-        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+        long blockId, long blockGs,
+        int pipelineSize, BlockConstructionStage stage,
+        long newGs, long minBytesRcvd, long maxBytesRcvd,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException;
 

+ 35 - 11
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -80,31 +81,49 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   final private ReplicaInPipelineInterface replicaInfo;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
-                String myAddr, boolean isRecovery, String clientName, 
-                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+                String myAddr, BlockConstructionStage stage, 
+                long newGs, long minBytesRcvd, long maxBytesRcvd, 
+                String clientName, DatanodeInfo srcDataNode, DataNode datanode)
+                throws IOException {
     try{
       this.block = block;
       this.in = in;
       this.inAddr = inAddr;
       this.myAddr = myAddr;
-      this.isRecovery = isRecovery;
       this.clientName = clientName;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
-      this.checksum = DataChecksum.newDataChecksum(in);
-      this.bytesPerChecksum = checksum.getBytesPerChecksum();
-      this.checksumSize = checksum.getChecksumSize();
       this.finalized = datanode.data.isValidBlock(block);
       //
       // Open local disk out
       //
       if (clientName.length() == 0) { //replication or move
         replicaInfo = datanode.data.writeToTemporary(block);
-      } else if (finalized && isRecovery) { // client append
-        replicaInfo = datanode.data.append(block);
-        this.finalized = false;
-      } else { // client write
-        replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+      } else {
+        switch (stage) {
+        case PIPELINE_SETUP_CREATE:
+          isRecovery = false;
+          replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+          break;
+        case PIPELINE_SETUP_STREAMING_RECOVERY:
+          isRecovery = true;
+          if (finalized) {
+            replicaInfo = datanode.data.append(block);
+            finalized = false;
+          } else {
+            replicaInfo = datanode.data.writeToRbw(block, isRecovery);
+          }
+          break;
+        case PIPELINE_SETUP_APPEND:
+        case PIPELINE_SETUP_APPEND_RECOVERY:
+          isRecovery = true;
+          replicaInfo = datanode.data.append(block);
+          finalized = false;
+          break;
+        case PIPELINE_CLOSE_RECOVERY:
+        default: throw new IOException("Unsupported stage " + stage + 
+              " while receiving block " + block + " from " + inAddr);
+        }
       }
       streams = replicaInfo.createStreams();
       if (streams != null) {
@@ -117,6 +136,11 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
         if (datanode.blockScanner != null && isRecovery) {
           datanode.blockScanner.deleteBlock(block);
         }
+        
+        // read checksum meta information
+        this.checksum = DataChecksum.newDataChecksum(in);
+        this.bytesPerChecksum = checksum.getBytesPerChecksum();
+        this.checksumSize = checksum.getChecksumSize();
       }
     } catch (BlockAlreadyExistsException bae) {
       throw bae;

+ 3 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@@ -1249,7 +1250,8 @@ public class DataNode extends Configured
               EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+            b.getBlockId(), b.getGenerationStamp(), 0, 
+            BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
             srcNode, targets, accessToken);
 
         // send data & checksum

+ 9 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
@@ -208,7 +209,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
    */
   @Override
   protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
-      int pipelineSize, boolean isRecovery,
+      int pipelineSize, BlockConstructionStage stage,
+      long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       AccessToken accessToken) throws IOException {
 
@@ -254,7 +256,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
       blockReceiver = new BlockReceiver(block, in, 
           s.getRemoteSocketAddress().toString(),
           s.getLocalSocketAddress().toString(),
-          isRecovery, client, srcDataNode, datanode);
+          stage, newGs, minBytesRcvd, maxBytesRcvd,
+          client, srcDataNode, datanode);
 
       //
       // Open network conn to backup machine, if 
@@ -282,8 +285,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
 
           // Write header: Copied from DFSClient.java!
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
-              block.getBlockId(), block.getGenerationStamp(), pipelineSize,
-              isRecovery, client, srcDataNode, targets, accessToken);
+              blockId, blockGs, 
+              pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
+              srcDataNode, targets, accessToken);
 
           blockReceiver.writeChecksumHeader(mirrorOut);
           mirrorOut.flush();
@@ -569,7 +573,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          false, "", null, datanode);
+          null, 0, 0, 0, "", null, datanode);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 

+ 9 - 1
src/test/findbugsExcludeFile.xml

@@ -208,7 +208,7 @@
      </Match>
 
      <!--
-       CreateBlockWriteStreams and getTmpInputStreams are pretty much like a stream constructor.
+       getTmpInputStreams is pretty much like a stream constructor.
        The newly created streams are not supposed to be closed in the constructor. So ignore
        the OBL warning.
      -->
@@ -218,4 +218,12 @@
        <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
      </Match>
 
+     <!--
+      ResponseProccessor is thread that is designed to catch RuntimeException.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer$ResponseProcessor" />
+       <Method name="run" />
+       <Bug pattern="REC_CATCH_EXCEPTION" />
+     </Match>
  </FindBugsFilter>

+ 155 - 47
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -41,9 +41,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.io.IOUtils;
@@ -51,6 +54,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.util.DataChecksum;
+import org.junit.Test;
 
 /**
  * This tests data transfer protocol handling in the Datanode. It sends
@@ -94,6 +98,7 @@ public class TestDataTransferProtocol extends TestCase {
       
       DataInputStream in = new DataInputStream(sock.getInputStream());
       out.write(sendBuf.toByteArray());
+      out.flush();
       try {
         in.readFully(retBuf);
       } catch (EOFException eof) {
@@ -137,7 +142,139 @@ public class TestDataTransferProtocol extends TestCase {
     in.readFully(arr);
   }
   
-  public void testDataTransferProtocol() throws IOException {
+  private void writeZeroLengthPacket(Block block, String description)
+  throws IOException {
+    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
+    sendOut.writeInt(512);         // checksum size
+    sendOut.writeInt(8);           // size of packet
+    sendOut.writeLong(block.getNumBytes());          // OffsetInBlock
+    sendOut.writeLong(100);        // sequencenumber
+    sendOut.writeBoolean(true);    // lastPacketInBlock
+
+    sendOut.writeInt(0);           // chunk length
+    sendOut.writeInt(0);           // zero checksum
+    
+    // mark the end of block
+    sendOut.writeInt(0);
+    
+    //ok finally write a block with 0 len
+    SUCCESS.write(recvOut);
+    Text.writeString(recvOut, ""); // first bad node
+    recvOut.writeLong(100);        // sequencenumber
+    SUCCESS.write(recvOut);
+    sendRecvData(description, false);
+  }
+  
+  private void testWrite(Block block, BlockConstructionStage stage, 
+      String description, Boolean eofExcepted) throws IOException {
+    sendBuf.reset();
+    recvBuf.reset();
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+        block.getBlockId(), block.getGenerationStamp(), 0,
+        stage, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
+    if (eofExcepted) {
+      ERROR.write(recvOut);
+      sendRecvData(description, true);
+    } else {
+      writeZeroLengthPacket(block, description);
+    }
+  }
+  
+  @Test public void testOpWrite() throws IOException {
+    int numDataNodes = 1;
+    Configuration conf = new Configuration();
+    conf.setBoolean("dfs.support.append", true);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    try {
+      cluster.waitActive();
+      datanode = cluster.getDataNodes().get(0).dnRegistration;
+      dnAddr = NetUtils.createSocketAddr(datanode.getName());
+      FileSystem fileSys = cluster.getFileSystem();
+
+      /* Test writing to finalized replicas */
+      Path file = new Path("dataprotocol.dat");    
+      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      // get the first blockid for the file
+      Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      // test PIPELINE_SETUP_CREATE on a finalized block
+      testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+          "Cannot create an existing block", true);
+      // test PIPELINE_DATA_STREAMING on a finalized block
+      testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING,
+          "Unexpected stage", true);
+      // test PIPELINE_SETUP_STREAMING_RECOVERY on an existing block
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+          "Successful for now", false);
+      // test PIPELINE_SETUP_APPEND on an existing block
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND,
+          "Append to a finalized replica", false);
+      // test PIPELINE_SETUP_APPEND on an existing block
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+          "Recover appending to a finalized replica", false);
+
+      /* Test writing to a new block */
+      long newBlockId = firstBlock.getBlockId() + 1;
+      Block newBlock = new Block(newBlockId, 0, 
+          firstBlock.getGenerationStamp());
+
+      // test PIPELINE_SETUP_CREATE on a new block
+      testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+          "Create a new block", false);
+      // test PIPELINE_SETUP_STREAMING_RECOVERY on a new block
+      newBlock.setBlockId(newBlock.getBlockId()+1);
+      testWrite(newBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+          "Recover a new block", false);
+      // test PIPELINE_SETUP_APPEND on a new block
+      newBlock.setBlockId(newBlock.getBlockId()+1);
+      testWrite(newBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND,
+          "Cannot append to a new block", true);
+      // test PIPELINE_SETUP_APPEND_RECOVERY on a new block
+      newBlock.setBlockId(newBlock.getBlockId()+1);
+      testWrite(newBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+          "Cannot append to a new block", true);
+
+      /* Test writing to RBW replicas */
+      // change first block to a RBW
+      DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
+          getWrappedStream()); 
+      out.write(1);
+      out.hflush();
+      FSDataInputStream in = fileSys.open(file);
+      firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+      
+      try {
+        // test PIPELINE_SETUP_CREATE on a RBW block
+        testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE,
+            "Cannot create a RBW block", true);
+        // test PIPELINE_SETUP_APPEND on an existing block
+        testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
+            "Cannot append to a RBW replica", true);
+        // test PIPELINE_SETUP_APPEND on an existing block
+        testWrite(firstBlock, 
+            BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+            "Cannot append to a RBW replica", true);
+        // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+        testWrite(firstBlock, 
+            BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+            "Recover a RBW replica", false);
+      } finally {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+@Test  public void testDataTransferProtocol() throws IOException {
     Random random = new Random();
     int oneMil = 1024*1024;
     Path file = new Path("dataprotocol.dat");
@@ -146,6 +283,7 @@ public class TestDataTransferProtocol extends TestCase {
     Configuration conf = new Configuration();
     conf.setInt("dfs.replication", numDataNodes); 
     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    try {
     cluster.waitActive();
     DFSClient dfsClient = new DFSClient(
                  new InetSocketAddress("localhost", cluster.getNameNodePort()),
@@ -178,16 +316,10 @@ public class TestDataTransferProtocol extends TestCase {
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(newBlockId); // block id
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-    sendOut.writeInt(0);           // number of downstream targets
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+        newBlockId, 0L, 0,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     // bad bytes per checksum
@@ -198,32 +330,10 @@ public class TestDataTransferProtocol extends TestCase {
 
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(newBlockId);
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-
-    // bad number of targets
-    sendOut.writeInt(-1-random.nextInt(oneMil));
-    ERROR.write(recvOut);
-    sendRecvData("bad targets len while writing block " + newBlockId, true);
-
-    sendBuf.reset();
-    recvBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(++newBlockId);
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-    sendOut.writeInt(0);
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    DataTransferProtocol.Sender.opWriteBlock(sendOut,
+        ++newBlockId, 0L, 0,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
     sendOut.writeInt(4);           // size of packet
@@ -243,16 +353,10 @@ public class TestDataTransferProtocol extends TestCase {
     // test for writing a valid zero size block
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(++newBlockId);
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-    sendOut.writeInt(0);
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+        ++newBlockId, 0L, 0,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
     sendOut.writeInt(8);           // size of packet
@@ -262,6 +366,7 @@ public class TestDataTransferProtocol extends TestCase {
 
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
+    sendOut.flush();
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
     Text.writeString(recvOut, ""); // first bad node
@@ -353,5 +458,8 @@ public class TestDataTransferProtocol extends TestCase {
     Text.writeString(sendOut, "cl");
     AccessToken.DUMMY_TOKEN.write(sendOut);
     readFile(fileSys, file, fileLen);
+    } finally {
+      cluster.shutdown();
+    }
   }
 }