Browse Source

HADOOP-1702. Reduce buffer copies when data is written to DFS.
DataNodes take 30% less CPU while writing data. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@656118 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 years ago
parent
commit
8b38c591f4

+ 3 - 0
CHANGES.txt

@@ -140,6 +140,9 @@ Trunk (unreleased changes)
 
     HADOOP-3369. Fast block processing during name-node startup. (shv)
 
+    HADOOP-1702. Reduce buffer copies when data is written to DFS. 
+    DataNodes take 30% less CPU while writing data. (rangadi)
+
   BUG FIXES
 
     HADOOP-2905. 'fsck -move' triggers NPE in NameNode. 

+ 122 - 128
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -39,6 +39,7 @@ import java.util.*;
 import java.util.zip.CRC32;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
@@ -71,6 +72,7 @@ class DFSClient implements FSConstants {
   private short defaultReplication;
   private SocketFactory socketFactory;
   private int socketTimeout;
+  final int writePacketSize;
   private FileSystem.Statistics stats;
     
   /**
@@ -144,6 +146,8 @@ class DFSClient implements FSConstants {
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
                                      FSConstants.READ_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
+    // dfs.write.packet.size is an internal config variable
+    this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     
     try {
       this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -1683,7 +1687,6 @@ class DFSClient implements FSConstants {
     private DataInputStream blockReplyStream;
     private Block block;
     private long blockSize;
-    private int buffersize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@@ -1694,10 +1697,8 @@ class DFSClient implements FSConstants {
     private ResponseProcessor response = null;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
-    private int packetSize = 0;
+    private int packetSize = 0; // write packet size, including the header.
     private int chunksPerPacket = 0;
-    private int chunksPerBlock = 0;
-    private int chunkSize = 0;
     private DatanodeInfo[] nodes = null; // list of targets for current block
     private volatile boolean hasError = false;
     private volatile int errorIndex = 0;
@@ -1707,56 +1708,95 @@ class DFSClient implements FSConstants {
     private boolean persistBlocks = false; // persist blocks on namenode
 
     private class Packet {
-      ByteBuffer buffer;
+      ByteBuffer buffer;           // only one of buf and buffer is non-null
+      byte[]  buf;
       long    seqno;               // sequencenumber of buffer in block
       long    offsetInBlock;       // offset in block
       boolean lastPacketInBlock;   // is this the last packet in block?
       int     numChunks;           // number of chunks currently in packet
-      int     flushOffsetBuffer;   // last full chunk that was flushed
-      long    flushOffsetBlock;    // block offset of last full chunk flushed
+      int     dataStart;
+      int     dataPos;
+      int     checksumStart;
+      int     checksumPos;      
   
       // create a new packet
-      Packet(int size, long offsetInBlock) {
-        buffer = ByteBuffer.allocate(size);
-        buffer.clear();
+      Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
         this.lastPacketInBlock = false;
         this.numChunks = 0;
         this.offsetInBlock = offsetInBlock;
         this.seqno = currentSeqno;
-        this.flushOffsetBuffer = 0;
-        this.flushOffsetBlock = 0;
         currentSeqno++;
+        
+        buffer = null;
+        buf = new byte[pktSize];
+        
+        checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+        checksumPos = checksumStart;
+        dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+        dataPos = dataStart;
       }
 
-      // create a new Packet with the contents copied from the
-      // specified one. Shares the same buffer.
-      Packet(Packet old) {
-        this.buffer = old.buffer;
-        this.lastPacketInBlock = old.lastPacketInBlock;
-        this.numChunks = old.numChunks;
-        this.offsetInBlock = old.offsetInBlock;
-        this.seqno = old.seqno;
-        this.flushOffsetBuffer = old.flushOffsetBuffer;
-        this.flushOffsetBlock = old.flushOffsetBlock;
-      }
-
-      // writes len bytes from offset off in inarray into
-      // this packet.
-      // 
-      void write(byte[] inarray, int off, int len) {
-        buffer.put(inarray, off, len);
+      void writeData(byte[] inarray, int off, int len) {
+        if ( dataPos + len > buf.length) {
+          throw new BufferOverflowException();
+        }
+        System.arraycopy(inarray, off, buf, dataPos, len);
+        dataPos += len;
       }
   
-      // writes an integer into this packet. 
-      //
-      void  writeInt(int value) {
-       buffer.putInt(value);
+      void  writeChecksum(byte[] inarray, int off, int len) {
+        if (checksumPos + len > dataStart) {
+          throw new BufferOverflowException();
+        }
+        System.arraycopy(inarray, off, buf, checksumPos, len);
+        checksumPos += len;
       }
-
-      // sets the last flush offset of this packet.
-      void setFlushOffset(int bufoff, long blockOff) {
-        this.flushOffsetBuffer = bufoff;;
-        this.flushOffsetBlock = blockOff;
+      
+      /**
+       * Returns ByteBuffer that contains one full packet, including header.
+       */
+      ByteBuffer getBuffer() {
+        /* Once this is called, no more data can be added to the packet.
+         * setting 'buf' to null ensures that.
+         * This is called only when the packet is ready to be sent.
+         */
+        if (buffer != null) {
+          return buffer;
+        }
+        
+        //prepare the header and close any gap between checksum and data.
+        
+        int dataLen = dataPos - dataStart;
+        int checksumLen = checksumPos - checksumStart;
+        
+        if (checksumPos != dataStart) {
+          /* move the checksum to cover the gap.
+           * This can happen for the last packet.
+           */
+          System.arraycopy(buf, checksumStart, buf, 
+                           dataStart - checksumLen , checksumLen); 
+        }
+        
+        int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;
+        
+        //normally dataStart == checksumPos, i.e., offset is zero.
+        buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
+                                 DataNode.PKT_HEADER_LEN + pktLen);
+        buf = null;
+        buffer.mark();
+        
+        /* write the header and data length.
+         * The format is described in comment before DataNode.BlockSender
+         */
+        buffer.putInt(pktLen);  // pktSize
+        buffer.putLong(offsetInBlock); 
+        buffer.putLong(seqno);
+        buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
+        //end of pkt header
+        buffer.putInt(dataLen); // actual data length, excluding checksum.
+        
+        buffer.reset();
+        return buffer;
       }
     }
   
