Kaynağa Gözat

HADOOP-312. Close idle IPC connections. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@440895 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 yıl önce
ebeveyn
işleme
e38c641d8e

+ 6 - 0
CHANGES.txt

@@ -137,6 +137,12 @@ Trunk (unreleased changes)
     renewLease() calls when no files are being written.
     (Konstantin Shvachko via cutting)
 
+35. HADOOP-312.  Close idle IPC connections.  All IPC connections were
+    cached forever.  Now, after a connection has been idle for more
+    than a configurable amount of time (one second by default), the
+    connection is closed, conserving resources on both client and
+    server. (Devaraj Das via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

+ 23 - 0
conf/hadoop-default.xml

@@ -475,5 +475,28 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>ipc.client.connection.maxidletime</name>
+  <value>1000</value>
+  <description>The maximum time after which a client will bring down the
+               connection to the server.
+  </description>
+</property>
+
+<property>
+  <name>ipc.client.connect.max.retries</name>
+  <value>10</value>
+  <description>Indicates the number of retries a client will make to establish
+               a server connection.
+  </description>
+</property>
+
+<property>
+  <name>ipc.server.listen.queue.size</name>
+  <value>128</value>
+  <description>Indicates the length of the listen queue for servers accepting
+               client connections.
+  </description>
+</property>
 
 </configuration>

+ 169 - 16
src/java/org/apache/hadoop/ipc/Client.java

@@ -30,6 +30,9 @@ import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
 
 import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Random;
 
 import org.apache.commons.logging.*;
 
@@ -50,7 +53,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
 public class Client {
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Client");
-
   private Hashtable connections = new Hashtable();
 
   private Class valueClass;                       // class of call values
@@ -58,6 +60,9 @@ public class Client {
   private int counter;                            // counter for call ids
   private boolean running = true;                 // true while client runs
   private Configuration conf;
+  private int maxIdleTime; //connections will be culled if it was idle for 
+                           //maxIdleTime msecs
+  private int maxRetries; //the max. no. of retries for socket connections
 
   /** A call waiting for a value. */
   private class Call {
@@ -101,16 +106,53 @@ public class Client {
    * socket: responses may be delivered out of order. */
   private class Connection extends Thread {
     private InetSocketAddress address;            // address of server
-    private Socket socket;                        // connected socket
+    private Socket socket = null;                 // connected socket
     private DataInputStream in;                   
     private DataOutputStream out;
     private Hashtable calls = new Hashtable();    // currently active calls
     private Call readingCall;
     private Call writingCall;
+    private int inUse = 0;
+    private long lastActivity = 0;
+    private boolean shouldCloseConnection = false;
 
     public Connection(InetSocketAddress address) throws IOException {
       this.address = address;
-      this.socket = new Socket(address.getAddress(), address.getPort());
+      this.setName("Client connection to "
+                   + address.getAddress().getHostAddress()
+                   + ":" + address.getPort());
+      this.setDaemon(true);
+    }
+
+    public synchronized void setupIOstreams() throws IOException {
+      if (socket != null) {
+        notify();
+        return;
+      }
+      short failures = 0;
+      while (true) {
+        try {
+          this.socket = new Socket(address.getAddress(), address.getPort());
+          break;
+        } catch (IOException ie) { //SocketTimeoutException is also caught 
+          if (failures == maxRetries) {
+            //reset inUse so that the culler gets a chance to throw this
+            //connection object out of the table. We don't want to increment
+            //inUse to infinity (everytime getConnection is called inUse is
+            //incremented)!
+            inUse = 0;
+            throw ie;
+          }
+          failures++;
+          LOG.info("Retrying connect to server: " + address + 
+                   ". Already tried " + failures + " time(s).");
+          try { 
+            Thread.sleep(1000);
+          } catch (InterruptedException iex){
+          }
+        }
+      }
+
       socket.setSoTimeout(timeout);
       this.in = new DataInputStream
         (new BufferedInputStream
@@ -133,17 +175,61 @@ public class Client {
                }
              }
            }));
-      this.setDaemon(true);
-      this.setName("Client connection to "
-                   + address.getAddress().getHostAddress()
-                   + ":" + address.getPort());
+      notify();
+    }
+
+    private synchronized boolean waitForWork() {
+      //wait till someone signals us to start reading RPC response or
+      //close the connection. If we are idle long enough (blocked in wait),
+      //the ConnectionCuller thread will wake us up and ask us to close the
+      //connection. 
+      //We need to wait when inUse is 0 or socket is null (it may be null if
+      //the Connection object has been created but the socket connection
+      //has not been setup yet). We stop waiting if we have been asked to close
+      //connection
+      while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
+        try {
+          wait();
+        } catch (InterruptedException e) {}
+      }
+      return !shouldCloseConnection;
+    }
+
+    private synchronized void incrementRef() {
+      inUse++;
+    }
+
+    private synchronized void decrementRef() {
+      lastActivity = System.currentTimeMillis();
+      inUse--;
+    }
+
+    public synchronized boolean isIdle() {
+      //check whether the connection is in use or just created
+      if (inUse != 0) return false;
+      long currTime = System.currentTimeMillis();
+      if (currTime - lastActivity > maxIdleTime)
+        return true;
+      return false;
+    }
+
+    public InetSocketAddress getRemoteAddress() {
+      return address;
+    }
+
+    public void setCloseConnection() {
+      shouldCloseConnection = true;
     }
 
     public void run() {
-      LOG.info(getName() + ": starting");
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": starting");
       try {
         while (running) {
           int id;
+          //wait here for work - read connection or close connection
+          if (waitForWork() == false)
+            break;
           try {
             id = in.readInt();                    // try to read an id
           } catch (SocketTimeoutException e) {
@@ -174,12 +260,25 @@ public class Client {
             call.setResult(value, null);
           }
           call.callComplete();                   // deliver result to caller
+          //received the response. So decrement the ref count
+          decrementRef();
         }
       } catch (EOFException eof) {
           // This is what happens when the remote side goes down
       } catch (Exception e) {
         LOG.info(getName() + " caught: " + e, e);
       } finally {
+        //If there was no exception thrown in this method, then the only
+        //way we reached here is by breaking out of the while loop (after
+        //waitForWork). And if we took that route to reach here, we have 
+        //already removed the connection object in the ConnectionCuller thread.
+        //We don't want to remove this again as some other thread might have
+        //actually put a new Connection object in the table in the meantime.
+        synchronized (connections) {
+          if (connections.get(address) == this) {
+            connections.remove(address);
+          }
+        }
         close();
       }
     }
@@ -213,22 +312,27 @@ public class Client {
         }
         error = false;
       } finally {
-        if (error)
+        if (error) {
+          synchronized (connections) {
+            if (connections.get(address) == this)
+              connections.remove(address);
+          }
           close();                                // close on error
+        }
       }
     }  
 
