ソースを参照

HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1431097 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 12 年 前
コミット
c9db06f2e4
31 ファイル変更1348 行追加780 行削除
  1. 3 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java
  2. 4 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java
  3. 6 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
  4. 12 19
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
  5. 25 44
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  6. 1 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
  7. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  8. 39 54
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  9. 55 74
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
  10. 28 52
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  11. 40 68
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  12. 121 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
  13. 136 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
  14. 125 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
  15. 108 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
  16. 60 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
  17. 156 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
  18. 7 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  19. 9 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  20. 43 49
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  21. 28 32
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
  22. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  23. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  24. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  25. 29 160
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
  26. 9 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
  27. 62 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
  28. 218 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
  29. 0 171
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
  30. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  31. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.net;
 
 import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import java.io.InputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -37,7 +38,8 @@ import java.nio.channels.SelectionKey;
  * IllegalBlockingModeException. 
  * Please use {@link SocketOutputStream} for writing.
  */
-class SocketInputStream extends InputStream
+@InterfaceAudience.LimitedPrivate("HDFS")
+public class SocketInputStream extends InputStream
                                implements ReadableByteChannel {
 
   private Reader reader;

+ 4 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java

@@ -260,4 +260,8 @@ public class SocketOutputStream extends OutputStream
       throws IOException {
     transferToFully(fileCh, position, count, null, null);
   }
+
+  public void setTimeout(int timeoutMs) {
+    writer.setTimeout(timeoutMs);
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt

@@ -0,0 +1,6 @@
+CHANGES for HDFS-347 branch.
+These will be integrated to trunk CHANGES.txt after merge
+
+
+HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
+(Colin Patrick McCabe via todd)

+ 12 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -18,10 +18,8 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.net.Socket;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -43,7 +41,18 @@ public interface BlockReader extends ByteBufferReadable {
    */
   long skip(long n) throws IOException;
 
-  void close() throws IOException;
+  /**
+   * Close the block reader.
+   *
+   * @param peerCache      The PeerCache to put the Peer we're using back
+   *                       into, or null if we should simply close the Peer
+   *                       we're using (along with its Socket).
+   *                       Some block readers, like BlockReaderLocal, may
+   *                       not make use of this parameter.
+   *
+   * @throws IOException
+   */
+  void close(PeerCache peerCache) throws IOException;
 
   /**
    * Read exactly the given amount of data, throwing an exception
@@ -60,20 +69,4 @@ public interface BlockReader extends ByteBufferReadable {
    * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
-
-  /**
-   * Take the socket used to talk to the DN.
-   */
-  Socket takeSocket();
-
-  /**
-   * Whether the BlockReader has reached the end of its input stream
-   * and successfully sent a status code back to the datanode.
-   */
-  boolean hasSentStatusCode();
-
-  /**
-   * @return a reference to the streams this block reader is using.
-   */
-  IOStreamPair getStreams();
 }

+ 25 - 44
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -24,6 +24,8 @@ import java.net.Socket;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -39,72 +41,51 @@ import org.apache.hadoop.security.token.Token;
  */
 @InterfaceAudience.Private
 public class BlockReaderFactory {
-  /**
-   * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
-   */
-  public static BlockReader newBlockReader(
-      Configuration conf,
-      Socket sock, String file,
-      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, DataEncryptionKey encryptionKey)
-          throws IOException {
-    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
-        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
-    return newBlockReader(new Conf(conf),
-        sock, file, block, blockToken, startOffset,
-        len, bufferSize, true, "", encryptionKey, null);
-  }
-
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
    * 
    * @param conf the DFSClient configuration
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param file  File location
    * @param block  The block object
    * @param blockToken  The block token for security
    * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
+   * @param len  The number of bytes to read, or -1 to read as many as
+   *             possible.
    * @param bufferSize  The IO buffer size (not the client buffer size)
+   *                    Ignored except on the legacy BlockReader.
    * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
+   * @param clientName  Client name.  Used for log messages.
+   * @param peer  The peer
+   * @param datanodeID  The datanode that the Peer is connected to
    * @return New BlockReader instance, or null on error.
    */
   @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(
-                                     Conf conf,
-                                     Socket sock, String file,
+                                     Configuration conf,
+                                     String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
+                                     boolean verifyChecksum,
                                      String clientName,
-                                     DataEncryptionKey encryptionKey,
-                                     IOStreamPair ioStreams)
+                                     Peer peer,
+                                     DatanodeID datanodeID)
                                      throws IOException {
-    
-    if (conf.useLegacyBlockReader) {
-      if (encryptionKey != null) {
-        throw new RuntimeException("Encryption is not supported with the legacy block reader.");
-      }
-      return RemoteBlockReader.newBlockReader(
-          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        HdfsServerConstants.READ_TIMEOUT));
+    peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+        DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
+      return RemoteBlockReader.newBlockReader(file,
+          block, blockToken, startOffset, len,
+          conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+              DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
+          verifyChecksum, clientName, peer, datanodeID);
     } else {
-      if (ioStreams == null) {
-        ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
-            NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
-        if (encryptionKey != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  ioStreams.out, ioStreams.in, encryptionKey);
-          ioStreams = encryptedStreams;
-        }
-      }
-      
       return RemoteBlockReader2.newBlockReader(
-          sock, file, block, blockToken, startOffset, len, bufferSize,
-          verifyChecksum, clientName, encryptionKey, ioStreams);
+          file, block, blockToken, startOffset, len,
+          verifyChecksum, clientName, peer, datanodeID);
     }
   }
   

+ 1 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -649,7 +649,7 @@ class BlockReaderLocal implements BlockReader {
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache) throws IOException {
     dataIn.close();
     if (checksumIn != null) {
       checksumIn.close();
@@ -675,19 +675,4 @@ class BlockReaderLocal implements BlockReader {
   public void readFully(byte[] buf, int off, int len) throws IOException {
     BlockReaderUtil.readFully(this, buf, off, len);
   }
-
-  @Override
-  public Socket takeSocket() {
-    return null;
-  }
-
-  @Override
-  public boolean hasSentStatusCode() {
-    return false;
-  }
-
-  @Override
-  public IOStreamPair getStreams() {
-    return null;
-  }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -191,7 +191,7 @@ public class DFSClient implements java.io.Closeable {
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   private final String authority;
-  final SocketCache socketCache;
+  final PeerCache peerCache;
   final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
@@ -432,7 +432,7 @@ public class DFSClient implements java.io.Closeable {
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+    this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**

+ 39 - 54
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -32,12 +32,15 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
+import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
@@ -60,7 +64,7 @@ import org.apache.hadoop.security.token.Token;
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
-  private final SocketCache socketCache;
+  private final PeerCache peerCache;
 
   private final DFSClient dfsClient;
   private boolean closed = false;
@@ -110,7 +114,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.socketCache = dfsClient.socketCache;
+    this.peerCache = dfsClient.peerCache;
     prefetchSize = dfsClient.getConf().prefetchSize;
     timeWindow = dfsClient.getConf().timeWindow;
     nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -424,7 +428,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
 
     // Will be getting a new BlockReader.
     if (blockReader != null) {
-      closeBlockReader(blockReader);
+      blockReader.close(peerCache);
       blockReader = null;
     }
 
@@ -506,7 +510,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     dfsClient.checkOpen();
 
     if (blockReader != null) {
-      closeBlockReader(blockReader);
+      blockReader.close(peerCache);
       blockReader = null;
     }
     super.close();
@@ -833,7 +837,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
         }
       } finally {
         if (reader != null) {
-          closeBlockReader(reader);
+          reader.close(peerCache);
         }
       }
       // Put chosen node into dead list, continue
@@ -841,16 +845,25 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     }
   }
 
-  /**
-   * Close the given BlockReader and cache its socket.
-   */
-  private void closeBlockReader(BlockReader reader) throws IOException {
-    if (reader.hasSentStatusCode()) {
-      IOStreamPair ioStreams = reader.getStreams();
-      Socket oldSock = reader.takeSocket();
-      socketCache.put(oldSock, ioStreams);
+  private Peer newPeer(InetSocketAddress addr) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = dfsClient.socketFactory.createSocket();
+      NetUtils.connect(sock, addr,
+        dfsClient.getRandomLocalInterfaceAddr(),
+        dfsClient.getConf().socketTimeout);
+      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+          dfsClient.getDataEncryptionKey());
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.closeQuietly(peer);
+        IOUtils.closeQuietly(sock);
+      }
     }
