浏览代码

HADOOP-2758. Reduce buffer copies in DataNode when data is read from
HDFS, without negatively affecting read throughput. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@633285 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 年之前
父节点
当前提交
aa576da58b

+ 3 - 0
CHANGES.txt

@@ -65,6 +65,9 @@ Trunk (unreleased changes)
     repetitive calls to get the current time and late checking to see if
     repetitive calls to get the current time and late checking to see if
     we want speculation on at all. (omalley)
     we want speculation on at all. (omalley)
 
 
+    HADOOP-2758. Reduce buffer copies in DataNode when data is read from
+    HDFS, without negatively affecting read throughput. (rangadi)
+
   BUG FIXES
   BUG FIXES
 
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

+ 1 - 1
src/java/org/apache/hadoop/dfs/Balancer.java

@@ -341,7 +341,7 @@ public class Balancer implements Tool {
     
     
     /* Send a block copy request to the outputstream*/
     /* Send a block copy request to the outputstream*/
     private void sendRequest(DataOutputStream out) throws IOException {
     private void sendRequest(DataOutputStream out) throws IOException {
-      out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+      out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
       out.writeByte(FSConstants.OP_COPY_BLOCK);
       out.writeByte(FSConstants.OP_COPY_BLOCK);
       out.writeLong(block.getBlock().getBlockId());
       out.writeLong(block.getBlock().getBlockId());
       Text.writeString(out, source.getStorageID());
       Text.writeString(out, source.getStorageID());

+ 1 - 1
src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java

@@ -853,7 +853,7 @@ class BlockCrcUpgradeUtils {
         DataInputStream in = new DataInputStream(dnSock.getInputStream());
         DataInputStream in = new DataInputStream(dnSock.getInputStream());
 
 
         // Write the header:
         // Write the header:
-        out.writeShort( DataNode.DATA_TRANFER_VERSION );
+        out.writeShort( DataNode.DATA_TRANSFER_VERSION );
         out.writeByte( DataNode.OP_READ_METADATA );
         out.writeByte( DataNode.OP_READ_METADATA );
         out.writeLong( blockInfo.block.getBlockId() );
         out.writeLong( blockInfo.block.getBlockId() );
 
 

+ 65 - 27
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -638,6 +638,7 @@ class DFSClient implements FSConstants {
     private DataChecksum checksum;
     private DataChecksum checksum;
     private long lastChunkOffset = -1;
     private long lastChunkOffset = -1;
     private long lastChunkLen = -1;
     private long lastChunkLen = -1;
+    private long lastSeqNo = -1;
 
 
     private long startOffset;
     private long startOffset;
     private long firstChunkOffset;
     private long firstChunkOffset;
@@ -646,6 +647,9 @@ class DFSClient implements FSConstants {
     private boolean gotEOS = false;
     private boolean gotEOS = false;
     
     
     byte[] skipBuf = null;
     byte[] skipBuf = null;
+    ByteBuffer checksumBytes = null;
+    int dataLeft = 0;
+    boolean isLastPacket = false;
     
     
     /* FSInputChecker interface */
     /* FSInputChecker interface */
     
     
@@ -722,6 +726,22 @@ class DFSClient implements FSConstants {
                                  "since seek is not required");
                                  "since seek is not required");
     }
     }
     
     
+    /**
+     * Makes sure that checksumBytes has enough capacity 
+     * and limit is set to the number of checksum bytes needed 
+     * to be read.
+     */
+    private void adjustChecksumBytes(int dataLen) {
+      int requiredSize = 
+        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+      if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+        checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+      } else {
+        checksumBytes.clear();
+      }
+      checksumBytes.limit(requiredSize);
+    }
+    
     @Override
     @Override
     protected synchronized int readChunk(long pos, byte[] buf, int offset, 
     protected synchronized int readChunk(long pos, byte[] buf, int offset, 
                                          int len, byte[] checksumBuf) 
                                          int len, byte[] checksumBuf) 
@@ -748,42 +768,60 @@ class DFSClient implements FSConstants {
                               firstChunkOffset + " != " + chunkOffset);
                               firstChunkOffset + " != " + chunkOffset);
       }
       }
 
 