-    /** Close the connection and remove it from the pool. */
+    /** Close the connection. */
     public void close() {
-      LOG.info(getName() + ": closing");
-      synchronized (connections) {
-        connections.remove(address);              // remove connection
-      }
+      //socket may be null if the connection could not be established to the
+      //server in question, and the culler asked us to close the connection
+      if (socket == null) return;
       try {
         socket.close();                           // close socket
       } catch (IOException e) {}
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": closing");
     }
-
   }
 
   /** Call implementation used for parallel calls. */
@@ -268,14 +372,57 @@ public class Client {
     }
   }
 
+  private class ConnectionCuller extends Thread {
+
+    public static final int MIN_SLEEP_TIME = 1000;
+
+    public void run() {
+
+      LOG.info(getName() + ": starting");
+
+      while (running) {
+        try {
+          Thread.sleep(MIN_SLEEP_TIME);
+        } catch (InterruptedException ie) {}
+
+        synchronized (connections) {
+          Iterator i = connections.values().iterator();
+          while (i.hasNext()) {
+            Connection c = (Connection)i.next();
+            if (c.isIdle()) { 
+            //We don't actually close the socket here (i.e., don't invoke
+            //the close() method). We leave that work to the response receiver
+            //thread. The reason for that is since we have taken a lock on the
+            //connections table object, we don't want to slow down the entire
+            //system if we happen to talk to a slow server.
+              i.remove();
+              synchronized (c) {
+                c.setCloseConnection();
+                c.notify();
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
   public Client(Class valueClass, Configuration conf) {
     this.valueClass = valueClass;
     this.timeout = conf.getInt("ipc.client.timeout",10000);
+    this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime",1000);
+    this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
     this.conf = conf;
-  }
 
+    Thread t = new ConnectionCuller();
+    t.setDaemon(true);
+    t.setName(valueClass.getName()
+              +" ConnectionCuller maxidletime="+maxIdleTime+"ms");
+    t.start();
+  }
+ 
   /** Stop all threads related to this client.  No further calls may be made
    * using this client. */
   public void stop() {
@@ -357,7 +504,13 @@ public class Client {
         connections.put(address, connection);
         connection.start();
       }
+      connection.incrementRef();
     }
+    //we don't invoke the method below inside "synchronized (connections)"
+    //block above. The reason for that is if the server happens to be slow,
+    //it will take longer to establish a connection and that will slow the
+    //entire system down.
+    connection.setupIOstreams();
     return connection;
   }
 

+ 10 - 5
src/java/org/apache/hadoop/ipc/Server.java

@@ -122,6 +122,7 @@ public abstract class Server {
                                          //-tion (for idle connections) ran
     private long cleanupInterval = 10000; //the minimum interval between 
                                           //two cleanup runs
+    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
     
     public Listener() throws IOException {
       address = new InetSocketAddress(port);
@@ -130,7 +131,7 @@ public abstract class Server {
       acceptChannel.configureBlocking(false);
 
       // Bind the server socket to the local host and port
-      acceptChannel.socket().bind(address);
+      acceptChannel.socket().bind(address, backlogLength);
       // create a selector;
       selector= Selector.open();
 
@@ -178,7 +179,8 @@ public abstract class Server {
                 numConnections--;
             }
             try {
-              LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+              if (LOG.isDebugEnabled())
+                LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
               c.close();
             } catch (Exception e) {}
             numNuked++;
@@ -252,7 +254,8 @@ public abstract class Server {
               numConnections--;
           }
           try {
-            LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+            if (LOG.isDebugEnabled())
+              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
             c.close();
           } catch (Exception ex) {}
           c = null;
@@ -272,7 +275,8 @@ public abstract class Server {
         connectionList.add(numConnections, c);
         numConnections++;
       }
-      LOG.info("Server connection on port " + port + " from " + 
+      if (LOG.isDebugEnabled())
+        LOG.debug("Server connection on port " + port + " from " + 
                 c.getHostAddress() +
                 ": starting. Number of active connections: " + numConnections);
     }
@@ -298,7 +302,8 @@ public abstract class Server {
             numConnections--;
         }
         try {
-          LOG.info(getName() + ": disconnecting client " + 
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": disconnecting client " + 
                   c.getHostAddress() + ". Number of active connections: "+
                   numConnections);
           c.close();