浏览代码

HADOOP-10198. DomainSocket: add support for socketpair. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1554891 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 年之前
父节点
当前提交
4cf8f575fc

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -112,6 +112,9 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10169. Remove the unnecessary synchronized in JvmMetrics class.
     (Liang Xie via jing9) 
 
+    HADOOP-10198. DomainSocket: add support for socketpair.
+    (Colin Patrick McCabe via wang)
+
   OPTIMIZATIONS
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java

@@ -276,6 +276,24 @@ public class DomainSocket implements Closeable {
     return new DomainSocket(path, fd);
   }
 
+  /**
+   * Create a pair of UNIX domain sockets which are connected to each other
+   * by calling socketpair(2).
+   *
+   * @return                An array of two UNIX domain sockets connected to
+   *                        each other.
+   * @throws IOException    on error.
+   */
+  public static DomainSocket[] socketpair() throws IOException {
+    int fds[] = socketpair0();
+    return new DomainSocket[] {
+      new DomainSocket("(anonymous0)", fds[0]),
+      new DomainSocket("(anonymous1)", fds[1])
+    };
+  }
+
+  private static native int[] socketpair0() throws IOException;
+
   private static native int accept0(int fd) throws IOException;
 
   /**

+ 44 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c

@@ -364,6 +364,50 @@ JNIEnv *env, jclass clazz, jstring path)
   return fd;
 }
 
+#define SOCKETPAIR_ARRAY_LEN 2
+
+JNIEXPORT jarray JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_socketpair0(
+JNIEnv *env, jclass clazz)
+{
+  jarray arr = NULL;
+  int idx, err, fds[SOCKETPAIR_ARRAY_LEN] = { -1, -1 };
+  jthrowable jthr = NULL;
+
+  arr = (*env)->NewIntArray(env, SOCKETPAIR_ARRAY_LEN);
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  if (socketpair(PF_UNIX, SOCK_STREAM, 0, fds) < 0) {
+    err = errno;
+    jthr = newSocketException(env, err,
+            "socketpair(2) error: %s", terror(err));
+    goto done;
+  }
+  (*env)->SetIntArrayRegion(env, arr, 0, SOCKETPAIR_ARRAY_LEN, fds);
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+
+done:
+  if (jthr) {
+    (*env)->DeleteLocalRef(env, arr);
+    arr = NULL;
+    for (idx = 0; idx < SOCKETPAIR_ARRAY_LEN; idx++) {
+      if (fds[idx] >= 0) {
+        close(fds[idx]);
+        fds[idx] = -1;
+      }
+    }
+    (*env)->Throw(env, jthr);
+  }
+  return arr;
+}
+
 JNIEXPORT jint JNICALL
 Java_org_apache_hadoop_net_unix_DomainSocket_accept0(
 JNIEnv *env, jclass clazz, jint fd)

+ 32 - 8
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java

@@ -420,7 +420,8 @@ public class TestDomainSocket {
    * @throws IOException
    */
   void testClientServer1(final Class<? extends WriteStrategy> writeStrategyClass,
-      final Class<? extends ReadStrategy> readStrategyClass) throws Exception {
+      final Class<? extends ReadStrategy> readStrategyClass,
+      final DomainSocket preConnectedSockets[]) throws Exception {
     final String TEST_PATH = new File(sockDir.getDir(),
         "test_sock_client_server1").getAbsolutePath();
     final byte clientMsg1[] = new byte[] { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6 };
@@ -428,13 +429,15 @@ public class TestDomainSocket {
     final byte clientMsg2 = 0x45;
     final ArrayBlockingQueue<Throwable> threadResults =
         new ArrayBlockingQueue<Throwable>(2);
-    final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    final DomainSocket serv = (preConnectedSockets != null) ?
+      null : DomainSocket.bindAndListen(TEST_PATH);
     Thread serverThread = new Thread() {
       public void run(){
         // Run server
         DomainSocket conn = null;
         try {
-          conn = serv.accept();
+          conn = preConnectedSockets != null ?
+                    preConnectedSockets[0] : serv.accept();
           byte in1[] = new byte[clientMsg1.length];
           ReadStrategy reader = readStrategyClass.newInstance();
           reader.init(conn);
@@ -459,7 +462,8 @@ public class TestDomainSocket {
     Thread clientThread = new Thread() {
       public void run(){
         try {
-          DomainSocket client = DomainSocket.connect(TEST_PATH);
+          DomainSocket client = preConnectedSockets != null ?
+                preConnectedSockets[1] : DomainSocket.connect(TEST_PATH);
           WriteStrategy writer = writeStrategyClass.newInstance();
           writer.init(client);
           writer.write(clientMsg1);
@@ -487,25 +491,45 @@ public class TestDomainSocket {
     }
     serverThread.join(120000);
     clientThread.join(120000);
-    serv.close();
+    if (serv != null) {
+      serv.close();
+    }
   }
 
   @Test(timeout=180000)
   public void testClientServerOutStreamInStream() throws Exception {
     testClientServer1(OutputStreamWriteStrategy.class,
-        InputStreamReadStrategy.class);
+        InputStreamReadStrategy.class, null);
+  }
+
+  @Test(timeout=180000)
+  public void testClientServerOutStreamInStreamWithSocketpair() throws Exception {
+    testClientServer1(OutputStreamWriteStrategy.class,
+        InputStreamReadStrategy.class, DomainSocket.socketpair());
   }
 
   @Test(timeout=180000)
   public void testClientServerOutStreamInDbb() throws Exception {
     testClientServer1(OutputStreamWriteStrategy.class,
-        DirectByteBufferReadStrategy.class);
+        DirectByteBufferReadStrategy.class, null);
+  }
+
+  @Test(timeout=180000)
+  public void testClientServerOutStreamInDbbWithSocketpair() throws Exception {
+    testClientServer1(OutputStreamWriteStrategy.class,
+        DirectByteBufferReadStrategy.class, DomainSocket.socketpair());
   }
 
   @Test(timeout=180000)
   public void testClientServerOutStreamInAbb() throws Exception {
     testClientServer1(OutputStreamWriteStrategy.class,
-        ArrayBackedByteBufferReadStrategy.class);
+        ArrayBackedByteBufferReadStrategy.class, null);
+  }
+
+  @Test(timeout=180000)
+  public void testClientServerOutStreamInAbbWithSocketpair() throws Exception {
+    testClientServer1(OutputStreamWriteStrategy.class,
+        ArrayBackedByteBufferReadStrategy.class, DomainSocket.socketpair());
   }
 
   static private class PassedFile {