-      // The chunk is transmitted as one packet. Read packet headers.
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-      LOG.debug("DFSClient readChunk got seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " lastPacketInBlock " + lastPacketInBlock +
-                " packetLen " + packetLen);
-
-      int chunkLen = in.readInt();
+      // Read next packet if the previous packet has been read completely.
+      if (dataLeft <= 0) {
+        //Read packet headers.
+        int packetLen = in.readInt();
+        long offsetInBlock = in.readLong();
+        long seqno = in.readLong();
+        boolean lastPacketInBlock = in.readBoolean();
       
       
-      // Sanity check the lengths
-      if ( chunkLen < 0 || chunkLen > bytesPerChecksum ||
-          ( lastChunkLen >= 0 && // prev packet exists
-              ( (chunkLen > 0 && lastChunkLen != bytesPerChecksum) ||
-                  chunkOffset != (lastChunkOffset + lastChunkLen) ) ) ) {
-        throw new IOException("BlockReader: error in chunk's offset " +
-                              "or length (" + chunkOffset + ":" +
-                              chunkLen + ")");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DFSClient readChunk got seqno " + seqno +
+                    " offsetInBlock " + offsetInBlock +
+                    " lastPacketInBlock " + lastPacketInBlock +
+                    " packetLen " + packetLen);
+        }
+        
+        int dataLen = in.readInt();
+      
+        // Sanity check the lengths
+        if ( dataLen < 0 || 
+             ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+             (seqno != (lastSeqNo + 1)) ) {
+             throw new IOException("BlockReader: error in packet header" +
+                                   "(chunkOffset : " + chunkOffset + 
+                                   ", dataLen : " + dataLen +
+                                   ", seqno : " + seqno + 
+                                   " (last: " + lastSeqNo + "))");
+        }
+        
+        lastSeqNo = seqno;
+        isLastPacket = lastPacketInBlock;
+        dataLeft = dataLen;
+        adjustChecksumBytes(dataLen);
+        if (dataLen > 0) {
+          IOUtils.readFully(in, checksumBytes.array(), 0,
+                            checksumBytes.limit());
+        }
       }
       }
 
 
+      int chunkLen = Math.min(dataLeft, bytesPerChecksum);
+      
       if ( chunkLen > 0 ) {
       if ( chunkLen > 0 ) {
         // len should be >= chunkLen
         // len should be >= chunkLen
         IOUtils.readFully(in, buf, offset, chunkLen);
         IOUtils.readFully(in, buf, offset, chunkLen);
+        checksumBytes.get(checksumBuf, 0, checksumSize);
       }
       }
       
       
-      if ( checksumSize > 0 ) {
-        IOUtils.readFully(in, checksumBuf, 0, checksumSize);
-      }
-
+      dataLeft -= chunkLen;
       lastChunkOffset = chunkOffset;
       lastChunkOffset = chunkOffset;
       lastChunkLen = chunkLen;
       lastChunkLen = chunkLen;
       
       
-      if ( chunkLen == 0 ) {
+      if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
         gotEOS = true;
         gotEOS = true;
+      }
+      if ( chunkLen == 0 ) {
         return -1;
         return -1;
       }
       }
       
       
@@ -827,7 +865,7 @@ class DFSClient implements FSConstants {
                        new BufferedOutputStream(sock.getOutputStream()));
                        new BufferedOutputStream(sock.getOutputStream()));
 
 
       //write the header.
       //write the header.
-      out.writeShort( DATA_TRANFER_VERSION );
+      out.writeShort( DATA_TRANSFER_VERSION );
       out.write( OP_READ_BLOCK );
       out.write( OP_READ_BLOCK );
       out.writeLong( blockId );
       out.writeLong( blockId );
       out.writeLong( startOffset );
       out.writeLong( startOffset );
@@ -2030,7 +2068,7 @@ class DFSClient implements FSConstants {
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
         blockReplyStream = new DataInputStream(s.getInputStream());
         blockReplyStream = new DataInputStream(s.getInputStream());
 
 
-        out.writeShort( DATA_TRANFER_VERSION );
+        out.writeShort( DATA_TRANSFER_VERSION );
         out.write( OP_WRITE_BLOCK );
         out.write( OP_WRITE_BLOCK );
         out.writeLong( block.getBlockId() );
         out.writeLong( block.getBlockId() );
         out.writeInt( nodes.length );
         out.writeInt( nodes.length );
@@ -2147,8 +2185,8 @@ class DFSClient implements FSConstants {
         }
         }
 
 
         currentPacket.writeInt(len);
         currentPacket.writeInt(len);
-        currentPacket.write(b, offset, len);
         currentPacket.write(checksum, 0, cklen);
         currentPacket.write(checksum, 0, cklen);
+        currentPacket.write(b, offset, len);
         currentPacket.numChunks++;
         currentPacket.numChunks++;
         bytesCurBlock += len;
         bytesCurBlock += len;
 
 

+ 211 - 73
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.dfs.datanode.metrics.DataNodeMetrics;
 
 
 import java.io.*;
 import java.io.*;
 import java.net.*;
 import java.net.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.Semaphore;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchAlgorithmException;