-    reader.close();
   }
 
   /**
@@ -896,62 +909,34 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     // Allow retry since there is no way of knowing whether the cached socket
     // is good until we actually use it.
     for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
-      SocketAndStreams sockAndStreams = null;
+      Peer peer = null;
       // Don't use the cache on the last attempt - it's possible that there
       // are arbitrarily many unusable sockets in the cache, but we don't
       // want to fail the read.
       if (retries < nCachedConnRetry) {
-        sockAndStreams = socketCache.get(dnAddr);
+        peer = peerCache.get(chosenNode);
       }
-      Socket sock;
-      if (sockAndStreams == null) {
+      if (peer == null) {
+        peer = newPeer(dnAddr);
         fromCache = false;
-
-        sock = dfsClient.socketFactory.createSocket();
-        
-        // TCP_NODELAY is crucial here because of bad interactions between
-        // Nagle's Algorithm and Delayed ACKs. With connection keepalive
-        // between the client and DN, the conversation looks like:
-        //   1. Client -> DN: Read block X
-        //   2. DN -> Client: data for block X
-        //   3. Client -> DN: Status OK (successful read)
-        //   4. Client -> DN: Read block Y
-        // The fact that step #3 and #4 are both in the client->DN direction
-        // triggers Nagling. If the DN is using delayed ACKs, this results
-        // in a delay of 40ms or more.
-        //
-        // TCP_NODELAY disables nagling and thus avoids this performance
-        // disaster.
-        sock.setTcpNoDelay(true);
-
-        NetUtils.connect(sock, dnAddr,
-            dfsClient.getRandomLocalInterfaceAddr(),
-            dfsClient.getConf().socketTimeout);
-        sock.setSoTimeout(dfsClient.getConf().socketTimeout);
-      } else {
-        sock = sockAndStreams.sock;
       }
 
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
-            BlockReaderFactory.newBlockReader(dfsClient.getConf(),
-                                       sock, file, block,
+            BlockReaderFactory.newBlockReader(dfsClient.conf,
+                                       file, block,
                                        blockToken,
                                        startOffset, len,
-                                       bufferSize, verifyChecksum,
+                                       verifyChecksum,
                                        clientName,
-                                       dfsClient.getDataEncryptionKey(),
-                                       sockAndStreams == null ? null : sockAndStreams.ioStreams);
+                                       peer,
+                                       chosenNode);
         return reader;
       } catch (IOException ex) {
         // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
-        if (sockAndStreams != null) {
-          sockAndStreams.close();
-        } else {
-          sock.close();
-        }
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
+        IOUtils.closeQuietly(peer);
         err = ex;
       }
     }

+ 55 - 74
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java → hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java

@@ -18,69 +18,55 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.io.Closeable;
-import java.net.Socket;
-import java.net.SocketAddress;
-
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
 /**
  * A cache of input stream sockets to Data Node.
  */
