Browse Source

HDFS-6057. DomainSocketWatcher.watcherThread should be marked as a daemon thread (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574789 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 11 years ago
parent
commit
6f2dd3a25a

+ 17 - 15
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java

@@ -235,6 +235,7 @@ public final class DomainSocketWatcher implements Closeable {
     Preconditions.checkArgument(interruptCheckPeriodMs > 0);
     Preconditions.checkArgument(interruptCheckPeriodMs > 0);
     this.interruptCheckPeriodMs = interruptCheckPeriodMs;
     this.interruptCheckPeriodMs = interruptCheckPeriodMs;
     notificationSockets = DomainSocket.socketpair();
     notificationSockets = DomainSocket.socketpair();
+    watcherThread.setDaemon(true);
     watcherThread.start();
     watcherThread.start();
   }
   }
 
 
@@ -263,6 +264,16 @@ public final class DomainSocketWatcher implements Closeable {
     Uninterruptibles.joinUninterruptibly(watcherThread);
     Uninterruptibles.joinUninterruptibly(watcherThread);
   }
   }
 
 
+  @VisibleForTesting
+  public boolean isClosed() {
+    lock.lock();
+    try {
+      return closed;
+    } finally {
+      lock.unlock();
+    }
+  }
+
   /**
   /**
    * Add a socket.
    * Add a socket.
    *
    *
@@ -274,7 +285,11 @@ public final class DomainSocketWatcher implements Closeable {
   public void add(DomainSocket sock, Handler handler) {
   public void add(DomainSocket sock, Handler handler) {
     lock.lock();
     lock.lock();
     try {
     try {
-      checkNotClosed();
+      if (closed) {
+        handler.handle(sock);
+        IOUtils.cleanup(LOG, sock);
+        return;
+      }
       Entry entry = new Entry(sock, handler);
       Entry entry = new Entry(sock, handler);
       try {
       try {
         sock.refCount.reference();
         sock.refCount.reference();
@@ -295,7 +310,6 @@ public final class DomainSocketWatcher implements Closeable {
         if (!toAdd.contains(entry)) {
         if (!toAdd.contains(entry)) {
           break;
           break;
         }
         }
-        checkNotClosed();
       }
       }
     } finally {
     } finally {
       lock.unlock();
       lock.unlock();
@@ -310,7 +324,7 @@ public final class DomainSocketWatcher implements Closeable {
   public void remove(DomainSocket sock) {
   public void remove(DomainSocket sock) {
     lock.lock();
     lock.lock();
     try {
     try {
-      checkNotClosed();
+      if (closed) return;
       toRemove.put(sock.fd, sock);
       toRemove.put(sock.fd, sock);
       kick();
       kick();
       while (true) {
       while (true) {
@@ -322,7 +336,6 @@ public final class DomainSocketWatcher implements Closeable {
         if (!toRemove.containsKey(sock.fd)) {
         if (!toRemove.containsKey(sock.fd)) {
           break;
           break;
         }
         }
-        checkNotClosed();
       }
       }
     } finally {
     } finally {
       lock.unlock();
       lock.unlock();
@@ -342,17 +355,6 @@ public final class DomainSocketWatcher implements Closeable {
     }
     }
   }
   }
 
 
-  /**
-   * Check that the DomainSocketWatcher is not closed.
-   * Must be called while holding the lock.
-   */
-  private void checkNotClosed() {
-    Preconditions.checkState(lock.isHeldByCurrentThread());
-    if (closed) {
-      throw new RuntimeException("DomainSocketWatcher is closed.");
-    }
-  }
-
   private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
   private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
       FdSet fdSet, int fd) {
       FdSet fdSet, int fd) {
     if (LOG.isTraceEnabled()) {
     if (LOG.isTraceEnabled()) {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -303,6 +303,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5898. Allow NFS gateway to login/relogin from its kerberos keytab.
     HDFS-5898. Allow NFS gateway to login/relogin from its kerberos keytab.
     (Abin Shahab via atm)
     (Abin Shahab via atm)
 
 
+    HDFS-6057. DomainSocketWatcher.watcherThread should be marked as daemon
+    thread (cmccabe)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

+ 37 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java

@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 
 
 import java.io.BufferedOutputStream;
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.EOFException;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
@@ -59,7 +60,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
  * See {@link ShortCircuitRegistry} for more information on the communication protocol.
  * See {@link ShortCircuitRegistry} for more information on the communication protocol.
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
-public class DfsClientShmManager {
+public class DfsClientShmManager implements Closeable {
   private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
   private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
 
 
   /**
   /**
@@ -225,6 +226,12 @@ public class DfsClientShmManager {
     Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
     Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
         String clientName, ExtendedBlockId blockId) throws IOException {
         String clientName, ExtendedBlockId blockId) throws IOException {
       while (true) {
       while (true) {
+        if (closed) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": the DfsClientShmManager has been closed.");
+          }
+          return null;
+        }
         if (disabled) {
         if (disabled) {
           if (LOG.isTraceEnabled()) {
           if (LOG.isTraceEnabled()) {
             LOG.trace(this + ": shared memory segment access is disabled.");
             LOG.trace(this + ": shared memory segment access is disabled.");
@@ -374,6 +381,8 @@ public class DfsClientShmManager {
     }
     }
   }
   }
 
 
+  private boolean closed = false;
+
   private final ReentrantLock lock = new ReentrantLock();
   private final ReentrantLock lock = new ReentrantLock();
 
 
   /**
   /**
@@ -409,6 +418,10 @@ public class DfsClientShmManager {
       String clientName) throws IOException {
       String clientName) throws IOException {
     lock.lock();
     lock.lock();
     try {
     try {
+      if (closed) {
+        LOG.trace(this + ": the DfsClientShmManager isclosed.");
+        return null;
+      }
       EndpointShmManager shmManager = datanodes.get(datanode);
       EndpointShmManager shmManager = datanodes.get(datanode);
       if (shmManager == null) {
       if (shmManager == null) {
         shmManager = new EndpointShmManager(datanode);
         shmManager = new EndpointShmManager(datanode);
@@ -466,9 +479,32 @@ public class DfsClientShmManager {
     }
     }
   }
   }
 
 
+  /**
+   * Close the DfsClientShmManager.
+   */
+  @Override
+  public void close() throws IOException {
+    lock.lock();
+    try {
+      if (closed) return;
+      closed = true;
+    } finally {
+      lock.unlock();
+    }
+    // When closed, the domainSocketWatcher will issue callbacks that mark
+    // all the outstanding DfsClientShm segments as stale.
+    IOUtils.cleanup(LOG, domainSocketWatcher);
+  }
+
+
   @Override
   @Override
   public String toString() {
   public String toString() {
     return String.format("ShortCircuitShmManager(%08x)",
     return String.format("ShortCircuitShmManager(%08x)",
         System.identityHashCode(this));
         System.identityHashCode(this));
   }
   }
+
+  @VisibleForTesting
+  public DomainSocketWatcher getDomainSocketWatcher() {
+    return domainSocketWatcher;
+  }
 }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java

@@ -887,6 +887,7 @@ public class ShortCircuitCache implements Closeable {
   /**
   /**
    * Close the cache and free all associated resources.
    * Close the cache and free all associated resources.
    */
    */
+  @Override
   public void close() {
   public void close() {
     try {
     try {
       lock.lock();
       lock.lock();
@@ -911,6 +912,7 @@ public class ShortCircuitCache implements Closeable {
     } finally {
     } finally {
       lock.unlock();
       lock.unlock();
     }
     }
+    IOUtils.cleanup(LOG, shmManager);
   }
   }
 
 
   @VisibleForTesting // ONLY for testing
   @VisibleForTesting // ONLY for testing

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java

@@ -376,4 +376,37 @@ public class TestBlockReaderFactory {
     Assert.assertEquals(null, cache.getDfsClientShmManager());
     Assert.assertEquals(null, cache.getDfsClientShmManager());
     cluster.shutdown();
     cluster.shutdown();
   }
   }
+  
+  /**
+   * Test shutting down the ShortCircuitCache while there are things in it.
+   */
+  @Test
+  public void testShortCircuitCacheShutdown() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testShortCircuitCacheShutdown", sockDir);
+    conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
+    Configuration serverConf = new Configuration(conf);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    cache.close();
+    Assert.assertTrue(cache.getDfsClientShmManager().
+        getDomainSocketWatcher().isClosed());
+    cluster.shutdown();
+  }
 }
 }