@@ -450,16 +451,6 @@ public class DataNode implements FSConstants, Runnable {
     }
     }
   }
   }
 
 
-  private void enumerateThreadGroup(ThreadGroup tg) {
-    int count = tg.activeCount();
-    Thread[] info = new Thread[count];
-    int num = tg.enumerate(info);
-    for (int i = 0; i < num; i++) {
-      System.out.print(info[i].getName() + " ");
-    }
-    System.out.println("");
-  }
-
   /**
   /**
    * Shut down this instance of the datanode.
    * Shut down this instance of the datanode.
    * Returns only after shutdown is complete.
    * Returns only after shutdown is complete.
@@ -937,7 +928,7 @@ public class DataNode implements FSConstants, Runnable {
         in = new DataInputStream(
         in = new DataInputStream(
             new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
             new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
         short version = in.readShort();
         short version = in.readShort();
-        if ( version != DATA_TRANFER_VERSION ) {
+        if ( version != DATA_TRANSFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
           throw new IOException( "Version Mismatch" );
         }
         }
         boolean local = s.getInetAddress().equals(s.getLocalAddress());
         boolean local = s.getInetAddress().equals(s.getLocalAddress());
@@ -1003,7 +994,7 @@ public class DataNode implements FSConstants, Runnable {
 
 
       // send the block
       // send the block
       DataOutputStream out = new DataOutputStream(
       DataOutputStream out = new DataOutputStream(
-          new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE));
+            new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE));
       BlockSender blockSender = null;
       BlockSender blockSender = null;
       try {
       try {
         try {
         try {
@@ -1116,7 +1107,7 @@ public class DataNode implements FSConstants, Runnable {
             mirrorIn = new DataInputStream(mirrorSock.getInputStream());
             mirrorIn = new DataInputStream(mirrorSock.getInputStream());
 
 
             // Write header: Copied from DFSClient.java!
             // Write header: Copied from DFSClient.java!
-            mirrorOut.writeShort( DATA_TRANFER_VERSION );
+            mirrorOut.writeShort( DATA_TRANSFER_VERSION );
             mirrorOut.write( OP_WRITE_BLOCK );
             mirrorOut.write( OP_WRITE_BLOCK );
             mirrorOut.writeLong( block.getBlockId() );
             mirrorOut.writeLong( block.getBlockId() );
             mirrorOut.writeInt( pipelineSize );
             mirrorOut.writeInt( pipelineSize );
@@ -1269,11 +1260,11 @@ public class DataNode implements FSConstants, Runnable {
         targetSock.setSoTimeout(socketTimeout);
         targetSock.setSoTimeout(socketTimeout);
 
 
         targetOut = new DataOutputStream(new BufferedOutputStream(
         targetOut = new DataOutputStream(new BufferedOutputStream(
-            targetSock.getOutputStream(), BUFFER_SIZE));
+            targetSock.getOutputStream(), SMALL_BUFFER_SIZE));
 
 
         /* send request to the target */
         /* send request to the target */
         // fist write header info
         // fist write header info
-        targetOut.writeShort(DATA_TRANFER_VERSION); // transfer version
+        targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
         targetOut.writeByte(OP_REPLACE_BLOCK); // op code
         targetOut.writeByte(OP_REPLACE_BLOCK); // op code
         targetOut.writeLong(block.getBlockId()); // block id
         targetOut.writeLong(block.getBlockId()); // block id
         Text.writeString( targetOut, source); // del hint
         Text.writeString( targetOut, source); // del hint
@@ -1445,15 +1436,94 @@ public class DataNode implements FSConstants, Runnable {
     }
     }
   }
   }
 
 