-class SocketCache {
-  private static final Log LOG = LogFactory.getLog(SocketCache.class);
-
-  @InterfaceAudience.Private
-  static class SocketAndStreams implements Closeable {
-    public final Socket sock;
-    public final IOStreamPair ioStreams;
-    long createTime;
-    
-    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
-      this.sock = s;
-      this.ioStreams = ioStreams;
-      this.createTime = Time.monotonicNow();
+class PeerCache {
+  private static final Log LOG = LogFactory.getLog(PeerCache.class);
+  
+  private static class Value {
+    private final Peer peer;
+    private final long time;
+
+    Value(Peer peer, long time) {
+      this.peer = peer;
+      this.time = time;
     }
-    
-    @Override
-    public void close() {
-      if (ioStreams != null) { 
-        IOUtils.closeStream(ioStreams.in);
-        IOUtils.closeStream(ioStreams.out);
-      }
-      IOUtils.closeSocket(sock);
+
+    Peer getPeer() {
+      return peer;
     }
 
-    public long getCreateTime() {
-      return this.createTime;
+    long getTime() {
+      return time;
     }
   }
 
   private Daemon daemon;
   /** A map for per user per datanode. */
-  private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+  private static LinkedListMultimap<DatanodeID, Value> multimap =
     LinkedListMultimap.create();
   private static int capacity;
   private static long expiryPeriod;
-  private static SocketCache scInstance = new SocketCache();
+  private static PeerCache instance = new PeerCache();
   private static boolean isInitedOnce = false;
  
-  public static synchronized SocketCache getInstance(int c, long e) {
+  public static synchronized PeerCache getInstance(int c, long e) {
     // capacity is only initialized once
     if (isInitedOnce == false) {
       capacity = c;
@@ -102,7 +88,7 @@ class SocketCache {
       }
     }
 
-    return scInstance;
+    return instance;
   }
 
   private boolean isDaemonStarted() {
@@ -119,44 +105,45 @@ class SocketCache {
       @Override
       public void run() {
         try {
-          SocketCache.this.run();
+          PeerCache.this.run();
         } catch(InterruptedException e) {
           //noop
         } finally {
-          SocketCache.this.clear();
+          PeerCache.this.clear();
         }
       }
 
       @Override
       public String toString() {
-        return String.valueOf(SocketCache.this);
+        return String.valueOf(PeerCache.this);
       }
     });
     daemon.start();
   }
 
   /**
-   * Get a cached socket to the given address.
-   * @param remote  Remote address the socket is connected to.
-   * @return  A socket with unknown state, possibly closed underneath. Or null.
+   * Get a cached peer connected to the given DataNode.
+   * @param dnId         The DataNode to get a Peer for.
+   * @return             An open Peer connected to the DN, or null if none
+   *                     was found. 
    */
-  public synchronized SocketAndStreams get(SocketAddress remote) {
+  public synchronized Peer get(DatanodeID dnId) {
 
     if (capacity <= 0) { // disabled
       return null;
     }
 
-    List<SocketAndStreams> sockStreamList = multimap.get(remote);
+    List<Value> sockStreamList = multimap.get(dnId);
     if (sockStreamList == null) {
       return null;
     }
 
-    Iterator<SocketAndStreams> iter = sockStreamList.iterator();
+    Iterator<Value> iter = sockStreamList.iterator();
     while (iter.hasNext()) {
-      SocketAndStreams candidate = iter.next();
+      Value candidate = iter.next();
       iter.remove();
-      if (!candidate.sock.isClosed()) {
-        return candidate;
+      if (!candidate.getPeer().isClosed()) {
+        return candidate.getPeer();
       }
     }
     return null;
@@ -166,30 +153,22 @@ class SocketCache {
    * Give an unused socket to the cache.
    * @param sock socket not used by anyone.
    */
-  public synchronized void put(Socket sock, IOStreamPair ioStreams) {
-
-    Preconditions.checkNotNull(sock);
-    SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
+  public synchronized void put(DatanodeID dnId, Peer peer) {
+    Preconditions.checkNotNull(dnId);
+    Preconditions.checkNotNull(peer);
+    if (peer.isClosed()) return;
     if (capacity <= 0) {
       // Cache disabled.
-      s.close();
+      IOUtils.cleanup(LOG, peer);
       return;
     }
  
     startExpiryDaemon();
 
-    SocketAddress remoteAddr = sock.getRemoteSocketAddress();
-    if (remoteAddr == null) {
-      LOG.warn("Cannot cache (unconnected) socket with no remote address: " +
-               sock);
-      IOUtils.closeSocket(sock);
-      return;
-    }
-
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, s);
+    multimap.put(dnId, new Value(peer, Time.monotonicNow()));
   }
 
   public synchronized int size() {
@@ -201,18 +180,17 @@ class SocketCache {
    */
   private synchronized void evictExpired(long expiryPeriod) {
     while (multimap.size() != 0) {
-      Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+      Iterator<Entry<DatanodeID, Value>> iter =
         multimap.entries().iterator();
-      Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+      Entry<DatanodeID, Value> entry = iter.next();
       // if oldest socket expired, remove it
       if (entry == null || 
-        Time.monotonicNow() - entry.getValue().getCreateTime() < 
+        Time.monotonicNow() - entry.getValue().getTime() <
         expiryPeriod) {
         break;
       }
+      IOUtils.cleanup(LOG, entry.getValue().getPeer());
       iter.remove();
-      SocketAndStreams s = entry.getValue();
-      s.close();
     }
   }
 
@@ -220,16 +198,18 @@ class SocketCache {
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
-    Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+    // We can get the oldest element immediately, because of an interesting
+    // property of LinkedListMultimap: its iterator traverses entries in the
+    // order that they were added.
+    Iterator<Entry<DatanodeID, Value>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
       throw new IllegalStateException("Cannot evict from empty cache! " +
         "capacity: " + capacity);
     }
-    Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+    Entry<DatanodeID, Value> entry = iter.next();
+    IOUtils.cleanup(LOG, entry.getValue().getPeer());
     iter.remove();
-    SocketAndStreams s = entry.getValue();
-    s.close();
   }
 
   /**
@@ -253,9 +233,10 @@ class SocketCache {
   /**
    * Empty the cache, and close all sockets.
    */
-  private synchronized void clear() {
-    for (SocketAndStreams sockAndStream : multimap.values()) {
-      sockAndStream.close();
+  @VisibleForTesting
+  synchronized void clear() {
+    for (Value value : multimap.values()) {
+      IOUtils.cleanup(LOG, value.getPeer());
     }
     multimap.clear();
   }

+ 28 - 52
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -31,6 +31,8 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -55,8 +57,8 @@ import org.apache.hadoop.util.DataChecksum;
 @InterfaceAudience.Private
 @Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
-
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private final Peer peer;
+  private final DatanodeID datanodeID;
   private final DataInputStream in;
   private DataChecksum checksum;
 
@@ -126,9 +128,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     // if eos was set in the previous read, send a status code to the DN
     if (eos && !eosBefore && nRead >= 0) {
       if (needChecksum()) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
+        sendReadResult(peer, Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, Status.SUCCESS);
+        sendReadResult(peer, Status.SUCCESS);
       }
     }
     return nRead;
@@ -322,7 +324,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   
   private RemoteBlockReader(String file, String bpid, long blockId,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID) {
     // Path is used only for printing block and file information in debug
     super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
           1, verifyChecksum,
@@ -330,7 +333,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
           checksum.getBytesPerChecksum(),
           checksum.getChecksumSize());
     
-    this.dnSock = dnSock;
+    this.peer = peer;
+    this.datanodeID = datanodeID;
     this.in = in;
     this.checksum = checksum;
     this.startOffset = Math.max( startOffset, 0 );
@@ -349,13 +353,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     checksumSize = this.checksum.getChecksumSize();
   }
 
-  public static RemoteBlockReader newBlockReader(Socket sock, String file,
-      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
-        len, bufferSize, true, "");
-  }
-
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
@@ -371,16 +368,17 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
-  public static RemoteBlockReader newBlockReader( Socket sock, String file,
+  public static RemoteBlockReader newBlockReader(String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName, Peer peer,
+                                     DatanodeID datanodeID)
                                      throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
     
     //
@@ -388,12 +386,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     //
 
     DataInputStream in = new DataInputStream(
-        new BufferedInputStream(NetUtils.getInputStream(sock), 
-                                bufferSize));
+        new BufferedInputStream(peer.getInputStream(), bufferSize));
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, sock, block, file);
+    RemoteBlockReader2.checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -411,15 +408,18 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     }
 
     return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+        peer, datanodeID);
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache) throws IOException {
     startOffset = -1;
     checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
+    if (peerCache != null & sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
     }
 
     // in will be closed when its Socket is closed.
@@ -436,37 +436,21 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     return readFully(this, buf, offset, len);
   }
 
-  @Override
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
-
-  @Override
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
   /**
    * When the reader reaches end of the read, it sends a status response
    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
+  void sendReadResult(Peer peer, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
     try {
-      RemoteBlockReader2.writeReadResult(
-          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
-          statusCode);
+      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
+               peer.getRemoteAddressString() + ": " + e.getMessage());
     }
   }
   
@@ -486,12 +470,4 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public int read(ByteBuffer buf) throws IOException {
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
-
-  @Override
-  public IOStreamPair getStreams() {
-    // This class doesn't support encryption, which is the only thing this
-    // method is used for. See HDFS-3637.
-    return null;
-  }
-
 }

+ 40 - 68
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -25,16 +25,16 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -45,10 +45,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -80,9 +81,8 @@ public class RemoteBlockReader2  implements BlockReader {
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
   
-  Socket dnSock;
-  // for now just sending the status code (e.g. checksumOk) after the read.
-  private IOStreamPair ioStreams;
+  final private Peer peer;
+  final private DatanodeID datanodeID;
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
@@ -115,6 +115,11 @@ public class RemoteBlockReader2  implements BlockReader {
   /** Amount of unread data in the current received packet */
   int dataLeft = 0;
   
+  @VisibleForTesting
+  public Peer getPeer() {
+    return peer;
+  }
+  
   @Override
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
@@ -247,13 +252,13 @@ public class RemoteBlockReader2  implements BlockReader {
   }
 
   protected RemoteBlockReader2(String file, String bpid, long blockId,
-      ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
-      IOStreamPair ioStreams) {
+      DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID) {
     // Path is used only for printing block and file information in debug
-    this.dnSock = dnSock;
-    this.ioStreams = ioStreams;
-    this.in = in;
+    this.peer = peer;
+    this.datanodeID = datanodeID;
+    this.in = peer.getInputStreamChannel();
     this.checksum = checksum;
     this.verifyChecksum = verifyChecksum;
     this.startOffset = Math.max( startOffset, 0 );
@@ -270,39 +275,19 @@ public class RemoteBlockReader2  implements BlockReader {
 
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache) throws IOException {
     packetReceiver.close();
-    
     startOffset = -1;
     checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
     }
 
     // in will be closed when its Socket is closed.
   }
   
-  /**
-   * Take the socket used to talk to the DN.
-   */
-  @Override
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
-
-  /**
-   * Whether the BlockReader has reached the end of its input stream
-   * and successfully sent a status code back to the datanode.
-   */
-  @Override
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
   /**
    * When the reader reaches end of the read, it sends a status response
    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
@@ -310,14 +295,14 @@ public class RemoteBlockReader2  implements BlockReader {
    * data correctness.
    */
   void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + dnSock;
+    assert !sentStatusCode : "already sent status code to " + peer;
     try {
-      writeReadResult(ioStreams.out, statusCode);
+      writeReadResult(peer.getOutputStream(), statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               dnSock.getInetAddress() + ": " + e.getMessage());
+               peer.getRemoteAddressString() + ": " + e.getMessage());
     }
   }
 
@@ -368,41 +353,33 @@ public class RemoteBlockReader2  implements BlockReader {
    * @param blockToken  The block token for security
    * @param startOffset  The read offset, relative to block head
    * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
    * @param verifyChecksum  Whether to verify checksum
    * @param clientName  Client name
+   * @param peer  The Peer to use
+   * @param datanodeID  The DatanodeID this peer is connected to
    * @return New BlockReader instance, or null on error.
    */
-  public static BlockReader newBlockReader(Socket sock, String file,
+  public static BlockReader newBlockReader(String file,
                                      ExtendedBlock block,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
+                                     boolean verifyChecksum,
                                      String clientName,
-                                     DataEncryptionKey encryptionKey,
-                                     IOStreamPair ioStreams)
+                                     Peer peer, DatanodeID datanodeID)
                                      throws IOException {
-    
-    ReadableByteChannel ch;
-    if (ioStreams.in instanceof SocketInputWrapper) {
-      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
-    } else {
-      ch = (ReadableByteChannel) ioStreams.in;
-    }
-    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          ioStreams.out));
+          peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
 
     //
     // Get bytes in block
     //
-    DataInputStream in = new DataInputStream(ioStreams.in);
+    DataInputStream in = new DataInputStream(peer.getInputStream());
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    checkSuccess(status, sock, block, file);
+    checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -420,34 +397,29 @@ public class RemoteBlockReader2  implements BlockReader {
     }
 
     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
-        ioStreams);
+        checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
+        datanodeID);
   }
 
   static void checkSuccess(
-      BlockOpResponseProto status, Socket sock,
+      BlockOpResponseProto status, Peer peer,
       ExtendedBlock block, String file)
       throws IOException {
     if (status.getStatus() != Status.SUCCESS) {
       if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
+                + peer.getLocalAddressString() + ", remote="
+                + peer.getRemoteAddressString() + ", for file " + file
                 + ", for pool " + block.getBlockPoolId() + " block " 
                 + block.getBlockId() + "_" + block.getGenerationStamp());
       } else {
         throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
+            + peer.getLocalAddressString() + ", remote="
+            + peer.getRemoteAddressString() + ", for file " + file
             + ", for pool " + block.getBlockPoolId() + " block " 
             + block.getBlockId() + "_" + block.getGenerationStamp());
       }
     }
   }
-
-  @Override
-  public IOStreamPair getStreams() {
-    return ioStreams;
-  }
 }

