Browse Source

reverting the patch to hadoop-2910

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@643521 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 17 years ago
parent
commit
2ccec36eee
3 changed files with 20 additions and 18 deletions
  1. 0 3
      CHANGES.txt
  2. 1 1
      src/java/org/apache/hadoop/ipc/Client.java
  3. 19 14
      src/java/org/apache/hadoop/ipc/Server.java

+ 0 - 3
CHANGES.txt

@@ -148,9 +148,6 @@ Trunk (unreleased changes)
     HADOOP-2239. Add HsftpFileSystem to permit transferring files over ssl.
     (cdouglas)
 
-    HADOOP-2910. Throttle IPC Client/Server during bursts of 
-    requests or server slowdown. (Hairong Kuang via dhruba)
-
     HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting
     the cluster directory. (Hemanth Yamijala via ddas)
 

+ 1 - 1
src/java/org/apache/hadoop/ipc/Client.java

@@ -171,7 +171,7 @@ public class Client {
         try {
           this.socket = socketFactory.createSocket();
           this.socket.setTcpNoDelay(tcpNoDelay);
-          this.socket.connect(remoteId.getAddress());
+          this.socket.connect(remoteId.getAddress(), FSConstants.READ_TIMEOUT);
           break;
         } catch (IOException ie) { //SocketTimeoutException is also caught 
           if (failures == maxRetries) {

+ 19 - 14
src/java/org/apache/hadoop/ipc/Server.java

@@ -45,8 +45,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Iterator;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -148,7 +146,7 @@ public abstract class Server {
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
-  private BlockingQueue<Call> callQueue; // queued calls
+  private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
 
   private List<Connection> connectionList = 
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -323,11 +321,6 @@ public abstract class Server {
           closeCurrentConnection(key, e);
           cleanupConnections(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
-        } catch (InterruptedException e) {
-          if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
-          }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
@@ -386,7 +379,7 @@ public abstract class Server {
                   "; # queued calls: " + callQueue.size());
     }
 
-    void doRead(SelectionKey key) throws InterruptedException {
+    void doRead(SelectionKey key) {
       int count = 0;
       Connection c = (Connection)key.attachment();
       if (c == null) {
@@ -396,8 +389,6 @@ public abstract class Server {
       
       try {
         count = c.readAndProcess();
-      } catch (InterruptedException ieo) {
-        throw ieo;
       } catch (Exception e) {
         LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
@@ -829,7 +820,15 @@ public abstract class Server {
       param.readFields(dis);        
         
       Call call = new Call(id, param, this);
-      callQueue.put(call);              // queue the call; maybe blocked here
+      synchronized (callQueue) {
+        if (callQueue.size() >= maxQueueSize) {
+          Call oldCall = callQueue.removeFirst();
+          LOG.warn("Call queue overflow discarding oldest call " + oldCall);
+        }
+        callQueue.addLast(call);              // queue the call
+        callQueue.notify();                   // wake up a waiting handler
+      }
+        
     }
 
     private synchronized void close() throws IOException {
@@ -859,7 +858,14 @@ public abstract class Server {
       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
-          Call call = callQueue.take(); // pop the queue; maybe blocked here
+          Call call;
+          synchronized (callQueue) {
+            while (running && callQueue.size()==0) { // wait for a call
+              callQueue.wait(timeout);
+            }
+            if (!running) break;
+            call = callQueue.removeFirst(); // pop the queue
+          }
 
           // throw the message away if it is too old
           if (System.currentTimeMillis() - call.receivedTime > 
@@ -944,7 +950,6 @@ public abstract class Server {
     this.socketSendBufferSize = 0;
     maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
     maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
-    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);