|
@@ -0,0 +1,944 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+#define _GNU_SOURCE
|
|
|
+
|
|
|
+#include "exception.h"
|
|
|
+#include "org/apache/hadoop/io/nativeio/file_descriptor.h"
|
|
|
+#include "org_apache_hadoop.h"
|
|
|
+#include "org_apache_hadoop_net_unix_DomainSocket.h"
|
|
|
+
|
|
|
+#include <errno.h>
|
|
|
+#include <fcntl.h>
|
|
|
+#include <inttypes.h>
|
|
|
+#include <jni.h>
|
|
|
+#include <limits.h>
|
|
|
+#include <stdint.h>
|
|
|
+#include <stdio.h>
|
|
|
+#include <stdlib.h>
|
|
|
+#include <sys/ioctl.h> /* for FIONREAD */
|
|
|
+#include <sys/socket.h>
|
|
|
+#include <sys/stat.h>
|
|
|
+#include <sys/types.h>
|
|
|
+#include <sys/un.h>
|
|
|
+#include <unistd.h>
|
|
|
+
|
|
|
+#define SEND_BUFFER_SIZE org_apache_hadoop_net_unix_DomainSocket_SEND_BUFFER_SIZE
|
|
|
+#define RECEIVE_BUFFER_SIZE org_apache_hadoop_net_unix_DomainSocket_RECEIVE_BUFFER_SIZE
|
|
|
+#define SEND_TIMEOUT org_apache_hadoop_net_unix_DomainSocket_SEND_TIMEOUT
|
|
|
+#define RECEIVE_TIMEOUT org_apache_hadoop_net_unix_DomainSocket_RECEIVE_TIMEOUT
|
|
|
+
|
|
|
+#define DEFAULT_RECEIVE_TIMEOUT 120000
|
|
|
+#define DEFAULT_SEND_TIMEOUT 120000
|
|
|
+#define LISTEN_BACKLOG 128
|
|
|
+
|
|
|
+/**
|
|
|
+ * Can't pass more than this number of file descriptors in a single message.
|
|
|
+ */
|
|
|
+#define MAX_PASSED_FDS 16
|
|
|
+
|
|
|
+static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val);
|
|
|
+
|
|
|
+/**
|
|
|
+ * Convert an errno to a socket exception name.
|
|
|
+ *
|
|
|
+ * Note: we assume that all of these exceptions have a one-argument constructor
|
|
|
+ * that takes a string.
|
|
|
+ *
|
|
|
+ * @return The exception class name
|
|
|
+ */
|
|
|
+static const char *errnoToSocketExceptionName(int errnum)
|
|
|
+{
|
|
|
+ switch (errnum) {
|
|
|
+ case EAGAIN:
|
|
|
+ /* accept(2) returns EAGAIN when a socket timeout has been set, and that
|
|
|
+ * timeout elapses without an incoming connection. This error code is also
|
|
|
+ * used in non-blocking I/O, but we don't support that. */
|
|
|
+ case ETIMEDOUT:
|
|
|
+ return "java/net/SocketTimeoutException";
|
|
|
+ case EHOSTDOWN:
|
|
|
+ case EHOSTUNREACH:
|
|
|
+ case ECONNREFUSED:
|
|
|
+ return "java/net/NoRouteToHostException";
|
|
|
+ case ENOTSUP:
|
|
|
+ return "java/lang/UnsupportedOperationException";
|
|
|
+ default:
|
|
|
+ return "java/net/SocketException";
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static jthrowable newSocketException(JNIEnv *env, int errnum,
|
|
|
+ const char *fmt, ...)
|
|
|
+ __attribute__((format(printf, 3, 4)));
|
|
|
+
|
|
|
+static jthrowable newSocketException(JNIEnv *env, int errnum,
|
|
|
+ const char *fmt, ...)
|
|
|
+{
|
|
|
+ va_list ap;
|
|
|
+ jthrowable jthr;
|
|
|
+
|
|
|
+ va_start(ap, fmt);
|
|
|
+ jthr = newExceptionV(env, errnoToSocketExceptionName(errnum), fmt, ap);
|
|
|
+ va_end(ap);
|
|
|
+ return jthr;
|
|
|
+}
|
|
|
+
|
|
|
+static const char* terror(int errnum)
|
|
|
+{
|
|
|
+ if ((errnum < 0) || (errnum >= sys_nerr)) {
|
|
|
+ return "unknown error.";
|
|
|
+ }
|
|
|
+ return sys_errlist[errnum];
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Flexible buffer that will try to fit data on the stack, and fall back
|
|
|
+ * to the heap if necessary.
|
|
|
+ */
|
|
|
+struct flexibleBuffer {
|
|
|
+ int8_t *curBuf;
|
|
|
+ int8_t *allocBuf;
|
|
|
+ int8_t stackBuf[8196];
|
|
|
+};
|
|
|
+
|
|
|
+static jthrowable flexBufInit(JNIEnv *env, struct flexibleBuffer *flexBuf, jint length)
|
|
|
+{
|
|
|
+ flexBuf->curBuf = flexBuf->allocBuf = NULL;
|
|
|
+ if (length < sizeof(flexBuf->stackBuf)) {
|
|
|
+ flexBuf->curBuf = flexBuf->stackBuf;
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ flexBuf->allocBuf = malloc(length);
|
|
|
+ if (!flexBuf->allocBuf) {
|
|
|
+ return newException(env, "java/lang/OutOfMemoryError",
|
|
|
+ "OOM allocating space for %d bytes of data.", length);
|
|
|
+ }
|
|
|
+ flexBuf->curBuf = flexBuf->allocBuf;
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
+static void flexBufFree(struct flexibleBuffer *flexBuf)
|
|
|
+{
|
|
|
+ free(flexBuf->allocBuf);
|
|
|
+}
|
|
|
+
|
|
|
+static jthrowable setup(JNIEnv *env, int *ofd, jobject jpath, int doConnect)
|
|
|
+{
|
|
|
+ const char *cpath = NULL;
|
|
|
+ struct sockaddr_un addr;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+ int fd = -1, ret;
|
|
|
+
|
|
|
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
|
|
|
+ if (fd < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newSocketException(env, ret,
|
|
|
+ "error creating UNIX domain socket with SOCK_STREAM: %s",
|
|
|
+ terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ memset(&addr, 0, sizeof(&addr));
|
|
|
+ addr.sun_family = AF_UNIX;
|
|
|
+ cpath = (*env)->GetStringUTFChars(env, jpath, NULL);
|
|
|
+ if (!cpath) {
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ ret = snprintf(addr.sun_path, sizeof(addr.sun_path),
|
|
|
+ "%s", cpath);
|
|
|
+ if (ret < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newSocketException(env, EIO,
|
|
|
+ "error computing UNIX domain socket path: error %d (%s)",
|
|
|
+ ret, terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (ret >= sizeof(addr.sun_path)) {
|
|
|
+ jthr = newSocketException(env, ENAMETOOLONG,
|
|
|
+ "error computing UNIX domain socket path: path too long. "
|
|
|
+ "The longest UNIX domain socket path possible on this host "
|
|
|
+ "is %zd bytes.", sizeof(addr.sun_path) - 1);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (doConnect) {
|
|
|
+ RETRY_ON_EINTR(ret, connect(fd,
|
|
|
+ (struct sockaddr*)&addr, sizeof(addr)));
|
|
|
+ if (ret < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newException(env, "java/net/ConnectException",
|
|
|
+ "connect(2) error: %s when trying to connect to '%s'",
|
|
|
+ terror(ret), addr.sun_path);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ RETRY_ON_EINTR(ret, unlink(addr.sun_path));
|
|
|
+ RETRY_ON_EINTR(ret, bind(fd, (struct sockaddr*)&addr, sizeof(addr)));
|
|
|
+ if (ret < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newException(env, "java/net/BindException",
|
|
|
+ "bind(2) error: %s when trying to bind to '%s'",
|
|
|
+ terror(ret), addr.sun_path);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ /* We need to make the socket readable and writable for all users in the
|
|
|
+ * system.
|
|
|
+ *
|
|
|
+ * If the system administrator doesn't want the socket to be accessible to
|
|
|
+ * all users, he can simply adjust the +x permissions on one of the socket's
|
|
|
+ * parent directories.
|
|
|
+ *
|
|
|
+ * See HDFS-4485 for more discussion.
|
|
|
+ */
|
|
|
+ if (chmod(addr.sun_path, 0666)) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newException(env, "java/net/BindException",
|
|
|
+ "chmod(%s, 0666) failed: %s", addr.sun_path, terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (listen(fd, LISTEN_BACKLOG) < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newException(env, "java/net/BindException",
|
|
|
+ "listen(2) error: %s when trying to listen to '%s'",
|
|
|
+ terror(ret), addr.sun_path);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+done:
|
|
|
+ if (cpath) {
|
|
|
+ (*env)->ReleaseStringUTFChars(env, jpath, cpath);
|
|
|
+ }
|
|
|
+ if (jthr) {
|
|
|
+ if (fd > 0) {
|
|
|
+ RETRY_ON_EINTR(ret, close(fd));
|
|
|
+ fd = -1;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ *ofd = fd;
|
|
|
+ }
|
|
|
+ return jthr;
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_anchorNative(
|
|
|
+JNIEnv *env, jclass clazz)
|
|
|
+{
|
|
|
+ fd_init(env); // for fd_get, fd_create, etc.
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_validateSocketPathSecurity0(
|
|
|
+JNIEnv *env, jclass clazz, jobject jstr, jint skipComponents)
|
|
|
+{
|
|
|
+ jint utfLength;
|
|
|
+ char path[PATH_MAX], check[PATH_MAX], *token, *rest;
|
|
|
+ struct stat st;
|
|
|
+ int ret, mode, strlenPath;
|
|
|
+ uid_t uid;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ utfLength = (*env)->GetStringUTFLength(env, jstr);
|
|
|
+ if (utfLength > sizeof(path)) {
|
|
|
+ jthr = newIOException(env, "path is too long! We expected a path "
|
|
|
+ "no longer than %zd UTF-8 bytes.", sizeof(path));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ (*env)->GetStringUTFRegion(env, jstr, 0, utfLength, path);
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ uid = geteuid();
|
|
|
+ strlenPath = strlen(path);
|
|
|
+ if (strlenPath == 0) {
|
|
|
+ jthr = newIOException(env, "socket path is empty.");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (path[strlenPath - 1] == '/') {
|
|
|
+ /* It makes no sense to have a socket path that ends in a slash, since
|
|
|
+ * sockets are not directories. */
|
|
|
+ jthr = newIOException(env, "bad socket path '%s'. The socket path "
|
|
|
+ "must not end in a slash.", path);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ // This loop iterates through all of the path components except for the very
|
|
|
+ // last one. We don't validate the last component, since it's not supposed to
|
|
|
+ // be a directory. (If it is a directory, we will fail to create the socket
|
|
|
+ // later with EISDIR or similar.)
|
|
|
+ for (check[0] = '/', check[1] = '\0', rest = path, token = "";
|
|
|
+ token && rest[0];
|
|
|
+ token = strtok_r(rest, "/", &rest)) {
|
|
|
+ if (strcmp(check, "/") != 0) {
|
|
|
+ // If the previous directory we checked was '/', we skip appending another
|
|
|
+ // slash to the end because it would be unncessary. Otherwise we do it.
|
|
|
+ strcat(check, "/");
|
|
|
+ }
|
|
|
+ // These strcats are safe because the length of 'check' is the same as the
|
|
|
+ // length of 'path' and we never add more slashes than were in the original
|
|
|
+ // path.
|
|
|
+ strcat(check, token);
|
|
|
+ if (skipComponents > 0) {
|
|
|
+ skipComponents--;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (stat(check, &st) < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newIOException(env, "failed to stat a path component: '%s'. "
|
|
|
+ "error code %d (%s)", check, ret, terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ mode = st.st_mode & 0777;
|
|
|
+ if (mode & 0002) {
|
|
|
+ jthr = newIOException(env, "the path component: '%s' is "
|
|
|
+ "world-writable. Its permissions are 0%03o. Please fix "
|
|
|
+ "this or select a different socket path.", check, mode);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if ((mode & 0020) && (st.st_gid != 0)) {
|
|
|
+ jthr = newIOException(env, "the path component: '%s' is "
|
|
|
+ "group-writable, and the group is not root. Its permissions are "
|
|
|
+ "0%03o, and it is owned by gid %d. Please fix this or "
|
|
|
+ "select a different socket path.", check, mode, st.st_gid);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if ((mode & 0200) && (st.st_uid != 0) &&
|
|
|
+ (st.st_uid != uid)) {
|
|
|
+ jthr = newIOException(env, "the path component: '%s' is "
|
|
|
+ "owned by a user who is not root and not you. Your effective user "
|
|
|
+ "id is %d; the path is owned by user id %d, and its permissions are "
|
|
|
+ "0%03o. Please fix this or select a different socket path.",
|
|
|
+ check, uid, st.st_uid, mode);
|
|
|
+ goto done;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ }
|
|
|
+done:
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_bind0(
|
|
|
+JNIEnv *env, jclass clazz, jstring path)
|
|
|
+{
|
|
|
+ int fd;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ jthr = setup(env, &fd, path, 0);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+ return fd;
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_accept0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd)
|
|
|
+{
|
|
|
+ int ret, newFd = -1;
|
|
|
+ socklen_t slen;
|
|
|
+ struct sockaddr_un remote;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ slen = sizeof(remote);
|
|
|
+ do {
|
|
|
+ newFd = accept(fd, (struct sockaddr*)&remote, &slen);
|
|
|
+ } while ((newFd < 0) && (errno == EINTR));
|
|
|
+ if (newFd < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newSocketException(env, ret, "accept(2) error: %s", terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+done:
|
|
|
+ if (jthr) {
|
|
|
+ if (newFd > 0) {
|
|
|
+ RETRY_ON_EINTR(ret, close(newFd));
|
|
|
+ newFd = -1;
|
|
|
+ }
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+ return newFd;
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_connect0(
|
|
|
+JNIEnv *env, jclass clazz, jstring path)
|
|
|
+{
|
|
|
+ int ret, fd;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ jthr = setup(env, &fd, path, 1);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ if (((jthr = setAttribute0(env, fd, SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT))) ||
|
|
|
+ ((jthr = setAttribute0(env, fd, RECEIVE_TIMEOUT, DEFAULT_RECEIVE_TIMEOUT)))) {
|
|
|
+ RETRY_ON_EINTR(ret, close(fd));
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return fd;
|
|
|
+}
|
|
|
+
|
|
|
+static void javaMillisToTimeVal(int javaMillis, struct timeval *tv)
|
|
|
+{
|
|
|
+ tv->tv_sec = javaMillis / 1000;
|
|
|
+ tv->tv_usec = (javaMillis - (tv->tv_sec * 1000)) * 1000;
|
|
|
+}
|
|
|
+
|
|
|
+static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
|
|
|
+{
|
|
|
+ struct timeval tv;
|
|
|
+ int ret, buf;
|
|
|
+
|
|
|
+ switch (type) {
|
|
|
+ case SEND_BUFFER_SIZE:
|
|
|
+ buf = val;
|
|
|
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf, sizeof(buf))) {
|
|
|
+ ret = errno;
|
|
|
+ return newSocketException(env, ret,
|
|
|
+ "setsockopt(SO_SNDBUF) error: %s", terror(ret));
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+ case RECEIVE_BUFFER_SIZE:
|
|
|
+ buf = val;
|
|
|
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf, sizeof(buf))) {
|
|
|
+ ret = errno;
|
|
|
+ return newSocketException(env, ret,
|
|
|
+ "setsockopt(SO_RCVBUF) error: %s", terror(ret));
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+ case SEND_TIMEOUT:
|
|
|
+ javaMillisToTimeVal(val, &tv);
|
|
|
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (struct timeval *)&tv,
|
|
|
+ sizeof(tv))) {
|
|
|
+ ret = errno;
|
|
|
+ return newSocketException(env, ret,
|
|
|
+ "setsockopt(SO_SNDTIMEO) error: %s", terror(ret));
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+ case RECEIVE_TIMEOUT:
|
|
|
+ javaMillisToTimeVal(val, &tv);
|
|
|
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&tv,
|
|
|
+ sizeof(tv))) {
|
|
|
+ ret = errno;
|
|
|
+ return newSocketException(env, ret,
|
|
|
+ "setsockopt(SO_RCVTIMEO) error: %s", terror(ret));
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return newRuntimeException(env, "Invalid attribute type %d.", type);
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_setAttribute0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jint type, jint val)
|
|
|
+{
|
|
|
+ jthrowable jthr = setAttribute0(env, fd, type, val);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static jint getSockOptBufSizeToJavaBufSize(int size)
|
|
|
+{
|
|
|
+#ifdef __linux__
|
|
|
+ // Linux always doubles the value that you set with setsockopt.
|
|
|
+ // We cut it in half here so that programs can at least read back the same
|
|
|
+ // value they set.
|
|
|
+ size /= 2;
|
|
|
+#endif
|
|
|
+ return size;
|
|
|
+}
|
|
|
+
|
|
|
+static int timeValToJavaMillis(const struct timeval *tv)
|
|
|
+{
|
|
|
+ return (tv->tv_sec * 1000) + (tv->tv_usec / 1000);
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_getAttribute0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jint type)
|
|
|
+{
|
|
|
+ struct timeval tv;
|
|
|
+ socklen_t len;
|
|
|
+ int ret, rval = 0;
|
|
|
+
|
|
|
+ switch (type) {
|
|
|
+ case SEND_BUFFER_SIZE:
|
|
|
+ len = sizeof(rval);
|
|
|
+ if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &rval, &len)) {
|
|
|
+ ret = errno;
|
|
|
+ (*env)->Throw(env, newSocketException(env, ret,
|
|
|
+ "getsockopt(SO_SNDBUF) error: %s", terror(ret)));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return getSockOptBufSizeToJavaBufSize(rval);
|
|
|
+ case RECEIVE_BUFFER_SIZE:
|
|
|
+ len = sizeof(rval);
|
|
|
+ if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rval, &len)) {
|
|
|
+ ret = errno;
|
|
|
+ (*env)->Throw(env, newSocketException(env, ret,
|
|
|
+ "getsockopt(SO_RCVBUF) error: %s", terror(ret)));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return getSockOptBufSizeToJavaBufSize(rval);
|
|
|
+ case SEND_TIMEOUT:
|
|
|
+ memset(&tv, 0, sizeof(tv));
|
|
|
+ len = sizeof(struct timeval);
|
|
|
+ if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &len)) {
|
|
|
+ ret = errno;
|
|
|
+ (*env)->Throw(env, newSocketException(env, ret,
|
|
|
+ "getsockopt(SO_SNDTIMEO) error: %s", terror(ret)));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return timeValToJavaMillis(&tv);
|
|
|
+ case RECEIVE_TIMEOUT:
|
|
|
+ memset(&tv, 0, sizeof(tv));
|
|
|
+ len = sizeof(struct timeval);
|
|
|
+ if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &len)) {
|
|
|
+ ret = errno;
|
|
|
+ (*env)->Throw(env, newSocketException(env, ret,
|
|
|
+ "getsockopt(SO_RCVTIMEO) error: %s", terror(ret)));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return timeValToJavaMillis(&tv);
|
|
|
+ default:
|
|
|
+ (*env)->Throw(env, newRuntimeException(env,
|
|
|
+ "Invalid attribute type %d.", type));
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_close0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd)
|
|
|
+{
|
|
|
+ int ret;
|
|
|
+
|
|
|
+ RETRY_ON_EINTR(ret, close(fd));
|
|
|
+ if (ret) {
|
|
|
+ ret = errno;
|
|
|
+ (*env)->Throw(env, newSocketException(env, ret,
|
|
|
+ "close(2) error: %s", terror(ret)));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_closeFileDescriptor0(
|
|
|
+JNIEnv *env, jclass clazz, jobject jfd)
|
|
|
+{
|
|
|
+ Java_org_apache_hadoop_net_unix_DomainSocket_close0(
|
|
|
+ env, clazz, fd_get(env, jfd));
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_shutdown0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd)
|
|
|
+{
|
|
|
+ int ret;
|
|
|
+
|
|
|
+ RETRY_ON_EINTR(ret, shutdown(fd, SHUT_RDWR));
|
|
|
+ if (ret) {
|
|
|
+ ret = errno;
|
|
|
+ (*env)->Throw(env, newSocketException(env, ret,
|
|
|
+ "shutdown(2) error: %s", terror(ret)));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Write an entire buffer to a file descriptor.
|
|
|
+ *
|
|
|
+ * @param env The JNI environment.
|
|
|
+ * @param fd The fd to write to.
|
|
|
+ * @param buf The buffer to write
|
|
|
+ * @param amt The length of the buffer to write.
|
|
|
+ * @return NULL on success; or the unraised exception representing
|
|
|
+ * the problem.
|
|
|
+ */
|
|
|
+static jthrowable write_fully(JNIEnv *env, int fd, int8_t *buf, int amt)
|
|
|
+{
|
|
|
+ int err, res;
|
|
|
+
|
|
|
+ while (amt > 0) {
|
|
|
+ res = write(fd, buf, amt);
|
|
|
+ if (res < 0) {
|
|
|
+ err = errno;
|
|
|
+ if (err == EINTR) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ return newSocketException(env, err, "write(2) error: %s", terror(err));
|
|
|
+ }
|
|
|
+ amt -= res;
|
|
|
+ buf += res;
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Our auxillary data setup.
|
|
|
+ *
|
|
|
+ * See man 3 cmsg for more information about auxillary socket data on UNIX.
|
|
|
+ *
|
|
|
+ * We use __attribute__((packed)) to ensure that the compiler doesn't insert any
|
|
|
+ * padding between 'hdr' and 'fds'.
|
|
|
+ * We use __attribute__((aligned(8)) to ensure that the compiler puts the start
|
|
|
+ * of the structure at an address which is a multiple of 8. If we did not do
|
|
|
+ * this, the attribute((packed)) would cause the compiler to generate a lot of
|
|
|
+ * slow code for accessing unaligned memory.
|
|
|
+ */
|
|
|
+struct cmsghdr_with_fds {
|
|
|
+ struct cmsghdr hdr;
|
|
|
+ int fds[MAX_PASSED_FDS];
|
|
|
+} __attribute__((packed,aligned(8)));
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_sendFileDescriptors0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jobject jfds, jobject jbuf,
|
|
|
+jint offset, jint length)
|
|
|
+{
|
|
|
+ struct iovec vec[1];
|
|
|
+ struct flexibleBuffer flexBuf;
|
|
|
+ struct cmsghdr_with_fds aux;
|
|
|
+ jint jfdsLen;
|
|
|
+ int i, ret = -1, auxLen;
|
|
|
+ struct msghdr socketMsg;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ jthr = flexBufInit(env, &flexBuf, length);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (length <= 0) {
|
|
|
+ jthr = newException(env, "java/lang/IllegalArgumentException",
|
|
|
+ "You must write at least one byte.");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jfdsLen = (*env)->GetArrayLength(env, jfds);
|
|
|
+ if (jfdsLen <= 0) {
|
|
|
+ jthr = newException(env, "java/lang/IllegalArgumentException",
|
|
|
+ "Called sendFileDescriptors with no file descriptors.");
|
|
|
+ goto done;
|
|
|
+ } else if (jfdsLen > MAX_PASSED_FDS) {
|
|
|
+ jfdsLen = 0;
|
|
|
+ jthr = newException(env, "java/lang/IllegalArgumentException",
|
|
|
+ "Called sendFileDescriptors with an array of %d length. "
|
|
|
+ "The maximum is %d.", jfdsLen, MAX_PASSED_FDS);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ (*env)->GetByteArrayRegion(env, jbuf, offset, length, flexBuf.curBuf);
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ memset(&vec, 0, sizeof(vec));
|
|
|
+ vec[0].iov_base = flexBuf.curBuf;
|
|
|
+ vec[0].iov_len = length;
|
|
|
+ auxLen = CMSG_LEN(jfdsLen * sizeof(int));
|
|
|
+ memset(&aux, 0, auxLen);
|
|
|
+ memset(&socketMsg, 0, sizeof(socketMsg));
|
|
|
+ socketMsg.msg_iov = vec;
|
|
|
+ socketMsg.msg_iovlen = 1;
|
|
|
+ socketMsg.msg_control = &aux;
|
|
|
+ socketMsg.msg_controllen = auxLen;
|
|
|
+ aux.hdr.cmsg_len = auxLen;
|
|
|
+ aux.hdr.cmsg_level = SOL_SOCKET;
|
|
|
+ aux.hdr.cmsg_type = SCM_RIGHTS;
|
|
|
+ for (i = 0; i < jfdsLen; i++) {
|
|
|
+ jobject jfd = (*env)->GetObjectArrayElement(env, jfds, i);
|
|
|
+ if (!jfd) {
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jthr = newException(env, "java/lang/NullPointerException",
|
|
|
+ "element %d of jfds was NULL.", i);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ aux.fds[i] = fd_get(env, jfd);
|
|
|
+ (*env)->DeleteLocalRef(env, jfd);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ RETRY_ON_EINTR(ret, sendmsg(fd, &socketMsg, 0));
|
|
|
+ if (ret < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newSocketException(env, ret, "sendmsg(2) error: %s", terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ length -= ret;
|
|
|
+ if (length > 0) {
|
|
|
+ // Write the rest of the bytes we were asked to send.
|
|
|
+ // This time, no fds will be attached.
|
|
|
+ jthr = write_fully(env, fd, flexBuf.curBuf + ret, length);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+done:
|
|
|
+ flexBufFree(&flexBuf);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_receiveFileDescriptors0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jarray jfds, jarray jbuf,
|
|
|
+jint offset, jint length)
|
|
|
+{
|
|
|
+ struct iovec vec[1];
|
|
|
+ struct flexibleBuffer flexBuf;
|
|
|
+ struct cmsghdr_with_fds aux;
|
|
|
+ int i, jRecvFdsLen = 0, auxLen;
|
|
|
+ jint jfdsLen = 0;
|
|
|
+ struct msghdr socketMsg;
|
|
|
+ ssize_t bytesRead = -1;
|
|
|
+ jobject fdObj;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ jthr = flexBufInit(env, &flexBuf, length);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (length <= 0) {
|
|
|
+ jthr = newRuntimeException(env, "You must read at least one byte.");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jfdsLen = (*env)->GetArrayLength(env, jfds);
|
|
|
+ if (jfdsLen <= 0) {
|
|
|
+ jthr = newException(env, "java/lang/IllegalArgumentException",
|
|
|
+ "Called receiveFileDescriptors with an array of %d length. "
|
|
|
+ "You must pass at least one fd.", jfdsLen);
|
|
|
+ goto done;
|
|
|
+ } else if (jfdsLen > MAX_PASSED_FDS) {
|
|
|
+ jfdsLen = 0;
|
|
|
+ jthr = newException(env, "java/lang/IllegalArgumentException",
|
|
|
+ "Called receiveFileDescriptors with an array of %d length. "
|
|
|
+ "The maximum is %d.", jfdsLen, MAX_PASSED_FDS);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ for (i = 0; i < jfdsLen; i++) {
|
|
|
+ (*env)->SetObjectArrayElement(env, jfds, i, NULL);
|
|
|
+ }
|
|
|
+ vec[0].iov_base = flexBuf.curBuf;
|
|
|
+ vec[0].iov_len = length;
|
|
|
+ auxLen = CMSG_LEN(jfdsLen * sizeof(int));
|
|
|
+ memset(&aux, 0, auxLen);
|
|
|
+ memset(&socketMsg, 0, auxLen);
|
|
|
+ socketMsg.msg_iov = vec;
|
|
|
+ socketMsg.msg_iovlen = 1;
|
|
|
+ socketMsg.msg_control = &aux;
|
|
|
+ socketMsg.msg_controllen = auxLen;
|
|
|
+ aux.hdr.cmsg_len = auxLen;
|
|
|
+ aux.hdr.cmsg_level = SOL_SOCKET;
|
|
|
+ aux.hdr.cmsg_type = SCM_RIGHTS;
|
|
|
+ RETRY_ON_EINTR(bytesRead, recvmsg(fd, &socketMsg, 0));
|
|
|
+ if (bytesRead < 0) {
|
|
|
+ int ret = errno;
|
|
|
+ if (ret == ECONNABORTED) {
|
|
|
+ // The remote peer disconnected on us. Treat this as an EOF.
|
|
|
+ bytesRead = -1;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jthr = newSocketException(env, ret, "recvmsg(2) failed: %s",
|
|
|
+ terror(ret));
|
|
|
+ goto done;
|
|
|
+ } else if (bytesRead == 0) {
|
|
|
+ bytesRead = -1;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jRecvFdsLen = (aux.hdr.cmsg_len - sizeof(struct cmsghdr)) / sizeof(int);
|
|
|
+ for (i = 0; i < jRecvFdsLen; i++) {
|
|
|
+ fdObj = fd_create(env, aux.fds[i]);
|
|
|
+ if (!fdObj) {
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ // Make this -1 so we don't attempt to close it twice in an error path.
|
|
|
+ aux.fds[i] = -1;
|
|
|
+ (*env)->SetObjectArrayElement(env, jfds, i, fdObj);
|
|
|
+ // There is no point keeping around a local reference to the fdObj.
|
|
|
+ // The array continues to reference it.
|
|
|
+ (*env)->DeleteLocalRef(env, fdObj);
|
|
|
+ }
|
|
|
+ (*env)->SetByteArrayRegion(env, jbuf, offset, length, flexBuf.curBuf);
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+done:
|
|
|
+ flexBufFree(&flexBuf);
|
|
|
+ if (jthr) {
|
|
|
+ // Free any FileDescriptor references we may have created,
|
|
|
+ // or file descriptors we may have been passed.
|
|
|
+ for (i = 0; i < jRecvFdsLen; i++) {
|
|
|
+ if (aux.fds[i] >= 0) {
|
|
|
+ RETRY_ON_EINTR(i, close(aux.fds[i]));
|
|
|
+ aux.fds[i] = -1;
|
|
|
+ }
|
|
|
+ fdObj = (*env)->GetObjectArrayElement(env, jfds, i);
|
|
|
+ if (fdObj) {
|
|
|
+ int ret, afd = fd_get(env, fdObj);
|
|
|
+ if (afd >= 0) {
|
|
|
+ RETRY_ON_EINTR(ret, close(afd));
|
|
|
+ }
|
|
|
+ (*env)->SetObjectArrayElement(env, jfds, i, NULL);
|
|
|
+ (*env)->DeleteLocalRef(env, fdObj);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+ return bytesRead;
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_readArray0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jarray b, jint offset, jint length)
|
|
|
+{
|
|
|
+ int ret = -1;
|
|
|
+ struct flexibleBuffer flexBuf;
|
|
|
+ jthrowable jthr;
|
|
|
+
|
|
|
+ jthr = flexBufInit(env, &flexBuf, length);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ RETRY_ON_EINTR(ret, read(fd, flexBuf.curBuf, length));
|
|
|
+ if (ret < 0) {
|
|
|
+ ret = errno;
|
|
|
+ if (ret == ECONNABORTED) {
|
|
|
+ // The remote peer disconnected on us. Treat this as an EOF.
|
|
|
+ ret = -1;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jthr = newSocketException(env, ret, "read(2) error: %s",
|
|
|
+ terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ if (ret == 0) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ (*env)->SetByteArrayRegion(env, b, offset, ret, flexBuf.curBuf);
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+done:
|
|
|
+ flexBufFree(&flexBuf);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+ return ret == 0 ? -1 : ret; // Java wants -1 on EOF
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_available0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd)
|
|
|
+{
|
|
|
+ int ret, avail = 0;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+
|
|
|
+ RETRY_ON_EINTR(ret, ioctl(fd, FIONREAD, &avail));
|
|
|
+ if (ret < 0) {
|
|
|
+ ret = errno;
|
|
|
+ jthr = newSocketException(env, ret,
|
|
|
+ "ioctl(%d, FIONREAD) error: %s", fd, terror(ret));
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+done:
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+ return avail;
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT void JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_writeArray0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jarray b, jint offset, jint length)
|
|
|
+{
|
|
|
+ struct flexibleBuffer flexBuf;
|
|
|
+ jthrowable jthr;
|
|
|
+
|
|
|
+ jthr = flexBufInit(env, &flexBuf, length);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ (*env)->GetByteArrayRegion(env, b, offset, length, flexBuf.curBuf);
|
|
|
+ jthr = (*env)->ExceptionOccurred(env);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->ExceptionClear(env);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jthr = write_fully(env, fd, flexBuf.curBuf, length);
|
|
|
+ if (jthr) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+done:
|
|
|
+ flexBufFree(&flexBuf);
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+JNIEXPORT jint JNICALL
|
|
|
+Java_org_apache_hadoop_net_unix_DomainSocket_readByteBufferDirect0(
|
|
|
+JNIEnv *env, jclass clazz, jint fd, jobject dst, jint position, jint remaining)
|
|
|
+{
|
|
|
+ uint8_t *buf;
|
|
|
+ jthrowable jthr = NULL;
|
|
|
+ int res = -1;
|
|
|
+
|
|
|
+ buf = (*env)->GetDirectBufferAddress(env, dst);
|
|
|
+ if (!buf) {
|
|
|
+ jthr = newRuntimeException(env, "GetDirectBufferAddress failed.");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ RETRY_ON_EINTR(res, read(fd, buf + position, remaining));
|
|
|
+ if (res < 0) {
|
|
|
+ res = errno;
|
|
|
+ if (res != ECONNABORTED) {
|
|
|
+ jthr = newSocketException(env, res, "read(2) error: %s",
|
|
|
+ terror(res));
|
|
|
+ goto done;
|
|
|
+ } else {
|
|
|
+ // The remote peer disconnected on us. Treat this as an EOF.
|
|
|
+ res = -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+done:
|
|
|
+ if (jthr) {
|
|
|
+ (*env)->Throw(env, jthr);
|
|
|
+ }
|
|
|
+ return res;
|
|
|
+}
|