+ 121 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using a basic Socket
+ * that has no associated Channel.
+ *
+ */
+class BasicInetPeer implements Peer {
+  private final Socket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final boolean isLocal;
+
+  public BasicInetPeer(Socket socket) throws IOException {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    /*
+     * This Socket has no channel, so there's nothing to return here.
+     */
+    return null;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setSoTimeout(timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getReceiveBufferSize();
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    return socket.getTcpNoDelay();
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) {
+   /* 
+    * We can't implement write timeouts. :(
+    * 
+    * Java provides no facility to set a blocking write timeout on a Socket.
+    * You can simulate a blocking write with a timeout by using
+    * non-blocking I/O.  However, we can't use nio here, because this Socket
+    * doesn't have an associated Channel.
+    * 
+    * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for
+    * more details.
+    */
+  }
+
+  @Override
+  public boolean isClosed() {
+    return socket.isClosed();
+  }
+
+  @Override
+  public void close() throws IOException {
+    socket.close();
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return socket.getLocalSocketAddress().toString();
+  }
+  
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+
+  @Override
+  public String toString() {
+    return "BasicInetPeer(" + socket.toString() + ")";
+  }
+}

+ 136 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using an encrypted
+ * communications medium.
+ */
+@InterfaceAudience.Private
+public class EncryptedPeer implements Peer {
+  private final Peer enclosedPeer;
+
+  /**
+   * An encrypted InputStream.
+   */
+  private final InputStream in;
+  
+  /**
+   * An encrypted OutputStream.
+   */
+  private final OutputStream out;
+  
+  /**
+   * An encrypted ReadableByteChannel.
+   */
+  private final ReadableByteChannel channel;
+
+  public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key)
+      throws IOException {
+    this.enclosedPeer = enclosedPeer;
+    IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
+        enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
+    this.in = ios.in;
+    this.out = ios.out;
+    this.channel = ios.in instanceof ReadableByteChannel ? 
+        (ReadableByteChannel)ios.in : null;
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    enclosedPeer.setReadTimeout(timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return enclosedPeer.getReceiveBufferSize();
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    return enclosedPeer.getTcpNoDelay();
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    enclosedPeer.setWriteTimeout(timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return enclosedPeer.isClosed();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      in.close();
+    } finally {
+      try {
+        out.close();
+      } finally {
+        enclosedPeer.close();
+      }
+    }
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return enclosedPeer.getRemoteAddressString();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return enclosedPeer.getLocalAddressString();
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return enclosedPeer.isLocal();
+  }
+
+  @Override
+  public String toString() {
+    return "EncryptedPeer(" + enclosedPeer + ")";
+  }
+}

+ 125 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketOutputStream;
+
+/**
+ * Represents a peer that we communicate with by using non-blocking I/O 
+ * on a Socket.
+ */
+class NioInetPeer implements Peer {
+  private final Socket socket;
+
+  /**
+   * An InputStream which simulates blocking I/O with timeouts using NIO.
+   */
+  private final SocketInputStream in;
+  
+  /**
+   * An OutputStream which simulates blocking I/O with timeouts using NIO.
+   */
+  private final SocketOutputStream out;
+
+  private final boolean isLocal;
+
+  NioInetPeer(Socket socket) throws IOException {
+    this.socket = socket;
+    this.in = new SocketInputStream(socket.getChannel(), 0);
+    this.out = new SocketOutputStream(socket.getChannel(), 0);
+    this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return in;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    in.setTimeout(timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getReceiveBufferSize();
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    return socket.getTcpNoDelay();
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    out.setTimeout(timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return socket.isClosed();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // We always close the outermost streams-- in this case, 'in' and 'out'
+    // Closing either one of these will also close the Socket.
+    try {
+      in.close();
+    } finally {
+      out.close();
+    }
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return socket.getLocalSocketAddress().toString();
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+
+  @Override
+  public String toString() {
+    return "NioInetPeer(" + socket.toString() + ")";
+  }
+}

+ 108 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java

@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Represents a connection to a peer.
+ */
+@InterfaceAudience.Private
+public interface Peer extends Closeable {
+  /**
+   * @return                The input stream channel associated with this
+   *                        peer, or null if it has none.
+   */
+  public ReadableByteChannel getInputStreamChannel();
+
+  /**
+   * Set the read timeout on this peer.
+   *
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setReadTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                The receive buffer size.
+   */
+  public int getReceiveBufferSize() throws IOException;
+
+  /**
+   * @return                True if TCP_NODELAY is turned on.
+   */
+  public boolean getTcpNoDelay() throws IOException;
+
+  /**
+   * Set the write timeout on this peer.
+   *
+   * Note: this is not honored for BasicInetPeer.
+   * See {@link BasicSocketPeer#setWriteTimeout} for details.
+   * 
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setWriteTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                true only if the peer is closed.
+   */
+  public boolean isClosed();
+  
+  /**
+   * Close the peer.
+   *
+   * It's safe to re-close a Peer that is already closed.
+   */
+  public void close() throws IOException;
+
+  /**
+   * @return               A string representing the remote end of our 
+   *                       connection to the peer.
+   */
+  public String getRemoteAddressString();
+
+  /**
+   * @return               A string representing the local end of our 
+   *                       connection to the peer.
+   */
+  public String getLocalAddressString();
+  
+  /**
+   * @return               An InputStream associated with the Peer.
+   *                       This InputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public InputStream getInputStream() throws IOException;
+  
+  /**
+   * @return               An OutputStream associated with the Peer.
+   *                       This OutputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public OutputStream getOutputStream() throws IOException;
+
+  /**
+   * @return               True if the peer resides on the same
+   *                       computer as we.
+   */
+  public boolean isLocal();
+}

+ 60 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+
+@InterfaceAudience.Private
+public interface PeerServer extends Closeable {
+  /**
+   * Set the receive buffer size of the PeerServer.
+   * 
+   * @param size     The receive buffer size.
+   */
+  public void setReceiveBufferSize(int size) throws IOException;
+
+  /**
+   * Listens for a connection to be made to this server and accepts 
+   * it. The method blocks until a connection is made.
+   *
+   * @exception IOException  if an I/O error occurs when waiting for a
+   *               connection.
+   * @exception SecurityException  if a security manager exists and its  
+   *             <code>checkAccept</code> method doesn't allow the operation.
+   * @exception SocketTimeoutException if a timeout was previously set and
+   *             the timeout has been reached.
+   */
+  public Peer accept() throws IOException, SocketTimeoutException;
+
+  /**
+   * @return                 A string representation of the address we're
+   *                         listening on.
+   */
+  public String getListeningString();
+
+  /**
+   * Free the resources associated with this peer server.
+   * This normally includes sockets, etc.
+   *
+   * @throws IOException     If there is an error closing the PeerServer
+   */
+  public void close() throws IOException;
+}

+ 156 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java

@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
+
+@InterfaceAudience.Private
+public class TcpPeerServer implements PeerServer {
+  static Log LOG = LogFactory.getLog(TcpPeerServer.class);
+
+  private final ServerSocket serverSocket;
+
+  public static Peer peerFromSocket(Socket socket)
+      throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    try {
+      // TCP_NODELAY is crucial here because of bad interactions between
+      // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+      // between the client and DN, the conversation looks like:
+      //   1. Client -> DN: Read block X
+      //   2. DN -> Client: data for block X
+      //   3. Client -> DN: Status OK (successful read)
+      //   4. Client -> DN: Read block Y
+      // The fact that step #3 and #4 are both in the client->DN direction
+      // triggers Nagling. If the DN is using delayed ACKs, this results
+      // in a delay of 40ms or more.
+      //
+      // TCP_NODELAY disables nagling and thus avoids this performance
+      // disaster.
+      socket.setTcpNoDelay(true);
+      SocketChannel channel = socket.getChannel();
+      if (channel == null) {
+        peer = new BasicInetPeer(socket);
+      } else {
+        peer = new NioInetPeer(socket);
+      }
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        if (peer != null) peer.close();
+        socket.close();
+      }
+    }
+  }
+
+  public static Peer peerFromSocketAndKey(Socket s,
+        DataEncryptionKey key) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    try {
+      peer = peerFromSocket(s); 
+      if (key != null) {
+        peer = new EncryptedPeer(peer, key);
+      }
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(null, peer);
+      }
+    }
+  }
+
+  /**
+   * Create a non-secure TcpPeerServer.
+   *
+   * @param socketWriteTimeout    The Socket write timeout in ms.
+   * @param bindAddr              The address to bind to.
+   * @throws IOException
+   */
+  public TcpPeerServer(int socketWriteTimeout,
+        InetSocketAddress bindAddr) throws IOException {
+    this.serverSocket = (socketWriteTimeout > 0) ?
+          ServerSocketChannel.open().socket() : new ServerSocket();
+    Server.bind(serverSocket, bindAddr, 0);
+  }
+
+  /**
+   * Create a secure TcpPeerServer.
+   *
+   * @param secureResources   Security resources.
+   */
+  public TcpPeerServer(SecureResources secureResources) {
+    this.serverSocket = secureResources.getStreamingSocket();
+  }
+  
+  /**
+   * @return     the IP address which this TcpPeerServer is listening on.
+   */
+  public InetSocketAddress getStreamingAddr() {
+    return new InetSocketAddress(
+        serverSocket.getInetAddress().getHostAddress(),
+        serverSocket.getLocalPort());
+  }
+
+  @Override
+  public void setReceiveBufferSize(int size) throws IOException {
+    this.serverSocket.setReceiveBufferSize(size);
+  }
+
+  @Override
+  public Peer accept() throws IOException, SocketTimeoutException {
+    Peer peer = peerFromSocket(serverSocket.accept());
+    return peer;
+  }
+
+  @Override
+  public String getListeningString() {
+    return serverSocket.getLocalSocketAddress().toString();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    try {
+      serverSocket.close();
+    } catch(IOException e) {
+      LOG.error("error closing TcpPeerServer: ", e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "TcpPeerServer(" + getListeningString() + ")";
+  }
+}

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -206,9 +208,12 @@ public class JspHelper {
       // Use the block name for file name. 
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
-        conf, s, file,
+        conf, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead, encryptionKey);
+        offsetIntoBlock, amtToRead,  true,
+        "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
+        new DatanodeID(addr.getAddress().toString(),              
+            addr.getHostName(), poolId, addr.getPort(), 0, 0));
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;

+ 9 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -522,24 +523,19 @@ public class DataNode extends Configured
   
   private void initDataXceiver(Configuration conf) throws IOException {
     // find free port or use privileged port provided
-    ServerSocket ss;
-    if (secureResources == null) {
-      InetSocketAddress addr = DataNode.getStreamingAddr(conf);
-      ss = (dnConf.socketWriteTimeout > 0) ? 
-          ServerSocketChannel.open().socket() : new ServerSocket();
-          Server.bind(ss, addr, 0);
+    TcpPeerServer tcpPeerServer;
+    if (secureResources != null) {
+      tcpPeerServer = new TcpPeerServer(secureResources);
     } else {
-      ss = secureResources.getStreamingSocket();
+      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
+          DataNode.getStreamingAddr(conf));
     }
-    ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); 
-
-    streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
-                                     ss.getLocalPort());
-
+    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
     this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(ss, conf, this));
