Explorar o código

HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@713112 13f79535-47bb-0310-9956-ffa450edef68
Raghu Angadi %!s(int64=16) %!d(string=hai) anos
pai
achega
d7c59551b6
Modificáronse 2 ficheiros con 19 adicións e 13 borrados
  1. 2 0
      CHANGES.txt
  2. 17 13
      src/core/org/apache/hadoop/ipc/Server.java

+ 2 - 0
CHANGES.txt

@@ -971,6 +971,8 @@ Release 0.19.0 - Unreleased
     HADOOP-4595. Fixes two race conditions - one to do with updating free slot count,
     and another to do with starting the MapEventsFetcher thread. (ddas)
 
+    HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi) 
+
 Release 0.18.3 - Unreleased
 
   BUG FIXES

+ 17 - 13
src/core/org/apache/hadoop/ipc/Server.java

@@ -40,6 +40,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -475,17 +476,28 @@ public abstract class Server {
           // long time, discard them.
           //
           LOG.debug("Checking for old call responses.");
+          ArrayList<Call> calls;
+          
+          // get the list of channels from list of keys.
           synchronized (writeSelector.keys()) {
+            calls = new ArrayList<Call>(writeSelector.keys().size());
             iter = writeSelector.keys().iterator();
             while (iter.hasNext()) {
               SelectionKey key = iter.next();
-              try {
-                doPurge(key, now);
-              } catch (IOException e) {
-                LOG.warn("Error in purging old calls " + e);
+              Call call = (Call)key.attachment();
+              if (call != null && key.channel() == call.connection.channel) { 
+                calls.add(call);
               }
             }
           }
+          
+          for(Call call : calls) {
+            try {
+              doPurge(call, now);
+            } catch (IOException e) {
+              LOG.warn("Error in purging old calls " + e);
+            }
+          }
         } catch (OutOfMemoryError e) {
           //
           // we can run out of memory if we have too many threads
@@ -531,15 +543,7 @@ public abstract class Server {
     // Remove calls that have been pending in the responseQueue 
     // for a long time.
     //
-    private void doPurge(SelectionKey key, long now) throws IOException {
-      Call call = (Call)key.attachment();
-      if (call == null) {
-        return;
-      }
-      if (key.channel() != call.connection.channel) {
-        LOG.info("doPurge: bad channel");
-        return;
-      }
+    private void doPurge(Call call, long now) throws IOException {
       LinkedList<Call> responseQueue = call.connection.responseQueue;
       synchronized (responseQueue) {
         Iterator<Call> iter = responseQueue.listIterator(0);