瀏覽代碼

HADOOP-7140. IPC Reader threads do not stop when server stops. Backported by Ivan Mitic.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1495293 13f79535-47bb-0310-9956-ffa450edef68
Ivan Mitic 12 年之前
父節點
當前提交
df2cbb7745
共有 3 個文件被更改,包括 70 次插入8 次删除
  1. 3 0
      CHANGES.txt
  2. 20 8
      src/core/org/apache/hadoop/ipc/Server.java
  3. 47 0
      src/test/org/apache/hadoop/ipc/TestRPC.java

+ 3 - 0
CHANGES.txt

@@ -62,6 +62,9 @@ Release 1.3.0 - unreleased
     HADOOP-9624. TestFSMainOperationsLocalFileSystem failed when the Hadoop test
     root path has "X" in its name. (Xi Fang via cnauroth)
 
+    HADOOP-7140. IPC Reader threads do not stop when server stops
+    (Todd Lipcon, backported by ivanmi)
+
 Release 1.2.1 - Unreleased 
 
   INCOMPATIBLE CHANGES

+ 20 - 8
src/core/org/apache/hadoop/ipc/Server.java

@@ -329,7 +329,6 @@ public abstract class Server {
     private long cleanupInterval = 10000; //the minimum interval between 
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
-    private ExecutorService readPool; 
    
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
@@ -343,12 +342,12 @@ public abstract class Server {
       // create a selector;
       selector= Selector.open();
       readers = new Reader[readThreads];
-      readPool = Executors.newFixedThreadPool(readThreads);
       for (int i = 0; i < readThreads; i++) {
         Selector readSelector = Selector.open();
-        Reader reader = new Reader(readSelector);
+        Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
+                                   readSelector);
         readers[i] = reader;
-        readPool.execute(reader);
+        reader.start();
       }
 
       // Register accepts on the server socket with the selector.
@@ -357,15 +356,16 @@ public abstract class Server {
       this.setDaemon(true);
     }
     
-    private class Reader implements Runnable {
+    private class Reader extends Thread {
       private volatile boolean adding = false;
       private Selector readSelector = null;
 
-      Reader(Selector readSelector) {
+      Reader(String name, Selector readSelector) {
+        super(name);
         this.readSelector = readSelector;
       }
       public void run() {
-        LOG.info("Starting SocketReader");
+        LOG.info("Starting " + getName());
         synchronized (this) {
           while (running) {
             SelectionKey key = null;
@@ -419,6 +419,16 @@ public abstract class Server {
         adding = false;
         this.notify();        
       }
+
+      void shutdown() {
+        assert !running;
+        readSelector.wakeup();
+        try {
+          join();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      }
     }
 
     /** cleanup connections from connectionList. Choose a random range
@@ -607,7 +617,9 @@ public abstract class Server {
           LOG.info(getName() + ":Exception in closing listener socket. " + e);
         }
       }
-      readPool.shutdown();
+      for (Reader r : readers) {
+        r.shutdown();
+      }
     }
 
     // The method that will return the next reader to work with

+ 47 - 0
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -23,6 +23,9 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -410,6 +413,50 @@ public class TestRPC extends TestCase {
       assertCounter("rpcAuthenticationSuccesses", 0, rb);
     }
   }
+
+  /**
+   * Count the number of threads that have a stack frame containing
+   * the given string
+   */
+  private static int countThreads(String search) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+    int count = 0;
+    ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+    for (ThreadInfo info : infos) {
+      if (info == null) continue;
+      for (StackTraceElement elem : info.getStackTrace()) {
+        if (elem.getClassName().contains(search)) {
+          count++;
+          break;
+        }
+      }
+    }
+    return count;
+  }
+
+
+  /**
+   * Test that server.stop() properly stops all threads
+   */
+  public void testStopsAllThreads() throws Exception {
+    int threadsBefore = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads running before test",
+      0, threadsBefore);
+
+    final Server server = RPC.getServer(new TestImpl(), ADDRESS,
+        0, 5, true, conf);
+    server.start();
+    try {
+      int threadsRunning = countThreads("Server$Listener$Reader");
+      assertTrue(threadsRunning > 0);
+    } finally {
+      server.stop();
+    }
+    int threadsAfter = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads left running after test",
+      0, threadsAfter);
+  }
   
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();