+        new DataXceiverServer(tcpPeerServer, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
   }
   

+ 43 - 49
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -64,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -79,8 +79,7 @@ class DataXceiver extends Receiver implements Runnable {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private final Socket s;
-  private final boolean isLocal; //is a local connection?
+  private final Peer peer;
   private final String remoteAddress; // address of remote side
   private final String localAddress;  // local address of this daemon
   private final DataNode datanode;
@@ -88,7 +87,7 @@ class DataXceiver extends Receiver implements Runnable {
   private final DataXceiverServer dataXceiverServer;
   private final boolean connectToDnViaHostname;
   private long opStartTime; //the start time of receiving an Op
-  private final SocketInputWrapper socketIn;
+  private final InputStream socketIn;
   private OutputStream socketOut;
 
   /**
@@ -97,25 +96,23 @@ class DataXceiver extends Receiver implements Runnable {
    */
   private String previousOpClientName;
   
-  public static DataXceiver create(Socket s, DataNode dn,
+  public static DataXceiver create(Peer peer, DataNode dn,
       DataXceiverServer dataXceiverServer) throws IOException {
-    return new DataXceiver(s, dn, dataXceiverServer);
+    return new DataXceiver(peer, dn, dataXceiverServer);
   }
   
-  private DataXceiver(Socket s, 
-      DataNode datanode, 
+  private DataXceiver(Peer peer, DataNode datanode,
       DataXceiverServer dataXceiverServer) throws IOException {
 
-    this.s = s;
+    this.peer = peer;
     this.dnConf = datanode.getDnConf();
-    this.socketIn = NetUtils.getInputStream(s);
-    this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
-    this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
+    this.socketIn = peer.getInputStream();
+    this.socketOut = peer.getOutputStream();
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
     this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
-    remoteAddress = s.getRemoteSocketAddress().toString();
-    localAddress = s.getLocalSocketAddress().toString();
+    remoteAddress = peer.getRemoteAddressString();
+    localAddress = peer.getLocalAddressString();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Number of active connections is: "
@@ -155,11 +152,10 @@ class DataXceiver extends Receiver implements Runnable {
   public void run() {
     int opsProcessed = 0;
     Op op = null;
-    
-    dataXceiverServer.childSockets.add(s);
-    
+
+    dataXceiverServer.addPeer(peer);
     try {
-      
+      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
       if (dnConf.encryptDataTransfer) {
         IOStreamPair encryptedStreams = null;
@@ -169,8 +165,9 @@ class DataXceiver extends Receiver implements Runnable {
               dnConf.encryptionAlgorithm);
         } catch (InvalidMagicNumberException imne) {
           LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + s.getInetAddress() + ". Perhaps the client is running an " +
-              "older version of Hadoop which does not support encryption");
+              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
+              "is running an older version of Hadoop which does not support " +
+              "encryption");
           return;
         }
         input = encryptedStreams.in;
@@ -189,9 +186,9 @@ class DataXceiver extends Receiver implements Runnable {
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
+            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
           } else {
-            socketIn.setTimeout(dnConf.socketTimeout);
+            peer.setReadTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -202,7 +199,7 @@ class DataXceiver extends Receiver implements Runnable {
           if (opsProcessed > 0 &&
               (err instanceof EOFException || err instanceof ClosedChannelException)) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+              LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
             }
           } else {
             throw err;
@@ -212,13 +209,13 @@ class DataXceiver extends Receiver implements Runnable {
 
         // restore normal timeout
         if (opsProcessed != 0) {
-          s.setSoTimeout(dnConf.socketTimeout);
+          peer.setReadTimeout(dnConf.socketTimeout);
         }
 
         opStartTime = now();
         processOp(op);
         ++opsProcessed;
-      } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+      } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
       LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
                 ((op == null) ? "unknown" : op.name()) + " operation " +
@@ -230,9 +227,8 @@ class DataXceiver extends Receiver implements Runnable {
             + datanode.getXceiverCount());
       }
       updateCurrentThreadName("Cleaning up");
+      dataXceiverServer.closePeer(peer);
       IOUtils.closeStream(in);
-      IOUtils.closeSocket(s);
-      dataXceiverServer.childSockets.remove(s);
     }
   }
 
@@ -286,8 +282,9 @@ class DataXceiver extends Receiver implements Runnable {
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
               HdfsProtoUtil.vintPrefixed(in));
           if (!stat.hasStatus()) {
-            LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
-                     "code after reading. Will close connection.");
+            LOG.warn("Client " + peer.getRemoteAddressString() +
+                " did not send a valid status code after reading. " +
+                "Will close connection.");
             IOUtils.closeStream(out);
           }
         } catch (IOException ioe) {
@@ -320,7 +317,7 @@ class DataXceiver extends Receiver implements Runnable {
 
     //update metrics
     datanode.metrics.addReadBlockOp(elapsed());
-    datanode.metrics.incrReadsFromClient(isLocal);
+    datanode.metrics.incrReadsFromClient(peer.isLocal());
   }
 
   @Override
@@ -358,8 +355,8 @@ class DataXceiver extends Receiver implements Runnable {
       LOG.debug("isDatanode=" + isDatanode
           + ", isClient=" + isClient
           + ", isTransfer=" + isTransfer);
-      LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
-                " tcp no delay " + s.getTcpNoDelay());
+      LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
+                " tcp no delay " + peer.getTcpNoDelay());
     }
 
     // We later mutate block's generation stamp and length, but we need to
