Przeglądaj źródła

HADOOP-19261. Support force close a DomainSocket for server service (#7057)

Sammi Chen 7 miesięcy temu
rodzic
commit
6fd4fea748

+ 47 - 24
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java

@@ -339,10 +339,13 @@ public class DomainSocket implements Closeable {
   private static native void shutdown0(int fd) throws IOException;
 
   /**
-   * Close the Socket.
+   * Close the Server Socket without check refCount.
+   * When Server Socket is blocked on accept(), its refCount is 1.
+   * close() call on Server Socket will be stuck in the while loop count check.
+   * @param force         if true, will not check refCount before close socket.
+   * @throws IOException  raised on errors performing I/O.
    */
-  @Override
-  public void close() throws IOException {
+  public void close(boolean force) throws IOException {
     // Set the closed bit on this DomainSocket
     int count;
     try {
@@ -351,41 +354,61 @@ public class DomainSocket implements Closeable {
       // Someone else already closed the DomainSocket.
       return;
     }
-    // Wait for all references to go away
-    boolean didShutdown = false;
+
     boolean interrupted = false;
-    while (count > 0) {
-      if (!didShutdown) {
+    if (force) {
+      try {
+        // Calling shutdown on the socket will interrupt blocking system
+        // calls like accept, write, and read that are going on in a
+        // different thread.
+        shutdown0(fd);
+      } catch (IOException e) {
+        LOG.error("shutdown error: ", e);
+      }
+    } else {
+      // Wait for all references to go away
+      boolean didShutdown = false;
+      while (count > 0) {
+        if (!didShutdown) {
+          try {
+            // Calling shutdown on the socket will interrupt blocking system
+            // calls like accept, write, and read that are going on in a
+            // different thread.
+            shutdown0(fd);
+          } catch (IOException e) {
+            LOG.error("shutdown error: ", e);
+          }
+          didShutdown = true;
+        }
         try {
-          // Calling shutdown on the socket will interrupt blocking system
-          // calls like accept, write, and read that are going on in a
-          // different thread.
-          shutdown0(fd);
-        } catch (IOException e) {
-          LOG.error("shutdown error: ", e);
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          interrupted = true;
         }
-        didShutdown = true;
+        count = refCount.getReferenceCount();
       }
-      try {
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        interrupted = true;
-      }
-      count = refCount.getReferenceCount();
     }
 
-    // At this point, nobody has a reference to the file descriptor, 
+    // At this point, nobody has a reference to the file descriptor,
     // and nobody will be able to get one in the future either.
     // We now call close(2) on the file descriptor.
-    // After this point, the file descriptor number will be reused by 
-    // something else.  Although this DomainSocket object continues to hold 
-    // the old file descriptor number (it's a final field), we never use it 
+    // After this point, the file descriptor number will be reused by
+    // something else.  Although this DomainSocket object continues to hold
+    // the old file descriptor number (it's a final field), we never use it
     // again because this DomainSocket is closed.
     close0(fd);
     if (interrupted) {
       Thread.currentThread().interrupt();
     }
   }
+
+  /**
+   * Close the Socket.
+   */
+  @Override
+  public void close() throws IOException {
+    close(false);
+  }
   
   /**
    * Call shutdown(SHUT_RDWR) on the UNIX domain socket.

+ 1 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.net.unix;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.util.Random;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileUtil;
@@ -35,8 +34,7 @@ public class TemporarySocketDirectory implements Closeable {
 
   public TemporarySocketDirectory() {
     String tmp = System.getProperty("java.io.tmpdir", "/tmp");
-    dir = new File(tmp, "socks." + (System.currentTimeMillis() +
-        "." + (new Random().nextInt())));
+    dir = new File(tmp, "socks." + System.nanoTime());
     dir.mkdirs();
     FileUtil.setWritable(dir, true);
   }

+ 36 - 25
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java

@@ -130,7 +130,7 @@ public class TestDomainSocket {
     DomainSocket conn = DomainSocket.connect(serv.getPath());
     Thread.sleep(50);
     conn.close();
-    serv.close();
+    serv.close(true);
     future.get(2, TimeUnit.MINUTES);
   }
 
@@ -161,7 +161,7 @@ public class TestDomainSocket {
     };
     Future<Void> future = exeServ.submit(callable);
     Thread.sleep(500);
-    serv.close();
+    serv.close(true);
     future.get(2, TimeUnit.MINUTES);
   }
 
@@ -240,7 +240,7 @@ public class TestDomainSocket {
     Future<Void> clientFuture = exeServ.submit(clientCallable);
     Thread.sleep(500);
     clientConn.close();
-    serv.close();
+    serv.close(true);
     clientFuture.get(2, TimeUnit.MINUTES);
     serverFuture.get(2, TimeUnit.MINUTES);
   }
@@ -281,28 +281,39 @@ public class TestDomainSocket {
     final String TEST_PATH = new File(sockDir.getDir(),
         "test_sock_server_options").getAbsolutePath();
     DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
-    try {
-      // Let's set a new receive buffer size
-      int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
-      int newBufSize = bufSize / 2;
-      serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
-      int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
-      Assert.assertEquals(newBufSize, nextBufSize);
-      // Let's set a server timeout
-      int newTimeout = 1000;
-      serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
-      int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
-      Assert.assertEquals(newTimeout, nextTimeout);
-      try {
-        serv.accept();
-        Assert.fail("expected the accept() to time out and fail");
-      } catch (SocketTimeoutException e) {
-        GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+    // Let's set a new receive buffer size
+    int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+    int newBufSize = bufSize / 2;
+    serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
+    int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+    Assert.assertEquals(newBufSize, nextBufSize);
+    // Let's set a server timeout
+    int newTimeout = 1000;
+    serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
+    int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
+    Assert.assertEquals(newTimeout, nextTimeout);
+
+    ExecutorService exeServ = Executors.newSingleThreadExecutor();
+    Callable<Void> callable = new Callable<Void>() {
+      public Void call() {
+        try {
+          serv.accept();
+          Assert.fail("expected the accept() to time out and fail");
+        } catch (SocketTimeoutException e) {
+          GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+        } catch (AsynchronousCloseException e) {
+          return null;
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
+        }
+        return null;
       }
-    } finally {
-      serv.close();
-      Assert.assertFalse(serv.isOpen());
-    }
+    };
+    Future<Void> future = exeServ.submit(callable);
+    Thread.sleep(500);
+    serv.close(true);
+    future.get();
+    Assert.assertFalse(serv.isOpen());
   }
   
   /**
@@ -656,7 +667,7 @@ public class TestDomainSocket {
     }
     serverThread.join(120000);
     clientThread.join(120000);
-    serv.close();
+    serv.close(true);
     for (PassedFile pf : passedFiles) {
       pf.cleanup();
     }