Browse Source

HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1431102 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 12 years ago
parent
commit
d94621a0cd
17 changed files with 2615 additions and 1 deletions
  1. 1 0
      hadoop-common-project/hadoop-common/pom.xml
  2. 2 1
      hadoop-common-project/hadoop-common/src/CMakeLists.txt
  3. 619 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
  4. 109 0
      hadoop-common-project/hadoop-common/src/main/native/src/exception.c
  5. 82 0
      hadoop-common-project/hadoop-common/src/main/native/src/exception.h
  6. 924 0
      hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c
  7. 4 0
      hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
  8. 58 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
  9. 576 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
  11. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
  12. 117 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
  13. 88 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
  14. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
  15. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
  16. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
  17. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java

+ 1 - 0
hadoop-common-project/hadoop-common/pom.xml

@@ -491,6 +491,7 @@
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
+                    <javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
                   </javahClassNames>
                   <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
                 </configuration>

+ 2 - 1
hadoop-common-project/hadoop-common/src/CMakeLists.txt

@@ -144,10 +144,10 @@ add_executable(test_bulk_crc32
     ${D}/util/bulk_crc32.c
     ${T}/util/test_bulk_crc32.c
 )
-set_property(SOURCE main.cpp PROPERTY INCLUDE_DIRECTORIES "\"-Werror\" \"-Wall\"")
 
 SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
 add_dual_library(hadoop
+    main/native/src/exception.c
     ${D}/io/compress/lz4/Lz4Compressor.c
     ${D}/io/compress/lz4/Lz4Decompressor.c
     ${D}/io/compress/lz4/lz4.c
@@ -157,6 +157,7 @@ add_dual_library(hadoop
     ${D}/io/nativeio/NativeIO.c
     ${D}/io/nativeio/errno_enum.c
     ${D}/io/nativeio/file_descriptor.c
+    ${D}/net/unix/DomainSocket.c
     ${D}/security/JniBasedUnixGroupsMapping.c
     ${D}/security/JniBasedUnixGroupsNetgroupMapping.c
     ${D}/security/getGroup.c

+ 619 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java

@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.Closeable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The implementation of UNIX domain sockets in Java.
+ * 
+ * See {@link DomainSocket} for more information about UNIX domain sockets.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+public class DomainSocket implements Closeable {
+  static {
+    if (SystemUtils.IS_OS_WINDOWS) {
+      loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
+    } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
+      loadingFailureReason = "libhadoop cannot be loaded.";
+    } else {
+      String problem = "DomainSocket#anchorNative got error: unknown";
+      try {
+        anchorNative();
+        problem = null;
+      } catch (Throwable t) {
+        problem = "DomainSocket#anchorNative got error: " + t.getMessage();
+      }
+      loadingFailureReason = problem;
+    }
+  }
+
+  static Log LOG = LogFactory.getLog(DomainSocket.class);
+
+  /**
+   * True only if we should validate the paths used in {@link DomainSocket#bind()}
+   */
+  private static boolean validateBindPaths = true;
+
+  /**
+   * The reason why DomainSocket is not available, or null if it is available.
+   */
+  private final static String loadingFailureReason;
+
+  /**
+   * Initialize the native library code.
+   */
+  private static native void anchorNative();
+
+  /**
+   * This function is designed to validate that the path chosen for a UNIX
+   * domain socket is secure.  A socket path is secure if it doesn't allow
+   * unprivileged users to perform a man-in-the-middle attack against it.
+   * For example, one way to perform a man-in-the-middle attack would be for
+   * a malicious user to move the server socket out of the way and create his
+   * own socket in the same place.  Not good.
+   * 
+   * Note that we only check the path once.  It's possible that the
+   * permissions on the path could change, perhaps to something more relaxed,
+   * immediately after the path passes our validation test-- hence creating a
+   * security hole.  However, the purpose of this check is to spot common
+   * misconfigurations.  System administrators do not commonly change
+   * permissions on these paths while the server is running.
+   *
+   * @param path             the path to validate
+   * @param skipComponents   the number of starting path components to skip 
+   *                         validation for (used only for testing)
+   */
+  @VisibleForTesting
+  native static void validateSocketPathSecurity0(String path,
+      int skipComponents) throws IOException;
+
+  /**
+   * Return true only if UNIX domain sockets are available.
+   */
+  public static String getLoadingFailureReason() {
+    return loadingFailureReason;
+  }
+
+  /**
+   * Disable validation of the server bind paths.
+   */
+  @VisibleForTesting
+  static void disableBindPathValidation() {
+    validateBindPaths = false;
+  }
+
+  /**
+   * Given a path and a port, compute the effective path by replacing
+   * occurrences of __PORT__ with the port.  This is mainly to make it 
+   * possible to run multiple DataNodes locally for testing purposes.
+   *
+   * @param path            The source path
+   * @param port            Port number to use
+   *
+   * @return                The effective path
+   */
+  public static String getEffectivePath(String path, int port) {
+    return path.replace("__PORT__", String.valueOf(port));
+  }
+
+  /**
+   * Status bits
+   * 
+   * Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
+   * Bits 29 to 0: the reference count.
+   */
+  private final AtomicInteger status;
+
+  /**
+   * Bit mask representing a closed domain socket. 
+   */
+  private static final int STATUS_CLOSED_MASK = 1 << 30;
+
+  /**
+   * The file descriptor associated with this UNIX domain socket.
+   */
+  private final int fd;
+
+  /**
+   * The path associated with this UNIX domain socket.
+   */
+  private final String path;
+
+  /**
+   * The InputStream associated with this socket.
+   */
+  private final DomainInputStream inputStream = new DomainInputStream();
+
+  /**
+   * The OutputStream associated with this socket.
+   */
+  private final DomainOutputStream outputStream = new DomainOutputStream();
+
+  /**
+   * The Channel associated with this socket.
+   */
+  private final DomainChannel channel = new DomainChannel();
+
+  private DomainSocket(String path, int fd) {
+    this.status = new AtomicInteger(0);
+    this.fd = fd;
+    this.path = path;
+  }
+
+  private static native int bind0(String path) throws IOException;
+
+  /**
+   * Create a new DomainSocket listening on the given path.
+   *
+   * @param path         The path to bind and listen on.
+   * @return             The new DomainSocket.
+   */
+  public static DomainSocket bindAndListen(String path) throws IOException {
+    if (loadingFailureReason != null) {
+      throw new UnsupportedOperationException(loadingFailureReason);
+    }
+    if (validateBindPaths) {
+      validateSocketPathSecurity0(path, 0);
+    }
+    int fd = bind0(path);
+    return new DomainSocket(path, fd);
+  }
+
+  private static native int accept0(int fd) throws IOException;
+
+  /**
+   * Accept a new UNIX domain connection.
+   *
+   * This method can only be used on sockets that were bound with bind().
+   *
+   * @return                              The new connection.
+   * @throws IOException                  If there was an I/O error
+   *                                      performing the accept-- such as the
+   *                                      socket being closed from under us.
+   * @throws SocketTimeoutException       If the accept timed out.
+   */
+  public DomainSocket accept() throws IOException {
+    fdRef();
+    try {
+      return new DomainSocket(path, accept0(fd));
+    } finally {
+      fdUnref();
+    }
+  }
+
+  private static native int connect0(String path);
+
+  /**
+   * Create a new DomainSocket connected to the given path.
+   *
+   * @param path         The path to connect to.
+   * @return             The new DomainSocket.
+   */
+  public static DomainSocket connect(String path) throws IOException {
+    if (loadingFailureReason != null) {
+      throw new UnsupportedOperationException(loadingFailureReason);
+    }
+    int fd = connect0(path);
+    return new DomainSocket(path, fd);
+  }
+
+  /**
+   * Increment the reference count of the underlying file descriptor.
+   *
+   * @throws SocketException  If the file descriptor is closed.
+   */
+  private void fdRef() throws SocketException {
+    int bits = status.incrementAndGet();
+    if ((bits & STATUS_CLOSED_MASK) != 0) {
+      status.decrementAndGet();
+      throw new SocketException("Socket is closed.");
+    }
+  }
+
+  /**
+   * Decrement the reference count of the underlying file descriptor.
+   */
+  private void fdUnref() {
+    int newCount = status.decrementAndGet();
+    assert newCount >= 0;
+  }
+
+  /**
+   * Return true if the file descriptor is currently open.
+   * 
+   * @return                 True if the file descriptor is currently open.
+   */
+  public boolean isOpen() {
+    return ((status.get() & STATUS_CLOSED_MASK) == 0);
+  }
+
+  /**
+   * @return                 The socket path.
+   */
+  public String getPath() {
+    return path;
+  }
+
+  /**
+   * @return                 The socket InputStream
+   */
+  public DomainInputStream getInputStream() {
+    return inputStream;
+  }
+
+  /**
+   * @return                 The socket OutputStream
+   */
+  public DomainOutputStream getOutputStream() {
+    return outputStream;
+  }
+
+  /**
+   * @return                 The socket Channel
+   */
+  public DomainChannel getChannel() {
+    return channel;
+  }
+
+  public static final int SND_BUF_SIZE = 1;
+  public static final int RCV_BUF_SIZE = 2;
+  public static final int SND_TIMEO = 3;
+  public static final int RCV_TIMEO = 4;
+
+  private static native void setAttribute0(int fd, int type, int val)
+      throws IOException;
+
+  public void setAttribute(int type, int size) throws IOException {
+    fdRef();
+    try {
+      setAttribute0(fd, type, size);
+    } finally {
+      fdUnref();
+    }
+  }
+
+  private native int getAttribute0(int fd, int type) throws IOException;
+
+  public int getAttribute(int type) throws IOException {
+    fdRef();
+    try {
+      return getAttribute0(fd, type);
+    } finally {
+      fdUnref();
+    }
+  }
+
+  private static native void close0(int fd) throws IOException;
+
+  private static native void closeFileDescriptor0(FileDescriptor fd)
+      throws IOException;
+
+  private static native void shutdown0(int fd) throws IOException;
+
+  /**
+   * Close the Socket.
+   */
+  @Override
+  public void close() throws IOException {
+    // Set the closed bit on this DomainSocket
+    int bits;
+    while (true) {
+      bits = status.get();
+      if ((bits & STATUS_CLOSED_MASK) != 0) {
+        return; // already closed
+      }
+      if (status.compareAndSet(bits, bits | STATUS_CLOSED_MASK)) {
+        break;
+      }
+    }
+    // Wait for all references to go away
+    boolean didShutdown = false;
+    boolean interrupted = false;
+    while ((bits & (~STATUS_CLOSED_MASK)) > 0) {
+      if (!didShutdown) {
+        try {
+          // Calling shutdown on the socket will interrupt blocking system
+          // calls like accept, write, and read that are going on in a
+          // different thread.
+          shutdown0(fd);
+        } catch (IOException e) {
+          LOG.error("shutdown error: ", e);
+        }
+        didShutdown = true;
+      }
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        interrupted = true;
+      }
+      bits = status.get();
+    }
+
+    // Close the file descriptor.  After this point, the file descriptor
+    // number will be reused by something else.  Although this DomainSocket
+    // object continues to hold the old file descriptor number (it's a final
+    // field), we never use it again because we look at the closed bit and
+    // realize that this DomainSocket is not usable.
+    close0(fd);
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /*
+   * Clean up if the user forgets to close the socket.
+   */
+  protected void finalize() throws IOException {
+    close();
+  }
+
+  private native static void sendFileDescriptors0(int fd, FileDescriptor jfds[],
+      byte jbuf[], int offset, int length) throws IOException;
+
+  /**
+   * Send some FileDescriptor objects to the process on the other side of this
+   * socket.
+   * 
+   * @param jfds              The file descriptors to send.
+   * @param jbuf              Some bytes to send.  You must send at least
+   *                          one byte.
+   * @param offset            The offset in the jbuf array to start at.
+   * @param length            Length of the jbuf array to use.
+   */
+  public void sendFileDescriptors(FileDescriptor jfds[],
+      byte jbuf[], int offset, int length) throws IOException {
+    fdRef();
+    try {
+      sendFileDescriptors0(fd, jfds, jbuf, offset, length);
+    } finally {
+      fdUnref();
+    }
+  }
+
+  private static native int receiveFileDescriptors0(int fd, FileDescriptor[] jfds,
+      byte jbuf[], int offset, int length) throws IOException;
+
+  /**
+   * Receive some FileDescriptor objects from the process on the other side of
+   * this socket.
+   *
+   * @param jfds              (output parameter) Array of FileDescriptors.
+   *                          We will fill as many slots as possible with file
+   *                          descriptors passed from the remote process.  The
+   *                          other slots will contain NULL.
+   * @param jbuf              (output parameter) Buffer to read into.
+   *                          The UNIX domain sockets API requires you to read
+   *                          at least one byte from the remote process, even
+   *                          if all you care about is the file descriptors
+   *                          you will receive.
+   * @param offset            Offset into the byte buffer to load data
+   * @param length            Length of the byte buffer to use for data
+   *
+   * @return                  The number of bytes read.  This will be -1 if we
+   *                          reached EOF (similar to SocketInputStream);
+   *                          otherwise, it will be positive.
+   * @throws                  IOException if there was an I/O error.
+   */
+  public int receiveFileDescriptors(FileDescriptor[] jfds,
+      byte jbuf[], int offset, int length) throws IOException {
+    fdRef();
+    try {
+      return receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
+    } finally {
+      fdUnref();
+    }
+  }
+
+  /**
+   * Receive some FileDescriptor objects from the process on the other side of
+   * this socket, and wrap them in FileInputStream objects.
+   *
+   * See {@link DomainSocket#recvFileInputStreams(ByteBuffer)}
+   */
+  public int recvFileInputStreams(FileInputStream[] fis, byte buf[],
+        int offset, int length) throws IOException {
+    FileDescriptor fds[] = new FileDescriptor[fis.length];
+    boolean success = false;
+    for (int i = 0; i < fis.length; i++) {
+      fis[i] = null;
+    }
+    fdRef();
+    try {
+      int ret = receiveFileDescriptors0(fd, fds, buf, offset, length);
+      for (int i = 0, j = 0; i < fds.length; i++) {
+        if (fds[i] != null) {
+          fis[j++] = new FileInputStream(fds[i]);
+          fds[i] = null;
+        }
+      }
+      success = true;
+      return ret;
+    } finally {
+      fdUnref();
+      if (!success) {
+        for (int i = 0; i < fds.length; i++) {
+          if (fds[i] != null) {
+            try {
+              closeFileDescriptor0(fds[i]);
+            } catch (Throwable t) {
+              LOG.warn(t);
+            }
+          } else if (fis[i] != null) {
+            try {
+              fis[i].close();
+            } catch (Throwable t) {
+              LOG.warn(t);
+            } finally {
+              fis[i] = null; }
+          }
+        }
+      }
+    }
+  }
+
+  private native static int readArray0(int fd, byte b[], int off, int len)
+      throws IOException;
+  
+  private native static int available0(int fd) throws IOException;
+
+  private static native void write0(int fd, int b) throws IOException;
+
+  private static native void writeArray0(int fd, byte b[], int offset, int length)
+      throws IOException;
+
+  private native static int readByteBufferDirect0(int fd, ByteBuffer dst,
+      int position, int remaining) throws IOException;
+
+  /**
+   * Input stream for UNIX domain sockets.
+   */
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  public class DomainInputStream extends InputStream {
+    @Override
+    public int read() throws IOException {
+      fdRef();
+      try {
+        byte b[] = new byte[1];
+        int ret = DomainSocket.readArray0(DomainSocket.this.fd, b, 0, 1);
+        return (ret >= 0) ? b[0] : -1;
+      } finally {
+        fdUnref();
+      }
+    }
+    
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+      fdRef();
+      try {
+        return DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
+      } finally {
+        fdUnref();
+      }
+    }
+
+    @Override
+    public int available() throws IOException {
+      fdRef();
+      try {
+        return DomainSocket.available0(DomainSocket.this.fd);
+      } finally {
+        fdUnref();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      DomainSocket.this.close();
+    }
+  }
+
+  /**
+   * Output stream for UNIX domain sockets.
+   */
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  public class DomainOutputStream extends OutputStream {
+    @Override
+    public void close() throws IOException {
+      DomainSocket.this.close();
+    }
+
+    @Override
+    public void write(int val) throws IOException {
+      fdRef();
+      try {
+        byte b[] = new byte[1];
+        b[0] = (byte)val;
+        DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
+      } finally {
+        fdUnref();
+      }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      fdRef();
+      try {
+        DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
+      } finally {
+        fdUnref();
+      }
+    }
+  }
+
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  public class DomainChannel implements ReadableByteChannel {
+    @Override
+    public boolean isOpen() {
+      return DomainSocket.this.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      DomainSocket.this.close();
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      fdRef();
+      try {
+        int nread = 0;
+        if (dst.isDirect()) {
+          nread = DomainSocket.readByteBufferDirect0(DomainSocket.this.fd,
+              dst, dst.position(), dst.remaining());
+        } else if (dst.hasArray()) {
+          nread = DomainSocket.readArray0(DomainSocket.this.fd,
+              dst.array(), dst.position() + dst.arrayOffset(),
+              dst.remaining());
+        } else {
+          throw new AssertionError("we don't support " +
+              "using ByteBuffers that aren't either direct or backed by " +
+              "arrays");
+        }
+        if (nread > 0) {
+          dst.position(dst.position() + nread);
+        }
+        return nread;
+      } finally {
+        fdUnref();
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("DomainSocket(fd=%d,path=%s)", fd, path);
+  }
+}

+ 109 - 0
hadoop-common-project/hadoop-common/src/main/native/src/exception.c

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "exception.h"
+
+#include <jni.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+jthrowable newExceptionV(JNIEnv* env, const char *name,
+                         const char *fmt, va_list ap)
+{
+  int need;
+  char buf[1], *msg = NULL;
+  va_list ap2;
+  jstring jstr = NULL;
+  jthrowable jthr;
+  jclass clazz;
+  jmethodID excCtor;
+
+  va_copy(ap2, ap);
+  clazz = (*env)->FindClass(env, name);
+  if (!clazz) {
+    jthr = (*env)->ExceptionOccurred(env);
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  excCtor = (*env)->GetMethodID(env,
+        clazz, "<init>", "(Ljava/lang/String;)V");
+  if (!excCtor) {
+    jthr = (*env)->ExceptionOccurred(env);
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  need = vsnprintf(buf, sizeof(buf), fmt, ap);
+  if (need < 0) {
+    fmt = "vsnprintf error";
+    need = strlen(fmt);
+  }
+  msg = malloc(need + 1);
+  vsnprintf(msg, need + 1, fmt, ap2);
+  jstr = (*env)->NewStringUTF(env, msg);
+  if (!jstr) {
+    jthr = (*env)->ExceptionOccurred(env);
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  jthr = (*env)->NewObject(env, clazz, excCtor, jstr);
+  if (!jthr) {
+    jthr = (*env)->ExceptionOccurred(env);
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+
+done:
+  free(msg);
+  va_end(ap2);
+  (*env)->DeleteLocalRef(env, jstr);
+  return jthr;
+}
+
+jthrowable newException(JNIEnv* env, const char *name, const char *fmt, ...)
+{
+  va_list ap;
+  jthrowable jthr;
+
+  va_start(ap, fmt);
+  jthr = newExceptionV(env, name, fmt, ap);
+  va_end(ap);
+  return jthr;
+}
+
+jthrowable newRuntimeException(JNIEnv* env, const char *fmt, ...)
+{
+  va_list ap;
+  jthrowable jthr;
+
+  va_start(ap, fmt);
+  jthr = newExceptionV(env, "java/lang/RuntimeException", fmt, ap);
+  va_end(ap);
+  return jthr;
+}
+
+jthrowable newIOException(JNIEnv* env, const char *fmt, ...)
+{
+  va_list ap;
+  jthrowable jthr;
+
+  va_start(ap, fmt);
+  jthr = newExceptionV(env, "java/io/IOException", fmt, ap);
+  va_end(ap);
+  return jthr;
+}

+ 82 - 0
hadoop-common-project/hadoop-common/src/main/native/src/exception.h

@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+#ifndef HADOOP_MAIN_NATIVE_SRC_EXCEPTION_H
+#define HADOOP_MAIN_NATIVE_SRC_EXCEPTION_H
+
+#include <jni.h> /* for jthrowable */
+#include <stdarg.h> /* for va_list */
+
+/**
+ * Create a new Exception.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env           The JNI environment
+ * @param name          full name of the Java exception class
+ * @param fmt           printf-style format string
+ * @param ap            printf-style arguments
+ *
+ * @return              The RuntimeException
+ */
+jthrowable newExceptionV(JNIEnv* env, const char *name,
+                         const char *fmt, va_list ap);
+
+/**
+ * Create a new Exception.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env           The JNI environment
+ * @param name          full name of the Java exception class
+ * @param fmt           printf-style format string
+ * @param ...           printf-style arguments
+ *
+ * @return              The RuntimeException
+ */
+jthrowable newException(JNIEnv* env, const char *name, const char *fmt, ...)
+    __attribute__((format(printf, 3, 4)));
+
+/**
+ * Create a new RuntimeException.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env           The JNI environment
+ * @param fmt           printf-style format string
+ * @param ...           printf-style arguments
+ *
+ * @return              The RuntimeException
+ */
+jthrowable newRuntimeException(JNIEnv* env, const char *fmt, ...)
+    __attribute__((format(printf, 2, 3)));
+
+/**
+ * Create a new IOException.
+ *
+ * No exceptions will be pending on return.
+ *
+ * @param env           The JNI environment
+ * @param fmt           printf-style format string
+ * @param ...           printf-style arguments
+ *
+ * @return              The IOException, or another exception if we failed
+ *                      to create the NativeIOException.
+ */
+jthrowable newIOException(JNIEnv* env, const char *fmt, ...)
+    __attribute__((format(printf, 2, 3)));
+
+#endif

+ 924 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c

@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include "exception.h"
+#include "org/apache/hadoop/io/nativeio/file_descriptor.h"
+#include "org_apache_hadoop.h"
+#include "org_apache_hadoop_net_unix_DomainSocket.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <jni.h>
+#include <limits.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/ioctl.h> /* for FIONREAD */
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#define SND_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_SND_BUF_SIZE
+#define RCV_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_RCV_BUF_SIZE
+#define SND_TIMEO org_apache_hadoop_net_unix_DomainSocket_SND_TIMEO
+#define RCV_TIMEO org_apache_hadoop_net_unix_DomainSocket_RCV_TIMEO
+
+#define DEFAULT_RCV_TIMEO 120000
+#define DEFAULT_SND_TIMEO 120000
+#define LISTEN_BACKLOG 128
+
+/**
+ * Can't pass more than this number of file descriptors in a single message.
+ */
+#define MAX_PASSED_FDS 16
+
+static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val);
+
+/**
+ * Convert an errno to a socket exception name.
+ *
+ * Note: we assume that all of these exceptions have a one-argument constructor
+ * that takes a string.
+ *
+ * @return               The exception class name
+ */
+static const char *errnoToSocketExceptionName(int errnum)
+{
+  switch (errnum) {
+  case EAGAIN:
+    /* accept(2) returns EAGAIN when a socket timeout has been set, and that
+     * timeout elapses without an incoming connection.  This error code is also
+     * used in non-blocking I/O, but we don't support that. */
+  case ETIMEDOUT:
+    return "java/net/SocketTimeoutException";
+  case EHOSTDOWN:
+  case EHOSTUNREACH:
+  case ECONNREFUSED:
+    return "java/net/NoRouteToHostException";
+  case ENOTSUP:
+    return "java/lang/UnsupportedOperationException";
+  default:
+    return "java/net/SocketException";
+  }
+}
+
+static jthrowable newSocketException(JNIEnv *env, int errnum,
+                                     const char *fmt, ...)
+    __attribute__((format(printf, 3, 4)));
+
+static jthrowable newSocketException(JNIEnv *env, int errnum,
+                                     const char *fmt, ...)
+{
+  va_list ap;
+  jthrowable jthr;
+
+  va_start(ap, fmt);
+  jthr = newExceptionV(env, errnoToSocketExceptionName(errnum), fmt, ap);
+  va_end(ap);
+  return jthr;
+}
+
+static const char* terror(int errnum)
+{
+  if ((errnum < 0) || (errnum >= sys_nerr)) {
+    return "unknown error.";
+  }
+  return sys_errlist[errnum];
+}
+
+/**
+ * Flexible buffer that will try to fit data on the stack, and fall back
+ * to the heap if necessary.
+ */
+struct flexibleBuffer {
+  int8_t *curBuf;
+  int8_t *allocBuf;
+  int8_t stackBuf[8196];
+};
+
+static jthrowable flexBufInit(JNIEnv *env, struct flexibleBuffer *flexBuf, jint length)
+{
+  flexBuf->curBuf = flexBuf->allocBuf = NULL;
+  if (length < sizeof(flexBuf->stackBuf)) {
+    flexBuf->curBuf = flexBuf->stackBuf;
+    return NULL;
+  }
+  flexBuf->allocBuf = malloc(length);
+  if (!flexBuf->allocBuf) {
+    return newException(env, "java/lang/OutOfMemoryError",
+        "OOM allocating space for %d bytes of data.", length);
+  }
+  flexBuf->curBuf = flexBuf->allocBuf;
+  return NULL;
+}
+
+static void flexBufFree(struct flexibleBuffer *flexBuf)
+{
+  free(flexBuf->allocBuf);
+}
+
+static jthrowable setup(JNIEnv *env, int *ofd, jobject jpath, int doConnect)
+{
+  const char *cpath = NULL;
+  struct sockaddr_un addr;
+  jthrowable jthr = NULL;
+  int fd = -1, ret;
+
+  fd = socket(PF_UNIX, SOCK_STREAM, 0);
+  if (fd < 0) {
+    ret = errno;
+    jthr = newSocketException(env, ret,
+        "error creating UNIX domain socket with SOCK_STREAM: %s",
+        terror(ret));
+    goto done;
+  }
+  memset(&addr, 0, sizeof(&addr));
+  addr.sun_family = AF_UNIX;
+  cpath = (*env)->GetStringUTFChars(env, jpath, NULL);
+  if (!cpath) {
+    jthr = (*env)->ExceptionOccurred(env);
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  ret = snprintf(addr.sun_path, sizeof(addr.sun_path),
+                 "%s", cpath);
+  if (ret < 0) {
+    ret = errno;
+    jthr = newSocketException(env, EIO,
+        "error computing UNIX domain socket path: error %d (%s)",
+        ret, terror(ret));
+    goto done;
+  }
+  if (ret >= sizeof(addr.sun_path)) {
+    jthr = newSocketException(env, ENAMETOOLONG,
+        "error computing UNIX domain socket path: path too long.  "
+        "The longest UNIX domain socket path possible on this host "
+        "is %zd bytes.", sizeof(addr.sun_path) - 1);
+    goto done;
+  }
+  if (doConnect) {
+    RETRY_ON_EINTR(ret, connect(fd, 
+        (struct sockaddr*)&addr, sizeof(addr)));
+    if (ret < 0) {
+      ret = errno;
+      jthr = newException(env, "java/net/ConnectException",
+              "connect(2) error: %s when trying to connect to '%s'",
+              terror(ret), addr.sun_path);
+      goto done;
+    }
+  } else {
+    RETRY_ON_EINTR(ret, unlink(addr.sun_path));
+    RETRY_ON_EINTR(ret, bind(fd, (struct sockaddr*)&addr, sizeof(addr)));
+    if (ret < 0) {
+      ret = errno;
+      jthr = newException(env, "java/net/BindException",
+              "bind(2) error: %s when trying to bind to '%s'",
+              terror(ret), addr.sun_path);
+      goto done;
+    }
+    if (listen(fd, LISTEN_BACKLOG) < 0) {
+      ret = errno;
+      jthr = newException(env, "java/net/BindException",
+              "listen(2) error: %s when trying to listen to '%s'",
+              terror(ret), addr.sun_path);
+      goto done;
+    }
+  }
+
+done:
+  if (cpath) {
+    (*env)->ReleaseStringUTFChars(env, jpath, cpath);
+  }
+  if (jthr) {
+    if (fd > 0) {
+      RETRY_ON_EINTR(ret, close(fd));
+      fd = -1;
+    }
+  } else {
+    *ofd = fd;
+  }
+  return jthr;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_anchorNative(
+JNIEnv *env, jclass clazz)
+{
+  fd_init(env); // for fd_get, fd_create, etc.
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_validateSocketPathSecurity0(
+JNIEnv *env, jclass clazz, jobject jstr, jint skipComponents)
+{
+  jint utfLength;
+  char path[PATH_MAX], check[PATH_MAX] = { 0 }, *token, *rest;
+  struct stat st;
+  int ret, mode, strlenPath;
+  uid_t uid;
+  jthrowable jthr = NULL;
+
+  utfLength = (*env)->GetStringUTFLength(env, jstr);
+  if (utfLength > sizeof(path)) {
+    jthr = newIOException(env, "path is too long!  We expected a path "
+        "no longer than %zd UTF-8 bytes.", sizeof(path));
+    goto done;
+  }
+  (*env)->GetStringUTFRegion(env, jstr, 0, utfLength, path);
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  uid = geteuid();
+  strlenPath = strlen(path);
+  if (strlenPath == 0) {
+    jthr = newIOException(env, "socket path is empty.");
+    goto done;
+  }
+  if (path[strlenPath - 1] == '/') {
+    /* It makes no sense to have a socket path that ends in a slash, since
+     * sockets are not directories. */
+    jthr = newIOException(env, "bad socket path '%s'.  The socket path "
+          "must not end in a slash.", path);
+    goto done;
+  }
+  rest = path;
+  while ((token = strtok_r(rest, "/", &rest))) {
+    strcat(check, "/");
+    strcat(check, token);
+    if (skipComponents > 0) {
+      skipComponents--;
+      continue;
+    }
+    if (!index(rest, '/')) {
+      /* Don't validate the last component, since it's not supposed to be a
+       * directory.  (If it is a directory, we will fail to create the socket
+       * later with EISDIR or similar.)
+       */
+      break;
+    }
+    if (stat(check, &st) < 0) {
+      ret = errno;
+      jthr = newIOException(env, "failed to stat a path component: '%s'.  "
+          "error code %d (%s)", check, ret, terror(ret));
+      goto done;
+    }
+    mode = st.st_mode & 0777;
+    if (mode & 0002) {
+      jthr = newIOException(env, "the path component: '%s' is "
+        "world-writable.  Its permissions are 0%03o.  Please fix "
+        "this or select a different socket path.", check, mode);
+      goto done;
+    }
+    if ((mode & 0020) && (st.st_gid != 0)) {
+      jthr = newIOException(env, "the path component: '%s' is "
+        "group-writable, and the group is not root.  Its permissions are "
+        "0%03o, and it is owned by gid %d.  Please fix this or "
+        "select a different socket path.", check, mode, st.st_gid);
+      goto done;
+    }
+    if ((mode & 0200) && (st.st_uid != 0) &&
+        (st.st_uid != uid)) {
+      jthr = newIOException(env, "the path component: '%s' is "
+        "owned by a user who is not root and not you.  Your effective user "
+        "id is %d; the path is owned by user id %d, and its permissions are "
+        "0%03o.  Please fix this or select a different socket path.",
+        check, uid, st.st_uid, mode);
+        goto done;
+      goto done;
+    }
+  }
+done:
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+  }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_bind0(
+JNIEnv *env, jclass clazz, jstring path)
+{
+  int fd;
+  jthrowable jthr = NULL;
+
+  jthr = setup(env, &fd, path, 0);
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+  }
+  return fd;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_accept0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+  int ret, newFd = -1;
+  socklen_t slen;
+  struct sockaddr_un remote;
+  jthrowable jthr = NULL;
+
+  slen = sizeof(remote);
+  do {
+    newFd = accept(fd, (struct sockaddr*)&remote, &slen);
+  } while ((newFd < 0) && (errno == EINTR));
+  if (newFd < 0) {
+    ret = errno;
+    jthr = newSocketException(env, ret, "accept(2) error: %s", terror(ret));
+    goto done;
+  }
+
+done:
+  if (jthr) {
+    if (newFd > 0) {
+      RETRY_ON_EINTR(ret, close(newFd));
+      newFd = -1;
+    }
+    (*env)->Throw(env, jthr);
+  }
+  return newFd;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_connect0(
+JNIEnv *env, jclass clazz, jstring path)
+{
+  int ret, fd;
+  jthrowable jthr = NULL;
+
+  jthr = setup(env, &fd, path, 1);
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+    return -1;
+  }
+  if (((jthr = setAttribute0(env, fd, SND_TIMEO, DEFAULT_SND_TIMEO))) || 
+      ((jthr = setAttribute0(env, fd, RCV_TIMEO, DEFAULT_RCV_TIMEO)))) {
+    RETRY_ON_EINTR(ret, close(fd));
+    (*env)->Throw(env, jthr);
+    return -1;
+  }
+  return fd;
+}
+
+static void javaMillisToTimeVal(int javaMillis, struct timeval *tv)
+{
+  tv->tv_sec = javaMillis / 1000;
+  tv->tv_usec = (javaMillis - (tv->tv_sec * 1000)) * 1000;
+}
+
+static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
+{
+  struct timeval tv;
+  int ret, buf;
+
+  switch (type) {
+  case SND_BUF_SIZE:
+    buf = val;
+    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf, sizeof(buf))) {
+      ret = errno;
+      return newSocketException(env, ret,
+          "setsockopt(SO_SNDBUF) error: %s", terror(ret));
+    }
+    return NULL;
+  case RCV_BUF_SIZE:
+    buf = val;
+    if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf, sizeof(buf))) {
+      ret = errno;
+      return newSocketException(env, ret,
+          "setsockopt(SO_RCVBUF) error: %s", terror(ret));
+    }
+    return NULL;
+  case SND_TIMEO:
+    javaMillisToTimeVal(val, &tv);
+    if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (struct timeval *)&tv,
+               sizeof(tv))) {
+      ret = errno;
+      return newSocketException(env, ret,
+          "setsockopt(SO_SNDTIMEO) error: %s", terror(ret));
+    }
+    return NULL;
+  case RCV_TIMEO:
+    javaMillisToTimeVal(val, &tv);
+    if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&tv,
+               sizeof(tv))) {
+      ret = errno;
+      return newSocketException(env, ret,
+          "setsockopt(SO_RCVTIMEO) error: %s", terror(ret));
+    }
+    return NULL;
+  default:
+    break;
+  }
+  return newRuntimeException(env, "Invalid attribute type %d.", type);
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_setAttribute0(
+JNIEnv *env, jclass clazz, jint fd, jint type, jint val)
+{
+  jthrowable jthr = setAttribute0(env, fd, type, val);
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+  }
+}
+
+static jint getSockOptBufSizeToJavaBufSize(int size)
+{
+#ifdef __linux__
+  // Linux always doubles the value that you set with setsockopt.
+  // We cut it in half here so that programs can at least read back the same
+  // value they set.
+  size /= 2;
+#endif
+  return size;
+}
+
+static int timeValToJavaMillis(const struct timeval *tv)
+{
+  return (tv->tv_sec * 1000) + (tv->tv_usec / 1000);
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_getAttribute0(
+JNIEnv *env, jclass clazz, jint fd, jint type)
+{
+  struct timeval tv;
+  socklen_t len;
+  int ret, rval = 0;
+
+  switch (type) {
+  case SND_BUF_SIZE:
+    len = sizeof(rval);
+    if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &rval, &len)) {
+      ret = errno;
+      (*env)->Throw(env, newSocketException(env, ret,
+          "getsockopt(SO_SNDBUF) error: %s", terror(ret)));
+      return -1;
+    }
+    return getSockOptBufSizeToJavaBufSize(rval);
+  case RCV_BUF_SIZE:
+    len = sizeof(rval);
+    if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rval, &len)) {
+      ret = errno;
+      (*env)->Throw(env, newSocketException(env, ret,
+          "getsockopt(SO_RCVBUF) error: %s", terror(ret)));
+      return -1;
+    }
+    return getSockOptBufSizeToJavaBufSize(rval);
+  case SND_TIMEO:
+    memset(&tv, 0, sizeof(tv));
+    len = sizeof(struct timeval);
+    if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &len)) {
+      ret = errno;
+      (*env)->Throw(env, newSocketException(env, ret,
+          "getsockopt(SO_SNDTIMEO) error: %s", terror(ret)));
+      return -1;
+    }
+    return timeValToJavaMillis(&tv);
+  case RCV_TIMEO:
+    memset(&tv, 0, sizeof(tv));
+    len = sizeof(struct timeval);
+    if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &len)) {
+      ret = errno;
+      (*env)->Throw(env, newSocketException(env, ret,
+          "getsockopt(SO_RCVTIMEO) error: %s", terror(ret)));
+      return -1;
+    }
+    return timeValToJavaMillis(&tv);
+  default:
+    (*env)->Throw(env, newRuntimeException(env,
+          "Invalid attribute type %d.", type));
+    return -1;
+  }
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_close0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+  int ret;
+
+  RETRY_ON_EINTR(ret, close(fd));
+  if (ret) {
+    ret = errno;
+    (*env)->Throw(env, newSocketException(env, ret,
+          "close(2) error: %s", terror(ret)));
+  }
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_closeFileDescriptor0(
+JNIEnv *env, jclass clazz, jobject jfd)
+{
+  Java_org_apache_hadoop_net_unix_DomainSocket_close0(
+      env, clazz, fd_get(env, jfd));
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_shutdown0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+  int ret;
+
+  RETRY_ON_EINTR(ret, shutdown(fd, SHUT_RDWR));
+  if (ret) {
+    ret = errno;
+    (*env)->Throw(env, newSocketException(env, ret,
+          "shutdown(2) error: %s", terror(ret)));
+  }
+}
+
+/**
+ * Write an entire buffer to a file descriptor.
+ *
+ * @param env            The JNI environment.
+ * @param fd             The fd to write to.
+ * @param buf            The buffer to write
+ * @param amt            The length of the buffer to write.
+ * @return               NULL on success; or the unraised exception representing
+ *                       the problem.
+ */
+static jthrowable write_fully(JNIEnv *env, int fd, int8_t *buf, int amt)
+{
+  int err, res;
+
+  while (amt > 0) {
+    res = write(fd, buf, amt);
+    if (res < 0) {
+      err = errno;
+      if (err == EINTR) {
+        continue;
+      }
+      return newSocketException(env, err, "write(2) error: %s", terror(err));
+    }
+    amt -= res;
+    buf += res;
+  }
+  return NULL;
+}
+
+/**
+ * Our auxillary data setup.
+ *
+ * See man 3 cmsg for more information about auxillary socket data on UNIX.
+ *
+ * We use __attribute__((packed)) to ensure that the compiler doesn't insert any
+ * padding between 'hdr' and 'fds'.
+ * We use __attribute__((aligned(8)) to ensure that the compiler puts the start
+ * of the structure at an address which is a multiple of 8.  If we did not do
+ * this, the attribute((packed)) would cause the compiler to generate a lot of
+ * slow code for accessing unaligned memory.
+ */
+struct cmsghdr_with_fds {
+  struct cmsghdr hdr;
+  int fds[MAX_PASSED_FDS];
+}  __attribute__((packed,aligned(8)));
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_sendFileDescriptors0(
+JNIEnv *env, jclass clazz, jint fd, jobject jfds, jobject jbuf,
+jint offset, jint length)
+{
+  struct iovec vec[1];
+  struct flexibleBuffer flexBuf;
+  struct cmsghdr_with_fds aux;
+  jint jfdsLen;
+  int i, ret = -1, auxLen;
+  struct msghdr socketMsg;
+  jthrowable jthr = NULL;
+
+  jthr = flexBufInit(env, &flexBuf, length);
+  if (jthr) {
+    goto done;
+  }
+  if (length <= 0) {
+    jthr = newException(env, "java/lang/IllegalArgumentException",
+        "You must write at least one byte.");
+    goto done;
+  }
+  jfdsLen = (*env)->GetArrayLength(env, jfds);
+  if (jfdsLen <= 0) {
+    jthr = newException(env, "java/lang/IllegalArgumentException",
+        "Called sendFileDescriptors with no file descriptors.");
+    goto done;
+  } else if (jfdsLen > MAX_PASSED_FDS) {
+    jfdsLen = 0;
+    jthr = newException(env, "java/lang/IllegalArgumentException",
+          "Called sendFileDescriptors with an array of %d length.  "
+          "The maximum is %d.", jfdsLen, MAX_PASSED_FDS);
+    goto done;
+  }
+  (*env)->GetByteArrayRegion(env, jbuf, offset, length, flexBuf.curBuf); 
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  memset(&vec, 0, sizeof(vec));
+  vec[0].iov_base = flexBuf.curBuf;
+  vec[0].iov_len = length;
+  auxLen = CMSG_LEN(jfdsLen * sizeof(int));
+  memset(&aux, 0, auxLen);
+  memset(&socketMsg, 0, sizeof(socketMsg));
+  socketMsg.msg_iov = vec;
+  socketMsg.msg_iovlen = 1;
+  socketMsg.msg_control = &aux;
+  socketMsg.msg_controllen = auxLen;
+  aux.hdr.cmsg_len = auxLen;
+  aux.hdr.cmsg_level = SOL_SOCKET;
+  aux.hdr.cmsg_type = SCM_RIGHTS;
+  for (i = 0; i < jfdsLen; i++) {
+    jobject jfd = (*env)->GetObjectArrayElement(env, jfds, i);
+    if (!jfd) {
+      jthr = (*env)->ExceptionOccurred(env);
+      if (jthr) {
+        (*env)->ExceptionClear(env);
+        goto done;
+      }
+      jthr = newException(env, "java/lang/NullPointerException",
+            "element %d of jfds was NULL.", i);
+      goto done;
+    }
+    aux.fds[i] = fd_get(env, jfd);
+    (*env)->DeleteLocalRef(env, jfd);
+    if (jthr) {
+      goto done;
+    }
+  }
+  RETRY_ON_EINTR(ret, sendmsg(fd, &socketMsg, 0));
+  if (ret < 0) {
+    ret = errno;
+    jthr = newSocketException(env, ret, "sendmsg(2) error: %s", terror(ret));
+    goto done;
+  }
+  length -= ret;
+  if (length > 0) {
+    // Write the rest of the bytes we were asked to send.
+    // This time, no fds will be attached.
+    jthr = write_fully(env, fd, flexBuf.curBuf + ret, length);
+    if (jthr) {
+      goto done;
+    }
+  }
+
+done:
+  flexBufFree(&flexBuf);
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+  }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_receiveFileDescriptors0(
+JNIEnv *env, jclass clazz, jint fd, jarray jfds, jarray jbuf,
+jint offset, jint length)
+{
+  struct iovec vec[1];
+  struct flexibleBuffer flexBuf;
+  struct cmsghdr_with_fds aux;
+  int i, jRecvFdsLen = 0, auxLen;
+  jint jfdsLen = 0;
+  struct msghdr socketMsg;
+  ssize_t bytesRead = -1;
+  jobject fdObj;
+  jthrowable jthr = NULL;
+
+  jthr = flexBufInit(env, &flexBuf, length);
+  if (jthr) {
+    goto done;
+  }
+  if (length <= 0) {
+    jthr = newRuntimeException(env, "You must read at least one byte.");
+    goto done;
+  }
+  jfdsLen = (*env)->GetArrayLength(env, jfds);
+  if (jfdsLen <= 0) {
+    jthr = newException(env, "java/lang/IllegalArgumentException",
+        "Called receiveFileDescriptors with an array of %d length.  "
+        "You must pass at least one fd.", jfdsLen);
+    goto done;
+  } else if (jfdsLen > MAX_PASSED_FDS) {
+    jfdsLen = 0;
+    jthr = newException(env, "java/lang/IllegalArgumentException",
+          "Called receiveFileDescriptors with an array of %d length.  "
+          "The maximum is %d.", jfdsLen, MAX_PASSED_FDS);
+    goto done;
+  }
+  for (i = 0; i < jfdsLen; i++) {
+    (*env)->SetObjectArrayElement(env, jfds, i, NULL);
+  }
+  vec[0].iov_base = flexBuf.curBuf;
+  vec[0].iov_len = length;
+  auxLen = CMSG_LEN(jfdsLen * sizeof(int));
+  memset(&aux, 0, auxLen);
+  memset(&socketMsg, 0, auxLen);
+  socketMsg.msg_iov = vec;
+  socketMsg.msg_iovlen = 1;
+  socketMsg.msg_control = &aux;
+  socketMsg.msg_controllen = auxLen;
+  aux.hdr.cmsg_len = auxLen;
+  aux.hdr.cmsg_level = SOL_SOCKET;
+  aux.hdr.cmsg_type = SCM_RIGHTS;
+  RETRY_ON_EINTR(bytesRead, recvmsg(fd, &socketMsg, 0));
+  if (bytesRead < 0) {
+    int ret = errno;
+    if (ret == ECONNABORTED) {
+      // The remote peer disconnected on us.  Treat this as an EOF.
+      bytesRead = -1;
+      goto done;
+    }
+    jthr = newSocketException(env, ret, "recvmsg(2) failed: %s",
+                              terror(ret));
+    goto done;
+  } else if (bytesRead == 0) {
+    bytesRead = -1;
+    goto done;
+  }
+  jRecvFdsLen = (aux.hdr.cmsg_len - sizeof(struct cmsghdr)) / sizeof(int);
+  for (i = 0; i < jRecvFdsLen; i++) {
+    fdObj = fd_create(env, aux.fds[i]);
+    if (!fdObj) {
+      jthr = (*env)->ExceptionOccurred(env);
+      (*env)->ExceptionClear(env);
+      goto done;
+    }
+    // Make this -1 so we don't attempt to close it twice in an error path.
+    aux.fds[i] = -1;
+    (*env)->SetObjectArrayElement(env, jfds, i, fdObj);
+    // There is no point keeping around a local reference to the fdObj.
+    // The array continues to reference it.
+    (*env)->DeleteLocalRef(env, fdObj);
+  }
+  (*env)->SetByteArrayRegion(env, jbuf, offset, length, flexBuf.curBuf);
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+done:
+  flexBufFree(&flexBuf);
+  if (jthr) {
+    // Free any FileDescriptor references we may have created,
+    // or file descriptors we may have been passed.
+    for (i = 0; i < jRecvFdsLen; i++) {
+      if (aux.fds[i] >= 0) {
+        RETRY_ON_EINTR(i, close(aux.fds[i]));
+        aux.fds[i] = -1;
+      }
+      fdObj = (*env)->GetObjectArrayElement(env, jfds, i);
+      if (fdObj) {
+        int ret, afd = fd_get(env, fdObj);
+        if (afd >= 0) {
+          RETRY_ON_EINTR(ret, close(afd));
+        }
+        (*env)->SetObjectArrayElement(env, jfds, i, NULL);
+        (*env)->DeleteLocalRef(env, fdObj);
+      }
+    }
+    (*env)->Throw(env, jthr);
+  }
+  return bytesRead;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_readArray0(
+JNIEnv *env, jclass clazz, jint fd, jarray b, jint offset, jint length)
+{
+  int ret = -1;
+  struct flexibleBuffer flexBuf;
+  jthrowable jthr;
+
+  jthr = flexBufInit(env, &flexBuf, length);
+  if (jthr) {
+    goto done;
+  }
+  RETRY_ON_EINTR(ret, read(fd, flexBuf.curBuf, length));
+  if (ret < 0) {
+    ret = errno;
+    if (ret == ECONNABORTED) {
+      // The remote peer disconnected on us.  Treat this as an EOF.
+      ret = -1;
+      goto done;
+    }
+    jthr = newSocketException(env, ret, "read(2) error: %s", 
+                              terror(ret));
+    goto done;
+  }
+  if (ret == 0) {
+    goto done;
+  }
+  (*env)->SetByteArrayRegion(env, b, offset, ret, flexBuf.curBuf);
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+done:
+  flexBufFree(&flexBuf);
+  if (jthr) { 
+    (*env)->Throw(env, jthr);
+  }
+  return ret == 0 ? -1 : ret; // Java wants -1 on EOF
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_available0(
+JNIEnv *env, jclass clazz, jint fd)
+{
+  int ret, avail = 0;
+  jthrowable jthr = NULL;
+
+  RETRY_ON_EINTR(ret, ioctl(fd, FIONREAD, &avail));
+  if (ret < 0) {
+    ret = errno;
+    jthr = newSocketException(env, ret,
+              "ioctl(%d, FIONREAD) error: %s", fd, terror(ret));
+    goto done;
+  }
+done:
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+  }
+  return avail;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_writeArray0(
+JNIEnv *env, jclass clazz, jint fd, jarray b, jint offset, jint length)
+{
+  struct flexibleBuffer flexBuf;
+  jthrowable jthr;
+
+  jthr = flexBufInit(env, &flexBuf, length);
+  if (jthr) {
+    goto done;
+  }
+  (*env)->GetByteArrayRegion(env, b, offset, length, flexBuf.curBuf);
+  jthr = (*env)->ExceptionOccurred(env);
+  if (jthr) {
+    (*env)->ExceptionClear(env);
+    goto done;
+  }
+  jthr = write_fully(env, fd, flexBuf.curBuf, length);
+  if (jthr) {
+    goto done;
+  }
+
+done:
+  flexBufFree(&flexBuf);
+  if (jthr) { 
+    (*env)->Throw(env, jthr);
+  }
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_net_unix_DomainSocket_readByteBufferDirect0(
+JNIEnv *env, jclass clazz, jint fd, jobject dst, jint position, jint remaining)
+{
+  uint8_t *buf;
+  jthrowable jthr = NULL;
+  int res = -1;
+
+  buf = (*env)->GetDirectBufferAddress(env, dst);
+  if (!buf) {
+    jthr = newRuntimeException(env, "GetDirectBufferAddress failed.");
+    goto done;
+  }
+  RETRY_ON_EINTR(res, read(fd, buf + position, remaining));
+  if (res < 0) {
+    res = errno;
+    if (res != ECONNABORTED) {
+      jthr = newSocketException(env, res, "read(2) error: %s", 
+                                terror(res));
+      goto done;
+    } else {
+      // The remote peer disconnected on us.  Treat this as an EOF.
+      res = -1;
+    }
+  }
+done:
+  if (jthr) {
+    (*env)->Throw(env, jthr);
+  }
+  return res;
+}

+ 4 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h

@@ -99,6 +99,10 @@ void *do_dlsym(JNIEnv *env, void *handle, const char *symbol) {
     THROW(env, "java/lang/InternalError", exception_msg); \
   }
 
+#define RETRY_ON_EINTR(ret, expr) do { \
+  ret = expr; \
+} while ((ret == -1) && (errno == EINTR));
+
 #endif
 
 //vim: sw=2: ts=2: et

+ 58 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * Create a temporary directory in which sockets can be created.
+ * When creating a UNIX domain socket, the name
+ * must be fairly short (around 110 bytes on most platforms).
+ */
+public class TemporarySocketDirectory implements Closeable {
+  private File dir;
+
+  public TemporarySocketDirectory() {
+    String tmp = System.getProperty("java.io.tmpdir", "/tmp");
+    dir = new File(tmp, "socks." + (System.currentTimeMillis() +
+        "." + (new Random().nextInt())));
+    dir.mkdirs();
+    dir.setWritable(true, true);
+  }
+
+  public File getDir() {
+    return dir;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (dir != null) {
+      FileUtils.deleteDirectory(dir);
+      dir = null;
+    }
+  }
+
+  protected void finalize() throws IOException {
+    close();
+  }
+}

+ 576 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java

@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net.unix;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket.DomainChannel;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+
+import com.google.common.io.Files;
+
+public class TestDomainSocket {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+  
+  @Before
+  public void checkPrecondition() {
+    Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+  }
+    
+  /**
+   * Test that we can create a socket and close it, even if it hasn't been
+   * opened.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testSocketCreateAndClose() throws IOException {
+    DomainSocket serv = DomainSocket.bindAndListen(
+      new File(sockDir.getDir(), "test_sock_create_and_close").
+        getAbsolutePath());
+    serv.close();
+  }
+
+  /**
+   * Test DomainSocket path setting and getting.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testSocketPathSetGet() throws IOException {
+    Assert.assertEquals("/var/run/hdfs/sock.100",
+        DomainSocket.getEffectivePath("/var/run/hdfs/sock.__PORT__", 100));
+  }
+
+  /**
+   * Test that if one thread is blocking in accept(), another thread
+   * can close the socket and stop the accept.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testSocketAcceptAndClose() throws Exception {
+    final String TEST_PATH =
+        new File(sockDir.getDir(), "test_sock_accept_and_close").getAbsolutePath();
+    final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    ExecutorService exeServ = Executors.newSingleThreadExecutor();
+    Callable<Void> callable = new Callable<Void>() {
+      public Void call(){
+        try {
+          serv.accept();
+          throw new RuntimeException("expected the accept() to be " +
+              "interrupted and fail");
+        } catch (IOException e) {
+          return null;
+        }
+      }
+    };
+    Future<Void> future = exeServ.submit(callable);
+    Thread.sleep(500);
+    serv.close();
+    future.get(2, TimeUnit.MINUTES);
+  }
+
+  /**
+   * Test that attempting to connect to an invalid path doesn't work.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testInvalidOperations() throws IOException {
+    try {
+      DomainSocket.connect(
+        new File(sockDir.getDir(), "test_sock_invalid_operation").
+          getAbsolutePath());
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("connect(2) error: ", e);
+    }
+  }
+
+  /**
+   * Test setting some server options.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testServerOptions() throws Exception {
+    final String TEST_PATH = new File(sockDir.getDir(),
+        "test_sock_server_options").getAbsolutePath();
+    DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    try {
+      // Let's set a new receive buffer size
+      int bufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE);
+      int newBufSize = bufSize / 2;
+      serv.setAttribute(DomainSocket.RCV_BUF_SIZE, newBufSize);
+      int nextBufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE);
+      Assert.assertEquals(newBufSize, nextBufSize);
+      // Let's set a server timeout
+      int newTimeout = 1000;
+      serv.setAttribute(DomainSocket.RCV_TIMEO, newTimeout);
+      int nextTimeout = serv.getAttribute(DomainSocket.RCV_TIMEO);
+      Assert.assertEquals(newTimeout, nextTimeout);
+      try {
+        serv.accept();
+        Assert.fail("expected the accept() to time out and fail");
+      } catch (SocketTimeoutException e) {
+        GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+      }
+    } finally {
+      serv.close();
+      Assert.assertFalse(serv.isOpen());
+    }
+  }
+  
+  /**
+   * A Throwable representing success.
+   *
+   * We can't use null to represent this, because you cannot insert null into
+   * ArrayBlockingQueue.
+   */
+  static class Success extends Throwable {
+    private static final long serialVersionUID = 1L;
+  }
+  
+  static interface WriteStrategy {
+    /**
+     * Initialize a WriteStrategy object from a Socket.
+     */
+    public void init(DomainSocket s) throws IOException;
+    
+    /**
+     * Write some bytes.
+     */
+    public void write(byte b[]) throws IOException;
+  }
+  
+  static class OutputStreamWriteStrategy implements WriteStrategy {
+    private OutputStream outs = null;
+    
+    public void init(DomainSocket s) throws IOException {
+      outs = s.getOutputStream();
+    }
+    
+    public void write(byte b[]) throws IOException {
+      outs.write(b);
+    }
+  }
+  
+  abstract static class ReadStrategy {
+    /**
+     * Initialize a ReadStrategy object from a DomainSocket.
+     */
+    public abstract void init(DomainSocket s) throws IOException;
+    
+    /**
+     * Read some bytes.
+     */
+    public abstract int read(byte b[], int off, int length) throws IOException;
+    
+    public void readFully(byte buf[], int off, int len) throws IOException {
+      int toRead = len;
+      while (toRead > 0) {
+        int ret = read(buf, off, toRead);
+        if (ret < 0) {
+          throw new IOException( "Premature EOF from inputStream");
+        }
+        toRead -= ret;
+        off += ret;
+      }
+    }
+  }
+  
+  static class InputStreamReadStrategy extends ReadStrategy {
+    private InputStream ins = null;
+    
+    @Override
+    public void init(DomainSocket s) throws IOException {
+      ins = s.getInputStream();
+    }
+
+    @Override
+    public int read(byte b[], int off, int length) throws IOException {
+      return ins.read(b, off, length);
+    }
+  }
+  
+  static class DirectByteBufferReadStrategy extends ReadStrategy {
+    private DomainChannel ch = null;
+
+    @Override
+    public void init(DomainSocket s) throws IOException {
+      ch = s.getChannel();
+    }
+    
+    @Override
+    public int read(byte b[], int off, int length) throws IOException {
+      ByteBuffer buf = ByteBuffer.allocateDirect(b.length);
+      int nread = ch.read(buf);
+      if (nread < 0) return nread;
+      buf.flip();
+      buf.get(b, off, nread);
+      return nread;
+    }
+  }
+
+  static class ArrayBackedByteBufferReadStrategy extends ReadStrategy {
+    private DomainChannel ch = null;
+    
+    @Override
+    public void init(DomainSocket s) throws IOException {
+      ch = s.getChannel();
+    }
+    
+    @Override
+    public int read(byte b[], int off, int length) throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(b);
+      int nread = ch.read(buf);
+      if (nread < 0) return nread;
+      buf.flip();
+      buf.get(b, off, nread);
+      return nread;
+    }
+  }
+  
+  /**
+   * Test a simple client/server interaction.
+   *
+   * @throws IOException
+   */
+  void testClientServer1(final Class<? extends WriteStrategy> writeStrategyClass,
+      final Class<? extends ReadStrategy> readStrategyClass) throws Exception {
+    final String TEST_PATH = new File(sockDir.getDir(),
+        "test_sock_client_server1").getAbsolutePath();
+    final byte clientMsg1[] = new byte[] { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6 };
+    final byte serverMsg1[] = new byte[] { 0x9, 0x8, 0x7, 0x6, 0x5 };
+    final byte clientMsg2 = 0x45;
+    final ArrayBlockingQueue<Throwable> threadResults =
+        new ArrayBlockingQueue<Throwable>(2);
+    final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    Thread serverThread = new Thread() {
+      public void run(){
+        // Run server
+        DomainSocket conn = null;
+        try {
+          conn = serv.accept();
+          byte in1[] = new byte[clientMsg1.length];
+          ReadStrategy reader = readStrategyClass.newInstance();
+          reader.init(conn);
+          reader.readFully(in1, 0, in1.length);
+          Assert.assertTrue(Arrays.equals(clientMsg1, in1));
+          WriteStrategy writer = writeStrategyClass.newInstance();
+          writer.init(conn);
+          writer.write(serverMsg1);
+          InputStream connInputStream = conn.getInputStream();
+          int in2 = connInputStream.read();
+          Assert.assertEquals((int)clientMsg2, in2);
+          conn.close();
+        } catch (Throwable e) {
+          threadResults.add(e);
+          Assert.fail(e.getMessage());
+        }
+        threadResults.add(new Success());
+      }
+    };
+    serverThread.start();
+    
+    Thread clientThread = new Thread() {
+      public void run(){
+        try {
+          DomainSocket client = DomainSocket.connect(TEST_PATH);
+          WriteStrategy writer = writeStrategyClass.newInstance();
+          writer.init(client);
+          writer.write(clientMsg1);
+          ReadStrategy reader = readStrategyClass.newInstance();
+          reader.init(client);
+          byte in1[] = new byte[serverMsg1.length];
+          reader.readFully(in1, 0, in1.length);
+          Assert.assertTrue(Arrays.equals(serverMsg1, in1));
+          OutputStream clientOutputStream = client.getOutputStream();
+          clientOutputStream.write(clientMsg2);
+          client.close();
+        } catch (Throwable e) {
+          threadResults.add(e);
+        }
+        threadResults.add(new Success());
+      }
+    };
+    clientThread.start();
+    
+    for (int i = 0; i < 2; i++) {
+      Throwable t = threadResults.take();
+      if (!(t instanceof Success)) {
+        Assert.fail(t.getMessage() + ExceptionUtils.getStackTrace(t));
+      }
+    }
+    serverThread.join(120000);
+    clientThread.join(120000);
+    serv.close();
+  }
+
+  @Test(timeout=180000)
+  public void testClientServerOutStreamInStream() throws Exception {
+    testClientServer1(OutputStreamWriteStrategy.class,
+        InputStreamReadStrategy.class);
+  }
+
+  @Test(timeout=180000)
+  public void testClientServerOutStreamInDbb() throws Exception {
+    testClientServer1(OutputStreamWriteStrategy.class,
+        DirectByteBufferReadStrategy.class);
+  }
+
+  @Test(timeout=180000)
+  public void testClientServerOutStreamInAbb() throws Exception {
+    testClientServer1(OutputStreamWriteStrategy.class,
+        ArrayBackedByteBufferReadStrategy.class);
+  }
+
+  static private class PassedFile {
+    private final int idx;
+    private final byte[] contents;
+    private FileInputStream fis;
+    
+    public PassedFile(int idx) throws IOException {
+      this.idx = idx;
+      this.contents = new byte[] { (byte)(idx % 127) };
+      Files.write(contents, new File(getPath()));
+      this.fis = new FileInputStream(getPath());
+    }
+
+    public String getPath() {
+      return new File(sockDir.getDir(), "passed_file" + idx).getAbsolutePath();
+    }
+
+    public FileInputStream getInputStream() throws IOException {
+      return fis;
+    }
+    
+    public void cleanup() throws IOException {
+      new File(getPath()).delete();
+      fis.close();
+    }
+
+    public void checkInputStream(FileInputStream fis) throws IOException {
+      byte buf[] = new byte[contents.length];
+      IOUtils.readFully(fis, buf, 0, buf.length);
+      Arrays.equals(contents, buf);
+    }
+    
+    protected void finalize() {
+      try {
+        cleanup();
+      } catch(Throwable t) {
+        // ignore
+      }
+    }
+  }
+
+  /**
+   * Test file descriptor passing.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testFdPassing() throws Exception {
+    final String TEST_PATH =
+        new File(sockDir.getDir(), "test_sock").getAbsolutePath();
+    final byte clientMsg1[] = new byte[] { 0x11, 0x22, 0x33, 0x44, 0x55, 0x66 };
+    final byte serverMsg1[] = new byte[] { 0x31, 0x30, 0x32, 0x34, 0x31, 0x33,
+          0x44, 0x1, 0x1, 0x1, 0x1, 0x1 };
+    final ArrayBlockingQueue<Throwable> threadResults =
+        new ArrayBlockingQueue<Throwable>(2);
+    final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    final PassedFile passedFiles[] =
+        new PassedFile[] { new PassedFile(1), new PassedFile(2) };
+    final FileDescriptor passedFds[] = new FileDescriptor[passedFiles.length];
+    for (int i = 0; i < passedFiles.length; i++) {
+      passedFds[i] = passedFiles[i].getInputStream().getFD();
+    }
+    Thread serverThread = new Thread() {
+      public void run(){
+        // Run server
+        DomainSocket conn = null;
+        try {
+          conn = serv.accept();
+          byte in1[] = new byte[clientMsg1.length];
+          InputStream connInputStream = conn.getInputStream();
+          IOUtils.readFully(connInputStream, in1, 0, in1.length);
+          Assert.assertTrue(Arrays.equals(clientMsg1, in1));
+          DomainSocket domainConn = (DomainSocket)conn;
+          domainConn.sendFileDescriptors(passedFds, serverMsg1, 0,
+              serverMsg1.length);
+          conn.close();
+        } catch (Throwable e) {
+          threadResults.add(e);
+          Assert.fail(e.getMessage());
+        }
+        threadResults.add(new Success());
+      }
+    };
+    serverThread.start();
+
+    Thread clientThread = new Thread() {
+      public void run(){
+        try {
+          DomainSocket client = DomainSocket.connect(TEST_PATH);
+          OutputStream clientOutputStream = client.getOutputStream();
+          InputStream clientInputStream = client.getInputStream();
+          clientOutputStream.write(clientMsg1);
+          DomainSocket domainConn = (DomainSocket)client;
+          byte in1[] = new byte[serverMsg1.length];
+          FileInputStream recvFis[] = new FileInputStream[passedFds.length];
+          int r = domainConn.
+              recvFileInputStreams(recvFis, in1, 0, in1.length - 1);
+          Assert.assertTrue(r > 0);
+          IOUtils.readFully(clientInputStream, in1, r, in1.length - r);
+          Assert.assertTrue(Arrays.equals(serverMsg1, in1));
+          for (int i = 0; i < passedFds.length; i++) {
+            Assert.assertNotNull(recvFis[i]);
+            passedFiles[i].checkInputStream(recvFis[i]);
+          }
+          for (FileInputStream fis : recvFis) {
+            fis.close();
+          }
+          client.close();
+        } catch (Throwable e) {
+          threadResults.add(e);
+        }
+        threadResults.add(new Success());
+      }
+    };
+    clientThread.start();
+    
+    for (int i = 0; i < 2; i++) {
+      Throwable t = threadResults.take();
+      if (!(t instanceof Success)) {
+        Assert.fail(t.getMessage() + ExceptionUtils.getStackTrace(t));
+      }
+    }
+    serverThread.join(120000);
+    clientThread.join(120000);
+    serv.close();
+    for (PassedFile pf : passedFiles) {
+      pf.cleanup();
+    }
+  }
+  
+  /**
+   * Run validateSocketPathSecurity
+   *
+   * @param str            The path to validate
+   * @param prefix         A prefix to skip validation for
+   * @throws IOException
+   */
+  private static void testValidateSocketPath(String str, String prefix)
+      throws IOException {
+    int skipComponents = 0;
+    File prefixFile = new File(prefix);
+    while (true) {
+      prefixFile = prefixFile.getParentFile();
+      if (prefixFile == null) {
+        break;
+      }
+      skipComponents++;
+    }
+    DomainSocket.validateSocketPathSecurity0(str,
+        skipComponents);
+  }
+  
+  /**
+   * Test file descriptor path security.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testFdPassingPathSecurity() throws Exception {
+    TemporarySocketDirectory tmp = new TemporarySocketDirectory();
+    try {
+      String prefix = tmp.getDir().getAbsolutePath();
+      Shell.execCommand(new String [] {
+          "mkdir", "-p", prefix + "/foo/bar/baz" });
+      Shell.execCommand(new String [] {
+          "chmod", "0700", prefix + "/foo/bar/baz" });
+      Shell.execCommand(new String [] {
+          "chmod", "0700", prefix + "/foo/bar" });
+      Shell.execCommand(new String [] {
+          "chmod", "0707", prefix + "/foo" });
+      Shell.execCommand(new String [] {
+          "mkdir", "-p", prefix + "/q1/q2" });
+      Shell.execCommand(new String [] {
+          "chmod", "0700", prefix + "/q1" });
+      Shell.execCommand(new String [] {
+          "chmod", "0700", prefix + "/q1/q2" });
+      testValidateSocketPath(prefix + "/q1/q2", prefix);
+      try {
+        testValidateSocketPath(prefix + "/foo/bar/baz", prefix);
+      } catch (IOException e) {
+        GenericTestUtils.assertExceptionContains("/foo' is world-writable.  " +
+            "Its permissions are 0707.  Please fix this or select a " +
+            "different socket path.", e);
+      }
+      try {
+        testValidateSocketPath(prefix + "/nope", prefix);
+      } catch (IOException e) {
+        GenericTestUtils.assertExceptionContains("failed to stat a path " +
+            "component: ", e);
+      }
+      // Root should be secure
+      DomainSocket.validateSocketPathSecurity0("/foo", 0);
+    } finally {
+      tmp.close();
+    }
+  }
+}

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt

@@ -4,3 +4,6 @@ These will be integrated to trunk CHANGES.txt after merge
 
 HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
 (Colin Patrick McCabe via todd)
+
+HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
+(Colin Patrick McCabe via todd)

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java

@@ -23,6 +23,8 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.channels.ReadableByteChannel;
 
+import org.apache.hadoop.net.unix.DomainSocket;
+
 /**
  * Represents a peer that we communicate with by using a basic Socket
  * that has no associated Channel.
@@ -118,4 +120,9 @@ class BasicInetPeer implements Peer {
   public String toString() {
     return "BasicInetPeer(" + socket.toString() + ")";
   }
+
+  @Override
+  public DomainSocket getDomainSocket() {
+    return null;
+  }
 }

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O 
+ * on a UNIX domain socket.
+ */
+@InterfaceAudience.Private
+public class DomainPeer implements Peer {
+  private final DomainSocket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final ReadableByteChannel channel;
+
+  public DomainPeer(DomainSocket socket) {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.channel = socket.getChannel();
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.RCV_TIMEO, timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getAttribute(DomainSocket.RCV_BUF_SIZE);
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    /* No TCP, no TCP_NODELAY. */
+    return false;
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.SND_TIMEO, timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return !socket.isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    socket.close();
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return "unix:" + socket.getPath();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return "<local>";
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    /* UNIX domain sockets can only be used for local communication. */
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "DomainPeer(" + getRemoteAddressString() + ")";
+  }
+
+  @Override
+  public DomainSocket getDomainSocket() {
+    return socket;
+  }
+}

+ 88 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.net.PeerServer;
+import org.apache.hadoop.net.unix.DomainSocket;
+
+class DomainPeerServer implements PeerServer {
+  static Log LOG = LogFactory.getLog(DomainPeerServer.class);
+  private final DomainSocket sock;
+
+  DomainPeerServer(DomainSocket sock) {
+    this.sock = sock;
+  }
+
+  public DomainPeerServer(String path, int port) 
+      throws IOException {
+    this(DomainSocket.bindAndListen(DomainSocket.getEffectivePath(path, port)));
+  }
+  
+  public String getBindPath() {
+    return sock.getPath();
+  }
+
+  @Override
+  public void setReceiveBufferSize(int size) throws IOException {
+    sock.setAttribute(DomainSocket.RCV_BUF_SIZE, size);
+  }
+
+  @Override
+  public Peer accept() throws IOException, SocketTimeoutException {
+    DomainSocket connSock = sock.accept();
+    Peer peer = null;
+    boolean success = false;
+    try {
+      peer = new DomainPeer(connSock);
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        if (peer != null) peer.close();
+        connSock.close();
+      }
+    }
+  }
+
+  @Override
+  public String getListeningString() {
+    return "unix:" + sock.getPath();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    try {
+      sock.close();
+    } catch (IOException e) {
+      LOG.error("error closing DomainPeerServer: ", e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "DomainPeerServer(" + getListeningString() + ")";
+  }
+}

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.net.unix.DomainSocket;
 
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -133,4 +134,9 @@ public class EncryptedPeer implements Peer {
   public String toString() {
     return "EncryptedPeer(" + enclosedPeer + ")";
   }
+
+  @Override
+  public DomainSocket getDomainSocket() {
+    return enclosedPeer.getDomainSocket();
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java

@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import org.apache.hadoop.net.SocketInputStream;
 import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.net.unix.DomainSocket;
 
 /**
  * Represents a peer that we communicate with by using non-blocking I/O 
@@ -122,4 +123,9 @@ class NioInetPeer implements Peer {
   public String toString() {
     return "NioInetPeer(" + socket.toString() + ")";
   }
+
+  @Override
+  public DomainSocket getDomainSocket() {
+    return null;
+  }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java

@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.net.unix.DomainSocket;
 
 /**
  * Represents a connection to a peer.
@@ -105,4 +106,10 @@ public interface Peer extends Closeable {
    *                       computer as we.
    */
   public boolean isLocal();
+
+  /**
+   * @return               The DomainSocket associated with the current
+   *                       peer, or null if there is none.
+   */
+  public DomainSocket getDomainSocket();
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java

@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.junit.Test;
 
 public class TestPeerCache {
@@ -114,6 +115,11 @@ public class TestPeerCache {
     public String toString() {
       return "FakePeer(dnId=" + dnId + ")";
     }
+
+    @Override
+    public DomainSocket getDomainSocket() {
+      return null;
+    }
   }
 
   @Test