فهرست منبع

HDFS-3591 Backport HDFS-3357 to branch-0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1359209 13f79535-47bb-0310-9956-ffa450edef68
Daryn Sharp 13 سال پیش
والد
کامیت
fa9ed6be32
16فایلهای تغییر یافته به همراه488 افزوده شده و 97 حذف شده
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 23 32
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
  3. 4 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
  4. 5 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java
  5. 88 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java
  6. 87 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
  7. 60 45
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
  8. 0 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
  9. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  10. 4 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  11. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  12. 18 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  13. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
  14. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  15. 159 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
  16. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -20,6 +20,9 @@ Release 0.23.3 - UNRELEASED
 
     HADOOP-8450. Remove src/test/system. (eli)
 
+    HADOOP-8350. Improve NetUtils.getInputStream to return a stream which has
+    a tunable timeout. (todd)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 23 - 32
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -371,53 +371,44 @@ public class NetUtils {
   }
   
   /**
-   * Same as getInputStream(socket, socket.getSoTimeout()).<br><br>
+   * Same as <code>getInputStream(socket, socket.getSoTimeout()).</code>
+   * <br><br>
    * 
-   * From documentation for {@link #getInputStream(Socket, long)}:<br>
-   * Returns InputStream for the socket. If the socket has an associated
-   * SocketChannel then it returns a 
-   * {@link SocketInputStream} with the given timeout. If the socket does not
-   * have a channel, {@link Socket#getInputStream()} is returned. In the later
-   * case, the timeout argument is ignored and the timeout set with 
-   * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
-   *
-   * Any socket created using socket factories returned by {@link NetUtils},
-   * must use this interface instead of {@link Socket#getInputStream()}.
-   *     
    * @see #getInputStream(Socket, long)
-   * 
-   * @param socket
-   * @return InputStream for reading from the socket.
-   * @throws IOException
    */
-  public static InputStream getInputStream(Socket socket) 
+  public static SocketInputWrapper getInputStream(Socket socket) 
                                            throws IOException {
     return getInputStream(socket, socket.getSoTimeout());
   }
-  
+
   /**
-   * Returns InputStream for the socket. If the socket has an associated
-   * SocketChannel then it returns a 
-   * {@link SocketInputStream} with the given timeout. If the socket does not
-   * have a channel, {@link Socket#getInputStream()} is returned. In the later
-   * case, the timeout argument is ignored and the timeout set with 
-   * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
+   * Return a {@link SocketInputWrapper} for the socket and set the given
+   * timeout. If the socket does not have an associated channel, then its socket
+   * timeout will be set to the specified value. Otherwise, a
+   * {@link SocketInputStream} will be created which reads with the configured
+   * timeout.
    * 
-   * Any socket created using socket factories returned by {@link NetUtils},
+   * Any socket created using socket factories returned by {@link #NetUtils},
    * must use this interface instead of {@link Socket#getInputStream()}.
-   *     
+   * 
+   * In general, this should be called only once on each socket: see the note
+   * in {@link SocketInputWrapper#setTimeout(long)} for more information.
+   *
    * @see Socket#getChannel()
    * 
    * @param socket
-   * @param timeout timeout in milliseconds. This may not always apply. zero
-   *        for waiting as long as necessary.
-   * @return InputStream for reading from the socket.
+   * @param timeout timeout in milliseconds. zero for waiting as
+   *                long as necessary.
+   * @return SocketInputWrapper for reading from the socket.
    * @throws IOException
    */
-  public static InputStream getInputStream(Socket socket, long timeout) 
+  public static SocketInputWrapper getInputStream(Socket socket, long timeout) 
                                            throws IOException {
-    return (socket.getChannel() == null) ? 
-          socket.getInputStream() : new SocketInputStream(socket, timeout);
+    InputStream stm = (socket.getChannel() == null) ? 
+          socket.getInputStream() : new SocketInputStream(socket);
+    SocketInputWrapper w = new SocketInputWrapper(socket, stm);
+    w.setTimeout(timeout);
+    return w;
   }
   
   /**

+ 4 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java

@@ -247,6 +247,10 @@ abstract class SocketIOWithTimeout {
                                                               ops)); 
     }
   }
+
+  public void setTimeout(long timeoutMs) {
+    this.timeout = timeoutMs;
+  }
     
   private static String timeoutExceptionString(SelectableChannel channel,
                                                long timeout, int ops) {

+ 5 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java

@@ -28,9 +28,6 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
 /**
  * This implements an input stream that can have a timeout while reading.
  * This sets non-blocking flag on the socket channel.
@@ -40,9 +37,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  * IllegalBlockingModeException. 
  * Please use {@link SocketOutputStream} for writing.
  */
