Browse Source

HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher that stops the thread. (zhouyingchao via cmccabe)

(cherry picked from commit 246cefa089156a50bf086b8b1e4d4324d66dc58c)
Colin Patrick Mccabe 10 years ago
parent
commit
2a56adc550

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -303,6 +303,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12035. shellcheck plugin displays a wrong version potentially
     HADOOP-12035. shellcheck plugin displays a wrong version potentially
     (Kengo Seki via aw)
     (Kengo Seki via aw)
 
 
+    HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher
+    that stops the thread.  (zhouyingchao via cmccabe)
+
 Release 2.7.1 - UNRELEASED
 Release 2.7.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 20 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java

@@ -470,6 +470,7 @@ public final class DomainSocketWatcher implements Closeable {
               // Handle pending additions (before pending removes).
               // Handle pending additions (before pending removes).
               for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
               for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
                 Entry entry = iter.next();
                 Entry entry = iter.next();
+                iter.remove();
                 DomainSocket sock = entry.getDomainSocket();
                 DomainSocket sock = entry.getDomainSocket();
                 Entry prevEntry = entries.put(sock.fd, entry);
                 Entry prevEntry = entries.put(sock.fd, entry);
                 Preconditions.checkState(prevEntry == null,
                 Preconditions.checkState(prevEntry == null,
@@ -479,7 +480,6 @@ public final class DomainSocketWatcher implements Closeable {
                   LOG.trace(this + ": adding fd " + sock.fd);
                   LOG.trace(this + ": adding fd " + sock.fd);
                 }
                 }
                 fdSet.add(sock.fd);
                 fdSet.add(sock.fd);
-                iter.remove();
               }
               }
               // Handle pending removals
               // Handle pending removals
               while (true) {
               while (true) {
@@ -525,6 +525,25 @@ public final class DomainSocketWatcher implements Closeable {
           }
           }
           entries.clear();
           entries.clear();
           fdSet.close();
           fdSet.close();
+          closed = true;
+          if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
+            // Items in toAdd might not be added to entries, handle it here
+            for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) {
+              Entry entry = iter.next();
+              entry.getDomainSocket().refCount.unreference();
+              entry.getHandler().handle(entry.getDomainSocket());
+              IOUtils.cleanup(LOG, entry.getDomainSocket());
+              iter.remove();
+            }
+            // Items in toRemove might not be really removed, handle it here
+            while (true) {
+              Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
+              if (entry == null)
+                break;
+              sendCallback("close", entries, fdSet, entry.getValue().fd);
+            }
+          }
+          processedCond.signalAll();
         } finally {
         } finally {
           lock.unlock();
           lock.unlock();
         }
         }

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c

@@ -212,7 +212,7 @@ done:
   free(carr);
   free(carr);
   if (jthr) {
   if (jthr) {
     (*env)->DeleteLocalRef(env, jarr);
     (*env)->DeleteLocalRef(env, jarr);
-    jarr = NULL;
+    (*env)->Throw(env, jthr);
   }
   }
   return jarr;
   return jarr;
 }
 }

+ 75 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Random;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantLock;
 
 
@@ -181,6 +182,80 @@ public class TestDomainSocketWatcher {
     watcher.close();
     watcher.close();
   }
   }
 
 
+  @Test(timeout = 300000)
+  public void testStressInterruption() throws Exception {
+    final int SOCKET_NUM = 250;
+    final ReentrantLock lock = new ReentrantLock();
+    final DomainSocketWatcher watcher = newDomainSocketWatcher(10);
+    final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
+    final AtomicInteger handled = new AtomicInteger(0);
+
+    final Thread adderThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < SOCKET_NUM; i++) {
+            DomainSocket pair[] = DomainSocket.socketpair();
+            watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+              @Override
+              public boolean handle(DomainSocket sock) {
+                handled.incrementAndGet();
+                return true;
+              }
+            });
+            lock.lock();
+            try {
+              pairs.add(pair);
+            } finally {
+              lock.unlock();
+            }
+            TimeUnit.MILLISECONDS.sleep(1);
+          }
+        } catch (Throwable e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    final Thread removerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        final Random random = new Random();
+        try {
+          while (handled.get() != SOCKET_NUM) {
+            lock.lock();
+            try {
+              if (!pairs.isEmpty()) {
+                int idx = random.nextInt(pairs.size());
+                DomainSocket pair[] = pairs.remove(idx);
+                if (random.nextBoolean()) {
+                  pair[0].close();
+                } else {
+                  watcher.remove(pair[1]);
+                }
+                TimeUnit.MILLISECONDS.sleep(1);
+              }
+            } finally {
+              lock.unlock();
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    adderThread.start();
+    removerThread.start();
+    TimeUnit.MILLISECONDS.sleep(100);
+    watcher.watcherThread.interrupt();
+    Uninterruptibles.joinUninterruptibly(adderThread);
+    Uninterruptibles.joinUninterruptibly(removerThread);
+    Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
+  }
+
   /**
   /**
    * Creates a new DomainSocketWatcher and tracks its thread for termination due
    * Creates a new DomainSocketWatcher and tracks its thread for termination due
    * to an unexpected exception.  At the end of each test, if there was an
    * to an unexpected exception.  At the end of each test, if there was an