Bläddra i källkod

HDFS-377. Separate codes which implement DataTransferProtocol.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@787537 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 16 år sedan
förälder
incheckning
27797be59d

+ 3 - 0
CHANGES.txt

@@ -14,3 +14,6 @@ Trunk (unreleased changes)
 
     HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)
+
+    HDFS-377. Separate codes which implement DataTransferProtocol.
+    (szetszwo)

+ 10 - 33
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -616,13 +616,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                 + DataTransferProtocol.OP_BLOCK_CHECKSUM +
                 ", block=" + block);
           }
-          out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-          out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
-          out.writeLong(block.getBlockId());
-          out.writeLong(block.getGenerationStamp());
-          lb.getAccessToken().write(out);
-          out.flush();
-         
+          DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
+              block.getGenerationStamp(), lb.getAccessToken());
+
           final short reply = in.readShort();
           if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
             if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
@@ -1307,19 +1303,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                                        String clientName)
                                        throws IOException {
       // in and out will be closed when sock is closed (by the caller)
-      DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
-
-      //write the header.
-      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-      out.write( DataTransferProtocol.OP_READ_BLOCK );
-      out.writeLong( blockId );
-      out.writeLong( genStamp );
-      out.writeLong( startOffset );
-      out.writeLong( len );
-      Text.writeString(out, clientName);
-      accessToken.write(out);
-      out.flush();
+      DataTransferProtocol.Sender.opReadBlock(
+          new DataOutputStream(new BufferedOutputStream(
+              NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
+          blockId, genStamp, startOffset, len, clientName, accessToken);
       
       //
       // Get bytes in block, set streams
@@ -2731,19 +2718,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
               DataNode.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
-          out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-          out.write(DataTransferProtocol.OP_WRITE_BLOCK);
-          out.writeLong(block.getBlockId());
-          out.writeLong(block.getGenerationStamp());
-          out.writeInt(nodes.length);
-          out.writeBoolean(recoveryFlag); // recovery flag
-          Text.writeString(out, client);
-          out.writeBoolean(false); // Not sending src node information
-          out.writeInt(nodes.length - 1);
-          for (int i = 1; i < nodes.length; i++) {
-            nodes[i].write(out);
-          }
-          accessToken.write(out);
+          DataTransferProtocol.Sender.opWriteBlock(out,
+              block.getBlockId(), block.getGenerationStamp(), nodes.length,
+              recoveryFlag, client, null, nodes, accessToken);
           checksum.writeHeader(out);
           out.flush();
 

+ 198 - 3
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessToken;
 
 /**
- * 
- * The Client transfers data to/from datanode using a streaming protocol.
- *
+ * Transfer data to/from datanode using a streaming protocol.
  */
 public interface DataTransferProtocol {
   
@@ -57,5 +61,196 @@ public interface DataTransferProtocol {
   public static final int OP_STATUS_CHECKSUM_OK = 6;
 
 
+  /** Sender */
+  public static class Sender {
+    /** Initialize a operation. */
+    public static void op(DataOutputStream out, int op) throws IOException {
+      out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+      out.write(op);
+    }
+
+    /** Send OP_READ_BLOCK */
+    public static void opReadBlock(DataOutputStream out,
+        long blockId, long blockGs, long blockOffset, long blockLen,
+        String clientName, AccessToken accessToken) throws IOException {
+      op(out, OP_READ_BLOCK);
+
+      out.writeLong(blockId);
+      out.writeLong(blockGs);
+      out.writeLong(blockOffset);
+      out.writeLong(blockLen);
+      Text.writeString(out, clientName);
+      accessToken.write(out);
+      out.flush();
+    }
+    
+    /** Send OP_WRITE_BLOCK */
+    public static void opWriteBlock(DataOutputStream out,
+        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+        String client, DatanodeInfo src, DatanodeInfo[] targets,
+        AccessToken accesstoken) throws IOException {
+      op(out, OP_WRITE_BLOCK);
+
+      out.writeLong(blockId);
+      out.writeLong(blockGs);
+      out.writeInt(pipelineSize);
+      out.writeBoolean(isRecovery);
+      Text.writeString(out, client);
+
+      out.writeBoolean(src != null);
+      if (src != null) {
+        src.write(out);
+      }
+      out.writeInt(targets.length - 1);
+      for (int i = 1; i < targets.length; i++) {
+        targets[i].write(out);
+      }
+
+      accesstoken.write(out);
+    }
+    
+    /** Send OP_REPLACE_BLOCK */
+    public static void opReplaceBlock(DataOutputStream out,
+        long blockId, long blockGs, String storageId, DatanodeInfo src,
+        AccessToken accesstoken) throws IOException {
+      op(out, OP_REPLACE_BLOCK);
+
+      out.writeLong(blockId);
+      out.writeLong(blockGs);
+      Text.writeString(out, storageId);
+      src.write(out);
+      accesstoken.write(out);
+      out.flush();
+    }
+
+    /** Send OP_COPY_BLOCK */
+    public static void opCopyBlock(DataOutputStream out,
+        long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+      op(out, OP_COPY_BLOCK);
+
+      out.writeLong(blockId);
+      out.writeLong(blockGs);
+      accesstoken.write(out);
+      out.flush();
+    }
+
+    /** Send OP_BLOCK_CHECKSUM */
+    public static void opBlockChecksum(DataOutputStream out,
+        long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+      op(out, OP_BLOCK_CHECKSUM);
+
+      out.writeLong(blockId);
+      out.writeLong(blockGs);
+      accesstoken.write(out);
+      out.flush();
+    }
+  }
+
+  /** Receiver */
+  public static abstract class Receiver {
+    /** Initialize a operation. */
+    public final byte op(DataInputStream in) throws IOException {
+      final short version = in.readShort();
+      if (version != DATA_TRANSFER_VERSION) {
+        throw new IOException( "Version Mismatch" );
+      }
+      return in.readByte();
+    }
+
+    /** Receive OP_READ_BLOCK */
+    public final void opReadBlock(DataInputStream in) throws IOException {
+      final long blockId = in.readLong();          
+      final long blockGs = in.readLong();
+      final long offset = in.readLong();
+      final long length = in.readLong();
+      final String client = Text.readString(in);
+      final AccessToken accesstoken = readAccessToken(in);
+
+      opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
+    }
+
+    /** Abstract OP_READ_BLOCK method. */
+    public abstract void opReadBlock(DataInputStream in,
+        long blockId, long blockGs, long offset, long length,
+        String client, AccessToken accesstoken) throws IOException;
+    
+    /** Receive OP_WRITE_BLOCK */
+    public final void opWriteBlock(DataInputStream in) throws IOException {
+      final long blockId = in.readLong();          
+      final long blockGs = in.readLong();
+      final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+      final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+      final String client = Text.readString(in); // working on behalf of this client
+      final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
+
+      final int nTargets = in.readInt();
+      if (nTargets < 0) {
+        throw new IOException("Mislabelled incoming datastream.");
+      }
+      final DatanodeInfo targets[] = new DatanodeInfo[nTargets];
+      for (int i = 0; i < targets.length; i++) {
+        targets[i] = DatanodeInfo.read(in);
+      }
+      final AccessToken accesstoken = readAccessToken(in);
+
+      opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
+          client, src, targets, accesstoken);
+    }
+
+    /** Abstract OP_WRITE_BLOCK method. */
+    public abstract void opWriteBlock(DataInputStream in,
+        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+        String client, DatanodeInfo src, DatanodeInfo[] targets,
+        AccessToken accesstoken) throws IOException;
+
+    /** Receive OP_REPLACE_BLOCK */
+    public final void opReplaceBlock(DataInputStream in) throws IOException {
+      final long blockId = in.readLong();          
+      final long blockGs = in.readLong();
+      final String sourceId = Text.readString(in); // read del hint
+      final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
+      final AccessToken accesstoken = readAccessToken(in);
+
+      opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
+    }
+
+    /** Abstract OP_REPLACE_BLOCK method. */
+    public abstract void opReplaceBlock(DataInputStream in,
+        long blockId, long blockGs, String sourceId, DatanodeInfo src,
+        AccessToken accesstoken) throws IOException;
+
+    /** Receive OP_COPY_BLOCK */
+    public final void opCopyBlock(DataInputStream in) throws IOException {
+      final long blockId = in.readLong();          
+      final long blockGs = in.readLong();
+      final AccessToken accesstoken = readAccessToken(in);
+
+      opCopyBlock(in, blockId, blockGs, accesstoken);
+    }
+
+    /** Abstract OP_COPY_BLOCK method. */
+    public abstract void opCopyBlock(DataInputStream in,
+        long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+
+    /** Receive OP_BLOCK_CHECKSUM */
+    public final void opBlockChecksum(DataInputStream in) throws IOException {
+      final long blockId = in.readLong();          
+      final long blockGs = in.readLong();
+      final AccessToken accesstoken = readAccessToken(in);
+
+      opBlockChecksum(in, blockId, blockGs, accesstoken);
+    }
+
+    /** Abstract OP_BLOCK_CHECKSUM method. */
+    public abstract void opBlockChecksum(DataInputStream in,
+        long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
+    /** Read an AccessToken */
+    static private AccessToken readAccessToken(DataInputStream in
+        ) throws IOException {
+      final AccessToken t = new AccessToken();
+      t.readFields(in);
+      return t; 
+    }
+  }
 }

+ 7 - 0
src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java

@@ -343,6 +343,13 @@ public class DatanodeInfo extends DatanodeID implements Node {
     setAdminState(WritableUtils.readEnum(in, AdminStates.class));
   }
 
+  /** Read a DatanodeInfo */
+  public static DatanodeInfo read(DataInput in) throws IOException {
+    final DatanodeInfo d = new DatanodeInfo();
+    d.readFields(in);
+    return d;
+  }
+
   @Override
   public int hashCode() {
     // Super implementation is sufficient

+ 3 - 12
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -19,9 +19,7 @@ package org.apache.hadoop.hdfs.server.balancer;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -67,8 +65,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -367,20 +363,15 @@ public class Balancer implements Tool {
     
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-      out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
-      out.writeLong(block.getBlock().getBlockId());
-      out.writeLong(block.getBlock().getGenerationStamp());
-      Text.writeString(out, source.getStorageID());
-      proxySource.getDatanode().write(out);
       AccessToken accessToken = AccessToken.DUMMY_TOKEN;
       if (isAccessTokenEnabled) {
         accessToken = accessTokenHandler.generateToken(null, block.getBlock()
             .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
             AccessTokenHandler.AccessMode.COPY));
       }
-      accessToken.write(out);
-      out.flush();
+      DataTransferProtocol.Sender.opReplaceBlock(out,
+          block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(),
+          source.getStorageID(), proxySource.getDatanode(), accessToken);
     }
     
     /* Receive a block copy response from the input stream */ 

+ 8 - 20
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -56,11 +56,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
@@ -72,13 +72,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -94,8 +93,8 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -1216,26 +1215,15 @@ public class DataNode extends Configured
         //
         // Header info
         //
-        out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-        out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
-        out.writeLong(b.getBlockId());
-        out.writeLong(b.getGenerationStamp());
-        out.writeInt(0);           // no pipelining
-        out.writeBoolean(false);   // not part of recovery
-        Text.writeString(out, ""); // client
-        out.writeBoolean(true); // sending src node information
-        srcNode.write(out); // Write src node DatanodeInfo
-        // write targets
-        out.writeInt(targets.length - 1);
-        for (int i = 1; i < targets.length; i++) {
-          targets[i].write(out);
-        }
         AccessToken accessToken = AccessToken.DUMMY_TOKEN;
         if (isAccessTokenEnabled) {
           accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
               EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
         }
-        accessToken.write(out);
+        DataTransferProtocol.Sender.opWriteBlock(out,
+            b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+            srcNode, targets, accessToken);
+
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
 

+ 48 - 106
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -47,7 +47,8 @@ import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FOR
 /**
  * Thread for processing incoming/outgoing data stream.
  */
-class DataXceiver implements Runnable, FSConstants {
+class DataXceiver extends DataTransferProtocol.Receiver
+    implements Runnable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -78,12 +79,8 @@ class DataXceiver implements Runnable, FSConstants {
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      short version = in.readShort();
-      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
-        throw new IOException( "Version Mismatch" );
-      }
+      final byte op = op(in);
       boolean local = s.getInetAddress().equals(s.getLocalAddress());
-      byte op = in.readByte();
       // Make sure the xciver count is not exceeded
       int curXceiverCount = datanode.getXceiverCount();
       if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
@@ -94,7 +91,7 @@ class DataXceiver implements Runnable, FSConstants {
       long startTime = DataNode.now();
       switch ( op ) {
       case DataTransferProtocol.OP_READ_BLOCK:
-        readBlock( in );
+        opReadBlock(in);
         datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
         if (local)
           datanode.myMetrics.readsFromLocalClient.inc();
@@ -102,7 +99,7 @@ class DataXceiver implements Runnable, FSConstants {
           datanode.myMetrics.readsFromRemoteClient.inc();
         break;
       case DataTransferProtocol.OP_WRITE_BLOCK:
-        writeBlock( in );
+        opWriteBlock(in);
         datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
         if (local)
           datanode.myMetrics.writesFromLocalClient.inc();
@@ -110,16 +107,16 @@ class DataXceiver implements Runnable, FSConstants {
           datanode.myMetrics.writesFromRemoteClient.inc();
         break;
       case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
-        replaceBlock(in);
+        opReplaceBlock(in);
         datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
         break;
       case DataTransferProtocol.OP_COPY_BLOCK:
             // for balancing purpose; send to a proxy source
-        copyBlock(in);
+        opCopyBlock(in);
         datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
         break;
       case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
-        getBlockChecksum(in);
+        opBlockChecksum(in);
         datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
         break;
       default:
@@ -138,21 +135,12 @@ class DataXceiver implements Runnable, FSConstants {
 
   /**
    * Read a block from the disk.
-   * @param in The stream to read from
-   * @throws IOException
    */
-  private void readBlock(DataInputStream in) throws IOException {
-    //
-    // Read in the header
-    //
-    long blockId = in.readLong();          
-    Block block = new Block( blockId, 0 , in.readLong());
-
-    long startOffset = in.readLong();
-    long length = in.readLong();
-    String clientName = Text.readString(in);
-    AccessToken accessToken = new AccessToken();
-    accessToken.readFields(in);
+  @Override
+  public void opReadBlock(DataInputStream in,
+      long blockId, long blockGs, long startOffset, long length,
+      String clientName, AccessToken accessToken) throws IOException {
+    final Block block = new Block(blockId, 0 , blockGs);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
@@ -224,42 +212,24 @@ class DataXceiver implements Runnable, FSConstants {
 
   /**
    * Write a block to disk.
-   * 
-   * @param in The stream to read from
-   * @throws IOException
    */
-  private void writeBlock(DataInputStream in) throws IOException {
-    DatanodeInfo srcDataNode = null;
-    LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
-              " tcp no delay " + s.getTcpNoDelay());
-    //
-    // Read in the header
-    //
-    Block block = new Block(in.readLong(), 
-        dataXceiverServer.estimateBlockSize, in.readLong());
+  @Override
+  public void opWriteBlock(DataInputStream in, long blockId, long blockGs,
+      int pipelineSize, boolean isRecovery,
+      String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
+      AccessToken accessToken) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+                " tcp no delay " + s.getTcpNoDelay());
+    }
+
+    final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
+        blockGs);
     LOG.info("Receiving block " + block + 
              " src: " + remoteAddress +
              " dest: " + localAddress);
-    int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
-    boolean isRecovery = in.readBoolean(); // is this part of recovery?
-    String client = Text.readString(in); // working on behalf of this client
-    boolean hasSrcDataNode = in.readBoolean(); // is src node info present
-    if (hasSrcDataNode) {
-      srcDataNode = new DatanodeInfo();
-      srcDataNode.readFields(in);
-    }
-    int numTargets = in.readInt();
-    if (numTargets < 0) {
-      throw new IOException("Mislabelled incoming datastream.");
-    }
-    DatanodeInfo targets[] = new DatanodeInfo[numTargets];
-    for (int i = 0; i < targets.length; i++) {
-      DatanodeInfo tmp = new DatanodeInfo();
-      tmp.readFields(in);
-      targets[i] = tmp;
-    }
-    AccessToken accessToken = new AccessToken();
-    accessToken.readFields(in);
+
     DataOutputStream replyOut = null;   // stream to prev target
     replyOut = new DataOutputStream(
                    NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
@@ -302,9 +272,9 @@ class DataXceiver implements Runnable, FSConstants {
         mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
         mirrorSock = datanode.newSocket();
         try {
-          int timeoutValue = numTargets * datanode.socketTimeout;
+          int timeoutValue = targets.length * datanode.socketTimeout;
           int writeTimeout = datanode.socketWriteTimeout + 
-                             (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
+                      (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -315,22 +285,9 @@ class DataXceiver implements Runnable, FSConstants {
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
           // Write header: Copied from DFSClient.java!
-          mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-          mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
-          mirrorOut.writeLong( block.getBlockId() );
-          mirrorOut.writeLong( block.getGenerationStamp() );
-          mirrorOut.writeInt( pipelineSize );
-          mirrorOut.writeBoolean( isRecovery );
-          Text.writeString( mirrorOut, client );
-          mirrorOut.writeBoolean(hasSrcDataNode);
-          if (hasSrcDataNode) { // pass src node information
-            srcDataNode.write(mirrorOut);
-          }
-          mirrorOut.writeInt( targets.length - 1 );
-          for ( int i = 1; i < targets.length; i++ ) {
-            targets[i].write( mirrorOut );
-          }
-          accessToken.write(mirrorOut);
+          DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
+              block.getBlockId(), block.getGenerationStamp(), pipelineSize,
+              isRecovery, client, srcDataNode, targets, accessToken);
 
           blockReceiver.writeChecksumHeader(mirrorOut);
           mirrorOut.flush();
@@ -414,12 +371,11 @@ class DataXceiver implements Runnable, FSConstants {
 
   /**
    * Get block checksum (MD5 of CRC32).
-   * @param in
    */
-  void getBlockChecksum(DataInputStream in) throws IOException {
-    final Block block = new Block(in.readLong(), 0 , in.readLong());
-    AccessToken accessToken = new AccessToken();
-    accessToken.readFields(in);
+  @Override
+  public void opBlockChecksum(DataInputStream in,
+      long blockId, long blockGs, AccessToken accessToken) throws IOException {
+    final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
     if (datanode.isAccessTokenEnabled
@@ -471,16 +427,12 @@ class DataXceiver implements Runnable, FSConstants {
 
   /**
    * Read a block from the disk and then sends it to a destination.
-   * 
-   * @param in The stream to read from
-   * @throws IOException
    */
-  private void copyBlock(DataInputStream in) throws IOException {
+  @Override
+  public void opCopyBlock(DataInputStream in,
+      long blockId, long blockGs, AccessToken accessToken) throws IOException {
     // Read in the header
-    long blockId = in.readLong(); // read block id
-    Block block = new Block(blockId, 0, in.readLong());
-    AccessToken accessToken = new AccessToken();
-    accessToken.readFields(in);
+    Block block = new Block(blockId, 0, blockGs);
     if (datanode.isAccessTokenEnabled
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.COPY)) {
@@ -545,20 +497,14 @@ class DataXceiver implements Runnable, FSConstants {
   /**
    * Receive a block and write it to disk, it then notifies the namenode to
    * remove the copy from the source.
-   * 
-   * @param in The stream to read from
-   * @throws IOException
    */
-  private void replaceBlock(DataInputStream in) throws IOException {
+  @Override
+  public void opReplaceBlock(DataInputStream in,
+      long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
+      AccessToken accessToken) throws IOException {
     /* read header */
-    long blockId = in.readLong();
-    Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
-        in.readLong()); // block id & generation stamp
-    String sourceID = Text.readString(in); // read del hint
-    DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
-    proxySource.readFields(in);
-    AccessToken accessToken = new AccessToken();
-    accessToken.readFields(in);
+    final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
+        blockGs);
     if (datanode.isAccessTokenEnabled
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.REPLACE)) {
@@ -597,12 +543,8 @@ class DataXceiver implements Runnable, FSConstants {
                      new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
       /* send request to the proxy */
-      proxyOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version
-      proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
-      proxyOut.writeLong(block.getBlockId()); // block id
-      proxyOut.writeLong(block.getGenerationStamp()); // block id
-      accessToken.write(proxyOut);
-      proxyOut.flush();
+      DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(),
+          block.getGenerationStamp(), accessToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(