+  /* ********************************************************************
+  Protocol when a client reads data from Datanode (Cur Ver: 9):
+  
+  Client's Request :
+  =================
+   
+     Processed in DataXceiver:
+     +----------------------------------------------+
+     | Common Header   | 1 byte OP == OP_READ_BLOCK |
+     +----------------------------------------------+
+     
+     Processed in readBlock() :
+     +-------------------------------------------------------+
+     | 8 byte Block ID | 8 byte start offset | 8 byte length |
+     +-------------------------------------------------------+
+     
+     Client sends optional response only at the end of receiving data.
+       
+  DataNode Response :
+  ===================
+   
+    In readBlock() :
+    If there is an error while initializing BlockSender :
+       +---------------------------+
+       | 2 byte OP_STATUS_ERROR    | and connection will be closed.
+       +---------------------------+
+    Otherwise
+       +---------------------------+
+       | 2 byte OP_STATUS_SUCCESS  |
+       +---------------------------+
+       
+    Actual data, sent by BlockSender.sendBlock() :
+    
+      ChecksumHeader :
+      +--------------------------------------------------+
+      | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+      +--------------------------------------------------+
+      Followed by actual data in the form of PACKETS: 
+      +------------------------------------+
+      | Sequence of data PACKETs ....      |
+      +------------------------------------+
+    
+    A "PACKET" is defined further below.
+    
+    The client reads data until it receives a packet with 
+    "LastPacketInBlock" set to true or with a zero length. If there is 
+    no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
+    
+    Client optional response at the end of data transmission :
+      +------------------------------+
+      | 2 byte OP_STATUS_CHECKSUM_OK |
+      +------------------------------+
+    
+    PACKET : Contains a packet header, checksum and data. Amount of data
+    ======== carried is set by BUFFER_SIZE.
+    
+      +-----------------------------------------------------+
+      | 4 byte packet length (excluding packet header)      |
+      +-----------------------------------------------------+
+      | 8 byte offset in the block | 8 byte sequence number |
+      +-----------------------------------------------------+
+      | 1 byte isLastPacketInBlock                          |
+      +-----------------------------------------------------+
+      | 4 byte Length of actual data                        |
+      +-----------------------------------------------------+
+      | x byte checksum data. x is defined below            |
+      +-----------------------------------------------------+
+      | actual data ......                                  |
+      +-----------------------------------------------------+
+      
+      x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+          CHECKSUM_SIZE
+          
+      CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
+      
+      The above packet format is used while writing data to DFS also.
+      Not all the fields might be used while reading.
+    
+   ************************************************************************ */
+  
   class BlockSender implements java.io.Closeable {
   class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
     private Block block; // the block to read from
-    private DataInputStream blockIn; // data strean
+    private InputStream blockIn; // data stream
     private DataInputStream checksumIn; // checksum datastream
     private DataInputStream checksumIn; // checksum datastream
     private DataChecksum checksum; // checksum stream
     private DataChecksum checksum; // checksum stream
     private long offset; // starting position to read
     private long offset; // starting position to read
     private long endOffset; // ending position
     private long endOffset; // ending position
     private long blockLength;
     private long blockLength;
-    private byte buf[]; // buffer to store data read from the block file & crc
     private int bytesPerChecksum; // chunk size
     private int bytesPerChecksum; // chunk size
     private int checksumSize; // checksum size
     private int checksumSize; // checksum size
     private boolean corruptChecksumOk; // if need to verify checksum
     private boolean corruptChecksumOk; // if need to verify checksum
@@ -1463,8 +1533,14 @@ public class DataNode implements FSConstants, Runnable {
     private boolean blockReadFully; //set when the whole block is read
     private boolean blockReadFully; //set when the whole block is read
     private boolean verifyChecksum; //if true, check is verified while reading
     private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
     private Throttler throttler;
-    private DataOutputStream out;
-
+    private OutputStream out;
+    
+    static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
+                                        8 + /* offset in block */
+                                        8 + /* seqno */
+                                        1 + /* isLastPacketInBlock */
+                                        4   /* data len */ );
+       
     BlockSender(Block block, long startOffset, long length,
     BlockSender(Block block, long startOffset, long length,
                 boolean corruptChecksumOk, boolean chunkOffsetOK,
                 boolean corruptChecksumOk, boolean chunkOffsetOK,
                 boolean verifyChecksum) throws IOException {
                 boolean verifyChecksum) throws IOException {
@@ -1511,7 +1587,7 @@ public class DataNode implements FSConstants, Runnable {
           throw new IOException(msg);
           throw new IOException(msg);
         }
         }
 
 
-        buf = new byte[bytesPerChecksum + checksumSize];
+        
         offset = (startOffset - (startOffset % bytesPerChecksum));
         offset = (startOffset - (startOffset % bytesPerChecksum));
         if (length >= 0) {
         if (length >= 0) {
           // Make sure endOffset points to end of a checksumed chunk.
           // Make sure endOffset points to end of a checksumed chunk.
@@ -1535,8 +1611,7 @@ public class DataNode implements FSConstants, Runnable {
         }
         }
         seqno = 0;
         seqno = 0;
 
 
-        InputStream blockInStream = data.getBlockInputStream(block, offset); // seek to offset
-        blockIn = new DataInputStream(new BufferedInputStream(blockInStream, BUFFER_SIZE));
+        blockIn = data.getBlockInputStream(block, offset); // seek to offset
       } catch (IOException ioe) {
       } catch (IOException ioe) {
         IOUtils.closeStream(this);
         IOUtils.closeStream(this);
         IOUtils.closeStream(blockIn);
         IOUtils.closeStream(blockIn);
@@ -1571,26 +1646,37 @@ public class DataNode implements FSConstants, Runnable {
       }
       }
     }
     }
 
 
