Browse Source

HDFS-524. Further DataTransferProtocol code refactoring.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@801057 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 15 years ago
parent
commit
f82cc23f5b

+ 10 - 8
CHANGES.txt

@@ -18,6 +18,9 @@ Trunk (unreleased changes)
     HDFS-381. Remove blocks from DataNode maps when corresponding file
     HDFS-381. Remove blocks from DataNode maps when corresponding file
     is deleted. (Suresh Srinivas via rangadi)
     is deleted. (Suresh Srinivas via rangadi)
 
 
+    HDFS-377. Separate codes which implement DataTransferProtocol.
+    (szetszwo)
+
     HDFS-396. NameNode image and edits directories are specified as URIs.
     HDFS-396. NameNode image and edits directories are specified as URIs.
     (Luca Telloli via rangadi)
     (Luca Telloli via rangadi)
 
 
@@ -47,9 +50,14 @@ Trunk (unreleased changes)
     only by the run-test-*-faul-inject targets.  (Konstantin Boudnik via
     only by the run-test-*-faul-inject targets.  (Konstantin Boudnik via
     szetszwo)
     szetszwo)
 
 
+    HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
+
     HADOOP-6160. Fix releaseaudit target to run on specific directories.
     HADOOP-6160. Fix releaseaudit target to run on specific directories.
     (gkesavan)
     (gkesavan)
 
 
+    HDFS-501. Use enum to define the constants in DataTransferProtocol.
+    (szetszwo)
+
     HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
     HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
 
 
     HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
     HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
@@ -68,11 +76,11 @@ Trunk (unreleased changes)
     HDFS-504. Update the modification time of a file when the file 
     HDFS-504. Update the modification time of a file when the file 
     is closed. (Chun Zhang via dhruba)
     is closed. (Chun Zhang via dhruba)
 
 
-    HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
-
     HDFS-498. Add development guide and documentation for the fault injection
     HDFS-498. Add development guide and documentation for the fault injection
     framework.  (Konstantin Boudnik via szetszwo)
     framework.  (Konstantin Boudnik via szetszwo)
 
 
+    HDFS-524. Further DataTransferProtocol code refactoring.  (szetszwo)
+
   BUG FIXES
   BUG FIXES
     HDFS-76. Better error message to users when commands fail because of 
     HDFS-76. Better error message to users when commands fail because of 
     lack of quota. Allow quota to be set even if the limit is lower than
     lack of quota. Allow quota to be set even if the limit is lower than
