浏览代码

HADOOP-3164. Reduce DataNode CPU usage by using FileChannel.tranferTo().
On Linux DataNode takes 5 times less CPU while serving data. Results may
vary on other platforms.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@651465 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 年之前
父节点
当前提交
0af1ef912f

+ 4 - 0
CHANGES.txt

@@ -49,6 +49,10 @@ Trunk (unreleased changes)
     HADOOP-1979. Speed up fsck by adding a buffered stream. (Lohit
     Vijaya Renu via omalley)
 
+    HADOOP-3164. Reduce DataNode CPU usage by using FileChannel.tranferTo().
+    On Linux DataNode takes 5 times less CPU while serving data. Results may
+    vary on other platforms.
+
   BUG FIXES
 
     HADOOP-2905. 'fsck -move' triggers NPE in NameNode. 

+ 1 - 1
src/java/org/apache/hadoop/dfs/DataBlockScanner.java

@@ -382,7 +382,7 @@ public class DataBlockScanner implements Runnable {
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());
         
-        blockSender.sendBlock(out, throttler);
+        blockSender.sendBlock(out, null, throttler);
 
         LOG.info((second ? "Second " : "") +
                  "Verification succeeded for " + block);

+ 111 - 40
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -39,6 +40,7 @@ import org.apache.hadoop.dfs.datanode.metrics.DataNodeMetrics;
 import java.io.*;
 import java.net.*;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.*;
@@ -89,6 +91,13 @@ public class DataNode implements FSConstants, Runnable {
     return NetUtils.createSocketAddr(target);
   }
 
+  /**
+   * Minimum buffer used while sending data to clients. Used only if
+   * transferTo() is enabled. 64KB is not that large. It could be larger, but
+   * not sure if there will be much more improvement.
+   */
+  private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+  
   DatanodeProtocol namenode = null;
   FSDatasetInterface data = null;
   DatanodeRegistration dnRegistration = null;