-    
-    private int sendChunk()
-        throws IOException {
-      int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
+    /**
+     * Sends upto maxChunks chunks of data.
+     */
+    private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException {
+      // Sends multiple chunks in one packet with a single write().
+
+      int len = Math.min((int) (endOffset - offset),
+                         bytesPerChecksum*maxChunks);
       if (len == 0) {
       if (len == 0) {
         return 0;
         return 0;
       }
       }
-      blockIn.readFully(buf, 0, len);
 
 
+      int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+      int packetLen = len + numChunks*checksumSize + 4;
+      pkt.clear();
+      
+      // write packet header
+      pkt.putInt(packetLen);
+      pkt.putLong(offset);
+      pkt.putLong(seqno);
+      pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+                 //why no ByteBuf.putBoolean()?
+      pkt.putInt(len);
+      
+      int checksumOff = pkt.position();
+      int checksumLen = numChunks * checksumSize;
+      byte[] buf = pkt.array();
+      
       if (checksumSize > 0 && checksumIn != null) {
       if (checksumSize > 0 && checksumIn != null) {
         try {
         try {
-          checksumIn.readFully(buf, len, checksumSize);
-          
-          if (verifyChecksum) {
-            checksum.reset();
-            checksum.update(buf, 0, len);
-            if (!checksum.compare(buf, len)) {
-              throw new ChecksumException("Checksum failed at " + offset, len);
-            }
-          }
+          checksumIn.readFully(buf, checksumOff, checksumLen);
         } catch (IOException e) {
         } catch (IOException e) {
           LOG.warn(" Could not read or failed to veirfy checksum for data" +
           LOG.warn(" Could not read or failed to veirfy checksum for data" +
                    " at offset " + offset + " for block " + block + " got : "
                    " at offset " + offset + " for block " + block + " got : "
@@ -1599,28 +1685,39 @@ public class DataNode implements FSConstants, Runnable {
           checksumIn = null;
           checksumIn = null;
           if (corruptChecksumOk) {
           if (corruptChecksumOk) {
             // Just fill the array with zeros.
             // Just fill the array with zeros.
-            Arrays.fill(buf, len, len + checksumSize, (byte) 0);
+            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
           } else {
           } else {
             throw e;
             throw e;
           }
           }
         }
         }
       }
       }
-      boolean lastPacketInBlock = false;
-      if (offset + len >= endOffset) {
-        lastPacketInBlock = true;
+      
+      int dataOff = checksumOff + checksumLen;
+      IOUtils.readFully(blockIn, buf, dataOff, len);
+      
+      if (verifyChecksum) {
+        int dOff = dataOff;
+        int cOff = checksumOff;
+        int dLeft = len;
+        
+        for (int i=0; i<numChunks; i++) {
+          checksum.reset();
+          int dLen = Math.min(dLeft, bytesPerChecksum);
+          checksum.update(buf, dOff, dLen);
+          if (!checksum.compare(buf, cOff)) {
+            throw new ChecksumException("Checksum failed at " + 
+                                        (offset + len - dLeft), len);
+          }
+          dLeft -= dLen;
+          dOff += dLen;
+          cOff += checksumSize;
+        }
       }
       }
 
 
-      // write packet header
-      out.writeInt(len + checksumSize + 4);
-      out.writeLong(offset);
-      out.writeLong(seqno);
-      out.writeBoolean(lastPacketInBlock);
-      
-      out.writeInt(len);
-      out.write(buf, 0, len + checksumSize);
+      out.write(buf, 0, dataOff + len);
 
 
       if (throttler != null) { // rebalancing so throttle
       if (throttler != null) { // rebalancing so throttle
-        throttler.throttle(len + checksumSize + 4);
+        throttler.throttle(packetLen);
       }
       }
 
 
       return len;
       return len;
@@ -1648,15 +1745,21 @@ public class DataNode implements FSConstants, Runnable {
         if ( chunkOffsetOK ) {
         if ( chunkOffsetOK ) {
           out.writeLong( offset );
           out.writeLong( offset );
         }
         }
+        //set up sendBuf:
+        int maxChunksPerPacket = Math.max(1,
+                      (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+        ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN + 
+                      (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
+
 
 
         while (endOffset > offset) {
         while (endOffset > offset) {
-          // Write one data chunk per loop.
-          long len = sendChunk();
+          long len = sendChunks(pktBuf, maxChunksPerPacket);
           offset += len;
           offset += len;
-          totalRead += len + checksumSize;
+          totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+                              checksumSize);
           seqno++;
           seqno++;
         }
         }
-        out.writeInt(0); // mark the end of block
+        out.writeInt(0); // mark the end of block        
         out.flush();
         out.flush();
       } finally {
       } finally {
         close();
         close();
@@ -1965,6 +2068,7 @@ public class DataNode implements FSConstants, Runnable {
     private int bytesPerChecksum;
     private int bytesPerChecksum;
     private int checksumSize;
     private int checksumSize;
     private byte buf[];
     private byte buf[];
+    private byte checksumBuf[];
     private long offsetInBlock;
     private long offsetInBlock;
     final private String inAddr;
     final private String inAddr;
     private String mirrorAddr;
     private String mirrorAddr;
@@ -1995,6 +2099,7 @@ public class DataNode implements FSConstants, Runnable {
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
         this.checksumSize = checksum.getChecksumSize();
         this.buf = new byte[bytesPerChecksum + checksumSize];
         this.buf = new byte[bytesPerChecksum + checksumSize];
+        this.checksumBuf = new byte[checksumSize];
         //
         //
         // Open local disk out
         // Open local disk out
         //
         //
@@ -2055,7 +2160,8 @@ public class DataNode implements FSConstants, Runnable {
     }
     }
 
 
     /* receive a chunk: write it to disk & mirror it to another stream */
     /* receive a chunk: write it to disk & mirror it to another stream */
-    private void receiveChunk( int len ) throws IOException {
+    private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) 
+                              throws IOException {
       if (len <= 0 || len > bytesPerChecksum) {
       if (len <= 0 || len > bytesPerChecksum) {
         throw new IOException("Got wrong length during writeBlock(" + block
         throw new IOException("Got wrong length during writeBlock(" + block
             + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
             + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
@@ -2071,7 +2177,7 @@ public class DataNode implements FSConstants, Runnable {
       lastLen = curLen;
       lastLen = curLen;
       curLen = len;
       curLen = len;
 
 
-      in.readFully(buf, 0, len + checksumSize);
+      in.readFully(buf, 0, len);
 
 
       /*
       /*
        * Verification is not included in the initial design. For now, it at
        * Verification is not included in the initial design. For now, it at
@@ -2080,7 +2186,7 @@ public class DataNode implements FSConstants, Runnable {
        */
        */
       checksum.update(buf, 0, len);
       checksum.update(buf, 0, len);
 
 
-      if (!checksum.compare(buf, len)) {
+      if (!checksum.compare(checksumBuf, checksumOff)) {
         throw new IOException("Unexpected checksum mismatch "
         throw new IOException("Unexpected checksum mismatch "
             + "while writing " + block + " from " + inAddr);
             + "while writing " + block + " from " + inAddr);
       }
       }
@@ -2097,7 +2203,8 @@ public class DataNode implements FSConstants, Runnable {
       if (mirrorOut != null) {
       if (mirrorOut != null) {
         try {
         try {
           mirrorOut.writeInt(len);
           mirrorOut.writeInt(len);
-          mirrorOut.write(buf, 0, len + checksumSize);
+          mirrorOut.write(checksumBuf, checksumOff, checksumSize);
+          mirrorOut.write(buf, 0, len);
         } catch (IOException ioe) {
         } catch (IOException ioe) {
           LOG.info(dnRegistration + ":Exception writing block " +
           LOG.info(dnRegistration + ":Exception writing block " +
                    block + " to mirror " + mirrorAddr + "\n" +
                    block + " to mirror " + mirrorAddr + "\n" +
@@ -2123,7 +2230,7 @@ public class DataNode implements FSConstants, Runnable {
         if (!finalized) {
         if (!finalized) {
           out.write(buf, 0, len);
           out.write(buf, 0, len);
           // Write checksum
           // Write checksum
-          checksumOut.write(buf, len, checksumSize);
+          checksumOut.write(checksumBuf, checksumOff, checksumSize);
           myMetrics.bytesWritten.inc(len);
           myMetrics.bytesWritten.inc(len);
         }
         }
       } catch (IOException iex) {
       } catch (IOException iex) {
@@ -2145,7 +2252,15 @@ public class DataNode implements FSConstants, Runnable {
      * Receive and process a packet. It contains many chunks.
      * Receive and process a packet. It contains many chunks.
      */
      */
     private void receivePacket(int packetSize) throws IOException {
     private void receivePacket(int packetSize) throws IOException {
-
+      /* TEMP: Currently this handles both interleaved 
+       * and non-interleaved DATA_CHUNKs in side the packet.
+       * non-interleaved is required for HADOOP-2758 and in future.
+       * iterleaved will be removed once extra buffer copies are removed
+       * in write path (HADOOP-1702).
+       * 
+       * Format of Non-interleaved data packets is described in the 
+       * comment before BlockSender.
+       */
       offsetInBlock = in.readLong(); // get offset of packet in block
       offsetInBlock = in.readLong(); // get offset of packet in block
       long seqno = in.readLong();    // get seqno
       long seqno = in.readLong();    // get seqno
       boolean lastPacketInBlock = in.readBoolean();
       boolean lastPacketInBlock = in.readBoolean();
@@ -2157,9 +2272,6 @@ public class DataNode implements FSConstants, Runnable {
                 " lastPacketInBlock " + lastPacketInBlock);
                 " lastPacketInBlock " + lastPacketInBlock);
       setBlockPosition(offsetInBlock);
       setBlockPosition(offsetInBlock);
 
 
-      int len = in.readInt();
-      curPacketSize += 4;            // read an integer in previous line
-
       // send packet header to next datanode in pipeline
       // send packet header to next datanode in pipeline
       if (mirrorOut != null) {
       if (mirrorOut != null) {
         try {
         try {
@@ -2189,6 +2301,9 @@ public class DataNode implements FSConstants, Runnable {
         }
         }
       }
       }
 
 
+      int len = in.readInt();
+      curPacketSize += 4;            // read an integer in previous line
+
       if (len == 0) {
       if (len == 0) {
         LOG.info("Receiving empty packet for block " + block);
         LOG.info("Receiving empty packet for block " + block);
         if (mirrorOut != null) {
         if (mirrorOut != null) {
@@ -2198,15 +2313,37 @@ public class DataNode implements FSConstants, Runnable {
       }
       }
 
 
       while (len != 0) {
       while (len != 0) {
-        LOG.debug("Receiving one chunk for block " + block +
-                  " of size " + len);
-        receiveChunk( len );
-        curPacketSize += (len + checksumSize);
-        if (curPacketSize > packetSize) {
-          throw new IOException("Packet size for block " + block +
-                                " too long " + curPacketSize +
-                                " was expecting " + packetSize);
-        } 
+        int checksumOff = 0;    
+        if (len > 0) {
+          int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum*
+                            checksumSize;
+          if (checksumBuf.length < checksumLen) {
+            checksumBuf = new byte[checksumLen];
+          }
+          // read the checksum
+          in.readFully(checksumBuf, 0, checksumLen);
+        }
+        
+        while (len != 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Receiving one chunk for block " + block +
+                      " of size " + len);
+          }
+          
+          int toRecv = Math.min(len, bytesPerChecksum);
+          
+          receiveChunk(toRecv, checksumBuf, checksumOff);
+          
+          len -= toRecv;
+          checksumOff += checksumSize;       
+          curPacketSize += (toRecv + checksumSize);
+          if (curPacketSize > packetSize) {
+            throw new IOException("Packet size for block " + block +
+                                  " too long " + curPacketSize +
+                                  " was expecting " + packetSize);
+          } 
+        }
+        
         if (curPacketSize == packetSize) {
         if (curPacketSize == packetSize) {
           if (mirrorOut != null) {
           if (mirrorOut != null) {
             mirrorOut.flush();
             mirrorOut.flush();
@@ -2388,13 +2525,14 @@ public class DataNode implements FSConstants, Runnable {
         sock.setSoTimeout(targets.length * socketTimeout);
         sock.setSoTimeout(targets.length * socketTimeout);
 
 
         out = new DataOutputStream(new BufferedOutputStream(
         out = new DataOutputStream(new BufferedOutputStream(
-            sock.getOutputStream(), BUFFER_SIZE));
+                       sock.getOutputStream(), SMALL_BUFFER_SIZE));
+
         blockSender = new BlockSender(b, 0, -1, false, false, false);
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 
 
         //
         //
         // Header info
         // Header info
         //
         //
-        out.writeShort(DATA_TRANFER_VERSION);
+        out.writeShort(DATA_TRANSFER_VERSION);
         out.writeByte(OP_WRITE_BLOCK);
         out.writeByte(OP_WRITE_BLOCK);
         out.writeLong(b.getBlockId());
         out.writeLong(b.getBlockId());
         out.writeInt(0);           // no pipelining
         out.writeInt(0);           // no pipelining

+ 8 - 14
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -100,21 +100,13 @@ public interface FSConstants {
    * This should change when serialization of DatanodeInfo, not just
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    * when protocol changes. It is not very obvious. 
    */
    */
-  /* Version 7: 
-   * Add two operations to data node
-   * OP_COPY_BLOCK: 
-   *   The command is for sending to a proxy source for the balancing purpose
-   *   The datanode then sends OP_REPLACE_BLOCK request to the destination
-   *   OP_COPY_BLOCK BlockID(long) SourceID (UTF8) Destination (DatanodeInfo)
-   *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
-   * OP_REPLACE_BLOCK: 
-   *   the command is for sending to a destination for the balancing purpose
-   *   The datanode then writes the block to disk and notifies namenode of this
-   *   received block together with a deletion hint: sourceID
-   *   OP_REPLACE_BLOCK BlockID(long) SourceID(UTF8) Block_Data_With_Crc
-   *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
+  /*
+   * Version 9:
+   *   While reading data from Datanode, each PACKET can consist
+   *   of non-interleaved data (check for for larger amount of data,
+   *   followed by data).
    */
    */
-  public static final int DATA_TRANFER_VERSION = 8;
+  public static final int DATA_TRANSFER_VERSION = 9;
 
 
   // Return codes for file create
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;
   public static final int OPERATION_FAILED = 0;
@@ -140,6 +132,8 @@ public interface FSConstants {
   public static int MAX_PATH_DEPTH = 1000;
   public static int MAX_PATH_DEPTH = 1000;
     
     
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+  //Used for writing header etc.
+  static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
   //TODO mb@media-style.com: should be conf injected?
   //TODO mb@media-style.com: should be conf injected?
   public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
   public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestBlockReplacement.java

@@ -215,7 +215,7 @@ public class TestBlockReplacement extends TestCase {
     sock.setSoTimeout(FSConstants.READ_TIMEOUT);
     sock.setSoTimeout(FSConstants.READ_TIMEOUT);
     // sendRequest
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+    out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
     out.writeByte(FSConstants.OP_COPY_BLOCK);
     out.writeByte(FSConstants.OP_COPY_BLOCK);
     out.writeLong(block.getBlockId());
     out.writeLong(block.getBlockId());
     Text.writeString(out, source.getStorageID());
     Text.writeString(out, source.getStorageID());

+ 13 - 13
src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

@@ -152,19 +152,19 @@ public class TestDataTransferProtocol extends TestCase {
     sendBuf.reset();
     sendBuf.reset();
     
     
     // bad version
     // bad version
-    recvOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
-    sendOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
+    recvOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
+    sendOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
     sendRecvData("Wrong Version", true);
     sendRecvData("Wrong Version", true);
 
 
     // bad ops
     // bad ops
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1));
     sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1));
     sendRecvData("Wrong Op Code", true);
     sendRecvData("Wrong Op Code", true);
     
     
     /* Test OP_WRITE_BLOCK */
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(newBlockId); // block id
     sendOut.writeLong(newBlockId); // block id
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
@@ -181,7 +181,7 @@ public class TestDataTransferProtocol extends TestCase {
 
 
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(newBlockId);
     sendOut.writeLong(newBlockId);
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
@@ -195,7 +195,7 @@ public class TestDataTransferProtocol extends TestCase {
 
 
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(++newBlockId);
     sendOut.writeLong(++newBlockId);
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
@@ -220,7 +220,7 @@ public class TestDataTransferProtocol extends TestCase {
     // test for writing a valid zero size block
     // test for writing a valid zero size block
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(++newBlockId);
     sendOut.writeLong(++newBlockId);
     sendOut.writeInt(0);           // targets in pipeline 
     sendOut.writeInt(0);           // targets in pipeline 
@@ -247,7 +247,7 @@ public class TestDataTransferProtocol extends TestCase {
     // bad block id
     // bad block id
     sendBuf.reset();
     sendBuf.reset();
     recvBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     newBlockId = firstBlock.getBlockId()-1;
     newBlockId = firstBlock.getBlockId()-1;
     sendOut.writeLong(newBlockId);
     sendOut.writeLong(newBlockId);
@@ -258,7 +258,7 @@ public class TestDataTransferProtocol extends TestCase {
 
 
     // negative block start offset
     // negative block start offset
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(-1L);
     sendOut.writeLong(-1L);
@@ -268,7 +268,7 @@ public class TestDataTransferProtocol extends TestCase {
 
 
     // bad block start offset
     // bad block start offset
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
@@ -280,7 +280,7 @@ public class TestDataTransferProtocol extends TestCase {
     recvBuf.reset();
     recvBuf.reset();
     recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);    
     recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);    
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(0);
     sendOut.writeLong(0);
@@ -292,7 +292,7 @@ public class TestDataTransferProtocol extends TestCase {
     recvBuf.reset();
     recvBuf.reset();
     recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);    
     recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);    
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(0);
     sendOut.writeLong(0);
@@ -302,7 +302,7 @@ public class TestDataTransferProtocol extends TestCase {
     
     
     //At the end of all this, read the file to make sure that succeeds finally.
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(0);
     sendOut.writeLong(0);