Prechádzať zdrojové kódy

HADOOP-255. Discard stale queued IPC calls. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@453766 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 rokov pred
rodič
commit
5d4210425e
2 zmenil súbory, kde vykonal 46 pridanie a 9 odobranie
  1. 8 0
      CHANGES.txt
  2. 38 9
      src/java/org/apache/hadoop/ipc/Server.java

+ 8 - 0
CHANGES.txt

@@ -142,6 +142,14 @@ Trunk (unreleased changes)
 34. HADOOP-506.  Ignore heartbeats from stale task trackers.
    (Sanjay Dahiya via cutting)
 
+35. HADOOP-255.  Discard stale, queued IPC calls.  Do not process
+    calls whose clients will likely time out before they receive a
+    response.  When the queue is full, new calls are now received and
+    queued, and the oldest calls are discarded, so that, when servers
+    get bogged down, they no longer develop a backlog on the socket.
+    This should improve some DFS namenode failure modes.
+    (omalley via cutting)
+
 
 Release 0.6.2 - 2006-09-18
 

+ 38 - 9
src/java/org/apache/hadoop/ipc/Server.java

@@ -17,7 +17,6 @@
 package org.apache.hadoop.ipc;
 
 import java.io.IOException;
-import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.BufferedOutputStream;
@@ -30,7 +29,6 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.nio.BufferUnderflowException;
 
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -57,6 +55,18 @@ import org.apache.hadoop.ipc.SocketChannelOutputStream;
  * @see Client
  */
 public abstract class Server {
+  /**
+   * How much time should be allocated for actually running the handler?
+   * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
+   * are ignored when the handler takes them off the queue.
+   */
+  private static final float MAX_CALL_QUEUE_TIME = 0.6f;
+  
+  /**
+   * How many calls/handler are allowed in the queue.
+   */
+  private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
+  
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Server");
 
@@ -72,7 +82,6 @@ public abstract class Server {
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private int maxQueuedCalls;                     // max number of queued calls
   private Class paramClass;                       // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
@@ -87,6 +96,8 @@ public abstract class Server {
   private Configuration conf;
 
   private int timeout;
+  private long maxCallStartAge;
+  private int maxQueueSize;
 
   private boolean running = true;                 // true while server runs
   private LinkedList callQueue = new LinkedList(); // queued calls
@@ -103,11 +114,17 @@ public abstract class Server {
     private int id;                               // the client's call id
     private Writable param;                       // the parameter passed
     private Connection connection;                // connection to client
+    private long receivedTime;                    // the time received
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
       this.param = param;
       this.connection = connection;
+      this.receivedTime = System.currentTimeMillis();
+    }
+    
+    public String toString() {
+      return param.toString() + " from " + connection.toString();
     }
   }
 
@@ -348,6 +365,10 @@ public abstract class Server {
          this.channelOut = new SocketChannelOutputStream(channel, 4096)));
     }   
 
+    public String toString() {
+      return getHostAddress() + ":" + socket.getPort(); 
+    }
+    
     public String getHostAddress() {
       return socket.getInetAddress().getHostAddress();
     }
@@ -409,15 +430,13 @@ public abstract class Server {
         
       Call call = new Call(id, param, this);
       synchronized (callQueue) {
+        if (callQueue.size() >= maxQueueSize) {
+          callQueue.removeFirst();
+        }
         callQueue.addLast(call);              // queue the call
         callQueue.notify();                   // wake up a waiting handler
       }
         
-      while (running && callQueue.size() >= maxQueuedCalls) {
-        synchronized (callDequeued) {         // queue is full
-          callDequeued.wait(timeout);         // wait for a dequeue
-        }
-      }
     }
 
     private void close() throws IOException {
@@ -462,6 +481,15 @@ public abstract class Server {
             callDequeued.notify();
           }
 
+          // throw the message away if it is too old
+          if (System.currentTimeMillis() - call.receivedTime > 
+              maxCallStartAge) {
+            LOG.info("Call " + call.toString() + 
+                     " discarded for being too old (" +
+                     (System.currentTimeMillis() - call.receivedTime) + ")");
+            continue;
+          }
+          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
                      call.connection.socket.getInetAddress().getHostAddress());
@@ -526,8 +554,9 @@ public abstract class Server {
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
-    this.maxQueuedCalls = handlerCount;
     this.timeout = conf.getInt("ipc.client.timeout",10000);
+    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
+    maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);