@@ -120,6 +129,7 @@ public class DataNode implements FSConstants, Runnable {
   int defaultBytesPerChecksum = 512;
   private int socketTimeout;
   private int socketWriteTimeout = 0;  
+  private boolean transferToAllowed = true;
   
   private DataBlockScanner blockScanner;
   private Daemon blockScannerThread;
@@ -208,7 +218,10 @@ public class DataNode implements FSConstants, Runnable {
                                       FSConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
                                           FSConstants.WRITE_TIMEOUT);
-    
+    /* Based on results on different platforms, we might need set the default 
+     * to false on some of them. */
+    this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
+                                             true);
     String address = 
       NetUtils.getServerAddress(conf,
                                 "dfs.datanode.bindAddress", 
@@ -1022,8 +1035,9 @@ public class DataNode implements FSConstants, Runnable {
       long length = in.readLong();
 
       // send the block
-      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        NetUtils.getOutputStream(s, socketWriteTimeout), SMALL_BUFFER_SIZE));
+      OutputStream baseStream = NetUtils.getOutputStream(s,socketWriteTimeout);
+      DataOutputStream out = new DataOutputStream(
+                   new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
       
       BlockSender blockSender = null;
       try {
@@ -1036,7 +1050,7 @@ public class DataNode implements FSConstants, Runnable {
         }
 
         out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
-        long read = blockSender.sendBlock(out, null); // send data
+        long read = blockSender.sendBlock(out, baseStream, null); // send data
 
         if (blockSender.isBlockReadFully()) {
           // See if client verification succeeded. 
@@ -1306,9 +1320,10 @@ public class DataNode implements FSConstants, Runnable {
         targetSock.connect(targetAddr, socketTimeout);
         targetSock.setSoTimeout(socketTimeout);
 
-        targetOut = new DataOutputStream(new BufferedOutputStream(
-                      NetUtils.getOutputStream(targetSock, socketWriteTimeout),
-                      SMALL_BUFFER_SIZE));
+        OutputStream baseStream = NetUtils.getOutputStream(targetSock, 
+                                                            socketWriteTimeout);
+        targetOut = new DataOutputStream(
+                       new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
         /* send request to the target */
         // fist write header info
@@ -1318,7 +1333,8 @@ public class DataNode implements FSConstants, Runnable {
         Text.writeString( targetOut, source); // del hint
 
         // then send data
-        long read = blockSender.sendBlock(targetOut, balancingThrottler);
+        long read = blockSender.sendBlock(targetOut, baseStream, 
+                                          balancingThrottler);
 
         myMetrics.bytesRead.inc((int) read);
         myMetrics.blocksRead.inc();
@@ -1567,6 +1583,7 @@ public class DataNode implements FSConstants, Runnable {
   class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
     private InputStream blockIn; // data stream
+    private long blockInPosition = -1; // updated while using transferTo().
     private DataInputStream checksumIn; // checksum datastream
     private DataChecksum checksum; // checksum stream
     private long offset; // starting position to read
@@ -1581,7 +1598,6 @@ public class DataNode implements FSConstants, Runnable {
     private boolean blockReadFully; //set when the whole block is read
     private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
-    private OutputStream out;
     
     static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
                                         8 + /* offset in block */
@@ -1705,8 +1721,14 @@ public class DataNode implements FSConstants, Runnable {
 
     /**
      * Sends upto maxChunks chunks of data.
+     * 
+     * When blockInPosition is >= 0, assumes 'out' is a 
+     * {@link SocketOutputStream} and tries 
+     * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
+     * send data (and updates blockInPosition).
      */
-    private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException {
+    private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
+                           throws IOException {
       // Sends multiple chunks in one packet with a single write().
 
       int len = Math.min((int) (endOffset - offset),
@@ -1750,28 +1772,44 @@ public class DataNode implements FSConstants, Runnable {
       }
       
       int dataOff = checksumOff + checksumLen;
-      IOUtils.readFully(blockIn, buf, dataOff, len);
       
-      if (verifyChecksum) {
-        int dOff = dataOff;
-        int cOff = checksumOff;
-        int dLeft = len;
+      if (blockInPosition >= 0) {
+        //use transferTo(). Checks on out and blockIn are already done. 
         
-        for (int i=0; i<numChunks; i++) {
-          checksum.reset();
-          int dLen = Math.min(dLeft, bytesPerChecksum);
-          checksum.update(buf, dOff, dLen);
-          if (!checksum.compare(buf, cOff)) {
-            throw new ChecksumException("Checksum failed at " + 
-                                        (offset + len - dLeft), len);
+        SocketOutputStream sockOut = (SocketOutputStream)out;
+        //first write the packet
+        sockOut.write(buf, 0, dataOff);
+        // no need to flush. since we know out is not a buffered stream. 
+
+        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
+                                blockInPosition, len);
+        
+        blockInPosition += len;
+      } else {
+        //normal transfer
+        IOUtils.readFully(blockIn, buf, dataOff, len);
+
+        if (verifyChecksum) {
+          int dOff = dataOff;
+          int cOff = checksumOff;
+          int dLeft = len;
+
+          for (int i=0; i<numChunks; i++) {
+            checksum.reset();
+            int dLen = Math.min(dLeft, bytesPerChecksum);
+            checksum.update(buf, dOff, dLen);
+            if (!checksum.compare(buf, cOff)) {
+              throw new ChecksumException("Checksum failed at " + 
+                                          (offset + len - dLeft), len);
+            }
+            dLeft -= dLen;
+            dOff += dLen;
+            cOff += checksumSize;
           }
-          dLeft -= dLen;
-          dOff += dLen;
-          cOff += checksumSize;
         }
-      }
 
-      out.write(buf, 0, dataOff + len);
+        out.write(buf, 0, dataOff + len);
+      }
 
       if (throttler != null) { // rebalancing so throttle
         throttler.throttle(packetLen);
@@ -1785,32 +1823,64 @@ public class DataNode implements FSConstants, Runnable {
      * either a client or to another datanode. 
      * 
      * @param out  stream to which the block is written to
-     * returns total bytes reads, including crc.
+     * @param baseStream optional. if non-null, <code>out</code> is assumed to 
+     *        be a wrapper over this stream. This enables optimizations for
+     *        sending the data, e.g. 
+     *        {@link SocketOutputStream#transferToFully(FileChannel, 
+     *        long, int)}.
+     * @param throttler for sending data.
+     * @return total bytes reads, including crc.
      */
-    long sendBlock(DataOutputStream out, Throttler throttler)
-        throws IOException {
+    long sendBlock(DataOutputStream out, OutputStream baseStream, 
+                   Throttler throttler) throws IOException {
       if( out == null ) {
         throw new IOException( "out stream is null" );
       }
-      this.out = out;
       this.throttler = throttler;
 
       long initialOffset = offset;
       long totalRead = 0;
+      OutputStream streamForSendChunks = out;
+      
       try {
         checksum.writeHeader(out);
         if ( chunkOffsetOK ) {
           out.writeLong( offset );
         }
-        //set up sendBuf:
-        int maxChunksPerPacket = Math.max(1,
-                      (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
-        ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN + 
-                      (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
+        out.flush();
+        
+        int maxChunksPerPacket;
+        int pktSize;
+        
+        if (transferToAllowed && !verifyChecksum && 
+            baseStream instanceof SocketOutputStream && 
+            blockIn instanceof FileInputStream) {
+          
+          FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+          
+          // blockInPosition also indicates sendChunks() uses transferTo.
+          blockInPosition = fileChannel.position();
+          streamForSendChunks = baseStream;
+          
+          // assure a mininum buffer size.
+          maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
+                                         MIN_BUFFER_WITH_TRANSFERTO)
+                                + bytesPerChecksum - 1)/bytesPerChecksum;
+          
+          // allocate smaller buffer while using transferTo(). 
+          pktSize = PKT_HEADER_LEN + checksumSize * maxChunksPerPacket;
+        } else {
+          maxChunksPerPacket = Math.max(1,
+                   (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+          pktSize = PKT_HEADER_LEN + 
+                    (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+        }
 
+        ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
         while (endOffset > offset) {
-          long len = sendChunks(pktBuf, maxChunksPerPacket);
+          long len = sendChunks(pktBuf, maxChunksPerPacket, 
+                                streamForSendChunks);
           offset += len;
           totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
                               checksumSize);
@@ -2606,8 +2676,9 @@ public class DataNode implements FSConstants, Runnable {
 
         long writeTimeout = socketWriteTimeout + 
                             WRITE_TIMEOUT_EXTENSION * (targets.length-1);
-        out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
+        OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
+        out = new DataOutputStream(new BufferedOutputStream(baseStream, 
+                                                            SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 
@@ -2626,7 +2697,7 @@ public class DataNode implements FSConstants, Runnable {
           targets[i].write(out);
         }
         // send data & checksum
-        blockSender.sendBlock(out, null);
+        blockSender.sendBlock(out, baseStream, null);
 
         // no response necessary
         LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);

+ 34 - 11
src/java/org/apache/hadoop/net/SocketIOWithTimeout.java

@@ -159,17 +159,7 @@ abstract class SocketIOWithTimeout {
       } 
 
       if (count == 0) {
-        
-        String waitingFor = ""+ops;
-        if (ops == SelectionKey.OP_READ) {
-          waitingFor = "read";
-        } else if (ops == SelectionKey.OP_WRITE) {
-          waitingFor = "write";
-        }
-        
-        throw new SocketTimeoutException(timeout + " millis timeout while " +
-                                         "waiting for channel to be ready for "
-                                         + waitingFor + ". ch : " + channel);
+        throw new SocketTimeoutException(timeoutExceptionString(ops));
       }
       // otherwise the socket should be ready for io.
     }
@@ -177,6 +167,39 @@ abstract class SocketIOWithTimeout {
     return 0; // does not reach here.
   }
   
+  /**
+   * This is similar to {@link #doIO(ByteBuffer, int)} except that it
+   * does not perform any I/O. It just waits for the channel to be ready
+   * for I/O as specified in ops.
+   * 
+   * @param ops Selection Ops used for waiting
+   * 
+   * @throws SocketTimeoutException 
+   *         if select on the channel times out.
+   * @throws IOException
+   *         if any other I/O error occurs. 
+   */
+  void waitForIO(int ops) throws IOException {
+    
+    if (selector.select(channel, ops, timeout) == 0) {
+      throw new SocketTimeoutException(timeoutExceptionString(ops)); 
+    }
+  }
+  
+  private String timeoutExceptionString(int ops) {
+    
+    String waitingFor = "" + ops;
+    if (ops == SelectionKey.OP_READ) {
+      waitingFor = "read";
+    } else if (ops == SelectionKey.OP_WRITE) {
+      waitingFor = "write";
+    }
+    
+    return timeout + " millis timeout while " +
+           "waiting for channel to be ready for " + 
+           waitingFor + ". ch : " + channel;    
+  }
+  
   /**
    * This maintains a pool of selectors. These selectors are closed
    * once they are idle (unused) for a few seconds.

+ 14 - 0
src/java/org/apache/hadoop/net/SocketInputStream.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.net;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
@@ -148,4 +149,17 @@ public class SocketInputStream extends InputStream
   public int read(ByteBuffer dst) throws IOException {
     return reader.doIO(dst, SelectionKey.OP_READ);
   }
+  
+  /**
+   * waits for the underlying channel to be ready for reading.
+   * The timeout specified for this stream applies to this wait.
+   * 
+   * @throws SocketTimeoutException 
+   *         if select on the channel times out.
+   * @throws IOException
+   *         if any other I/O error occurs. 
+   */
+  public void waitForReadable() throws IOException {
+    reader.waitForIO(SelectionKey.OP_READ);
+  }
 }

+ 73 - 0
src/java/org/apache/hadoop/net/SocketOutputStream.java

@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.net;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SelectableChannel;
@@ -143,4 +145,75 @@ public class SocketOutputStream extends OutputStream
   public int write(ByteBuffer src) throws IOException {
     return writer.doIO(src, SelectionKey.OP_WRITE);
   }
+  
+  /**
+   * waits for the underlying channel to be ready for writing.
+   * The timeout specified for this stream applies to this wait.
+   *
+   * @throws SocketTimeoutException 
+   *         if select on the channel times out.
+   * @throws IOException
+   *         if any other I/O error occurs. 
+   */
+  public void waitForWritable() throws IOException {
+    writer.waitForIO(SelectionKey.OP_WRITE);
+  }
+  
+  /**
+   * Transfers data from FileChannel using 
+   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
+   * 
+   * Similar to readFully(), this waits till requested amount of 
+   * data is transfered.
+   * 
+   * @param fileCh FileChannel to transfer data from.
+   * @param position position within the channel where the transfer begins
+   * @param count number of bytes to transfer.
+   * 
+   * @throws EOFException 
+   *         If end of input file is reached before requested number of 
+   *         bytes are transfered.
+   *
+   * @throws SocketTimeoutException 
+   *         If this channel blocks transfer longer than timeout for 
+   *         this stream.
+   *          
+   * @throws IOException Includes any exception thrown by 
+   *         {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
+   */
+  public void transferToFully(FileChannel fileCh, long position, int count) 
+                              throws IOException {
+    
+    while (count > 0) {
+      /* 
+       * Ideally we should wait after transferTo returns 0. But because of
+       * a bug in JRE on Linux (http://bugs.sun.com/view_bug.do?bug_id=5103988),
+       * which throws an exception instead of returning 0, we wait for the
+       * channel to be writable before writing to it. If you ever see 
+       * IOException with message "Resource temporarily unavailable" 
+       * thrown here, please let us know.
+       * 
+       * Once we move to JAVA SE 7, wait should be moved to correct place.
+       */
+      waitForWritable();
+      int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
+      
+      if (nTransfered == 0) {
+        //check if end of file is reached.
+        if (position >= fileCh.size()) {
+          throw new EOFException("EOF Reached. file size is " + fileCh.size() + 
+                                 " and " + count + " more bytes left to be " +
+                                 "transfered.");
+        }
+        //otherwise assume the socket is full.
+        //waitForWritable(); // see comment above.
+      } else if (nTransfered < 0) {
+        throw new IOException("Unexpected return of " + nTransfered + 
+                              " from transferTo()");
+      } else {
+        position += nTransfered;
+        count -= nTransfered;
+      }
+    }
+  }  
 }

+ 8 - 0
src/test/org/apache/hadoop/dfs/TestPread.java

@@ -161,12 +161,20 @@ public class TestPread extends TestCase {
    * Tests positional read in DFS.
    */
   public void testPreadDFS() throws IOException {
+    dfsPreadTest(false); //normal pread
+    dfsPreadTest(true); //trigger read code path without transferTo.
+  }
+  
+  private void dfsPreadTest(boolean disableTransferTo) throws IOException {
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", 4096);
     conf.setLong("dfs.read.prefetch.size", 4096);
     if (simulatedStorage) {
       conf.setBoolean("dfs.datanode.simulateddatastorage", true);
     }
+    if (disableTransferTo) {
+      conf.setBoolean("dfs.datanode.transferTo.allowed", false);
+    }
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {