소스 검색

HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1334115 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 년 전
부모
커밋
e63d0b2667

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -476,6 +476,9 @@ Release 2.0.0 - UNRELEASED
     HDFS-3350. In INode, add final to compareTo(..), equals(..) and hashCode(),
     and remove synchronized from updatePermissionStatus(..).  (szetszwo)
 
+    HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout
+    (todd)
+
   BREAKDOWN OF HDFS-1623 SUBTASKS
 
     HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

+ 18 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -83,13 +84,24 @@ class DataXceiver extends Receiver implements Runnable {
   private final DataXceiverServer dataXceiverServer;
 
   private long opStartTime; //the start time of receiving an Op
+  private final SocketInputWrapper socketInputWrapper;
   
-  public DataXceiver(Socket s, DataNode datanode, 
+  public static DataXceiver create(Socket s, DataNode dn,
+      DataXceiverServer dataXceiverServer) throws IOException {
+    
+    SocketInputWrapper iw = NetUtils.getInputStream(s);
+    return new DataXceiver(s, iw, dn, dataXceiverServer);
+  }
+  
+  private DataXceiver(Socket s, 
+      SocketInputWrapper socketInput,
+      DataNode datanode, 
       DataXceiverServer dataXceiverServer) throws IOException {
     super(new DataInputStream(new BufferedInputStream(
-        NetUtils.getInputStream(s), HdfsConstants.SMALL_BUFFER_SIZE)));
+        socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
 
     this.s = s;
+    this.socketInputWrapper = socketInput;
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.dnConf = datanode.getDnConf();
@@ -128,8 +140,6 @@ class DataXceiver extends Receiver implements Runnable {
     Op op = null;
     dataXceiverServer.childSockets.add(s);
     try {
-      int stdTimeout = s.getSoTimeout();
-
       // We process requests in a loop, and stay around for a short timeout.
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
@@ -139,7 +149,9 @@ class DataXceiver extends Receiver implements Runnable {
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            s.setSoTimeout(dnConf.socketKeepaliveTimeout);
+            socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
+          } else {
+            socketInputWrapper.setTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -160,7 +172,7 @@ class DataXceiver extends Receiver implements Runnable {
 
         // restore normal timeout
         if (opsProcessed != 0) {
-          s.setSoTimeout(stdTimeout);
+          s.setSoTimeout(dnConf.socketTimeout);
         }
 
         opStartTime = now();

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -135,6 +135,7 @@ class DataXceiverServer implements Runnable {
       try {
         s = ss.accept();
         s.setTcpNoDelay(true);
+        // Timeouts are set within DataXceiver.run()
 
         // Make sure the xceiver count is not exceeded
         int curXceiverCount = datanode.getXceiverCount();
@@ -144,7 +145,8 @@ class DataXceiverServer implements Runnable {
               + maxXceiverCount);
         }
 
-        new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this))
+        new Daemon(datanode.threadGroup,
+            DataXceiver.create(s, datanode, this))
             .start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run

+ 159 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.junit.Assert.*;
+
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataTransferKeepalive {
+  Configuration conf = new HdfsConfiguration();
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private InetSocketAddress dnAddr;
+  private DataNode dn;
+  private DFSClient dfsClient;
+  private static Path TEST_FILE = new Path("/test");
+  
+  private static final int KEEPALIVE_TIMEOUT = 1000;
+  private static final int WRITE_TIMEOUT = 3000;
+  
+  @Before
+  public void setup() throws Exception {
+    conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+        KEEPALIVE_TIMEOUT);
+    
+    cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(1).build();
+    fs = cluster.getFileSystem();
+    dfsClient = ((DistributedFileSystem)fs).dfs;
+
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    dn = cluster.getDataNodes().get(0);
+    DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
+        dn, poolId);
+    dnAddr = NetUtils.createSocketAddr(dnReg.getXferAddr());
+  }
+  
+  @After
+  public void teardown() {
+    cluster.shutdown();
+  }
+  
+  /**
+   * Regression test for HDFS-3357. Check that the datanode is respecting
+   * its configured keepalive timeout.
+   */
+  @Test(timeout=30000)
+  public void testKeepaliveTimeouts() throws Exception {
+    DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
+
+    // Clients that write aren't currently re-used.
+    assertEquals(0, dfsClient.socketCache.size());
+    assertXceiverCount(0);
+
+    // Reads the file, so we should get a
+    // cached socket, and should have an xceiver on the other side.
+    DFSTestUtil.readFile(fs, TEST_FILE);
+    assertEquals(1, dfsClient.socketCache.size());
+    assertXceiverCount(1);
+
+    // Sleep for a bit longer than the keepalive timeout
+    // and make sure the xceiver died.
+    Thread.sleep(KEEPALIVE_TIMEOUT * 2);
+    assertXceiverCount(0);
+    
+    // The socket is still in the cache, because we don't
+    // notice that it's closed until we try to read
+    // from it again.
+    assertEquals(1, dfsClient.socketCache.size());
+    
+    // Take it out of the cache - reading should
+    // give an EOF.
+    Socket s = dfsClient.socketCache.get(dnAddr);
+    assertNotNull(s);
+    assertEquals(-1, NetUtils.getInputStream(s).read());
+  }
+
+  /**
+   * Test for the case where the client beings to read a long block, but doesn't
+   * read bytes off the stream quickly. The datanode should time out sending the
+   * chunks and the transceiver should die, even if it has a long keepalive.
+   */
+  @Test(timeout=30000)
+  public void testSlowReader() throws Exception {
+    // Restart the DN with a shorter write timeout.
+    DataNodeProperties props = cluster.stopDataNode(0);
+    props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+        WRITE_TIMEOUT);
+    props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+        120000);
+    assertTrue(cluster.restartDataNode(props, true));
+    // Wait for heartbeats to avoid a startup race where we
+    // try to write the block while the DN is still starting.
+    cluster.triggerHeartbeats();
+    
+    dn = cluster.getDataNodes().get(0);
+    
+    DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L);
+    FSDataInputStream stm = fs.open(TEST_FILE);
+    try {
+      stm.read();
+      assertXceiverCount(1);
+
+      Thread.sleep(WRITE_TIMEOUT + 1000);
+      // DN should time out in sendChunks, and this should force
+      // the xceiver to exit.
+      assertXceiverCount(0);
+    } finally {
+      IOUtils.closeStream(stm);
+    }
+  }
+
+  private void assertXceiverCount(int expected) {
+    // Subtract 1, since the DataXceiverServer
+    // counts as one
+    int count = dn.getXceiverCount() - 1;
+    if (count != expected) {
+      ReflectionUtils.printThreadInfo(
+          new PrintWriter(System.err),
+          "Thread dumps");
+      fail("Expected " + expected + " xceivers, found " +
+          count);
+    }
+  }
+}