Browse Source

HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1542113 13f79535-47bb-0310-9956-ffa450edef68
Daryn Sharp 11 năm trước cách đây
mục cha
commit
3ce1d25750

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -22,6 +22,8 @@ Release 0.23.10 - UNRELEASED
 
     HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
 
+    HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)
+
   BUG FIXES
 
     HADOOP-9757. Har metadata cache can grow without limit (Cristina Abad via daryn)

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -129,5 +129,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String  IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 
+  /** How often the server scans for idle connections */
+  public static final String IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY =
+      "ipc.client.connection.idle-scan-interval.ms";
+  /** Default value for IPC_SERVER_CONNECTION_IDLE_SCAN_INTERVAL_KEY */
+  public static final int IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT =
+      10000;
 }
 

+ 166 - 125
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -48,11 +48,13 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
@@ -227,15 +229,6 @@ public abstract class Server {
   private int readThreads;                        // number of read threads
   private int readerPendingConnectionQueue;       // number of connections to queue per read thread
   private Class<? extends Writable> paramClass;   // class of call parameters
-  private int maxIdleTime;                        // the maximum idle time after 
-                                                  // which a client may be disconnected
-  private int thresholdIdleConnections;           // the number of idle connections
-                                                  // after which we will start
-                                                  // cleaning up idle 
-                                                  // connections
-  int maxConnectionsToNuke;                       // the max number of 
-                                                  // connections to nuke
-                                                  //during a cleanup
   
   protected RpcMetrics rpcMetrics;
   protected RpcDetailedMetrics rpcDetailedMetrics;
@@ -253,13 +246,10 @@ public abstract class Server {
   volatile private boolean running = true;         // true while server runs
   private BlockingQueue<Call> callQueue; // queued calls
 
-  private List<Connection> connectionList = 
-    Collections.synchronizedList(new LinkedList<Connection>());
-  //maintain a list
-  //of client connections
+  // maintains the set of client connections and handles idle timeouts
+  private ConnectionManager connectionManager;
   private Listener listener = null;
   private Responder responder = null;
-  private int numConnections = 0;
   private Handler[] handlers = null;
 
   /**
@@ -376,11 +366,6 @@ public abstract class Server {
     private Reader[] readers = null;
     private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
-    private Random rand = new Random();
-    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
-                                         //-tion (for idle connections) ran
-    private long cleanupInterval = 10000; //the minimum interval between 
-                                          //two cleanup runs
     private int backlogLength = conf.getInt(
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
@@ -489,58 +474,12 @@ public abstract class Server {
         }
       }
     }
-    /** cleanup connections from connectionList. Choose a random range
-     * to scan and also have a limit on the number of the connections
-     * that will be cleanedup per run. The criteria for cleanup is the time
-     * for which the connection was idle. If 'force' is true then all 
-     * connections will be looked at for the cleanup.
-     */
-    private void cleanupConnections(boolean force) {
-      if (force || numConnections > thresholdIdleConnections) {
-        long currentTime = System.currentTimeMillis();
-        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
-          return;
-        }
-        int start = 0;
-        int end = numConnections - 1;
-        if (!force) {
-          start = rand.nextInt() % numConnections;
-          end = rand.nextInt() % numConnections;
-          int temp;
-          if (end < start) {
-            temp = start;
-            start = end;
-            end = temp;
-          }
-        }
-        int i = start;
-        int numNuked = 0;
-        while (i <= end) {
-          Connection c;
-          synchronized (connectionList) {
-            try {
-              c = connectionList.get(i);
-            } catch (Exception e) {return;}
-          }
-          if (c.timedOut(currentTime)) {
-            if (LOG.isDebugEnabled())
-              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
-            closeConnection(c);
-            numNuked++;
-            end--;
-            c = null;
-            if (!force && numNuked == maxConnectionsToNuke) break;
-          }
-          else i++;
-        }
-        lastCleanupRunTime = System.currentTimeMillis();
-      }
-    }
 
     @Override
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
+      connectionManager.startIdleScan();
       while (running) {
         SelectionKey key = null;
         try {
@@ -564,12 +503,11 @@ public abstract class Server {
           // some thread(s) a chance to finish
           LOG.warn("Out of Memory in server select", e);
           closeCurrentConnection(key, e);
-          cleanupConnections(true);
+          connectionManager.closeIdle(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
-        cleanupConnections(false);
       }
       LOG.info("Stopping " + this.getName());
 
@@ -582,10 +520,9 @@ public abstract class Server {
         selector= null;
         acceptChannel= null;
         
-        // clean up all connections
-        while (!connectionList.isEmpty()) {
-          closeConnection(connectionList.remove(0));
-        }
+        // close all connections
+        connectionManager.stopIdleScan();
+        connectionManager.closeAll();
       }
     }
 
@@ -593,8 +530,6 @@ public abstract class Server {
       if (key != null) {
         Connection c = (Connection)key.attachment();
         if (c != null) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
           closeConnection(c);
           c = null;
         }
@@ -605,8 +540,7 @@ public abstract class Server {
       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
     }
     
-    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
-      Connection c = null;
+    void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
       SocketChannel channel;
       while ((channel = server.accept()) != null) {
@@ -615,25 +549,9 @@ public abstract class Server {
         channel.socket().setTcpNoDelay(tcpNoDelay);
         
         Reader reader = getReader();
-        try {
-          c = new Connection(channel, System.currentTimeMillis());
-          synchronized (connectionList) {
-            connectionList.add(numConnections, c);
-            numConnections++;
-          }
-          reader.addConnection(c);
-          if (LOG.isDebugEnabled())
-            LOG.debug("Server connection from " + c.toString() +
-                "; # active connections: " + numConnections +
-                "; # queued calls: " + callQueue.size());          
-        } catch (InterruptedException ie) {
-          if (running) {
-            LOG.info(
-                getName() + ": disconnecting client " + c.getHostAddress() +
-                " due to unexpected interrupt");
-          }
-          closeConnection(c);
-        }
+        Connection c = connectionManager.register(channel);
+        key.attach(c);  // so closeCurrentConnection can get the object
+        reader.addConnection(c);
       }
     }
 
@@ -657,10 +575,6 @@ public abstract class Server {
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
-        if (LOG.isDebugEnabled())
-          LOG.debug(getName() + ": disconnecting client " + 
-                    c + ". Number of active connections: "+
-                    numConnections);
         closeConnection(c);
         c = null;
       }
@@ -1061,12 +975,6 @@ public abstract class Server {
       rpcCount++;
     }
     
-    private boolean timedOut(long currentTime) {
-      if (isIdle() && currentTime -  lastContact > maxIdleTime)
-        return true;
-      return false;
-    }
-    
     private UserGroupInformation getAuthorizedUgi(String authorizedId)
         throws IOException {
       if (authMethod == SaslRpcServer.AuthMethod.DIGEST) {
@@ -1510,7 +1418,7 @@ public abstract class Server {
       return true;
     }
     
-    private synchronized void close() throws IOException {
+    private synchronized void close() {
       disposeSasl();
       data = null;
       dataLengthBuffer = null;
@@ -1683,15 +1591,6 @@ public abstract class Server {
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
-    this.maxIdleTime = 2 * conf.getInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
-    this.maxConnectionsToNuke = conf.getInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
-        CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
-    this.thresholdIdleConnections = conf.getInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
-        CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 
       conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 
@@ -1709,6 +1608,7 @@ public abstract class Server {
 
     // Create the responder here
     responder = new Responder();
+    connectionManager = new ConnectionManager();
     
     if (isSecurityEnabled) {
       SaslRpcServer.init(conf);
@@ -1716,14 +1616,7 @@ public abstract class Server {
   }
 
   private void closeConnection(Connection connection) {
-    synchronized (connectionList) {
-      if (connectionList.remove(connection))
-        numConnections--;
-    }
-    try {
-      connection.close();
-    } catch (IOException e) {
-    }
+    connectionManager.close(connection);
   }
   
   /**
@@ -1911,7 +1804,7 @@ public abstract class Server {
    * @return the number of open rpc connections
    */
   public int getNumOpenConnections() {
-    return numConnections;
+    return connectionManager.size();
   }
   
   /**
@@ -2021,4 +1914,152 @@ public abstract class Server {
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
   }      
+
+  
+  private class ConnectionManager {
+    final private AtomicInteger count = new AtomicInteger();    
+    final private Set<Connection> connections;
+
+    final private Timer idleScanTimer;
+    final private int idleScanThreshold;
+    final private int idleScanInterval;
+    final private int maxIdleTime;
+    final private int maxIdleToClose;
+    
+    ConnectionManager() {
+      this.idleScanTimer = new Timer(
+          "IPC Server idle connection scanner for port " + getPort(), true);
+      this.idleScanThreshold = conf.getInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
+          CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
+      this.idleScanInterval = conf.getInt(
+          CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
+          CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
+      this.maxIdleTime = 2 * conf.getInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+          CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
+      this.maxIdleToClose = conf.getInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
+          CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
+      // create a set with concurrency -and- a thread-safe iterator, add 2
+      // for listener and idle closer threads
+      this.connections = Collections.newSetFromMap(
+          new ConcurrentHashMap<Connection,Boolean>(
+              maxQueueSize, 0.75f, readThreads+2));
+    }
+
+    private boolean add(Connection connection) {
+      boolean added = connections.add(connection);
+      if (added) {
+        count.getAndIncrement();
+      }
+      return added;
+    }
+    
+    private boolean remove(Connection connection) {
+      boolean removed = connections.remove(connection);
+      if (removed) {
+        count.getAndDecrement();
+      }
+      return removed;
+    }
+    
+    int size() {
+      return count.get();
+    }
+
+    Connection[] toArray() {
+      return connections.toArray(new Connection[0]);
+    }
+
+    Connection register(SocketChannel channel) {
+      Connection connection = new Connection(channel, System.currentTimeMillis());
+      add(connection);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Server connection from " + connection +
+            "; # active connections: " + size() +
+            "; # queued calls: " + callQueue.size());
+      }      
+      return connection;
+    }
+    
+    boolean close(Connection connection) {
+      boolean exists = remove(connection);
+      if (exists) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(Thread.currentThread().getName() +
+              ": disconnecting client " + connection +
+              ". Number of active connections: "+ size());
+        }
+        // only close if actually removed to avoid double-closing due
+        // to possible races
+        connection.close();
+      }
+      return exists;
+    }
+    
+    // synch'ed to avoid explicit invocation upon OOM from colliding with
+    // timer task firing
+    synchronized void closeIdle(boolean scanAll) {
+      long minLastContact = System.currentTimeMillis() - maxIdleTime;
+      // concurrent iterator might miss new connections added
+      // during the iteration, but that's ok because they won't
+      // be idle yet anyway and will be caught on next scan
+      int closed = 0;
+      for (Connection connection : connections) {
+        // stop if connections dropped below threshold unless scanning all
+        if (!scanAll && size() < idleScanThreshold) {
+          break;
+        }
+        // stop if not scanning all and max connections are closed
+        if (connection.isIdle() &&
+            connection.getLastContact() < minLastContact &&
+            close(connection) &&
+            !scanAll && (++closed == maxIdleToClose)) {
+          break;
+        }
+      }
+    }
+    
+    void closeAll() {
+      // use a copy of the connections to be absolutely sure the concurrent
+      // iterator doesn't miss a connection
+      for (Connection connection : toArray()) {
+        close(connection);
+      }
+    }
+    
+    void startIdleScan() {
+      scheduleIdleScanTask();
+    }
+    
+    void stopIdleScan() {
+      idleScanTimer.cancel();
+    }
+    
+    private void scheduleIdleScanTask() {
+      if (!running) {
+        return;
+      }
+      TimerTask idleScanTask = new TimerTask(){
+        @Override
+        public void run() {
+          if (!running) {
+            return;
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(Thread.currentThread().getName()+": task running");
+          }
+          try {
+            closeIdle(false);
+          } finally {
+            // explicitly reschedule so next execution occurs relative
+            // to the end of this scan, not the beginning
+            scheduleIdleScanTask();
+          }
+        }
+      };
+      idleScanTimer.schedule(idleScanTask, idleScanInterval);
+    }
+  }
 }

+ 121 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -39,6 +39,8 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.SocketFactory;
 
@@ -48,7 +50,9 @@ import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.junit.Assume;
+import org.junit.Before;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -61,7 +65,7 @@ public class TestIPC {
   public static final Log LOG =
     LogFactory.getLog(TestIPC.class);
   
-  final private static Configuration conf = new Configuration();
+  private static Configuration conf;
   final static private int PING_INTERVAL = 1000;
   final static private int MIN_SLEEP_TIME = 1000;
 
@@ -71,7 +75,9 @@ public class TestIPC {
    **/
   static boolean WRITABLE_FAULTS_ENABLED = true;
   
-  static {
+  @Before
+  public void setupConf() {
+    conf = new Configuration();
     Client.setPingInterval(conf, PING_INTERVAL);
   }
 
@@ -83,6 +89,10 @@ public class TestIPC {
   private static final File FD_DIR = new File("/proc/self/fd");
 
   private static class TestServer extends Server {
+    // Tests can set callListener to run a piece of code each time the server
+    // receives a call.  This code executes on the server thread, so it has
+    // visibility of that thread's thread-local storage.
+    private Runnable callListener;
     private boolean sleep;
     private Class<? extends Writable> responseClass;
 
@@ -108,6 +118,9 @@ public class TestIPC {
           Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
         } catch (InterruptedException e) {}
       }
+      if (callListener != null) {
+        callListener.run();
+      }
       if (responseClass != null) {
         try {
           return responseClass.newInstance();
@@ -567,6 +580,112 @@ public class TestIPC {
     }
   }
 
+  @Test(timeout=30000)
+  public void testConnectionIdleTimeouts() throws Exception {
+    final int maxIdle = 1000;
+    final int cleanupInterval = maxIdle*3/4; // stagger cleanups
+    final int killMax = 3;
+    final int clients = 1 + killMax*2; // 1 to block, 2 batches to kill
+    
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, maxIdle);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY, 0);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, killMax);
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY, cleanupInterval);
+    
+    final CyclicBarrier firstCallBarrier = new CyclicBarrier(2);
+    final CyclicBarrier callBarrier = new CyclicBarrier(clients);
+    final CountDownLatch allCallLatch = new CountDownLatch(clients);
+    final AtomicBoolean error = new AtomicBoolean();
+    
+    final TestServer server = new TestServer(clients, false);
+    Thread[] threads = new Thread[clients];
+    try {
+      server.callListener = new Runnable(){
+        AtomicBoolean first = new AtomicBoolean(true);
+        @Override
+        public void run() {
+          try {
+            allCallLatch.countDown();
+            // block first call
+            if (first.compareAndSet(true, false)) {
+              firstCallBarrier.await();
+            } else {
+              callBarrier.await();
+            }
+          } catch (Throwable t) {
+            LOG.error(t);
+            error.set(true); 
+          } 
+        }
+      };
+      server.start();
+
+      // start client
+      final CountDownLatch callReturned = new CountDownLatch(clients-1);
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      final Configuration clientConf = new Configuration();
+      clientConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 10000);
+      for (int i=0; i < clients; i++) {
+        threads[i] = new Thread(new Runnable(){
+          @Override
+          public void run() {
+            Client client = new Client(LongWritable.class, clientConf);
+            try {
+              client.call(new LongWritable(Thread.currentThread().getId()),
+                  addr, null, null, 0, clientConf);
+              callReturned.countDown();
+              Thread.sleep(10000);
+            } catch (IOException e) {
+              LOG.error(e);
+            } catch (InterruptedException e) {
+            }
+          }
+        });
+        threads[i].start();
+      }
+      
+      // all calls blocked in handler so all connections made
+      allCallLatch.await();
+      assertFalse(error.get());
+      assertEquals(clients, server.getNumOpenConnections());
+      
+      // wake up blocked calls and wait for client call to return, no
+      // connections should have closed
+      callBarrier.await();
+      callReturned.await();
+      assertEquals(clients, server.getNumOpenConnections());
+      
+      // server won't close till maxIdle*2, so give scanning thread time to
+      // be almost ready to close idle connection.  after which it should
+      // close max connections on every cleanupInterval
+      Thread.sleep(maxIdle*2-cleanupInterval);
+      for (int i=clients; i > 1; i -= killMax) {
+        Thread.sleep(cleanupInterval);
+        assertFalse(error.get());
+        assertEquals(i, server.getNumOpenConnections());
+      }
+
+      // connection for the first blocked call should still be open
+      Thread.sleep(cleanupInterval);
+      assertFalse(error.get());
+      assertEquals(1, server.getNumOpenConnections());
+     
+      // wake up call and ensure connection times out
+      firstCallBarrier.await();
+      Thread.sleep(maxIdle*2);
+      assertFalse(error.get());
+      assertEquals(0, server.getNumOpenConnections());
+    } finally {
+      for (Thread t : threads) {
+        if (t != null) {
+          t.interrupt();
+          t.join();
+        }
+        server.stop();
+      }
+    }
+  }
+  
   /**
    * Check that reader queueing works
    * @throws BrokenBarrierException