瀏覽代碼

HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1541742 13f79535-47bb-0310-9956-ffa450edef68
Daryn Sharp 11 年之前
父節點
當前提交
d55bf175be

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -20,6 +20,8 @@ Release 0.23.10 - UNRELEASED
     HADOOP-9476. Some test cases in TestUserGroupInformation fail if ran after
     testSetLoginUser. (Robert Parker via kihwal)
 
+    HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
+
   BUG FIXES
 
     HADOOP-9757. Har metadata cache can grow without limit (Cristina Abad via daryn)

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -67,6 +67,12 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
     "ipc.server.read.threadpool.size";
   /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
   public static final int     IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
+  /** Number of pending connections that may be queued per socket reader */
+  public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
+      "ipc.server.read.connection-queue.size";
+  /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
+  public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
+      100;
 
   /** How many calls per handler are allowed in the queue. */
   public static final String  IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =

+ 29 - 29
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -225,6 +225,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
+  private int readerPendingConnectionQueue;       // number of connections to queue per read thread
   private Class<? extends Writable> paramClass;   // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
@@ -410,12 +411,14 @@ public abstract class Server {
     }
     
     private class Reader extends Thread {
-      private volatile boolean adding = false;
+      final private BlockingQueue<Connection> pendingConnections;
       private final Selector readSelector;
 
       Reader(String name) throws IOException {
         super(name);
 
+        this.pendingConnections =
+            new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
         this.readSelector = Selector.open();
       }
       
@@ -436,10 +439,14 @@ public abstract class Server {
         while (running) {
           SelectionKey key = null;
           try {
+            // consume as many connections as currently queued to avoid
+            // unbridled acceptance of connections that starves the select
+            int size = pendingConnections.size();
+            for (int i=size; i>0; i--) {
+              Connection conn = pendingConnections.take();
+              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
+            }
             readSelector.select();
-            while (adding) {
-              this.wait(1000);
-            }              
 
             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
             while (iter.hasNext()) {
@@ -463,26 +470,14 @@ public abstract class Server {
       }
 
       /**
-       * This gets reader into the state that waits for the new channel
-       * to be registered with readSelector. If it was waiting in select()
-       * the thread will be woken up, otherwise whenever select() is called
-       * it will return even if there is nothing to read and wait
-       * in while(adding) for finishAdd call
+       * Updating the readSelector while it's being used is not thread-safe,
+       * so the connection must be queued.  The reader will drain the queue
+       * and update its readSelector before performing the next select
        */
-      public void startAdd() {
-        adding = true;
+      public void addConnection(Connection conn) throws InterruptedException {
+        pendingConnections.put(conn);
         readSelector.wakeup();
       }
-      
-      public synchronized SelectionKey registerChannel(SocketChannel channel)
-                                                          throws IOException {
-          return channel.register(readSelector, SelectionKey.OP_READ);
-      }
-
-      public synchronized void finishAdd() {
-        adding = false;
-        this.notify();        
-      }
 
       void shutdown() {
         assert !running;
@@ -621,20 +616,23 @@ public abstract class Server {
         
         Reader reader = getReader();
         try {
-          reader.startAdd();
-          SelectionKey readKey = reader.registerChannel(channel);
-          c = new Connection(readKey, channel, System.currentTimeMillis());
-          readKey.attach(c);
+          c = new Connection(channel, System.currentTimeMillis());
           synchronized (connectionList) {
             connectionList.add(numConnections, c);
             numConnections++;
           }
+          reader.addConnection(c);
           if (LOG.isDebugEnabled())
             LOG.debug("Server connection from " + c.toString() +
                 "; # active connections: " + numConnections +
                 "; # queued calls: " + callQueue.size());          
-        } finally {
-          reader.finishAdd(); 
+        } catch (InterruptedException ie) {
+          if (running) {
+            LOG.info(
+                getName() + ": disconnecting client " + c.getHostAddress() +
+                " due to unexpected interrupt");
+          }
+          closeConnection(c);
         }
       }
     }
@@ -1001,8 +999,7 @@ public abstract class Server {
     
     private boolean useWrap = false;
     
-    public Connection(SelectionKey key, SocketChannel channel, 
-                      long lastContact) {
+    public Connection(SocketChannel channel, long lastContact) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
@@ -1682,6 +1679,9 @@ public abstract class Server {
           CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
           CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
     }
+    this.readerPendingConnectionQueue = conf.getInt(
+        CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
+        CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = 2 * conf.getInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,

+ 144 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -37,6 +37,9 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.SocketFactory;
 
 import org.junit.Test;
@@ -44,6 +47,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.junit.Assume;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -540,6 +544,146 @@ public class TestIPC {
     client.call(new LongWritable(RANDOM.nextLong()),
         addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
   }
+
+  private static class TestServerQueue extends Server {
+    final CountDownLatch firstCallLatch = new CountDownLatch(1);
+    final CountDownLatch callBlockLatch = new CountDownLatch(1);
+    
+    TestServerQueue(int expectedCalls, int readers, int callQ, int handlers,
+        Configuration conf) throws IOException {
+      super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null); 
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      firstCallLatch.countDown();
+      try {
+        callBlockLatch.await();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      return param;
+    }
+  }
+
+  /**
+   * Check that reader queueing works
+   * @throws BrokenBarrierException 
+   * @throws InterruptedException 
+   */
+  @Test(timeout=60000)
+  public void testIpcWithReaderQueuing() throws Exception {
+    // 1 reader, 1 connectionQ slot, 1 callq
+    for (int i=0; i < 10; i++) {
+      checkBlocking(1, 1, 1);
+    }
+    // 4 readers, 5 connectionQ slots, 2 callq
+    for (int i=0; i < 10; i++) {
+      checkBlocking(4, 5, 2);
+    }
+  }
+  
+  // goal is to jam a handler with a connection, fill the callq with
+  // connections, in turn jamming the readers - then flood the server and
+  // ensure that the listener blocks when the reader connection queues fill
+  private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
+    int handlers = 1; // makes it easier
+    
+    final Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ);
+
+    // send in enough clients to block up the handlers, callq, and readers
+    int initialClients = readers + callQ + handlers;
+    // max connections we should ever end up accepting at once
+    int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener
+    // stress it with 2X the max
+    int clients = maxAccept*2;
+    
+    final AtomicInteger failures = new AtomicInteger(0);
+    final CountDownLatch callFinishedLatch = new CountDownLatch(clients);
+
+    // start server
+    final TestServerQueue server =
+        new TestServerQueue(clients, readers, callQ, handlers, conf);
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    // instantiate the threads, will start in batches
+    Thread[] threads = new Thread[clients];
+    for (int i=0; i<clients; i++) {
+      threads[i] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          Client client = new Client(LongWritable.class, conf);
+          try {
+            client.call(new LongWritable(Thread.currentThread().getId()),
+                addr, null, null, 60000, conf);
+          } catch (Throwable e) {
+            LOG.error(e);
+            failures.incrementAndGet();
+            return;
+          } finally {
+            callFinishedLatch.countDown();            
+            client.stop();
+          }
+        }
+      });
+    }
+    
+    // start enough clients to block up the handler, callq, and each reader;
+    // let the calls sequentially slot in to avoid some readers blocking
+    // and others not blocking in the race to fill the callq
+    for (int i=0; i < initialClients; i++) {
+      threads[i].start();
+      if (i==0) {
+        // let first reader block in a call
+        server.firstCallLatch.await();
+      } else if (i <= callQ) {
+        // let subsequent readers jam the callq, will happen immediately 
+        while (server.getCallQueueLen() != i) {
+          Thread.sleep(1);
+        }
+      } // additional threads block the readers trying to add to the callq
+    }
+
+    // wait till everything is slotted, should happen immediately
+    Thread.sleep(10);
+    if (server.getNumOpenConnections() < initialClients) {
+      LOG.info("(initial clients) need:"+initialClients+" connections have:"+server.getNumOpenConnections());
+      Thread.sleep(100);
+    }
+    LOG.info("ipc layer should be blocked");
+    assertEquals(callQ, server.getCallQueueLen());
+    assertEquals(initialClients, server.getNumOpenConnections());
+    
+    // now flood the server with the rest of the connections, the reader's
+    // connection queues should fill and then the listener should block
+    for (int i=initialClients; i<clients; i++) {
+      threads[i].start();
+    }
+    Thread.sleep(10);
+    if (server.getNumOpenConnections() < maxAccept) {
+      LOG.info("(max clients) need:"+maxAccept+" connections have:"+server.getNumOpenConnections());
+      Thread.sleep(100);
+    }
+    // check a few times to make sure we didn't go over
+    for (int i=0; i<4; i++) {
+      assertEquals(maxAccept, server.getNumOpenConnections());
+      Thread.sleep(100);
+    }
+    
+    // sanity check that no calls have finished
+    assertEquals(clients, callFinishedLatch.getCount());
+    LOG.info("releasing the calls");
+    server.callBlockLatch.countDown();
+    callFinishedLatch.await();
+    for (Thread t : threads) {
+      t.join();
+    }
+    assertEquals(0, failures.get());
+    server.stop();
+  }
   
   /**
    * Check that file descriptors aren't leaked by starting