-@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
-@InterfaceStability.Unstable
-public class SocketInputStream extends InputStream
+class SocketInputStream extends InputStream
                                implements ReadableByteChannel {
 
   private Reader reader;
@@ -171,4 +166,8 @@ public class SocketInputStream extends InputStream
   public void waitForReadable() throws IOException {
     reader.waitForIO(SelectionKey.OP_READ);
   }
+
+  public void setTimeout(long timeoutMs) {
+    reader.setTimeout(timeoutMs);
+  }
 }

+ 88 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.io.FilterInputStream;
+
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A wrapper stream around a socket which allows setting of its timeout. If the
+ * socket has a channel, this uses non-blocking IO via the package-private
+ * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by
+ * setting the underlying socket timeout itself.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+@InterfaceStability.Unstable
+public class SocketInputWrapper extends FilterInputStream {
+  private final Socket socket;
+  private final boolean hasChannel;
+
+  SocketInputWrapper(Socket s, InputStream is) {
+    super(is);
+    this.socket = s;
+    this.hasChannel = s.getChannel() != null;
+    if (hasChannel) {
+      Preconditions.checkArgument(is instanceof SocketInputStream,
+          "Expected a SocketInputStream when there is a channel. " +
+          "Got: %s", is);
+    }
+  }
+
+  /**
+   * Set the timeout for reads from this stream.
+   * 
+   * Note: the behavior here can differ subtly depending on whether the
+   * underlying socket has an associated Channel. In particular, if there is no
+   * channel, then this call will affect the socket timeout for <em>all</em>
+   * readers of this socket. If there is a channel, then this call will affect
+   * the timeout only for <em>this</em> stream. As such, it is recommended to
+   * only create one {@link SocketInputWrapper} instance per socket.
+   * 
+   * @param timeoutMs
+   *          the new timeout, 0 for no timeout
+   * @throws SocketException
+   *           if the timeout cannot be set
+   */
+  public void setTimeout(long timeoutMs) throws SocketException {
+    if (hasChannel) {
+      ((SocketInputStream)in).setTimeout(timeoutMs);
+    } else {
+      socket.setSoTimeout((int)timeoutMs);
+    }
+  }
+
+  /**
+   * @return an underlying ReadableByteChannel implementation.
+   * @throws IllegalStateException if this socket does not have a channel
+   */
+  public ReadableByteChannel getReadableByteChannel() {
+    Preconditions.checkState(hasChannel,
+        "Socket %s does not have a channel",
+        this.socket);
+    return (SocketInputStream)in;
+  }
+}

+ 87 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java

@@ -25,11 +25,14 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.AssertionFailedError;
 