@@ -81,9 +89,6 @@ Trunk (unreleased changes)
     HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under 
     HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)
     Hadoop (Owen O'Malley)
 
 
-    HDFS-377. Separate codes which implement DataTransferProtocol.
-    (szetszwo)
-
     HADOOP-6096. Fix Eclipse project and classpath files following project
     HADOOP-6096. Fix Eclipse project and classpath files following project
     split. (tomwhite)
     split. (tomwhite)
 
 
@@ -120,9 +125,6 @@ Trunk (unreleased changes)
     HDFS-484. Fix bin-package and package target to package jar files.
     HDFS-484. Fix bin-package and package target to package jar files.
     (gkesavan)
     (gkesavan)
 
 
-    HDFS-501. Use enum to define the constants in DataTransferProtocol.
-    (szetszwo)
-
     HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
     HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
     (He Yongqiang via szetszwo)
     (He Yongqiang via szetszwo)
 
 

+ 56 - 17
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -249,8 +249,8 @@ public interface DataTransferProtocol {
 
 
   /** Receiver */
   /** Receiver */
   public static abstract class Receiver {
   public static abstract class Receiver {
-    /** Initialize a operation. */
-    public final Op op(DataInputStream in) throws IOException {
+    /** Read an Op.  It also checks protocol version. */
+    protected final Op readOp(DataInputStream in) throws IOException {
       final short version = in.readShort();
       final short version = in.readShort();
       if (version != DATA_TRANSFER_VERSION) {
       if (version != DATA_TRANSFER_VERSION) {
         throw new IOException( "Version Mismatch" );
         throw new IOException( "Version Mismatch" );
@@ -258,8 +258,32 @@ public interface DataTransferProtocol {
       return Op.read(in);
       return Op.read(in);
     }
     }
 
 
+    /** Process op by the corresponding method. */
+    protected final void processOp(Op op, DataInputStream in
+        ) throws IOException {
+      switch(op) {
+      case READ_BLOCK:
+        opReadBlock(in);
+        break;
+      case WRITE_BLOCK:
+        opWriteBlock(in);
+        break;
+      case REPLACE_BLOCK:
+        opReplaceBlock(in);
+        break;
+      case COPY_BLOCK:
+        opCopyBlock(in);
+        break;
+      case BLOCK_CHECKSUM:
+        opBlockChecksum(in);
+        break;
+      default:
+        throw new IOException("Unknown op " + op + " in data stream");
+      }
+    }
+
     /** Receive OP_READ_BLOCK */
     /** Receive OP_READ_BLOCK */
-    public final void opReadBlock(DataInputStream in) throws IOException {
+    private void opReadBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
       final long offset = in.readLong();
       final long offset = in.readLong();
@@ -270,13 +294,16 @@ public interface DataTransferProtocol {
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
     }
     }
 
 
-    /** Abstract OP_READ_BLOCK method. */
-    public abstract void opReadBlock(DataInputStream in,
+    /**
+     * Abstract OP_READ_BLOCK method.
+     * Read a block.
+     */
+    protected abstract void opReadBlock(DataInputStream in,
         long blockId, long blockGs, long offset, long length,
         long blockId, long blockGs, long offset, long length,
         String client, AccessToken accesstoken) throws IOException;
         String client, AccessToken accesstoken) throws IOException;
     
     
     /** Receive OP_WRITE_BLOCK */
     /** Receive OP_WRITE_BLOCK */
-    public final void opWriteBlock(DataInputStream in) throws IOException {
+    private void opWriteBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
@@ -298,14 +325,17 @@ public interface DataTransferProtocol {
           client, src, targets, accesstoken);
           client, src, targets, accesstoken);
     }
     }
 
 
-    /** Abstract OP_WRITE_BLOCK method. */
-    public abstract void opWriteBlock(DataInputStream in,
+    /**
+     * Abstract OP_WRITE_BLOCK method. 
+     * Write a block.
+     */
+    protected abstract void opWriteBlock(DataInputStream in,
         long blockId, long blockGs, int pipelineSize, boolean isRecovery,
         long blockId, long blockGs, int pipelineSize, boolean isRecovery,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException;
         AccessToken accesstoken) throws IOException;
 
 
     /** Receive OP_REPLACE_BLOCK */
     /** Receive OP_REPLACE_BLOCK */
-    public final void opReplaceBlock(DataInputStream in) throws IOException {
+    private void opReplaceBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
       final String sourceId = Text.readString(in); // read del hint
@@ -315,13 +345,16 @@ public interface DataTransferProtocol {
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
     }
     }
 
 
-    /** Abstract OP_REPLACE_BLOCK method. */
-    public abstract void opReplaceBlock(DataInputStream in,
+    /**
+     * Abstract OP_REPLACE_BLOCK method.
+     * It is used for balancing purpose; send to a destination
+     */
+    protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
         AccessToken accesstoken) throws IOException;
         AccessToken accesstoken) throws IOException;
 
 
     /** Receive OP_COPY_BLOCK */
     /** Receive OP_COPY_BLOCK */
-    public final void opCopyBlock(DataInputStream in) throws IOException {
+    private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
       final AccessToken accesstoken = readAccessToken(in);
       final AccessToken accesstoken = readAccessToken(in);
@@ -329,12 +362,15 @@ public interface DataTransferProtocol {
       opCopyBlock(in, blockId, blockGs, accesstoken);
       opCopyBlock(in, blockId, blockGs, accesstoken);
     }
     }
 
 
-    /** Abstract OP_COPY_BLOCK method. */
-    public abstract void opCopyBlock(DataInputStream in,
+    /**
+     * Abstract OP_COPY_BLOCK method.
+     * It is used for balancing purpose; send to a proxy source.
+     */
+    protected abstract void opCopyBlock(DataInputStream in,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
 
     /** Receive OP_BLOCK_CHECKSUM */
     /** Receive OP_BLOCK_CHECKSUM */
-    public final void opBlockChecksum(DataInputStream in) throws IOException {
+    private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
       final AccessToken accesstoken = readAccessToken(in);
       final AccessToken accesstoken = readAccessToken(in);
@@ -342,8 +378,11 @@ public interface DataTransferProtocol {
       opBlockChecksum(in, blockId, blockGs, accesstoken);
       opBlockChecksum(in, blockId, blockGs, accesstoken);
     }
     }
 
 
-    /** Abstract OP_BLOCK_CHECKSUM method. */
-    public abstract void opBlockChecksum(DataInputStream in,
+    /**
+     * Abstract OP_BLOCK_CHECKSUM method.
+     * Get the checksum of a block 
+     */
+    protected abstract void opBlockChecksum(DataInputStream in,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
 
     /** Read an AccessToken */
     /** Read an AccessToken */

+ 59 - 51
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputSt
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.security.AccessTokenHandler;
@@ -57,22 +59,29 @@ class DataXceiver extends DataTransferProtocol.Receiver
   public static final Log LOG = DataNode.LOG;
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   
-  Socket s;
-  final String remoteAddress; // address of remote side
-  final String localAddress;  // local address of this daemon
-  DataNode datanode;
-  DataXceiverServer dataXceiverServer;
+  private final Socket s;
+  private final boolean isLocal; //is a local connection?
+  private final String remoteAddress; // address of remote side
+  private final String localAddress;  // local address of this daemon
+  private final DataNode datanode;
+  private final DataXceiverServer dataXceiverServer;
+
+  private long opStartTime; //the start time of receiving an Op
   
   
   public DataXceiver(Socket s, DataNode datanode, 
   public DataXceiver(Socket s, DataNode datanode, 
       DataXceiverServer dataXceiverServer) {
       DataXceiverServer dataXceiverServer) {
-    
     this.s = s;
     this.s = s;
+    this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
     this.dataXceiverServer = dataXceiverServer;
     dataXceiverServer.childSockets.put(s, s);
     dataXceiverServer.childSockets.put(s, s);
     remoteAddress = s.getRemoteSocketAddress().toString();
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
-    LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of active connections is: "
+          + datanode.getXceiverCount());
+    }
   }
   }
 
 
   /**
   /**
@@ -84,8 +93,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
       in = new DataInputStream(
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
                                   SMALL_BUFFER_SIZE));
-      final DataTransferProtocol.Op op = op(in);
-      boolean local = s.getInetAddress().equals(s.getLocalAddress());
+      final DataTransferProtocol.Op op = readOp(in);
+
       // Make sure the xciver count is not exceeded
       // Make sure the xciver count is not exceeded
       int curXceiverCount = datanode.getXceiverCount();
       int curXceiverCount = datanode.getXceiverCount();
       if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
       if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
@@ -93,45 +102,16 @@ class DataXceiver extends DataTransferProtocol.Receiver
                               + " exceeds the limit of concurrent xcievers "
                               + " exceeds the limit of concurrent xcievers "
                               + dataXceiverServer.maxXceiverCount);
                               + dataXceiverServer.maxXceiverCount);
       }
       }
-      long startTime = DataNode.now();
-      switch ( op ) {
-      case READ_BLOCK:
-        opReadBlock(in);
-        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
-        if (local)
-          datanode.myMetrics.readsFromLocalClient.inc();
-        else
-          datanode.myMetrics.readsFromRemoteClient.inc();
-        break;
-      case WRITE_BLOCK:
-        opWriteBlock(in);
-        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
-        if (local)
-          datanode.myMetrics.writesFromLocalClient.inc();
-        else
-          datanode.myMetrics.writesFromRemoteClient.inc();
-        break;
-      case REPLACE_BLOCK: // for balancing purpose; send to a destination
-        opReplaceBlock(in);
-        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
-        break;
-      case COPY_BLOCK:
-            // for balancing purpose; send to a proxy source
-        opCopyBlock(in);
-        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
-        break;
-      case BLOCK_CHECKSUM: //get the checksum of a block
-        opBlockChecksum(in);
-        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
-        break;
-      default:
-        throw new IOException("Unknown opcode " + op + " in data stream");
-      }
+
+      opStartTime = DataNode.now();
+      processOp(op, in);
     } catch (Throwable t) {
     } catch (Throwable t) {
       LOG.error(datanode.dnRegistration + ":DataXceiver",t);
       LOG.error(datanode.dnRegistration + ":DataXceiver",t);
     } finally {
     } finally {
-      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
-                               + datanode.getXceiverCount());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+            + datanode.getXceiverCount());
+      }
       IOUtils.closeStream(in);
       IOUtils.closeStream(in);
       IOUtils.closeSocket(s);
       IOUtils.closeSocket(s);
       dataXceiverServer.childSockets.remove(s);
       dataXceiverServer.childSockets.remove(s);
@@ -142,7 +122,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Read a block from the disk.
    * Read a block from the disk.
    */
    */
   @Override
   @Override
-  public void opReadBlock(DataInputStream in,
+  protected void opReadBlock(DataInputStream in,
       long blockId, long blockGs, long startOffset, long length,
       long blockId, long blockGs, long startOffset, long length,
       String clientName, AccessToken accessToken) throws IOException {
       String clientName, AccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     final Block block = new Block(blockId, 0 , blockGs);
@@ -213,13 +193,18 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(out);
       IOUtils.closeStream(out);
       IOUtils.closeStream(blockSender);
       IOUtils.closeStream(blockSender);
     }
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.readBlockOp);
+    updateCounter(datanode.myMetrics.readsFromLocalClient,
+                  datanode.myMetrics.readsFromRemoteClient);
   }
   }
 
 
   /**
   /**
    * Write a block to disk.
    * Write a block to disk.
    */
    */
   @Override
   @Override
-  public void opWriteBlock(DataInputStream in, long blockId, long blockGs,
+  protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
       int pipelineSize, boolean isRecovery,
       int pipelineSize, boolean isRecovery,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       AccessToken accessToken) throws IOException {
       AccessToken accessToken) throws IOException {
@@ -377,13 +362,18 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(blockReceiver);
     }
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.writeBlockOp);
+    updateCounter(datanode.myMetrics.writesFromLocalClient,
+                  datanode.myMetrics.writesFromRemoteClient);
   }
   }
 
 
   /**
   /**
    * Get block checksum (MD5 of CRC32).
    * Get block checksum (MD5 of CRC32).
    */
    */
   @Override
   @Override
-  public void opBlockChecksum(DataInputStream in,
+  protected void opBlockChecksum(DataInputStream in,
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
@@ -433,13 +423,16 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(checksumIn);
       IOUtils.closeStream(checksumIn);
       IOUtils.closeStream(metadataIn);
       IOUtils.closeStream(metadataIn);
     }
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.blockChecksumOp);
   }
   }
 
 
   /**
   /**
    * Read a block from the disk and then sends it to a destination.
    * Read a block from the disk and then sends it to a destination.
    */
    */
   @Override
   @Override