@@ -390,8 +387,8 @@ class DataXceiver extends Receiver implements Runnable {
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
         blockReceiver = new BlockReceiver(block, in, 
-            s.getRemoteSocketAddress().toString(),
-            s.getLocalSocketAddress().toString(),
+            peer.getRemoteAddressString(),
+            peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum);
       } else {
@@ -546,7 +543,7 @@ class DataXceiver extends Receiver implements Runnable {
 
     //update metrics
     datanode.metrics.addWriteBlockOp(elapsed());
-    datanode.metrics.incrWritesFromClient(isLocal);
+    datanode.metrics.incrWritesFromClient(peer.isLocal());
   }
 
   @Override
@@ -554,7 +551,7 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets) throws IOException {
-    checkAccess(null, true, blk, blockToken,
+    checkAccess(socketOut, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
     previousOpClientName = clientName;
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
@@ -641,8 +638,9 @@ class DataXceiver extends Receiver implements Runnable {
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      String msg = "Not able to copy block " + block.getBlockId() + " to " 
-      + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
+      String msg = "Not able to copy block " + block.getBlockId() + " " +
+          "to " + peer.getRemoteAddressString() + " because threads " +
+          "quota is exceeded.";
       LOG.info(msg);
       sendResponse(ERROR, msg);
       return;
@@ -671,7 +669,7 @@ class DataXceiver extends Receiver implements Runnable {
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
       
-      LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
+      LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
     } catch (IOException ioe) {
       isOpSuccess = false;
       LOG.info("opCopyBlock " + block + " received exception " + ioe);
@@ -716,8 +714,9 @@ class DataXceiver extends Receiver implements Runnable {
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      String msg = "Not able to receive block " + block.getBlockId() + " from " 
-          + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
+      String msg = "Not able to receive block " + block.getBlockId() +
+          " from " + peer.getRemoteAddressString() + " because threads " +
+          "quota is exceeded.";
       LOG.warn(msg);
       sendResponse(ERROR, msg);
       return;
@@ -794,7 +793,7 @@ class DataXceiver extends Receiver implements Runnable {
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, delHint);
 
-      LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress());
+      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
       
     } catch (IOException ioe) {
       opStatus = ERROR;
@@ -817,7 +816,7 @@ class DataXceiver extends Receiver implements Runnable {
       try {
         sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
-        LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+        LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
       }
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
@@ -871,7 +870,7 @@ class DataXceiver extends Receiver implements Runnable {
   }
   
 
-  private void checkAccess(DataOutputStream out, final boolean reply, 
+  private void checkAccess(OutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final Op op,
@@ -886,11 +885,6 @@ class DataXceiver extends Receiver implements Runnable {
       } catch(InvalidToken e) {
         try {
           if (reply) {
-            if (out == null) {
-              out = new DataOutputStream(
-                  NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
-            }
-            
             BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
               .setStatus(ERROR_ACCESS_TOKEN);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {

+ 28 - 32
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -18,18 +18,16 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.channels.AsynchronousCloseException;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.PeerServer;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -45,11 +43,9 @@ import org.apache.hadoop.util.Daemon;
 class DataXceiverServer implements Runnable {
   public static final Log LOG = DataNode.LOG;
   
-  ServerSocket ss;
-  DataNode datanode;
-  // Record all sockets opened for data transfer
-  Set<Socket> childSockets = Collections.synchronizedSet(
-                                       new HashSet<Socket>());
+  private final PeerServer peerServer;
+  private final DataNode datanode;
+  private final Set<Peer> peers = new HashSet<Peer>();
   
   /**
    * Maximal number of concurrent xceivers per node.
@@ -109,10 +105,10 @@ class DataXceiverServer implements Runnable {
   long estimateBlockSize;
   
   
-  DataXceiverServer(ServerSocket ss, Configuration conf, 
+  DataXceiverServer(PeerServer peerServer, Configuration conf,
       DataNode datanode) {
     
-    this.ss = ss;
+    this.peerServer = peerServer;
     this.datanode = datanode;
     
     this.maxXceiverCount = 
@@ -130,12 +126,10 @@ class DataXceiverServer implements Runnable {
 
   @Override
   public void run() {
+    Peer peer = null;
     while (datanode.shouldRun) {
-      Socket s = null;
       try {
-        s = ss.accept();
-        s.setTcpNoDelay(true);
-        // Timeouts are set within DataXceiver.run()
+        peer = peerServer.accept();
 
         // Make sure the xceiver count is not exceeded
         int curXceiverCount = datanode.getXceiverCount();
@@ -146,7 +140,7 @@ class DataXceiverServer implements Runnable {
         }
 
         new Daemon(datanode.threadGroup,
-            DataXceiver.create(s, datanode, this))
+            DataXceiver.create(peer, datanode, this))
             .start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
@@ -157,10 +151,10 @@ class DataXceiverServer implements Runnable {
           LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
         }
       } catch (IOException ie) {
-        IOUtils.closeSocket(s);
+        IOUtils.cleanup(null, peer);
         LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
       } catch (OutOfMemoryError ie) {
-        IOUtils.closeSocket(s);
+        IOUtils.cleanup(null, peer);
         // DataNode can run out of memory if there is too many transfers.
         // Log the event, Sleep for 30 seconds, other transfers may complete by
         // then.
@@ -176,33 +170,35 @@ class DataXceiverServer implements Runnable {
         datanode.shouldRun = false;
       }
     }
+    synchronized (this) {
+      for (Peer p : peers) {
+        IOUtils.cleanup(LOG, p);
+      }
+    }
     try {
-      ss.close();
+      peerServer.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName()
           + " :DataXceiverServer: close exception", ie);
     }
   }
-  
+
   void kill() {
     assert datanode.shouldRun == false :
       "shoudRun should be set to false before killing";
     try {
-      this.ss.close();
+      this.peerServer.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
     }
+  }
+  
+  synchronized void addPeer(Peer peer) {
+    peers.add(peer);
+  }
 
-    // close all the sockets that were accepted earlier
-    synchronized (childSockets) {
-      for (Iterator<Socket> it = childSockets.iterator();
-           it.hasNext();) {
-        Socket thissock = it.next();
-        try {
-          thissock.close();
-        } catch (IOException e) {
-        }
-      }
-    }
+  synchronized void closePeer(Peer peer) {
+    peers.remove(peer);
+    IOUtils.cleanup(null, peer);
   }
 }

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -559,9 +560,10 @@ public class NamenodeFsck {
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
         blockReader = BlockReaderFactory.newBlockReader(
-            conf, s, file, block, lblock
-            .getBlockToken(), 0, -1,
-            namenode.getRpcServer().getDataEncryptionKey());
+            conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
+            TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
+                getDataEncryptionKey()),
+            chosenNode);
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -31,6 +31,7 @@ import java.util.Random;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -150,12 +151,12 @@ public class BlockReaderTestUtil {
     sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
     return BlockReaderFactory.newBlockReader(
-      new DFSClient.Conf(conf),
-      sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
+      conf,
+      targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,
-      conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
-      true, "", null, null);
+      true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
+      nodes[0]);
   }
 
   /**

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -61,7 +61,7 @@ public class TestClientBlockVerification {
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
+    reader.close(null);
   }
 
   /**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification {
     // We asked the blockreader for the whole file, and only read
     // half of it, so no CHECKSUM_OK
     verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
+    reader.close(null);
   }
 
   /**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification {
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
+    reader.close(null);
   }
 
   /**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification {
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(Status.CHECKSUM_OK);
-        reader.close();
+        reader.close(null);
       }
     }
   }

+ 29 - 160
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java

@@ -18,28 +18,20 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.security.token.Token;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
@@ -55,59 +47,31 @@ public class TestConnCache {
 
   static final int BLOCK_SIZE = 4096;
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
-  final static int CACHE_SIZE = 4;
-  final static long CACHE_EXPIRY_MS = 200;
-  static Configuration conf = null;
-  static MiniDFSCluster cluster = null;
-  static FileSystem fs = null;
-  static SocketCache cache;
-
-  static final Path testFile = new Path("/testConnCache.dat");
-  static byte authenticData[] = null;
-
-  static BlockReaderTestUtil util = null;
-
 
   /**
    * A mock Answer to remember the BlockReader used.
    *
    * It verifies that all invocation to DFSInputStream.getBlockReader()
-   * use the same socket.
+   * use the same peer.
    */
   private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
     public RemoteBlockReader2 reader = null;
-    private Socket sock = null;
+    private Peer peer = null;
 
     @Override
     public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
       RemoteBlockReader2 prevReader = reader;
       reader = (RemoteBlockReader2) invocation.callRealMethod();
-      if (sock == null) {
-        sock = reader.dnSock;
+      if (peer == null) {
+        peer = reader.getPeer();
       } else if (prevReader != null) {
-        assertSame("DFSInputStream should use the same socket",
-                   sock, reader.dnSock);
+        Assert.assertSame("DFSInputStream should use the same peer",
+                   peer, reader.getPeer());
       }
       return reader;
     }
   }
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    final int REPLICATION_FACTOR = 1;
-
-    /* create a socket cache. There is only one socket cache per jvm */
-    cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
-
-    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
-    cluster = util.getCluster();
-    conf = util.getConf();
-    fs = cluster.getFileSystem();
-
-    authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
-  }
-
-
   /**
    * (Optionally) seek to position, read and verify data.
    *
@@ -117,9 +81,10 @@ public class TestConnCache {
                      long pos,
                      byte[] buffer,
                      int offset,
-                     int length)
+                     int length,
+                     byte[] authenticData)
       throws IOException {
-    assertTrue("Test buffer too small", buffer.length >= offset + length);
+    Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
 
     if (pos >= 0)
       in.seek(pos);
@@ -129,7 +94,7 @@ public class TestConnCache {
 
     while (length > 0) {
       int cnt = in.read(buffer, offset, length);
-      assertTrue("Error in read", cnt > 0);
+      Assert.assertTrue("Error in read", cnt > 0);
       offset += cnt;
       length -= cnt;
     }
@@ -144,116 +109,23 @@ public class TestConnCache {
     }
   }
 
-  /**
-   * Test the SocketCache itself.
-   */
-  @Test
-  public void testSocketCache() throws Exception {
-    // Make a client
-    InetSocketAddress nnAddr =
-        new InetSocketAddress("localhost", cluster.getNameNodePort());
-    DFSClient client = new DFSClient(nnAddr, conf);
-
-    // Find out the DN addr
-    LocatedBlock block =
-        client.getNamenode().getBlockLocations(
-            testFile.toString(), 0, FILE_SIZE)
-        .getLocatedBlocks().get(0);
-    DataNode dn = util.getDataNode(block);
-    InetSocketAddress dnAddr = dn.getXferAddress();
-
-
-    // Make some sockets to the DN
-    Socket[] dnSockets = new Socket[CACHE_SIZE];
-    for (int i = 0; i < dnSockets.length; ++i) {
-      dnSockets[i] = client.socketFactory.createSocket(
-          dnAddr.getAddress(), dnAddr.getPort());
-    }
-
-
-    // Insert a socket to the NN
-    Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
-    cache.put(nnSock, null);
-    assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
-    cache.put(nnSock, null);
-
-    // Insert DN socks
-    for (Socket dnSock : dnSockets) {
-      cache.put(dnSock, null);
-    }
-
-    assertEquals("NN socket evicted", null, cache.get(nnAddr));
-    assertTrue("Evicted socket closed", nnSock.isClosed());
- 
-    // Lookup the DN socks
-    for (Socket dnSock : dnSockets) {
-      assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
-      dnSock.close();
-    }
-
-    assertEquals("Cache is empty", 0, cache.size());
-  }
-
-
-  /**
-   * Test the SocketCache expiry.
-   * Verify that socket cache entries expire after the set
-   * expiry time.
-   */
-  @Test
-  public void testSocketCacheExpiry() throws Exception {
-    // Make a client
-    InetSocketAddress nnAddr =
-        new InetSocketAddress("localhost", cluster.getNameNodePort());
-    DFSClient client = new DFSClient(nnAddr, conf);
-
-    // Find out the DN addr
-    LocatedBlock block =
-        client.getNamenode().getBlockLocations(
-            testFile.toString(), 0, FILE_SIZE)
-        .getLocatedBlocks().get(0);
-    DataNode dn = util.getDataNode(block);
-    InetSocketAddress dnAddr = dn.getXferAddress();
-
-
-    // Make some sockets to the DN and put in cache
-    Socket[] dnSockets = new Socket[CACHE_SIZE];
-    for (int i = 0; i < dnSockets.length; ++i) {
-      dnSockets[i] = client.socketFactory.createSocket(
-          dnAddr.getAddress(), dnAddr.getPort());
-      cache.put(dnSockets[i], null);
-    }
-
-    // Client side still has the sockets cached
-    assertEquals(CACHE_SIZE, client.socketCache.size());
-
-    //sleep for a second and see if it expired
-    Thread.sleep(CACHE_EXPIRY_MS + 1000);
-    
-    // Client side has no sockets cached
-    assertEquals(0, client.socketCache.size());
-
-    //sleep for another second and see if 
-    //the daemon thread runs fine on empty cache
-    Thread.sleep(CACHE_EXPIRY_MS + 1000);
-  }
-
-
   /**
    * Read a file served entirely from one DN. Seek around and read from
    * different offsets. And verify that they all use the same socket.
-   *
-   * @throws java.io.IOException
+   * @throws Exception 
    */
   @Test
   @SuppressWarnings("unchecked")
-  public void testReadFromOneDN() throws IOException {
-    LOG.info("Starting testReadFromOneDN()");
+  public void testReadFromOneDN() throws Exception {
+    BlockReaderTestUtil util = new BlockReaderTestUtil(1,
+        new HdfsConfiguration());
+    final Path testFile = new Path("/testConnCache.dat");
+    byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
     DFSClient client = new DFSClient(
-        new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
-    DFSInputStream in = spy(client.open(testFile.toString()));
+        new InetSocketAddress("localhost",
+            util.getCluster().getNameNodePort()), util.getConf());
+    DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
     LOG.info("opened " + testFile.toString());
-
     byte[] dataBuf = new byte[BLOCK_SIZE];
 
     MockGetBlockReader answer = new MockGetBlockReader();
@@ -270,18 +142,15 @@ public class TestConnCache {
                            Matchers.anyString());
 
     // Initial read
-    pread(in, 0, dataBuf, 0, dataBuf.length);
+    pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
     // Read again and verify that the socket is the same
-    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
-    pread(in, 1024, dataBuf, 0, dataBuf.length);
-    pread(in, -1, dataBuf, 0, dataBuf.length);            // No seek; just read
-    pread(in, 64, dataBuf, 0, dataBuf.length / 2);
+    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
+        authenticData);
+    pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
+    // No seek; just read
+    pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
+    pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
 
     in.close();
   }
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    util.shutdown();
-  }
 }

+ 9 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -92,13 +93,13 @@ public class TestDataTransferKeepalive {
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, dfsClient.socketCache.size());
+    assertEquals(0, dfsClient.peerCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, dfsClient.socketCache.size());
+    assertEquals(1, dfsClient.peerCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the keepalive timeout
@@ -109,13 +110,13 @@ public class TestDataTransferKeepalive {
     // The socket is still in the cache, because we don't
     // notice that it's closed until we try to read
     // from it again.
-    assertEquals(1, dfsClient.socketCache.size());
+    assertEquals(1, dfsClient.peerCache.size());
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Socket s = dfsClient.socketCache.get(dnAddr).sock;
-    assertNotNull(s);
-    assertEquals(-1, NetUtils.getInputStream(s).read());
+    Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
+    assertNotNull(peer);
+    assertEquals(-1, peer.getInputStream().read());
   }
 
   /**
@@ -174,14 +175,14 @@ public class TestDataTransferKeepalive {
     }
     
     DFSClient client = ((DistributedFileSystem)fs).dfs;
-    assertEquals(5, client.socketCache.size());
+    assertEquals(5, client.peerCache.size());
     
     // Let all the xceivers timeout
     Thread.sleep(1500);
     assertXceiverCount(0);
 
     // Client side still has the sockets cached
-    assertEquals(5, client.socketCache.size());
+    assertEquals(5, client.peerCache.size());
 
     // Reading should not throw an exception.
     DFSTestUtil.readFile(fs, TEST_FILE);

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * This class tests disabling client connection caching in a single node
+ * mini-cluster.
+ */
+public class TestDisableConnCache {
+  static final Log LOG = LogFactory.getLog(TestDisableConnCache.class);
+
+  static final int BLOCK_SIZE = 4096;
+  static final int FILE_SIZE = 3 * BLOCK_SIZE;
+  
+  /**
+   * Test that the socket cache can be disabled by setting the capacity to
+   * 0. Regression test for HDFS-3365.
+   * @throws Exception 
+   */
+  @Test
+  public void testDisableCache() throws Exception {
+    HdfsConfiguration confWithoutCache = new HdfsConfiguration();
+    // Configure a new instance with no peer caching, ensure that it doesn't
+    // cache anything
+    confWithoutCache.setInt(
+        DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
+    BlockReaderTestUtil util = new BlockReaderTestUtil(1, confWithoutCache);
+    final Path testFile = new Path("/testConnCache.dat");
+    util.writeFile(testFile, FILE_SIZE / 1024);
+    FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf());
+    try {
+      DFSTestUtil.readFile(fsWithoutCache, testFile);
+      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size());
+    } finally {
+      fsWithoutCache.close();
+      util.shutdown();
+    }
+  }
+}

+ 218 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.junit.Test;
+
+public class TestPeerCache {
+  static final Log LOG = LogFactory.getLog(TestPeerCache.class);
+
+  private static final int CAPACITY = 3;
+  private static final int EXPIRY_PERIOD = 20;
+  private static PeerCache cache =
+      PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
+
+  private static class FakePeer implements Peer {
+    private boolean closed = false;
+
+    private DatanodeID dnId;
+
+    public FakePeer(DatanodeID dnId) {
+      this.dnId = dnId;
+    }
+
+    @Override
+    public ReadableByteChannel getInputStreamChannel() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setReadTimeout(int timeoutMs) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getReceiveBufferSize() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean getTcpNoDelay() throws IOException {
+      return false;
+    }
+
+    @Override
+    public void setWriteTimeout(int timeoutMs) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
+  
+    @Override
+    public void close() throws IOException {
+      closed = true;
+    }
+
+    @Override
+    public String getRemoteAddressString() {
+      return dnId.getInfoAddr();
+    }
+
+    @Override
+    public String getLocalAddressString() {
+      return "127.0.0.1:123";
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  
+    @Override
+    public boolean isLocal() {
+      return true;
+    }
+  
+    @Override
+    public String toString() {
+      return "FakePeer(dnId=" + dnId + ")";
+    }
+  }
+
+  @Test
+  public void testAddAndRetrieve() throws Exception {
+    DatanodeID dnId = new DatanodeID("192.168.0.1",
+          "fakehostname", "fake_storage_id",
+          100, 101, 102);
+    FakePeer peer = new FakePeer(dnId);
+    cache.put(dnId, peer);
+    assertTrue(!peer.isClosed());
+    assertEquals(1, cache.size());
+    assertEquals(peer, cache.get(dnId));
+    assertEquals(0, cache.size());
+    cache.clear();
+  }
+
+  @Test
+  public void testExpiry() throws Exception {
+    DatanodeID dnIds[] = new DatanodeID[CAPACITY];
+    FakePeer peers[] = new FakePeer[CAPACITY];
+    for (int i = 0; i < CAPACITY; ++i) {
+      dnIds[i] = new DatanodeID("192.168.0.1",
+          "fakehostname_" + i, "fake_storage_id",
+          100, 101, 102);
+      peers[i] = new FakePeer(dnIds[i]);
+    }
+    for (int i = 0; i < CAPACITY; ++i) {
+      cache.put(dnIds[i], peers[i]);
+    }
+    // Check that the peers are cached
+    assertEquals(CAPACITY, cache.size());
+
+    // Wait for the peers to expire
+    Thread.sleep(EXPIRY_PERIOD * 50);
+    assertEquals(0, cache.size());
+
+    // make sure that the peers were closed when they were expired
+    for (int i = 0; i < CAPACITY; ++i) {
+      assertTrue(peers[i].isClosed());
+    }
+
+    // sleep for another second and see if 
+    // the daemon thread runs fine on empty cache
+    Thread.sleep(EXPIRY_PERIOD * 50);
+    cache.clear();
+  }
+
+  @Test
+  public void testEviction() throws Exception {
+    DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
+    FakePeer peers[] = new FakePeer[CAPACITY + 1];
+    for (int i = 0; i < dnIds.length; ++i) {
+      dnIds[i] = new DatanodeID("192.168.0.1",
+          "fakehostname_" + i, "fake_storage_id_" + i,
+          100, 101, 102);
+      peers[i] = new FakePeer(dnIds[i]);
+    }
+    for (int i = 0; i < CAPACITY; ++i) {
+      cache.put(dnIds[i], peers[i]);
+    }
+    // Check that the peers are cached
+    assertEquals(CAPACITY, cache.size());
+
+    // Add another entry and check that the first entry was evicted
+    cache.put(dnIds[CAPACITY], peers[CAPACITY]);
+    assertEquals(CAPACITY, cache.size());
+    assertSame(null, cache.get(dnIds[0]));
+
+    // Make sure that the other entries are still there
+    for (int i = 1; i < CAPACITY; ++i) {
+      Peer peer = cache.get(dnIds[i]);
+      assertSame(peers[i], peer);
+      assertTrue(!peer.isClosed());
+      peer.close();
+    }
+    assertEquals(1, cache.size());
+    cache.clear();
+  }
+
+  @Test
+  public void testMultiplePeersWithSameDnId() throws Exception {
+    DatanodeID dnId = new DatanodeID("192.168.0.1",
+          "fakehostname", "fake_storage_id",
+          100, 101, 102);
+    HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
+    for (int i = 0; i < CAPACITY; ++i) {
+      FakePeer peer = new FakePeer(dnId);
+      peers.add(peer);
+      cache.put(dnId, peer);
+    }
+    // Check that all of the peers ended up in the cache
+    assertEquals(CAPACITY, cache.size());
+    while (!peers.isEmpty()) {
+      Peer peer = cache.get(dnId);
+      assertTrue(peer != null);
+      assertTrue(!peer.isClosed());
+      peers.remove(peer);
+    }
+    assertEquals(0, cache.size());
+    cache.clear();
+  }
+}

+ 0 - 171
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java

@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.security.token.Token;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * This class tests the client connection caching in a single node
- * mini-cluster.
- */
-public class TestSocketCache {
-  static final Log LOG = LogFactory.getLog(TestSocketCache.class);
-
-  static final int BLOCK_SIZE = 4096;
-  static final int FILE_SIZE = 3 * BLOCK_SIZE;
-  final static int CACHE_SIZE = 4;
-  final static long CACHE_EXPIRY_MS = 200;
-  static Configuration conf = null;
-  static MiniDFSCluster cluster = null;
-  static FileSystem fs = null;
-  static SocketCache cache;
-
-  static final Path testFile = new Path("/testConnCache.dat");
-  static byte authenticData[] = null;
-
-  static BlockReaderTestUtil util = null;
-
-
-  /**
-   * A mock Answer to remember the BlockReader used.
-   *
-   * It verifies that all invocation to DFSInputStream.getBlockReader()
-   * use the same socket.
-   */
-  private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
-    public RemoteBlockReader2 reader = null;
-    private Socket sock = null;
-
-    @Override
-    public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
-      RemoteBlockReader2 prevReader = reader;
-      reader = (RemoteBlockReader2) invocation.callRealMethod();
-      if (sock == null) {
-        sock = reader.dnSock;
-      } else if (prevReader != null) {
-        assertSame("DFSInputStream should use the same socket",
-                   sock, reader.dnSock);
-      }
-      return reader;
-    }
-  }
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    final int REPLICATION_FACTOR = 1;
-
-    HdfsConfiguration confWithoutCache = new HdfsConfiguration();
-    confWithoutCache.setInt(
-        DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
-    util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache);
-    cluster = util.getCluster();
-    conf = util.getConf();
-
-    authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
-  }
-
-
-  /**
-   * (Optionally) seek to position, read and verify data.
-   *
-   * Seek to specified position if pos is non-negative.
-   */
-  private void pread(DFSInputStream in,
-                     long pos,
-                     byte[] buffer,
-                     int offset,
-                     int length)
-      throws IOException {
-    assertTrue("Test buffer too small", buffer.length >= offset + length);
-
-    if (pos >= 0)
-      in.seek(pos);
-
-    LOG.info("Reading from file of size " + in.getFileLength() +
-             " at offset " + in.getPos());
-
-    while (length > 0) {
-      int cnt = in.read(buffer, offset, length);
-      assertTrue("Error in read", cnt > 0);
-      offset += cnt;
-      length -= cnt;
-    }
-
-    // Verify
-    for (int i = 0; i < length; ++i) {
-      byte actual = buffer[i];
-      byte expect = authenticData[(int)pos + i];
-      assertEquals("Read data mismatch at file offset " + (pos + i) +
-                   ". Expects " + expect + "; got " + actual,
-                   actual, expect);
-    }
-  }
-
-  
-  /**
-   * Test that the socket cache can be disabled by setting the capacity to
-   * 0. Regression test for HDFS-3365.
-   */
-  @Test
-  public void testDisableCache() throws IOException {
-    LOG.info("Starting testDisableCache()");
-
-    // Configure a new instance with no caching, ensure that it doesn't
-    // cache anything
-
-    FileSystem fsWithoutCache = FileSystem.newInstance(conf);
-    try {
-      DFSTestUtil.readFile(fsWithoutCache, testFile);
-      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
-    } finally {
-      fsWithoutCache.close();
-    }
-  }
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    util.shutdown();
-  }
-}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -145,8 +146,9 @@ public class TestBlockTokenWithDFS {
       String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
       blockReader = BlockReaderFactory.newBlockReader(
-          conf, s, file, block, 
-          lblock.getBlockToken(), 0, -1, null);
+          conf, file, block, lblock.getBlockToken(), 0, -1,
+          true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
+          nodes[0]);
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -280,8 +281,9 @@ public class TestDataNodeVolumeFailure {
     String file = BlockReaderFactory.getFileName(targetAddr, 
         "test-blockpoolid",
         block.getBlockId());
-    BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
-        .getBlockToken(), 0, -1, null);
+    BlockReaderFactory.newBlockReader(conf, file, block,
+        lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
+        TcpPeerServer.peerFromSocket(s), datanode);
 
     // nothing - if it fails - it will throw and exception
   }