@@ -37,7 +40,11 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.NetUtilsTestResolver;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,6 +57,13 @@ public class TestNetUtils {
   private static final int LOCAL_PORT = 8080;
   private static final String LOCAL_PORT_NAME = Integer.toString(LOCAL_PORT);
 
+  /**
+   * Some slop around expected times when making sure timeouts behave
+   * as expected. We assume that they will be accurate to within
+   * this threshold.
+   */
+  static final long TIME_FUDGE_MILLIS = 200;
+
   /**
    * Test that we can't accidentally connect back to the connecting socket due
    * to a quirk in the TCP spec.
@@ -81,6 +95,79 @@ public class TestNetUtils {
     }
   }
   
+  @Test
+  public void testSocketReadTimeoutWithChannel() throws Exception {
+    doSocketReadTimeoutTest(true);
+  }
+  
+  @Test
+  public void testSocketReadTimeoutWithoutChannel() throws Exception {
+    doSocketReadTimeoutTest(false);
+  }
+
+  
+  private void doSocketReadTimeoutTest(boolean withChannel)
+      throws IOException {
+    // Binding a ServerSocket is enough to accept connections.
+    // Rely on the backlog to accept for us.
+    ServerSocket ss = new ServerSocket(0);
+    
+    Socket s;
+    if (withChannel) {
+      s = NetUtils.getDefaultSocketFactory(new Configuration())
+          .createSocket();
+      Assume.assumeNotNull(s.getChannel());
+    } else {
+      s = new Socket();
+      assertNull(s.getChannel());
+    }
+    
+    SocketInputWrapper stm = null;
+    try {
+      NetUtils.connect(s, ss.getLocalSocketAddress(), 1000);
+
+      stm = NetUtils.getInputStream(s, 1000);
+      assertReadTimeout(stm, 1000);
+
+      // Change timeout, make sure it applies.
+      stm.setTimeout(1);
+      assertReadTimeout(stm, 1);
+      
+      // If there is a channel, then setting the socket timeout
+      // should not matter. If there is not a channel, it will
+      // take effect.
+      s.setSoTimeout(1000);
+      if (withChannel) {
+        assertReadTimeout(stm, 1);
+      } else {
+        assertReadTimeout(stm, 1000);        
+      }
+    } finally {
+      IOUtils.closeStream(stm);
+      IOUtils.closeSocket(s);
+      ss.close();
+    }
+  }
+  
+  private void assertReadTimeout(SocketInputWrapper stm, int timeoutMillis)
+      throws IOException {
+    long st = System.nanoTime();
+    try {
+      stm.read();
+      fail("Didn't time out");
+    } catch (SocketTimeoutException ste) {
+      assertTimeSince(st, timeoutMillis);
+    }
+  }
+
+  private void assertTimeSince(long startNanos, int expectedMillis) {
+    long durationNano = System.nanoTime() - startNanos;
+    long millis = TimeUnit.MILLISECONDS.convert(
+        durationNano, TimeUnit.NANOSECONDS);
+    assertTrue("Expected " + expectedMillis + "ms, but took " + millis,
+        Math.abs(millis - expectedMillis) < TIME_FUDGE_MILLIS);
+  }
+  
   /**
    * Test for {
    * @throws UnknownHostException @link NetUtils#getLocalInetAddress(String)

+ 60 - 45
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.net;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.nio.channels.Pipe;
@@ -26,8 +27,13 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 /**
  * This tests timout out from SocketInputStream and
@@ -36,14 +42,17 @@ import junit.framework.TestCase;
  * Normal read and write using these streams are tested by pretty much
  * every DFS unit test.
  */
-public class TestSocketIOWithTimeout extends TestCase {
+public class TestSocketIOWithTimeout {
 
   static Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
   
   private static int TIMEOUT = 1*1000; 
   private static String TEST_STRING = "1234567890";
+
+  private MultithreadedTestUtil.TestContext ctx = new TestContext();
   
-  private void doIO(InputStream in, OutputStream out) throws IOException {
+  private void doIO(InputStream in, OutputStream out,
+      int expectedTimeout) throws IOException {
     /* Keep on writing or reading until we get SocketTimeoutException.
      * It expects this exception to occur within 100 millis of TIMEOUT.
      */
@@ -61,34 +70,15 @@ public class TestSocketIOWithTimeout extends TestCase {
         long diff = System.currentTimeMillis() - start;
         LOG.info("Got SocketTimeoutException as expected after " + 
                  diff + " millis : " + e.getMessage());
-        assertTrue(Math.abs(TIMEOUT - diff) <= 200);
+        assertTrue(Math.abs(expectedTimeout - diff) <=
+          TestNetUtils.TIME_FUDGE_MILLIS);
         break;
       }
     }
   }
   
-  /**
-   * Just reads one byte from the input stream.
-   */
-  static class ReadRunnable implements Runnable {
-    private InputStream in;
-
-    public ReadRunnable(InputStream in) {
-      this.in = in;
-    }
-    public void run() {
-      try {
-        in.read();
-      } catch (IOException e) {
-        LOG.info("Got expection while reading as expected : " + 
-                 e.getMessage());
-        return;
-      }
-      assertTrue(false);
-    }
-  }
-  
-  public void testSocketIOWithTimeout() throws IOException {
+  @Test
+  public void testSocketIOWithTimeout() throws Exception {
     
     // first open pipe:
     Pipe pipe = Pipe.open();
@@ -96,7 +86,7 @@ public class TestSocketIOWithTimeout extends TestCase {
     Pipe.SinkChannel sink = pipe.sink();
     
     try {
-      InputStream in = new SocketInputStream(source, TIMEOUT);
+      final InputStream in = new SocketInputStream(source, TIMEOUT);
       OutputStream out = new SocketOutputStream(sink, TIMEOUT);
       
       byte[] writeBytes = TEST_STRING.getBytes();
@@ -105,37 +95,62 @@ public class TestSocketIOWithTimeout extends TestCase {
       
       out.write(writeBytes);
       out.write(byteWithHighBit);
-      doIO(null, out);
+      doIO(null, out, TIMEOUT);
       
       in.read(readBytes);
       assertTrue(Arrays.equals(writeBytes, readBytes));
       assertEquals(byteWithHighBit & 0xff, in.read());
-      doIO(in, null);
+      doIO(in, null, TIMEOUT);
+      
+      // Change timeout on the read side.
+      ((SocketInputStream)in).setTimeout(TIMEOUT * 2);
+      doIO(in, null, TIMEOUT * 2);
+      
       
       /*
        * Verify that it handles interrupted threads properly.
-       * Use a large timeout and expect the thread to return quickly.
+       * Use a large timeout and expect the thread to return quickly
+       * upon interruption.
        */
-      in = new SocketInputStream(source, 0);
-      Thread thread = new Thread(new ReadRunnable(in));
-      thread.start();
-      
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ignored) {}
-      
+      ((SocketInputStream)in).setTimeout(0);
+      TestingThread thread = new TestingThread(ctx) {
+        @Override
+        public void doWork() throws Exception {
+          try {
+            in.read();
+            fail("Did not fail with interrupt");
+          } catch (InterruptedIOException ste) {
+            LOG.info("Got expection while reading as expected : " + 
+                ste.getMessage());
+          }
+        }
+      };
+      ctx.addThread(thread);
+      ctx.startThreads();
+      // If the thread is interrupted before it calls read()
+      // then it throws ClosedByInterruptException due to
+      // some Java quirk. Waiting for it to call read()
+      // gets it into select(), so we get the expected
+      // InterruptedIOException.
+      Thread.sleep(1000);
       thread.interrupt();
+      ctx.stop();
+
+      //make sure the channels are still open
+      assertTrue(source.isOpen());
+      assertTrue(sink.isOpen());
       
+      // Nevertheless, the output stream is closed, because
+      // a partial write may have succeeded (see comment in
+      // SocketOutputStream#write(byte[]), int, int)
       try {
-        thread.join();
-      } catch (InterruptedException e) {
-        throw new IOException("Unexpected InterruptedException : " + e);
+        out.write(1);
+        fail("Did not throw");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "stream is closed", ioe);
       }
       
-      //make sure the channels are still open
-      assertTrue(source.isOpen());
-      assertTrue(sink.isOpen());
-
       out.close();
       assertFalse(sink.isOpen());
       

+ 0 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java → hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java


+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -47,6 +47,9 @@ Release 0.23.3 - UNRELEASED
     HDFS-3442. Incorrect count for Missing Replicas in FSCK report. (Andrew
     Wang via atm)
 
+    HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout
+    (todd)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 4 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -428,11 +428,8 @@ public class RemoteBlockReader2  implements BlockReader {
     //
     // Get bytes in block, set streams
     //
-    Preconditions.checkArgument(sock.getChannel() != null,
-        "Socket %s does not have an associated Channel.",
-        sock);
-    SocketInputStream sin =
-      (SocketInputStream)NetUtils.getInputStream(sock);
+    SocketInputWrapper sin = NetUtils.getInputStream(sock);
+    ReadableByteChannel ch = sin.getReadableByteChannel();
     DataInputStream in = new DataInputStream(sin);
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
@@ -455,7 +452,7 @@ public class RemoteBlockReader2  implements BlockReader {
     }
 
     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
 
   static void checkSuccess(

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -106,6 +106,24 @@ class BPOfferService implements Runnable {
     this.dnConf = dn.getDnConf();
   }
 
+  /**
+   * Run an immediate heartbeat from all actors. Used by tests.
+   */
+  @VisibleForTesting
+  void triggerHeartbeatForTests() throws IOException {
+    synchronized(receivedBlockList) {
+      lastHeartbeat = 0;
+      receivedBlockList.notifyAll();
+      while (lastHeartbeat == 0) {
+        try {
+          receivedBlockList.wait(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+  }
+  
   /**
    * returns true if BP thread has completed initialization of storage
    * and has registered with the corresponding namenode

+ 18 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -83,13 +84,24 @@ class DataXceiver extends Receiver implements Runnable {
   private final DataXceiverServer dataXceiverServer;
 
   private long opStartTime; //the start time of receiving an Op
+  private final SocketInputWrapper socketInputWrapper;
   
-  public DataXceiver(Socket s, DataNode datanode, 
+  public static DataXceiver create(Socket s, DataNode dn,
+      DataXceiverServer dataXceiverServer) throws IOException {
+    
+    SocketInputWrapper iw = NetUtils.getInputStream(s);
+    return new DataXceiver(s, iw, dn, dataXceiverServer);
+  }
+  
+  private DataXceiver(Socket s, 
+      SocketInputWrapper socketInput,
+      DataNode datanode, 
       DataXceiverServer dataXceiverServer) throws IOException {
     super(new DataInputStream(new BufferedInputStream(
-        NetUtils.getInputStream(s), HdfsConstants.SMALL_BUFFER_SIZE)));
+        socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
 
     this.s = s;
+    this.socketInputWrapper = socketInput;
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.dnConf = datanode.getDnConf();
@@ -128,8 +140,6 @@ class DataXceiver extends Receiver implements Runnable {
     Op op = null;
     dataXceiverServer.childSockets.add(s);
     try {
-      int stdTimeout = s.getSoTimeout();
-
       // We process requests in a loop, and stay around for a short timeout.
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
@@ -139,7 +149,9 @@ class DataXceiver extends Receiver implements Runnable {
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            s.setSoTimeout(dnConf.socketKeepaliveTimeout);
+            socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
+          } else {
+            socketInputWrapper.setTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -160,7 +172,7 @@ class DataXceiver extends Receiver implements Runnable {
 
         // restore normal timeout
         if (opsProcessed != 0) {
-          s.setSoTimeout(stdTimeout);
+          s.setSoTimeout(dnConf.socketTimeout);
         }
 
         opStartTime = now();

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -135,6 +135,7 @@ class DataXceiverServer implements Runnable {
       try {
         s = ss.accept();
         s.setTcpNoDelay(true);
+        // Timeouts are set within DataXceiver.run()
 
         // Make sure the xceiver count is not exceeded
         int curXceiverCount = datanode.getXceiverCount();
@@ -144,7 +145,8 @@ class DataXceiverServer implements Runnable {
               + maxXceiverCount);
         }
 
-        new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this))
+        new Daemon(datanode.threadGroup,
+            DataXceiver.create(s, datanode, this))
             .start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1495,6 +1495,13 @@ public class MiniDFSCluster {
     return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
   }
 
+  public void triggerHeartbeats()
+  throws IOException {
+    for (DataNode dn : getDataNodes()) {
+      DataNodeTestUtils.triggerHeartbeat(dn);
+    }
+  }
+  
   /** Wait until the given namenode gets registration from all the datanodes */
   public void waitActive(int nnIndex) throws IOException {
     if (nameNodes.length == 0 || nameNodes[nnIndex] == null) {

+ 159 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.junit.Assert.*;
+
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataTransferKeepalive {
+  Configuration conf = new HdfsConfiguration();
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private InetSocketAddress dnAddr;
+  private DataNode dn;
+  private DFSClient dfsClient;
+  private static Path TEST_FILE = new Path("/test");
+  
+  private static final int KEEPALIVE_TIMEOUT = 1000;
+  private static final int WRITE_TIMEOUT = 3000;
+  
+  @Before
+  public void setup() throws Exception {
+    conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+        KEEPALIVE_TIMEOUT);
+    
+    cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(1).build();
+    fs = cluster.getFileSystem();
+    dfsClient = ((DistributedFileSystem)fs).dfs;
+
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    dn = cluster.getDataNodes().get(0);
+    DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
+        dn, poolId);
+    dnAddr = NetUtils.createSocketAddr(dnReg.getName());
+  }
+  
+  @After
+  public void teardown() {
+    cluster.shutdown();
+  }
+  
+  /**
+   * Regression test for HDFS-3357. Check that the datanode is respecting
+   * its configured keepalive timeout.
+   */
+  @Test(timeout=30000)
+  public void testKeepaliveTimeouts() throws Exception {
+    DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
+
+    // Clients that write aren't currently re-used.
+    assertEquals(0, dfsClient.socketCache.size());
+    assertXceiverCount(0);
+
+    // Reads the file, so we should get a
+    // cached socket, and should have an xceiver on the other side.
+    DFSTestUtil.readFile(fs, TEST_FILE);
+    assertEquals(1, dfsClient.socketCache.size());
+    assertXceiverCount(1);
+
+    // Sleep for a bit longer than the keepalive timeout
+    // and make sure the xceiver died.
+    Thread.sleep(KEEPALIVE_TIMEOUT * 2);
+    assertXceiverCount(0);
+    
+    // The socket is still in the cache, because we don't
+    // notice that it's closed until we try to read
+    // from it again.
+    assertEquals(1, dfsClient.socketCache.size());
+    
+    // Take it out of the cache - reading should
+    // give an EOF.
+    Socket s = dfsClient.socketCache.get(dnAddr);
+    assertNotNull(s);
+    assertEquals(-1, NetUtils.getInputStream(s).read());
+  }
+
+  /**
+   * Test for the case where the client beings to read a long block, but doesn't
+   * read bytes off the stream quickly. The datanode should time out sending the
+   * chunks and the transceiver should die, even if it has a long keepalive.
+   */
+  @Test(timeout=30000)
+  public void testSlowReader() throws Exception {
+    // Restart the DN with a shorter write timeout.
+    DataNodeProperties props = cluster.stopDataNode(0);
+    props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+        WRITE_TIMEOUT);
+    props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+        120000);
+    assertTrue(cluster.restartDataNode(props, true));
+    // Wait for heartbeats to avoid a startup race where we
+    // try to write the block while the DN is still starting.
+    cluster.triggerHeartbeats();
+    
+    dn = cluster.getDataNodes().get(0);
+    
+    DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L);
+    FSDataInputStream stm = fs.open(TEST_FILE);
+    try {
+      stm.read();
+      assertXceiverCount(1);
+
+      Thread.sleep(WRITE_TIMEOUT + 1000);
+      // DN should time out in sendChunks, and this should force
+      // the xceiver to exit.
+      assertXceiverCount(0);
+    } finally {
+      IOUtils.closeStream(stm);
+    }
+  }
+
+  private void assertXceiverCount(int expected) {
+    // Subtract 1, since the DataXceiverServer
+    // counts as one
+    int count = dn.getXceiverCount() - 1;
+    if (count != expected) {
+      ReflectionUtils.printThreadInfo(
+          new PrintWriter(System.err),
+          "Thread dumps");
+      fail("Expected " + expected + " xceivers, found " +
+          count);
+    }
+  }
+}

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -36,6 +36,12 @@ public class DataNodeTestUtils {
     return dn.getDNRegistrationByMachineName(mName);
   }
   
+  public static void triggerHeartbeat(DataNode dn) throws IOException {
+    for (BPOfferService bpos : dn.getAllBpOs()) {
+      bpos.triggerHeartbeatForTests();
+    } 
+  }
+  
   public static DatanodeRegistration 
   getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
     return dn.getDNRegistrationForBP(bpid);