-  public void opCopyBlock(DataInputStream in,
+  protected void opCopyBlock(DataInputStream in,
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
     // Read in the header
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
     Block block = new Block(blockId, 0, blockGs);
@@ -499,6 +492,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(reply);
       IOUtils.closeStream(reply);
       IOUtils.closeStream(blockSender);
       IOUtils.closeStream(blockSender);
     }
     }
+
+    //update metrics    
+    updateDuration(datanode.myMetrics.copyBlockOp);
   }
   }
 
 
   /**
   /**
@@ -506,7 +502,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * remove the copy from the source.
    * remove the copy from the source.
    */
    */
   @Override
   @Override
-  public void opReplaceBlock(DataInputStream in,
+  protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
       AccessToken accessToken) throws IOException {
       AccessToken accessToken) throws IOException {
     /* read header */
     /* read header */
@@ -606,8 +602,20 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
       IOUtils.closeStream(proxyReply);
     }
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.replaceBlockOp);
   }
   }
-  
+
+  private void updateDuration(MetricsTimeVaryingRate mtvr) {
+    mtvr.inc(DataNode.now() - opStartTime);
+  }
+
+  private void updateCounter(MetricsTimeVaryingInt localCounter,
+      MetricsTimeVaryingInt remoteCounter) {
+    (isLocal? localCounter: remoteCounter).inc();
+  }
+
   /**
   /**
    * Utility function for sending a response.
    * Utility function for sending a response.
    * @param s socket to write to
    * @param s socket to write to