فهرست منبع

HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is full. Contributed by Ming Ma.

(cherry picked from commit 685af8a3d0504724fe588daf3722519fedc45b01)
(cherry picked from commit 6c01e586198a3c3ebaa7561778c124ae62553246)
Kihwal Lee 10 سال پیش
والد
کامیت
4ec7b6174d

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -39,6 +39,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-11482. Use correct UGI when KMSClientProvider is called by a proxy
     user. Contributed by Arun Suresh.
 
+    HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is
+    full. (Ming Ma via kihwal)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -662,7 +662,8 @@ public abstract class Server {
         assert !running;
         readSelector.wakeup();
         try {
-          join();
+          super.interrupt();
+          super.join();
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
         }

+ 68 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -38,9 +38,16 @@ import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -1009,6 +1016,67 @@ public class TestRPC {
     }
   }
 
+  /**
+   *  Verify the RPC server can shutdown properly when callQueue is full.
+   */
+  @Test (timeout=30000)
+  public void testRPCServerShutdown() throws Exception {
+    final int numClients = 3;
+    final List<Future<Void>> res = new ArrayList<Future<Void>>();
+    final ExecutorService executorService =
+        Executors.newFixedThreadPool(numClients);
+    final Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0)
+        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
+        .build();
+    server.start();
+
+    final TestProtocol proxy =
+        RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
+            NetUtils.getConnectAddress(server), conf);
+    try {
+      // start a sleep RPC call to consume the only handler thread.
+      // Start another sleep RPC call to make callQueue full.
+      // Start another sleep RPC call to make reader thread block on CallQueue.
+      for (int i = 0; i < numClients; i++) {
+        res.add(executorService.submit(
+            new Callable<Void>() {
+              @Override
+              public Void call() throws IOException, InterruptedException {
+                proxy.sleep(100000);
+                return null;
+              }
+            }));
+      }
+      while (server.getCallQueueLen() != 1
+          && countThreads(CallQueueManager.class.getName()) != 1
+          && countThreads(TestProtocol.class.getName()) != 1) {
+        Thread.sleep(100);
+      }
+    } finally {
+      try {
+        server.stop();
+        assertEquals("Not enough clients", numClients, res.size());
+        for (Future<Void> f : res) {
+          try {
+            f.get();
+            fail("Future get should not return");
+          } catch (ExecutionException e) {
+            assertTrue("Unexpected exception: " + e,
+                e.getCause() instanceof IOException);
+            LOG.info("Expected exception", e.getCause());
+          }
+        }
+      } finally {
+        RPC.stopProxy(proxy);
+        executorService.shutdown();
+      }
+    }
+  }
+
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);