@@ -1807,8 +1847,6 @@ class DFSClient implements FSConstants {
             try {
               // get packet to be sent.
               one = dataQueue.getFirst();
-              int start = 0;
-              int len = one.buffer.limit();
               long offsetInBlock = one.offsetInBlock;
   
               // get new block from namenode.
@@ -1821,16 +1859,6 @@ class DFSClient implements FSConstants {
                 response.start();
               }
 
-              // If we are sending a sub-packet, then determine the offset 
-              // in block.
-              if (one.flushOffsetBuffer != 0) {
-                offsetInBlock += one.flushOffsetBlock;
-                len = len - one.flushOffsetBuffer;
-                start += one.flushOffsetBuffer;
-              }
-
-              // user bytes from 'position' to 'limit'.
-              byte[] arr = one.buffer.array();
               if (offsetInBlock >= blockSize) {
                 throw new IOException("BlockSize " + blockSize +
                                       " is smaller than data size. " +
@@ -1839,6 +1867,8 @@ class DFSClient implements FSConstants {
                                       " Aborting file " + src);
               }
 
+              ByteBuffer buf = one.getBuffer();
+              
               // move packet from dataQueue to ackQueue
               dataQueue.removeFirst();
               dataQueue.notifyAll();
@@ -1846,22 +1876,21 @@ class DFSClient implements FSConstants {
                 ackQueue.addLast(one);
                 ackQueue.notifyAll();
               } 
-  
+              
               // write out data to remote datanode
-              blockStream.writeInt(len); // size of this packet
-              blockStream.writeLong(offsetInBlock); // data offset in block
-              blockStream.writeLong(one.seqno); // sequence num of packet
-              blockStream.writeBoolean(one.lastPacketInBlock); 
-              blockStream.write(arr, start, len);
+              blockStream.write(buf.array(), buf.position(), buf.remaining());
+              
               if (one.lastPacketInBlock) {
                 blockStream.writeInt(0); // indicate end-of-block 
               }
               blockStream.flush();
-              LOG.debug("DataStreamer block " + block +
-                        " wrote packet seqno:" + one.seqno +
-                        " size:" + len + 
-                        " offsetInBlock:" + offsetInBlock +
-                        " lastPacketInBlock:" + one.lastPacketInBlock);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("DataStreamer block " + block +
+                          " wrote packet seqno:" + one.seqno +
+                          " size:" + buf.remaining() +
+                          " offsetInBlock:" + one.offsetInBlock + 
+                          " lastPacketInBlock:" + one.lastPacketInBlock);
+              }
             } catch (IOException e) {
               LOG.warn("DataStreamer Exception: " + e);
               hasError = true;
@@ -2138,7 +2167,6 @@ class DFSClient implements FSConstants {
       super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
       this.src = src;
       this.blockSize = blockSize;
-      this.buffersize = buffersize;
       this.progress = progress;
       if (progress != null) {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -2154,11 +2182,11 @@ class DFSClient implements FSConstants {
       }
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
-      // A maximum of 128 chunks per packet, i.e. 64K packet size.
-      chunkSize = bytesPerChecksum + 2 * SIZE_OF_INTEGER; // user data & checksum
-      chunksPerBlock = (int)(blockSize / bytesPerChecksum);
-      chunksPerPacket = Math.min(chunksPerBlock, 128);
-      packetSize = chunkSize * chunksPerPacket;
+      int chunkSize = bytesPerChecksum + checksum.getChecksumSize();
+      chunksPerPacket = Math.max((writePacketSize - DataNode.PKT_HEADER_LEN - 
+                                  SIZE_OF_INTEGER + chunkSize-1)/chunkSize, 1);
+      packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER + 
+                   chunkSize * chunksPerPacket; 
 
       try {
         namenode.create(
@@ -2254,7 +2282,7 @@ class DFSClient implements FSConstants {
         //
         DataOutputStream out = new DataOutputStream(
             new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), 
-                                     buffersize));
+                                     DataNode.SMALL_BUFFER_SIZE));
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
         out.writeShort( DATA_TRANSFER_VERSION );
@@ -2351,12 +2379,6 @@ class DFSClient implements FSConstants {
                               this.checksum.getChecksumSize() + 
                               " but found to be " + checksum.length);
       }
-      if (len + cklen + SIZE_OF_INTEGER > chunkSize) {
-        throw new IOException("writeChunk() found data of size " +
-                              (len + cklen + 4) +
-                              " that cannot be larger than chukSize " + 
-                              chunkSize);
-      }
 
       synchronized (dataQueue) {
   
@@ -2370,30 +2392,30 @@ class DFSClient implements FSConstants {
         isClosed();
   
         if (currentPacket == null) {
-          currentPacket = new Packet(packetSize, bytesCurBlock);
+          currentPacket = new Packet(packetSize, chunksPerPacket, 
+                                     bytesCurBlock);
           LOG.debug("DFSClient writeChunk allocating new packet " + 
                     currentPacket.seqno);
         }
 
-        currentPacket.writeInt(len);
-        currentPacket.write(checksum, 0, cklen);
-        currentPacket.write(b, offset, len);
+        currentPacket.writeChecksum(checksum, 0, cklen);
+        currentPacket.writeData(b, offset, len);
         currentPacket.numChunks++;
         bytesCurBlock += len;
 
         // If packet is full, enqueue it for transmission
         //
         if (currentPacket.numChunks == chunksPerPacket ||
-            bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+            bytesCurBlock == blockSize) {
           LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno);
-          currentPacket.buffer.flip();
           //
           // if we allocated a new packet because we encountered a block
           // boundary, reset bytesCurBlock.
           //
-          if (bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+          if (bytesCurBlock == blockSize) {
             currentPacket.lastPacketInBlock = true;
             bytesCurBlock = 0;
+            lastFlushOffset = -1;
           }
           dataQueue.addLast(currentPacket);
           dataQueue.notifyAll();
@@ -2410,66 +2432,38 @@ class DFSClient implements FSConstants {
      * datanode. Block allocations are persisted on namenode.
      */
     public synchronized void fsync() throws IOException {
-      Packet savePacket = null;
-      int position = 0;
-      long saveOffset = 0;
-
       try {
-        // Record the state of the current output stream.
-        // This state will be reverted after the flush successfully
-        // finishes. It is necessary to do this so that partial 
-        // checksum chunks are reused by writes that follow this 
-        // flush.
-        if (currentPacket != null) {
-          savePacket = new Packet(currentPacket);
-          position = savePacket.buffer.position();
-        }
-        saveOffset = bytesCurBlock;
+        /* Record current blockOffset. This might be changed inside
+         * flushBuffer() where a partial checksum chunk might be flushed.
+         * After the flush, reset the bytesCurBlock back to its previous value,
+         * any partial checksum chunk will be sent now and in next packet.
+         */
+        long saveOffset = bytesCurBlock;
 
         // flush checksum buffer, but keep checksum buffer intact
         flushBuffer(true);
 
-        LOG.debug("DFSClient flushInternal save position " +  
-                  position +
-                  " cur position " +
-                  ((currentPacket != null) ? currentPacket.buffer.position() : -1) +
-                  " limit " +
-                  ((currentPacket != null) ? currentPacket.buffer.limit() : -1) +
-                   " bytesCurBlock " + bytesCurBlock +
-                   " lastFlushOffset " + lastFlushOffset);
-
-        //
-        // Detect the condition that we have already flushed all
-        // outstanding data.
-        //
-        boolean skipFlush = (lastFlushOffset == bytesCurBlock && 
-                             savePacket != null && currentPacket != null &&
-                             savePacket.seqno == currentPacket.seqno);
+        LOG.debug("DFSClient flush() : saveOffset " + saveOffset +  
+                  " bytesCurBlock " + bytesCurBlock +
+                  " lastFlushOffset " + lastFlushOffset);
         
-        // Do the flush.
-        //
-        if (!skipFlush) {
+        // Flush only if we haven't already flushed till this offset.
+        if (lastFlushOffset != bytesCurBlock) {
 
           // record the valid offset of this flush
           lastFlushOffset = bytesCurBlock;
 
           // wait for all packets to be sent and acknowledged
           flushInternal();
+        } else {
+          // just discard the current packet since it is already been sent.
+          currentPacket = null;
         }
         
         // Restore state of stream. Record the last flush offset 
         // of the last full chunk that was flushed.
         //
         bytesCurBlock = saveOffset;
-        currentPacket = null;
-        if (savePacket != null) {
-          savePacket.buffer.limit(savePacket.buffer.capacity());
-          savePacket.buffer.position(position);
-          savePacket.setFlushOffset(position, 
-                                    savePacket.numChunks * 
-                                    checksum.getBytesPerChecksum());
-          currentPacket = savePacket;
-        }
 
         // If any new blocks were allocated since the last flush, 
         // then persist block locations on namenode. 
@@ -2501,7 +2495,6 @@ class DFSClient implements FSConstants {
           // If there is data in the current buffer, send it across
           //
           if (currentPacket != null) {
-            currentPacket.buffer.flip();
             dataQueue.addLast(currentPacket);
             dataQueue.notifyAll();
             currentPacket = null;
@@ -2594,12 +2587,11 @@ class DFSClient implements FSConstants {
           // packet with empty payload.
           synchronized (dataQueue) {
             if (currentPacket == null && bytesCurBlock != 0) {
-              currentPacket = new Packet(packetSize, bytesCurBlock);
-              currentPacket.writeInt(0); // one chunk with empty contents
+              currentPacket = new Packet(packetSize, chunksPerPacket,
+                                         bytesCurBlock);
             }
             if (currentPacket != null) { 
               currentPacket.lastPacketInBlock = true;
-              currentPacket.setFlushOffset(0, 0); // send whole packet
             }
           }
 
@@ -2649,7 +2641,9 @@ class DFSClient implements FSConstants {
 
     synchronized void setChunksPerPacket(int value) {
       chunksPerPacket = Math.min(chunksPerPacket, value);
-      packetSize = chunkSize * chunksPerPacket;
+      packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+                   (checksum.getBytesPerChecksum() + 
+                    checksum.getChecksumSize()) * chunksPerPacket;
     }
 
     synchronized void setTestFilename(String newname) {

+ 250 - 206
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -129,6 +129,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
   private int socketTimeout;
   private int socketWriteTimeout = 0;  
   private boolean transferToAllowed = true;
+  private int writePacketSize = 0;
   
   DataBlockScanner blockScanner;
   Daemon blockScannerThread;
@@ -221,6 +222,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
      * to false on some of them. */
     this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
                                              true);
+    this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     String address = 
       NetUtils.getServerAddress(conf,
                                 "dfs.datanode.bindAddress", 
@@ -991,7 +993,8 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
       DataInputStream in=null; 
       try {
         in = new DataInputStream(
-            new BufferedInputStream(NetUtils.getInputStream(s), BUFFER_SIZE));
+            new BufferedInputStream(NetUtils.getInputStream(s), 
+                                    SMALL_BUFFER_SIZE));
         short version = in.readShort();
         if ( version != DATA_TRANSFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
@@ -1174,7 +1177,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
             mirrorOut = new DataOutputStream(
                new BufferedOutputStream(
                            NetUtils.getOutputStream(mirrorSock, writeTimeout),
-                           BUFFER_SIZE));
+                           SMALL_BUFFER_SIZE));
             mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
             // Write header: Copied from DFSClient.java!
@@ -1603,6 +1606,12 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
     
    ************************************************************************ */
   
+  /** Header size for a packet */
+  static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
+                                      8 + /* offset in block */
+                                      8 + /* seqno */
+                                      1   /* isLastPacketInBlock */);
+  
   class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
     private InputStream blockIn; // data stream
@@ -1622,12 +1631,6 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
     private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
     
-    static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
-                                        8 + /* offset in block */
-                                        8 + /* seqno */
-                                        1 + /* isLastPacketInBlock */
-                                        4   /* data len */ );
-       
     BlockSender(Block block, long startOffset, long length,
                 boolean corruptChecksumOk, boolean chunkOffsetOK,
                 boolean verifyChecksum) throws IOException {
@@ -1873,7 +1876,7 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         out.flush();
         
         int maxChunksPerPacket;
-        int pktSize;
+        int pktSize = PKT_HEADER_LEN + SIZE_OF_INTEGER;
         
         if (transferToAllowed && !verifyChecksum && 
             baseStream instanceof SocketOutputStream && 
@@ -1891,12 +1894,11 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
                                 + bytesPerChecksum - 1)/bytesPerChecksum;
           
           // allocate smaller buffer while using transferTo(). 
-          pktSize = PKT_HEADER_LEN + checksumSize * maxChunksPerPacket;
+          pktSize += checksumSize * maxChunksPerPacket;
         } else {
           maxChunksPerPacket = Math.max(1,
                    (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
-          pktSize = PKT_HEADER_LEN + 
-                    (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+          pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
         }
 
         ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
@@ -2200,39 +2202,6 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
     }
   }
 
-  // this class is a bufferoutputstream that exposes the number of
-  // bytes in the buffer.
-  static private class DFSBufferedOutputStream extends BufferedOutputStream {
-    OutputStream out;
-    DFSBufferedOutputStream(OutputStream out, int capacity) {
-      super(out, capacity);
-      this.out = out;
-    }
-
-    public synchronized void flush() throws IOException {
-      super.flush();
-    }
-
-    /**
-     * Returns true if the channel pointer is already set at the
-     * specified offset. Otherwise returns false.
-     */
-    synchronized boolean samePosition(FSDatasetInterface data, 
-                                      FSDataset.BlockWriteStreams streams,
-                                      Block block,
-                                      long offset) 
-                                      throws IOException {
-      if (data.getChannelPosition(block, streams) + count == offset) {
-        return true;
-      }
-      LOG.debug("samePosition is false. " +
-                " current position " + data.getChannelPosition(block, streams)+
-                " buffered size " + count +
-                " new offset " + offset);
-      return false;
-    }
-  }
-
   /* A class that receives a block and wites to its own disk, meanwhile
    * may copies it to another site. If a throttler is provided,
    * streaming throttling is also supported. 
@@ -2242,13 +2211,13 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
     private boolean finalized;
     private DataInputStream in = null; // from where data are read
     private DataChecksum checksum; // from where chunks of a block can be read
-    private DataOutputStream out = null; // to block file at local disk
+    private OutputStream out = null; // to block file at local disk
     private DataOutputStream checksumOut = null; // to crc file at local disk
-    private DFSBufferedOutputStream bufStream = null;
     private int bytesPerChecksum;
     private int checksumSize;
-    private byte buf[];
-    private byte checksumBuf[];
+    private ByteBuffer buf; // contains one full packet.
+    private int bufRead; //amount of valid data in the buf
+    private int maxPacketReadLen;
     private long offsetInBlock;
     final private String inAddr;
     private String mirrorAddr;
@@ -2272,19 +2241,16 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
-        this.buf = new byte[bytesPerChecksum + checksumSize];
-        this.checksumBuf = new byte[checksumSize];
         //
         // Open local disk out
         //
         streams = data.writeToBlock(block, isRecovery);
         this.finalized = data.isValidBlock(block);
         if (streams != null) {
-          this.bufStream = new DFSBufferedOutputStream(
-                                          streams.dataOut, BUFFER_SIZE);
-          this.out = new DataOutputStream(bufStream);
+          this.out = streams.dataOut;
           this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-                                          streams.checksumOut, BUFFER_SIZE));
+                                                    streams.checksumOut, 
+                                                    SMALL_BUFFER_SIZE));
         }
       } catch(IOException ioe) {
         IOUtils.closeStream(this);
@@ -2351,174 +2317,249 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
       }
     }
     
-    /* receive a chunk: write it to disk & mirror it to another stream */
-    private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) 
-                              throws IOException {
-      if (len <= 0 || len > bytesPerChecksum) {
-        throw new IOException("Got wrong length during writeBlock(" + block
-            + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
-            + " expected <= " + bytesPerChecksum);
-      }
-
-      in.readFully(buf, 0, len);
+    /**
+     * Verify multiple CRC chunks. 
+     */
+    private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
+                               byte[] checksumBuf, int checksumOff ) 
+                               throws IOException {
+      while (len > 0) {
+        int chunkLen = Math.min(len, bytesPerChecksum);
+        
+        checksum.update(dataBuf, dataOff, chunkLen);
 
-      /*
-       * Verification is not included in the initial design. For now, it at
-       * least catches some bugs. Later, we can include this after showing that
-       * it does not affect performance much.
-       */
-      checksum.update(buf, 0, len);
+        if (!checksum.compare(checksumBuf, checksumOff)) {
+          throw new IOException("Unexpected checksum mismatch " + 
+                                "while writing " + block + " from " + inAddr);
+        }
 
-      if (!checksum.compare(checksumBuf, checksumOff)) {
-        throw new IOException("Unexpected checksum mismatch "
-            + "while writing " + block + " from " + inAddr);
+        checksum.reset();
+        dataOff += chunkLen;
+        checksumOff += checksumSize;
+        len -= chunkLen;
       }
+    }
 
-      checksum.reset();
-      offsetInBlock += len;
-
-      // First write to remote node before writing locally.
-      if (mirrorOut != null) {
-        try {
-          mirrorOut.writeInt(len);
-          mirrorOut.write(checksumBuf, checksumOff, checksumSize);
-          mirrorOut.write(buf, 0, len);
-        } catch (IOException ioe) {
-          handleMirrorOutError(ioe);
-        }
+    /**
+     * Makes sure buf.position() is zero without modifying buf.remaining().
+     * It moves the data if position needs to be changed.
+     */
+    private void shiftBufData() {
+      if (bufRead != buf.limit()) {
+        throw new IllegalStateException("bufRead should be same as " +
+                                        "buf.limit()");
       }
-
-      try {
-        if (!finalized) {
-          out.write(buf, 0, len);
-          // Write checksum
-          checksumOut.write(checksumBuf, checksumOff, checksumSize);
-          myMetrics.bytesWritten.inc(len);
+      
+      //shift the remaining data on buf to the front
+      if (buf.position() > 0) {
+        int dataLeft = buf.remaining();
+        if (dataLeft > 0) {
+          byte[] b = buf.array();
+          System.arraycopy(b, buf.position(), b, 0, dataLeft);
         }
-      } catch (IOException iex) {
-        checkDiskError(iex);
-        throw iex;
+        buf.position(0);
+        bufRead = dataLeft;
+        buf.limit(bufRead);
       }
-
-      if (throttler != null) { // throttle I/O
-        throttler.throttle(len + checksumSize + 4);
+    }
+    
+    /**
+     * reads upto toRead byte to buf at buf.limit() and increments the limit.
+     * throws an IOException if read does not succeed.
+     */
+    private int readToBuf(int toRead) throws IOException {
+      if (toRead < 0) {
+        toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
+                 - buf.limit();
+      }
+      
+      int nRead = in.read(buf.array(), buf.limit(), toRead);
+      
+      if (nRead < 0) {
+        throw new EOFException("while trying to read " + toRead + " bytes");
       }
+      bufRead = buf.limit() + nRead;
+      buf.limit(bufRead);
+      return nRead;
     }
-
-    /* 
-     * Receive and process a packet. It contains many chunks.
+    
+    
+    /**
+     * Reads (at least) one packet and returns the packet length.
+     * buf.position() points to the start of the packet and 
+     * buf.limit() point to the end of the packet. There could 
+     * be more data from next packet in buf.<br><br>
+     * 
+     * It tries to read a full packet with single read call.
+     * Consecutinve packets are usually of the same length.
      */
-    private void receivePacket(int packetSize) throws IOException {
-      /* TEMP: Currently this handles both interleaved 
-       * and non-interleaved DATA_CHUNKs in side the packet.
-       * non-interleaved is required for HADOOP-2758 and in future.
-       * iterleaved will be removed once extra buffer copies are removed
-       * in write path (HADOOP-1702).
-       * 
-       * Format of Non-interleaved data packets is described in the 
-       * comment before BlockSender.
+    private int readNextPacket() throws IOException {
+      /* This dances around buf a little bit, mainly to read 
+       * full packet with single read and to accept arbitarary size  
+       * for next packet at the same time.
        */
-      offsetInBlock = in.readLong(); // get offset of packet in block
-      long seqno = in.readLong();    // get seqno
-      boolean lastPacketInBlock = in.readBoolean();
-      int curPacketSize = 0;         
-      LOG.debug("Receiving one packet for block " + block +
-                " of size " + packetSize +
-                " seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " lastPacketInBlock " + lastPacketInBlock);
+      if (buf == null) {
+        /* initialize buffer to the best guess size:
+         * 'chunksPerPacket' calculation here should match the same 
+         * calculation in DFSClient to make the guess accurate.
+         */
+        int chunkSize = bytesPerChecksum + checksumSize;
+        int chunksPerPacket = (writePacketSize - PKT_HEADER_LEN - 
+                               SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
+        buf = ByteBuffer.allocate(PKT_HEADER_LEN + SIZE_OF_INTEGER +
+                                  Math.max(chunksPerPacket, 1) * chunkSize);
+        buf.limit(0);
+      }
+      
+      // See if there is data left in the buffer :
+      if (bufRead > buf.limit()) {
+        buf.limit(bufRead);
+      }
+      
+      while (buf.remaining() < SIZE_OF_INTEGER) {
+        if (buf.position() > 0) {
+          shiftBufData();
+        }
+        readToBuf(-1);
+      }
+      
+      /* We mostly have the full packet or at least enough for an int
+       */
+      buf.mark();
+      int payloadLen = buf.getInt();
+      buf.reset();
+      
+      if (payloadLen == 0) {
+        //end of stream!
+        buf.limit(buf.position() + SIZE_OF_INTEGER);
+        return 0;
+      }
+      
+      // check corrupt values for pktLen, 100MB upper limit should be ok?
+      if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
+        throw new IOException("Incorrect value for packet payload : " +
+                              payloadLen);
+      }
+      
+      int pktSize = payloadLen + PKT_HEADER_LEN;
+      
+      if (buf.remaining() < pktSize) {
+        //we need to read more data
+        int toRead = pktSize - buf.remaining();
+        
+        // first make sure buf has enough space.        
+        int spaceLeft = buf.capacity() - buf.limit();
+        if (toRead > spaceLeft && buf.position() > 0) {
+          shiftBufData();
+          spaceLeft = buf.capacity() - buf.limit();
+        }
+        if (toRead > spaceLeft) {
+          byte oldBuf[] = buf.array();
+          int toCopy = buf.limit();
+          buf = ByteBuffer.allocate(toCopy + toRead);
+          System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
+          buf.limit(toCopy);
+        }
+        
+        //now read:
+        while (toRead > 0) {
+          toRead -= readToBuf(toRead);
+        }
+      }
+      
+      if (buf.remaining() > pktSize) {
+        buf.limit(buf.position() + pktSize);
+      }
+      
+      if (pktSize > maxPacketReadLen) {
+        maxPacketReadLen = pktSize;
+      }
+      
+      return payloadLen;
+    }
+    
+    /** 
+     * Receives and processes a packet. It can contain many chunks.
+     * returns size of the packet.
+     */
+    private int receivePacket() throws IOException {
+      
+      int payloadLen = readNextPacket();
+      
+      if (payloadLen <= 0) {
+        return payloadLen;
+      }
+      
+      buf.mark();
+      //read the header
+      buf.getInt(); // packet length
+      offsetInBlock = buf.getLong(); // get offset of packet in block
+      long seqno = buf.getLong();    // get seqno
+      boolean lastPacketInBlock = (buf.get() != 0);
+      
+      int endOfHeader = buf.position();
+      buf.reset();
+      
+      if (LOG.isDebugEnabled()){
+        LOG.debug("Receiving one packet for block " + block +
+                  " of length " + payloadLen +
+                  " seqno " + seqno +
+                  " offsetInBlock " + offsetInBlock +
+                  " lastPacketInBlock " + lastPacketInBlock);
+      }
+      
       setBlockPosition(offsetInBlock);
       
-      int len = in.readInt();
-      curPacketSize += 4;            // read an integer in previous line
-
-      // send packet header to next datanode in pipeline
+      //First write the packet to the mirror:
       if (mirrorOut != null) {
         try {
-          int mirrorPacketSize = packetSize;
-          if (len > bytesPerChecksum) {
-            /* 
-             * This is a packet with non-interleaved checksum. 
-             * But we are sending interleaving checksums to mirror, 
-             * which changes packet len. Adjust the packet size for mirror.
-             * 
-             * As mentioned above, this is mismatch is 
-             * temporary till HADOOP-1702.
-             */
-            
-            //find out how many chunks are in this patcket :
-            int chunksInPkt = (len + bytesPerChecksum - 1)/bytesPerChecksum;
-            
-            // we send 4 more bytes for for each of the extra 
-            // checksum chunks. so :
-            mirrorPacketSize += (chunksInPkt - 1) * 4;
-          }
-          mirrorOut.writeInt(mirrorPacketSize);
-          mirrorOut.writeLong(offsetInBlock);
-          mirrorOut.writeLong(seqno);
-          mirrorOut.writeBoolean(lastPacketInBlock);
+          mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+          mirrorOut.flush();
         } catch (IOException e) {
           handleMirrorOutError(e);
         }
       }
 
+      buf.position(endOfHeader);        
+      int len = buf.getInt();
+      
+      if (len < 0) {
+        throw new IOException("Got wrong length during writeBlock(" + block + 
+                              ") from " + inAddr + " at offset " + 
+                              offsetInBlock + ": " + len); 
+      } 
+
       if (len == 0) {
-        LOG.info("Receiving empty packet for block " + block);
-        if (mirrorOut != null) {
-          try {
-            mirrorOut.writeInt(len);
-            mirrorOut.flush();
-          } catch (IOException e) {
-            handleMirrorOutError(e);
-          }
-        }
-      }
+        LOG.debug("Receiving empty packet for block " + block);
+      } else {
+        offsetInBlock += len;
 
-      while (len != 0) {
-        int checksumOff = 0;    
-        if (len > 0) {
-          int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum*
-                            checksumSize;
-          if (checksumBuf.length < checksumLen) {
-            checksumBuf = new byte[checksumLen];
-          }
-          // read the checksum
-          in.readFully(checksumBuf, 0, checksumLen);
-        }
-        
-        while (len != 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Receiving one chunk for block " + block +
-                      " of size " + len);
-          }
-          
-          int toRecv = Math.min(len, bytesPerChecksum);
-          
-          curPacketSize += (toRecv + checksumSize);
-          if (curPacketSize > packetSize) {
-            throw new IOException("Packet size for block " + block +
-                                  " too long " + curPacketSize +
-                                  " was expecting " + packetSize);
-          } 
-          
-          receiveChunk(toRecv, checksumBuf, checksumOff);
-          
-          len -= toRecv;
-          checksumOff += checksumSize;       
+        int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
+                                                              checksumSize;
+
+        if ( buf.remaining() != (checksumLen + len)) {
+          throw new IOException("Data remaining in packet does not match " +
+                                "sum of checksumLen and dataLen");
         }
-        
-        if (curPacketSize == packetSize) {
-          if (mirrorOut != null) {
-            try {
-              mirrorOut.flush();
-            } catch (IOException e) {
-              handleMirrorOutError(e);
-            }
+        int checksumOff = buf.position();
+        int dataOff = checksumOff + checksumLen;
+        byte pktBuf[] = buf.array();
+
+        buf.position(buf.limit()); // move to the end of the data.
+
+        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+
+        try {
+          if (!finalized) {
+            //finally write to the disk :
+            out.write(pktBuf, dataOff, len);
+            checksumOut.write(pktBuf, checksumOff, checksumLen);
+            myMetrics.bytesWritten.inc(len);
           }
-          break;
+        } catch (IOException iex) {
+          checkDiskError(iex);
+          throw iex;
         }
-        len = in.readInt();
-        curPacketSize += 4;
       }
 
       /// flush entire packet before sending ack
@@ -2529,6 +2570,12 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                         lastPacketInBlock); 
       }
+      
+      if (throttler != null) { // throttle I/O
+        throttler.throttle(payloadLen);
+      }
+      
+      return payloadLen;
     }
 
     public void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -2562,13 +2609,9 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         }
 
         /* 
-         * Skim packet headers. A response is needed for every packet.
+         * Receive until packet length is zero.
          */
-        int len = in.readInt(); // get packet size
-        while (len != 0) {
-          receivePacket(len);
-          len = in.readInt(); // get packet size
-        }
+        while (receivePacket() > 0) {}
 
         // flush the mirror out
         if (mirrorOut != null) {
@@ -2637,8 +2680,9 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
         }
         return;
       }
-      if (bufStream.samePosition(data, streams, block, offsetInBlock)) {
-        return;
+
+      if (data.getChannelPosition(block, streams) == offsetInBlock) {
+        return;                   // nothing to do 
       }
       if (offsetInBlock % bytesPerChecksum != 0) {
         throw new IOException("setBlockPosition trying to set position to " +

+ 4 - 5
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -101,12 +101,11 @@ public interface FSConstants {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 9:
-   *   While reading data from Datanode, each PACKET can consist
-   *   of non-interleaved data (check for for larger amount of data,
-   *   followed by data).
+   * Version 10:
+   *    DFSClient also sends non-interleaved checksum and data while writing
+   *    to DFS.
    */
-  public static final int DATA_TRANSFER_VERSION = 9;
+  public static final int DATA_TRANSFER_VERSION = 10;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;

+ 2 - 2
src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

@@ -204,7 +204,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);
-    sendOut.writeInt(20);          // size of packet
+    sendOut.writeInt(4);           // size of packet
     sendOut.writeLong(0);          // OffsetInBlock
     sendOut.writeLong(100);        // sequencenumber
     sendOut.writeBoolean(false);   // lastPacketInBlock
@@ -229,7 +229,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);    // checksum size
-    sendOut.writeInt(20);          // size of packet
+    sendOut.writeInt(8);           // size of packet
     sendOut.writeLong(0);          // OffsetInBlock
     sendOut.writeLong(100);        // sequencenumber
     sendOut.writeBoolean(true);    // lastPacketInBlock