Browse Source

HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1542111 13f79535-47bb-0310-9956-ffa450edef68
Daryn Sharp 11 years ago
parent
commit
1da81363fa

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -2313,6 +2313,8 @@ Release 0.23.10 - UNRELEASED
 
     HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
 
+    HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)
+
   BUG FIXES
 
 Release 0.23.9 - 2013-07-08

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -226,4 +226,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String  IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 
+  /** How often the server scans for idle connections */
+  public static final String IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY =
+      "ipc.client.connection.idle-scan-interval.ms";
+  /** Default value for IPC_SERVER_CONNECTION_IDLE_SCAN_INTERVAL_KEY */
+  public static final int IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT =
+      10000;
 }

+ 166 - 124
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -51,11 +51,13 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
@@ -347,16 +349,6 @@ public abstract class Server {
   private int readThreads;                        // number of read threads
   private int readerPendingConnectionQueue;         // number of connections to queue per read thread
   private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
-  private int maxIdleTime;                        // the maximum idle time after 
-                                                  // which a client may be disconnected
-  private int thresholdIdleConnections;           // the number of idle connections
-                                                  // after which we will start
-                                                  // cleaning up idle 
-                                                  // connections
-  int maxConnectionsToNuke;                       // the max number of 
-                                                  // connections to nuke
-                                                  //during a cleanup
-  
   protected RpcMetrics rpcMetrics;
   protected RpcDetailedMetrics rpcDetailedMetrics;
   
@@ -374,13 +366,10 @@ public abstract class Server {
   volatile private boolean running = true;         // true while server runs
   private BlockingQueue<Call> callQueue; // queued calls
 
-  private List<Connection> connectionList = 
-    Collections.synchronizedList(new LinkedList<Connection>());
-  //maintain a list
-  //of client connections
+  // maintains the set of client connections and handles idle timeouts
+  private ConnectionManager connectionManager;
   private Listener listener = null;
   private Responder responder = null;
-  private int numConnections = 0;
   private Handler[] handlers = null;
 
   /**
@@ -450,8 +439,8 @@ public abstract class Server {
   }
 
   @VisibleForTesting
-  List<Connection> getConnections() {
-    return connectionList;
+  Connection[] getConnections() {
+    return connectionManager.toArray();
   }
 
   /**
@@ -519,11 +508,6 @@ public abstract class Server {
     private Reader[] readers = null;
     private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
-    private Random rand = new Random();
-    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
-                                         //-tion (for idle connections) ran
-    private long cleanupInterval = 10000; //the minimum interval between 
-                                          //two cleanup runs
     private int backlogLength = conf.getInt(
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
@@ -633,58 +617,12 @@ public abstract class Server {
         }
       }
     }
-    /** cleanup connections from connectionList. Choose a random range
-     * to scan and also have a limit on the number of the connections
-     * that will be cleanedup per run. The criteria for cleanup is the time
-     * for which the connection was idle. If 'force' is true then all 
-     * connections will be looked at for the cleanup.
-     */
-    private void cleanupConnections(boolean force) {
-      if (force || numConnections > thresholdIdleConnections) {
-        long currentTime = Time.now();
-        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
-          return;
-        }
-        int start = 0;
-        int end = numConnections - 1;
-        if (!force) {
-          start = rand.nextInt() % numConnections;
-          end = rand.nextInt() % numConnections;
-          int temp;
-          if (end < start) {
-            temp = start;
-            start = end;
-            end = temp;
-          }
-        }
-        int i = start;
-        int numNuked = 0;
-        while (i <= end) {
-          Connection c;
-          synchronized (connectionList) {
-            try {
-              c = connectionList.get(i);
-            } catch (Exception e) {return;}
-          }
-          if (c.timedOut(currentTime)) {
-            if (LOG.isDebugEnabled())
-              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
-            closeConnection(c);
-            numNuked++;
-            end--;
-            c = null;
-            if (!force && numNuked == maxConnectionsToNuke) break;
-          }
-          else i++;
-        }
-        lastCleanupRunTime = Time.now();
-      }
-    }
 
     @Override
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
+      connectionManager.startIdleScan();
       while (running) {
         SelectionKey key = null;
         try {
@@ -708,12 +646,11 @@ public abstract class Server {
           // some thread(s) a chance to finish
           LOG.warn("Out of Memory in server select", e);
           closeCurrentConnection(key, e);
-          cleanupConnections(true);
+          connectionManager.closeIdle(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
-        cleanupConnections(false);
       }
       LOG.info("Stopping " + this.getName());
 
@@ -726,10 +663,9 @@ public abstract class Server {
         selector= null;
         acceptChannel= null;
         
-        // clean up all connections
-        while (!connectionList.isEmpty()) {
-          closeConnection(connectionList.remove(0));
-        }
+        // close all connections
+        connectionManager.stopIdleScan();
+        connectionManager.closeAll();
       }
     }
 
@@ -737,8 +673,6 @@ public abstract class Server {
       if (key != null) {
         Connection c = (Connection)key.attachment();
         if (c != null) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
           closeConnection(c);
           c = null;
         }
@@ -749,8 +683,7 @@ public abstract class Server {
       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
     }
     
-    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
-      Connection c = null;
+    void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
       SocketChannel channel;
       while ((channel = server.accept()) != null) {
@@ -760,25 +693,9 @@ public abstract class Server {
         channel.socket().setKeepAlive(true);
         
         Reader reader = getReader();
-        try {
-          c = new Connection(channel, Time.now());
-          synchronized (connectionList) {
-            connectionList.add(numConnections, c);
-            numConnections++;
-          }
-          reader.addConnection(c);
-          if (LOG.isDebugEnabled())
-            LOG.debug("Server connection from " + c.toString() +
-                "; # active connections: " + numConnections +
-                "; # queued calls: " + callQueue.size());          
-        } catch (InterruptedException ie) {
-          if (running) {
-            LOG.info(
-                getName() + ": disconnecting client " + c.getHostAddress() +
-                " due to unexpected interrupt");
-          }
-          closeConnection(c);
-        }
+        Connection c = connectionManager.register(channel);
+        key.attach(c);  // so closeCurrentConnection can get the object
+        reader.addConnection(c);
       }
     }
 
@@ -806,10 +723,6 @@ public abstract class Server {
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
-        if (LOG.isDebugEnabled())
-          LOG.debug(getName() + ": disconnecting client " + 
-                    c + ". Number of active connections: "+
-                    numConnections);
         closeConnection(c);
         c = null;
       }
@@ -1250,12 +1163,6 @@ public abstract class Server {
       rpcCount++;
     }
     
-    private boolean timedOut(long currentTime) {
-      if (isIdle() && currentTime -  lastContact > maxIdleTime)
-        return true;
-      return false;
-    }
-    
     private UserGroupInformation getAuthorizedUgi(String authorizedId)
         throws InvalidToken, AccessControlException {
       if (authMethod == AuthMethod.TOKEN) {
@@ -2190,15 +2097,6 @@ public abstract class Server {
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
-    this.maxIdleTime = 2 * conf.getInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
-    this.maxConnectionsToNuke = conf.getInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
-        CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
-    this.thresholdIdleConnections = conf.getInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
-        CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 
       conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 
@@ -2219,6 +2117,7 @@ public abstract class Server {
 
     // Create the responder here
     responder = new Responder();
+    connectionManager = new ConnectionManager();
     
     if (secretManager != null) {
       SaslRpcServer.init(conf);
@@ -2277,11 +2176,7 @@ public abstract class Server {
   }
   
   private void closeConnection(Connection connection) {
-    synchronized (connectionList) {
-      if (connectionList.remove(connection))
-        numConnections--;
-    }
-    connection.close();
+    connectionManager.close(connection);
   }
   
   /**
@@ -2536,7 +2431,7 @@ public abstract class Server {
    * @return the number of open rpc connections
    */
   public int getNumOpenConnections() {
-    return numConnections;
+    return connectionManager.size();
   }
   
   /**
@@ -2646,4 +2541,151 @@ public abstract class Server {
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
   }
+  
+  private class ConnectionManager {
+    final private AtomicInteger count = new AtomicInteger();    
+    final private Set<Connection> connections;
+
+    final private Timer idleScanTimer;
+    final private int idleScanThreshold;
+    final private int idleScanInterval;
+    final private int maxIdleTime;
+    final private int maxIdleToClose;
+    
+    ConnectionManager() {
+      this.idleScanTimer = new Timer(
+          "IPC Server idle connection scanner for port " + getPort(), true);
+      this.idleScanThreshold = conf.getInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
+          CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
+      this.idleScanInterval = conf.getInt(
+          CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
+          CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
+      this.maxIdleTime = 2 * conf.getInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+          CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
+      this.maxIdleToClose = conf.getInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
+          CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
+      // create a set with concurrency -and- a thread-safe iterator, add 2
+      // for listener and idle closer threads
+      this.connections = Collections.newSetFromMap(
+          new ConcurrentHashMap<Connection,Boolean>(
+              maxQueueSize, 0.75f, readThreads+2));
+    }
+
+    private boolean add(Connection connection) {
+      boolean added = connections.add(connection);
+      if (added) {
+        count.getAndIncrement();
+      }
+      return added;
+    }
+    
+    private boolean remove(Connection connection) {
+      boolean removed = connections.remove(connection);
+      if (removed) {
+        count.getAndDecrement();
+      }
+      return removed;
+    }
+    
+    int size() {
+      return count.get();
+    }
+
+    Connection[] toArray() {
+      return connections.toArray(new Connection[0]);
+    }
+
+    Connection register(SocketChannel channel) {
+      Connection connection = new Connection(channel, Time.now());
+      add(connection);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Server connection from " + connection +
+            "; # active connections: " + size() +
+            "; # queued calls: " + callQueue.size());
+      }      
+      return connection;
+    }
+    
+    boolean close(Connection connection) {
+      boolean exists = remove(connection);
+      if (exists) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(Thread.currentThread().getName() +
+              ": disconnecting client " + connection +
+              ". Number of active connections: "+ size());
+        }
+        // only close if actually removed to avoid double-closing due
+        // to possible races
+        connection.close();
+      }
+      return exists;
+    }
+    
+    // synch'ed to avoid explicit invocation upon OOM from colliding with
+    // timer task firing
+    synchronized void closeIdle(boolean scanAll) {
+      long minLastContact = Time.now() - maxIdleTime;
+      // concurrent iterator might miss new connections added
+      // during the iteration, but that's ok because they won't
+      // be idle yet anyway and will be caught on next scan
+      int closed = 0;
+      for (Connection connection : connections) {
+        // stop if connections dropped below threshold unless scanning all
+        if (!scanAll && size() < idleScanThreshold) {
+          break;
+        }
+        // stop if not scanning all and max connections are closed
+        if (connection.isIdle() &&
+            connection.getLastContact() < minLastContact &&
+            close(connection) &&
+            !scanAll && (++closed == maxIdleToClose)) {
+          break;
+        }
+      }
+    }
+    
+    void closeAll() {
+      // use a copy of the connections to be absolutely sure the concurrent
+      // iterator doesn't miss a connection
+      for (Connection connection : toArray()) {
+        close(connection);
+      }
+    }
+    
+    void startIdleScan() {
+      scheduleIdleScanTask();
+    }
+    
+    void stopIdleScan() {
+      idleScanTimer.cancel();
+    }
+    
+    private void scheduleIdleScanTask() {
+      if (!running) {
+        return;
+      }
+      TimerTask idleScanTask = new TimerTask(){
+        @Override
+        public void run() {
+          if (!running) {
+            return;
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(Thread.currentThread().getName()+": task running");
+          }
+          try {
+            closeIdle(false);
+          } finally {
+            // explicitly reschedule so next execution occurs relative
+            // to the end of this scan, not the beginning
+            scheduleIdleScanTask();
+          }
+        }
+      };
+      idleScanTimer.schedule(idleScanTask, idleScanInterval);
+    }
+  }
 }

+ 117 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -46,12 +46,15 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -68,8 +71,10 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -83,7 +88,7 @@ public class TestIPC {
   public static final Log LOG =
     LogFactory.getLog(TestIPC.class);
   
-  final private static Configuration conf = new Configuration();
+  private static Configuration conf;
   final static private int PING_INTERVAL = 1000;
   final static private int MIN_SLEEP_TIME = 1000;
   /**
@@ -93,7 +98,9 @@ public class TestIPC {
   static boolean WRITABLE_FAULTS_ENABLED = true;
   static int WRITABLE_FAULTS_SLEEP = 0;
   
-  static {
+  @Before
+  public void setupConf() {
+    conf = new Configuration();
     Client.setPingInterval(conf, PING_INTERVAL);
   }
 
@@ -759,6 +766,113 @@ public class TestIPC {
     server.stop();
   }
 
+  @Test(timeout=30000)
+  public void testConnectionIdleTimeouts() throws Exception {
+    ((Log4JLogger)Server.LOG).getLogger().setLevel(Level.DEBUG);
+    final int maxIdle = 1000;
+    final int cleanupInterval = maxIdle*3/4; // stagger cleanups
+    final int killMax = 3;
+    final int clients = 1 + killMax*2; // 1 to block, 2 batches to kill
+    
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, maxIdle);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, 0);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, killMax);
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, cleanupInterval);
+    
+    final CyclicBarrier firstCallBarrier = new CyclicBarrier(2);
+    final CyclicBarrier callBarrier = new CyclicBarrier(clients);
+    final CountDownLatch allCallLatch = new CountDownLatch(clients);
+    final AtomicBoolean error = new AtomicBoolean();
+    
+    final TestServer server = new TestServer(clients, false);
+    Thread[] threads = new Thread[clients];
+    try {
+      server.callListener = new Runnable(){
+        AtomicBoolean first = new AtomicBoolean(true);
+        @Override
+        public void run() {
+          try {
+            allCallLatch.countDown();
+            // block first call
+            if (first.compareAndSet(true, false)) {
+              firstCallBarrier.await();
+            } else {
+              callBarrier.await();
+            }
+          } catch (Throwable t) {
+            LOG.error(t);
+            error.set(true); 
+          } 
+        }
+      };
+      server.start();
+
+      // start client
+      final CountDownLatch callReturned = new CountDownLatch(clients-1);
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      final Configuration clientConf = new Configuration();
+      clientConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 10000);
+      for (int i=0; i < clients; i++) {
+        threads[i] = new Thread(new Runnable(){
+          @Override
+          public void run() {
+            Client client = new Client(LongWritable.class, clientConf);
+            try {
+              client.call(new LongWritable(Thread.currentThread().getId()),
+                  addr, null, null, 0, clientConf);
+              callReturned.countDown();
+              Thread.sleep(10000);
+            } catch (IOException e) {
+              LOG.error(e);
+            } catch (InterruptedException e) {
+            }
+          }
+        });
+        threads[i].start();
+      }
+      
+      // all calls blocked in handler so all connections made
+      allCallLatch.await();
+      assertFalse(error.get());
+      assertEquals(clients, server.getNumOpenConnections());
+      
+      // wake up blocked calls and wait for client call to return, no
+      // connections should have closed
+      callBarrier.await();
+      callReturned.await();
+      assertEquals(clients, server.getNumOpenConnections());
+      
+      // server won't close till maxIdle*2, so give scanning thread time to
+      // be almost ready to close idle connection.  after which it should
+      // close max connections on every cleanupInterval
+      Thread.sleep(maxIdle*2-cleanupInterval);
+      for (int i=clients; i > 1; i -= killMax) {
+        Thread.sleep(cleanupInterval);
+        assertFalse(error.get());
+        assertEquals(i, server.getNumOpenConnections());
+      }
+
+      // connection for the first blocked call should still be open
+      Thread.sleep(cleanupInterval);
+      assertFalse(error.get());
+      assertEquals(1, server.getNumOpenConnections());
+     
+      // wake up call and ensure connection times out
+      firstCallBarrier.await();
+      Thread.sleep(maxIdle*2);
+      assertFalse(error.get());
+      assertEquals(0, server.getNumOpenConnections());
+    } finally {
+      for (Thread t : threads) {
+        if (t != null) {
+          t.interrupt();
+          t.join();
+        }
+        server.stop();
+      }
+    }
+  }
+  
   /**
    * Make a call from a client and verify if header info is changed in server side
    */
@@ -768,7 +882,7 @@ public class TestIPC {
 
     client.call(new LongWritable(RANDOM.nextLong()),
         addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
-    Connection connection = server.getConnections().get(0);
+    Connection connection = server.getConnections()[0];
     int serviceClass2 = connection.getServiceClass();
     assertFalse(noChanged ^ serviceClass == serviceClass2);
     client.stop();