소스 검색

HADOOP-10667. Implement TCP connection reuse for native client (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HADOOP-10388@1605865 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 11 년 전
부모
커밋
062ff403a8

+ 5 - 1
hadoop-native-core/src/main/native/CMakeLists.txt

@@ -146,7 +146,6 @@ add_library(fstest STATIC
 target_link_libraries(fstest
     ${COMMON_DEPS}
     ${RPC_DEPS}
-    ${JNI_DEPS}
     ${FS_DEPS}
 )
 
@@ -167,6 +166,11 @@ add_executable(hconf-unit common/hconf-unit.c
 add_utest(hconf-unit)
 target_link_libraries(hconf-unit ${EXPAT_LIB} ${LIBUV_LIB})
 
+add_executable(net-unit common/hadoop_err.c common/net-unit.c
+    common/net.c common/test.c)
+add_utest(net-unit)
+target_link_libraries(net-unit uv)
+
 add_executable(hadoop_err-unit common/hadoop_err-unit.c
     common/hadoop_err.c test/test.c)
 add_utest(hadoop_err-unit)

+ 2 - 0
hadoop-native-core/src/main/native/common/hadoop_err.c

@@ -61,6 +61,8 @@ static const char *errno_to_class(int code)
         return "org.apache.hadoop.native.HadoopCore.InvalidRequestException";
     case ENOMEM:
         return "org.apache.hadoop.native.HadoopCore.OutOfMemoryException";
+    case ESHUTDOWN:
+        return "org.apache.hadoop.native.HadoopCore.ShutdownException";
     default:
         return "org.apache.hadoop.native.HadoopCore.IOException";
     }

+ 8 - 3
hadoop-native-core/src/main/native/hdfs/namenode-rpc-unit.c

@@ -98,8 +98,6 @@ void set_replication_cb(SetReplicationResponseProto *resp,
     }
 }
 
-
-
 int main(void)
 {
     struct hrpc_messenger_builder *msgr_bld;
@@ -120,7 +118,14 @@ int main(void)
     EXPECT_INT_ZERO(uv_sem_init(&sem, 0));
     {
         SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
-        req.src = "/foo2";
+        req.src = "/foo";
+        req.replication = 1;
+        cnn_async_set_replication(&proxy, &req, set_replication_cb, &sem);
+    }
+    uv_sem_wait(&sem);
+    {
+        SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
+        req.src = "/bar";
         req.replication = 2;
         cnn_async_set_replication(&proxy, &req, set_replication_cb, &sem);
     }

+ 1 - 1
hadoop-native-core/src/main/native/rpc/call-unit.c

@@ -109,7 +109,7 @@ int main(void)
     payload.base = strdup("testbuff");
     payload.len = strlen(payload.base);
 
-    call.remote.sin_addr.s_addr = inet_addr("127.0.0.1");
+    call.remote->sin_addr.s_addr = inet_addr("127.0.0.1");
     call.protocol = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
     call.username = "root";
     call.payload = payload;

+ 1 - 1
hadoop-native-core/src/main/native/rpc/call.h

@@ -50,7 +50,7 @@ struct hrpc_call {
     /**
      * Remote address we're sending to.
      */
-    struct sockaddr_in remote;
+    struct sockaddr_in *remote;
 
     /**
      * The callback to make from the reactor thread when this call completes or

+ 117 - 79
hadoop-native-core/src/main/native/rpc/conn.c

@@ -37,14 +37,14 @@
 #include <uv.h>
 
 #define conn_log_warn(conn, fmt, ...) \
-    fprintf(stderr, "WARN: conn %p (reactor %p): " fmt, \
-            conn, conn->reactor, __VA_ARGS__)
+    fprintf(stderr, "WARN: conn %s: " fmt, \
+            conn->name, __VA_ARGS__)
 #define conn_log_info(conn, fmt, ...) \
-    fprintf(stderr, "INFO: conn %p (reactor %p): " fmt, \
-            conn, conn->reactor, __VA_ARGS__)
+    fprintf(stderr, "INFO: conn %s: " fmt, \
+            conn->name, __VA_ARGS__)
 #define conn_log_debug(conn, fmt, ...) \
-    fprintf(stderr, "DEBUG: conn %p (reactor %p): " fmt, \
-            conn, conn->reactor, __VA_ARGS__)
+    fprintf(stderr, "DEBUG: conn %s: " fmt, \
+            conn->name, __VA_ARGS__)
 
 /**
  * The maximum length that we'll allocate to hold a response from the server.
@@ -200,7 +200,7 @@ static struct hadoop_err *hrpc_conn_setup_payload(struct hrpc_conn *conn)
     return NULL;
 }
 
-static void conn_write_cb(uv_write_t* req, int status)
+static void conn_write_cb(uv_write_t *req, int status)
 {
     struct hrpc_conn *conn = req->data;
     struct hrpc_conn_writer *writer = &conn->writer;
@@ -210,11 +210,30 @@ static void conn_write_cb(uv_write_t* req, int status)
 
     if (status) {
         err = hadoop_uverr_alloc(status,
-                "conn_write_cb got error"); 
+                "conn_write_cb got error %d", status);
         hrpc_conn_destroy(conn, err);
         return;
     }
+
     switch (writer->state) {
+    case HRPC_CONN_WRITE_CONNECTING:
+        writer->state = HRPC_CONN_WRITE_IPC_HEADER;
+        err = conn_setup_ipc_header(conn);
+        if (err) {
+            hrpc_conn_destroy(conn, err);
+            return;
+        }
+        conn_log_debug(conn, "writing %zd-byte IPC header\n",
+                writer->cur_writes[0].len);
+        res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
+                     writer->cur_writes, 1, conn_write_cb);
+        if (res) {
+            err = hadoop_uverr_alloc(res, "conn_start_outbound: "
+                "got uv_write error %d when writing ipc header", res);
+            hrpc_conn_destroy(conn, err);
+            return;
+        }
+        break;
     case HRPC_CONN_WRITE_IPC_HEADER:
         free_write_bufs(conn);
         writer->state = HRPC_CONN_WRITE_PAYLOAD;
@@ -223,19 +242,19 @@ static void conn_write_cb(uv_write_t* req, int status)
             hrpc_conn_destroy(conn, err);
             return;
         }
-        writer->write_req.data = conn;
+        conn_log_debug(conn, "writing %zd-byte and %zd-byte payloads\n",
+                writer->cur_writes[0].len, writer->cur_writes[1].len);
         res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream, 
                      writer->cur_writes, 2, conn_write_cb);
         if (res) {
-            err = hadoop_uverr_alloc(res,
-                    "failed to call uv_write on payload");
+            err = hadoop_uverr_alloc(res, "conn_write_cb: uv_write "
+                "failed with error %d.  Falied to start writing the "
+                "payload.", res);
             hrpc_conn_destroy(conn, err);
             return;
         }
         break;
     case HRPC_CONN_WRITE_PAYLOAD:
-        conn_log_debug(conn, "%s", "conn_write_cb: finished writing payload.  "
-                      "Now waiting for response.\n");
         free_write_bufs(conn);
         writer->state = HRPC_CONN_WRITE_IDLE;
         reader->state = HRPC_CONN_READ_LEN;
@@ -243,10 +262,13 @@ static void conn_write_cb(uv_write_t* req, int status)
         res = uv_read_start((uv_stream_t*)&conn->stream, hrpc_conn_read_alloc,
                           conn_read_cb);
         if (res) {
-            err = hadoop_uverr_alloc(res, "uv_read_start failed");
+            err = hadoop_uverr_alloc(res, "conn_write_cb: uv_read_start "
+                                     "failed with error %d", res);
             hrpc_conn_destroy(conn, err);
             return;
         }
+        conn_log_debug(conn, "%s", "conn_write_cb: finished writing payload.  "
+                      "Now waiting for response.\n");
         break;
     default:
         conn_log_warn(conn, "conn_write_cb: got an unexpected write "
@@ -256,39 +278,18 @@ static void conn_write_cb(uv_write_t* req, int status)
     }
 }
 
-static void conn_start_outbound(struct hrpc_conn *conn)
+void hrpc_conn_restart_outbound(struct hrpc_conn *conn, struct hrpc_call *call)
 {
-    struct hadoop_err *err = NULL;
-    struct hrpc_conn_writer *writer = &conn->writer;
-    int res;
-
-    // Get next call ID to use.
+    conn->call = call;
     if (conn->call_id >= 0x7fffffff) {
         conn->call_id = 1;
     } else {
         conn->call_id++;
     }
-    writer->state = HRPC_CONN_WRITE_IPC_HEADER;
-    err = conn_setup_ipc_header(conn);
-    if (err) {
-        hrpc_conn_destroy(conn, err);
-        return;
-    }
-    writer->write_req.data = conn;
-    res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
-                 writer->cur_writes, 1, conn_write_cb);
-    if (res) {
-        err = hadoop_uverr_alloc(res,
-                "failed to call uv_write on ipc_header_buf");
-        hrpc_conn_destroy(conn, err);
-        return;
-    }
-}
-
-void hrpc_conn_start_outbound(struct hrpc_conn *conn, struct hrpc_call *call)
-{
-    conn->call = call;
-    conn_start_outbound(conn);
+    conn->writer.state = HRPC_CONN_WRITE_IPC_HEADER;
+    conn->reader.state = HRPC_CONN_UNREADABLE;
+    conn->writer.write_req.data = conn;
+    conn_write_cb(&conn->writer.write_req, 0);
 }
 
 static void conn_connect_cb(uv_connect_t *req, int status)
@@ -296,9 +297,11 @@ static void conn_connect_cb(uv_connect_t *req, int status)
     struct hrpc_conn *conn = req->data;
     struct hadoop_err *err = NULL;
     struct hrpc_conn_writer *writer = &conn->writer;
+    char buf[128] = { 0 };
 
     if (status) {
-        err = hadoop_uverr_alloc(status, "uv_tcp_connect failed");
+        err = hadoop_uverr_alloc(status, "uv_tcp_connect failed "
+                                 "with status %d", status);
         hrpc_conn_destroy(conn, err);
         return;
     }
@@ -310,25 +313,29 @@ static void conn_connect_cb(uv_connect_t *req, int status)
         hrpc_conn_destroy(conn, err);
         return;
     }
-    conn_start_outbound(conn);
+    conn_log_debug(conn, "conn_connect_cb: finished connecting to %s.\n",
+                    net_ipv4_name(&conn->remote, buf, sizeof(buf)));
+    conn->writer.write_req.data = conn;
+    conn_write_cb(&conn->writer.write_req, 0);
 }
 
 struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
-                                    struct hrpc_call *call,
+                                    struct hrpc_call *call, const char *name,
                                     struct hrpc_conn **out)
 {
     struct hadoop_err *err = NULL;
     struct hrpc_conn *conn = NULL;
     int res, tcp_init = 0;
 
-    conn = calloc(1, sizeof(struct hrpc_conn));
+    conn = calloc(1, sizeof(struct hrpc_conn) + strlen(name) + 1);
     if (!conn) {
         err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
         goto done;
     }
+    strcpy(conn->name, name);
     conn->reactor = reactor;
     conn->call = call;
-    conn->remote = call->remote;
+    conn->remote = *call->remote;
     conn->protocol = strdup(call->protocol);
     conn->username = strdup(call->username);
     if ((!conn->protocol) || (!conn->username)) {
@@ -338,11 +345,12 @@ struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
     hrpc_client_id_generate_rand(&conn->client_id);
     res = uv_tcp_init(&reactor->loop, &conn->stream);
     if (res) {
-        err = hadoop_uverr_alloc(res,
-                "hrpc_conn_create_outbound: uv_tcp_init failed");
+        err = hadoop_uverr_alloc(res, "hrpc_conn_create_outbound: "
+                "uv_tcp_init failed with status %d", res);
         goto done;
     }
     tcp_init = 1;
+    conn->call_id = 1;
     conn->writer.state = HRPC_CONN_WRITE_CONNECTING;
     conn->reader.state = HRPC_CONN_UNREADABLE;
     conn->conn_req.data = conn;
@@ -350,12 +358,13 @@ struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
             (struct sockaddr*)&conn->remote, conn_connect_cb);
     if (res) {
         char remote_str[64] = { 0 };
-        err = hadoop_uverr_alloc(res,
-                "hrpc_conn_create_outbound: uv_tcp_connect(%s) failed",
+        err = hadoop_uverr_alloc(res, "hrpc_conn_create_outbound: "
+                "uv_tcp_connect(%s) failed with status %d",
                 net_ipv4_name_and_port(&conn->remote, remote_str,
-                                       sizeof(remote_str)));
+                                       sizeof(remote_str)), res);
         goto done;
     }
+    conn_log_debug(conn, "%s", "hrpc_conn_create: created new connection.\n");
 
 done:
     if (err) {
@@ -375,39 +384,50 @@ done:
 
 int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
 {
-    int proto_cmp, username_cmp, a_active, b_active;
+    int proto_cmp, username_cmp, client_id_cmp, a_active, b_active;
 
     // Big-endian versus little-endian doesn't matter here.
     // We just want a consistent ordering on the same machine.
-    if (a->remote.sin_addr.s_addr < b->remote.sin_addr.s_addr)
+    // We don't compare ports.
+    if (a->remote.sin_addr.s_addr < b->remote.sin_addr.s_addr) {
         return -1;
-    else if (a->remote.sin_addr.s_addr > b->remote.sin_addr.s_addr)
+    } else if (a->remote.sin_addr.s_addr > b->remote.sin_addr.s_addr) {
         return 1;
-    else if (a->remote.sin_port < b->remote.sin_port)
+    } else if (a->remote.sin_port < b->remote.sin_port) {
         return -1;
-    else if (a->remote.sin_port > b->remote.sin_port)
+    } else if (a->remote.sin_port > b->remote.sin_port) {
         return 1;
-    // Compare protocol name.
+    }
     proto_cmp = strcmp(a->protocol, b->protocol);
-    if (proto_cmp != 0)
+    if (proto_cmp != 0) {
         return proto_cmp;
-    // Compare username.
+    }
     username_cmp = strcmp(a->username, b->username);
-    if (username_cmp != 0)
+    if (username_cmp != 0) {
         return username_cmp;
+    }
     // Make the inactive connections sort before the active ones.
     a_active = !!a->call;
     b_active = !!b->call;
-    if (a_active < b_active) 
+    if (a_active < b_active) {
         return -1;
-    else if (a_active > b_active) 
+    }
+    else if (a_active > b_active) {
         return 1;
+    }
+    client_id_cmp = hrpc_client_id_compare(&a->client_id, &b->client_id);
+    if (client_id_cmp) {
+        return client_id_cmp;
+    }
+
     // Compare pointer identity, so that no two distinct connections are
     // ever identical.
-    else if (a < b)
+    else if (a < b) {
         return -1;
-    else if (a > b)
+    }
+    else if (a > b) {
         return 1;
+    }
     return 0;
 }
 
@@ -415,15 +435,23 @@ int hrpc_conn_usable(const struct hrpc_conn *conn,
             const struct sockaddr_in *addr,
             const char *protocol, const char *username)
 {
-    if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
+    if (conn->call) {
+        // The current implementation can't use the same TCP connection
+        // simulatenously for multiple RPCs.
+        return 0;
+    } else if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr) {
+        return 0;
+    } else if (conn->remote.sin_port != addr->sin_port) {
         return 0;
-    else if (conn->remote.sin_port != addr->sin_port)
+    } else if (conn->remote.sin_port != addr->sin_port) {
         return 0;
-    else if (strcmp(conn->protocol, protocol))
+    } else if (strcmp(conn->protocol, protocol)) {
         return 0;
-    else if (strcmp(conn->username, username))
+    } else if (strcmp(conn->username, username)) {
         return 0;
-    return 1;
+    } else {
+        return 1;
+    }
 }
 
 static void free_read_bufs(struct hrpc_conn *conn)
@@ -462,10 +490,16 @@ static struct hadoop_err *conn_deliver_resp(struct hrpc_conn *conn,
     resp.base = reader->body;
     reader->body = NULL;
     free_read_bufs(conn);
+    // Remove the connection from the map.  When we put it back in, it will be
+    // no longer active because we have zeroed conn->call, which will put it
+    // in a different position.
+    reactor_remove_conn(conn->reactor, conn);
     conn->call = NULL;
+    reactor_insert_conn(conn->reactor, conn);
+    // Shut down the read state for now.
     conn->reader.state = HRPC_CONN_UNREADABLE;
-    // TODO: cache connections 
-    hrpc_conn_destroy(conn, NULL);
+    uv_read_stop((uv_stream_t*)&conn->stream);
+    // Perform the callback.
     hrpc_call_deliver_resp(call, &resp);
     return NULL;
 }
@@ -550,16 +584,20 @@ static const char *conn_read_state_str(enum hrpc_conn_read_state state)
 };
 
 /**
- * Return a read buffer to libuv.
+ * Tell libuv where to put the bytes that arrive from the remote end of the
+ * connection.
+ *
+ * If we haven't read the length (we're in state HRPC_CONN_READ_LEN), we give
+ * libuv back bytes from the four-byte "message length" buffer.
+ *
+ * If we have read the message length (and allocated a buffer for it), we
+ * return back that.
  *
- * We don't do the actual allocation here, for two reasons.  The first is that
- * we'd like to know how big a buffer to allocate first (by reading the first
- * 4 bytes).  The second is that libuv doesn't really take kindly to
- * failures here... returning a zero-length buffer triggers a crash.
- * So we simply return previously allocated buffers here.
+ * If we return 0 here, libuv treats that as a connection error.
  *
  * @param handle            The TCP stream.
- * @param suggested_size    The suggested size.
+ * @param suggested_size    The suggested size.  We ignore this because we know
+ *                              exactly how many bytes we need.
  *
  * @return                  The read buffer to use.
  */
@@ -610,7 +648,7 @@ static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
 
     if (nread < 0) {
         hrpc_conn_destroy(conn,
-            hadoop_uverr_alloc(-nread, "conn_read_cb error"));
+            hadoop_uverr_alloc(-nread, "conn_read_cb error %zd", -nread));
         return;
     }
     if (nread == 0) {

+ 10 - 3
hadoop-native-core/src/main/native/rpc/conn.h

@@ -155,6 +155,11 @@ struct hrpc_conn {
 
     struct hrpc_conn_writer writer;
     struct hrpc_conn_reader reader;
+
+    /**
+     * Connection name.
+     */
+    char name[0];
 };
 
 /**
@@ -165,21 +170,23 @@ struct hrpc_conn {
  * @param call          The call to make.  The connection will take ownership
  *                          of this call.  If the connection fails, the call
  *                          will be given a failure callback.
+ * @param name          The name to use for this connection.  Will be copied.
  * @param out           (out param) On success, the new connection.
  *
  * @return              NULL on success; the error otherwise.
  */
 struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
-                                    struct hrpc_call *call,
+                                    struct hrpc_call *call, const char *name,
                                     struct hrpc_conn **out);
 
 /**
- * Start an outbound call on this connection.
+ * Reuse a connection for a new call.
  *
  * @param conn          The connection.
  * @param call          The call.
  */
-void hrpc_conn_start_outbound(struct hrpc_conn *conn, struct hrpc_call *call);
+void hrpc_conn_restart_outbound(struct hrpc_conn *conn,
+                                struct hrpc_call *call);
 
 /**
  * Compare two hadoop connection objects.

+ 20 - 5
hadoop-native-core/src/main/native/rpc/messenger.c

@@ -22,16 +22,23 @@
 #include "rpc/reactor.h"
 
 #include <errno.h>
+#include <inttypes.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <uv.h>
 
 #define msgr_log_warn(msgr, fmt, ...) \
-    fprintf(stderr, "WARN: msgr %p: " fmt, msgr, __VA_ARGS__)
+    fprintf(stderr, "WARN: msgr %" PRId64 ": " fmt, msgr->id, __VA_ARGS__)
 #define msgr_log_info(msgr, fmt, ...) \
-    fprintf(stderr, "INFO: msgr %p: " fmt, msgr, __VA_ARGS__)
+    fprintf(stderr, "INFO: msgr %" PRId64 ": " fmt, msgr->id, __VA_ARGS__)
 #define msgr_log_debug(msgr, fmt, ...) \
-    fprintf(stderr, "DEBUG: msgr %p: " fmt, msgr, __VA_ARGS__)
+    fprintf(stderr, "DEBUG: msgr %" PRId64 ": " fmt, msgr->id, __VA_ARGS__)
+
+/**
+ * The highest messenger ID that has been assigned, or 0 if no messengers have
+ * been created.
+ */
+static uint64_t g_highest_messenger_id;
 
 struct hrpc_messenger_builder {
 };
@@ -43,6 +50,11 @@ struct hrpc_messenger_builder {
  * the RPC system.
  */
 struct hrpc_messenger {
+    /**
+     * Unique messenger ID.
+     */
+    uint64_t id;
+
     /**
      * The reactor thread which makes the actual network calls.
      *
@@ -73,6 +85,7 @@ struct hadoop_err *hrpc_messenger_create(
 {
     struct hrpc_messenger *msgr = NULL;
     struct hadoop_err *err = NULL;
+    char reactor_name[64];
 
     free(bld);
     msgr = calloc(1, sizeof(struct hrpc_messenger));
@@ -80,11 +93,13 @@ struct hadoop_err *hrpc_messenger_create(
         err = hadoop_lerr_alloc(ENOMEM, "hrpc_messenger_create: OOM");
         goto error;
     }
-    err = hrpc_reactor_create(&msgr->reactor);
+    msgr->id = __sync_add_and_fetch(&g_highest_messenger_id, 1);
+    snprintf(reactor_name, sizeof(reactor_name), "%" PRId64"-1", msgr->id);
+    err = hrpc_reactor_create(&msgr->reactor, reactor_name);
     if (err) {
         goto error_free_msgr;
     }
-    msgr_log_info(msgr, "created messenger %p\n", msgr);
+    msgr_log_info(msgr, "%s", "created messenger\n");
     *out = msgr;
     return NULL;
 

+ 7 - 2
hadoop-native-core/src/main/native/rpc/proxy.c

@@ -59,7 +59,7 @@ void hrpc_proxy_init(struct hrpc_proxy *proxy,
     proxy->msgr = msgr;
     proxy->protocol = protocol;
     proxy->username = username;
-    proxy->call.remote = *remote;
+    proxy->remote = *remote;
 }
 
 struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy)
@@ -113,7 +113,11 @@ void hrpc_proxy_sync_cb(struct hrpc_response *resp, struct hadoop_err *err,
                         void *cb_data)
 {
     struct hrpc_sync_ctx *ctx = cb_data;
-    ctx->resp = *resp;
+    if (resp) {
+        ctx->resp = *resp;
+    } else {
+        memset(&ctx->resp, 0, sizeof(ctx->resp));
+    }
     ctx->err = err;
     uv_sem_post(&ctx->sem);
 }
@@ -129,6 +133,7 @@ void hrpc_proxy_start(struct hrpc_proxy *proxy,
     uint8_t *buf;
     struct hrpc_call *call = &proxy->call;
 
+    call->remote = &proxy->remote;
     call->cb = cb;
     call->cb_data = cb_data;
     call->protocol = proxy->protocol;

+ 5 - 0
hadoop-native-core/src/main/native/rpc/proxy.h

@@ -59,6 +59,11 @@ struct hrpc_proxy {
      */
     const char *username;
 
+    /**
+     * Remote address we're sending to.
+     */
+    struct sockaddr_in remote;
+
     /**
      * The current call.
      */

+ 97 - 34
hadoop-native-core/src/main/native/rpc/reactor.c

@@ -25,31 +25,73 @@
 #include "rpc/reactor.h"
 
 #include <errno.h>
+#include <inttypes.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <uv.h>
 
 #define reactor_log_warn(reactor, fmt, ...) \
-    fprintf(stderr, "WARN: reactor %p: " fmt, reactor, __VA_ARGS__)
+    fprintf(stderr, "WARN: reactor %s: " fmt, \
+            reactor->name, __VA_ARGS__)
 #define reactor_log_info(msgr, fmt, ...) \
-    fprintf(stderr, "INFO: reactor %p: " fmt, reactor, __VA_ARGS__)
+    fprintf(stderr, "INFO: reactor %s: " fmt, \
+            reactor->name, __VA_ARGS__)
 #define reactor_log_debug(msgr, fmt, ...) \
-    fprintf(stderr, "DEBUG: reactor %p: " fmt, reactor, __VA_ARGS__)
+    fprintf(stderr, "DEBUG: reactor %s: " fmt, \
+            reactor->name, __VA_ARGS__)
 
 RB_GENERATE(hrpc_conns, hrpc_conn, entry, hrpc_conn_compare);
 
+/**
+ * Historically, UNIX has delivered a SIGPIPE to threads that write to a socket
+ * which has been disconnected by the other end (the "broken pipe" error.)
+ * We don't want this signal.  We would rather handle this as part of the
+ * normal control flow.
+ *
+ * It would be easy to ignore SIGPIPE for the whole process by setting a
+ * different signal handler (such as SIG_IGN).  But this could have unforseen
+ * effects on the process running our library.
+ *
+ * Some operating systems provide variants of the sendto() and send() functions
+ * (or optional arguments to those functions) that suppress SIGPIPE only for
+ * that invocation of the function.  But not all OSes provide this, and not all
+ * versions of libuv use those arguments anyway.
+ *
+ * So what can we do?  Well, since each reactor runs in its own special thread
+ * which is not shared with the rest of the application, we simply set the
+ * "signal mask" so that SIGPIPE is ignored for the reactor thread.  Since
+ * SIGPIPE is delivered only to these specific threads, this fixes the issue.
+ */
+static void block_sigpipe(struct hrpc_reactor *reactor)
+{
+    sigset_t sig_ign;
+    int ret;
+
+    if (sigemptyset(&sig_ign) || sigaddset(&sig_ign, SIGPIPE)) {
+        reactor_log_warn(reactor, "%s",
+            "suppress_sigpipe: failed to set up signal set for SIGPIPE.\n");
+        return;
+    }
+    ret = pthread_sigmask(SIG_BLOCK, &sig_ign, NULL);
+    if (ret) {
+        reactor_log_warn(reactor, "suppress_sigpipe: pthread_sigmask "
+                         "failed with error %d\n", ret);
+        return;
+    }
+}
+
 static void reactor_thread_run(void *arg)
 {
     struct hrpc_reactor *reactor = arg;
-    struct hrpc_conn *conn, *conn_tmp;
 
     reactor_log_debug(reactor, "%s", "reactor thread starting.\n");
+    block_sigpipe(reactor);
     uv_run(&reactor->loop, UV_RUN_DEFAULT);
     reactor_log_debug(reactor, "%s", "reactor thread terminating.\n");
-    RB_FOREACH_SAFE(conn, hrpc_conns, &reactor->conns, conn_tmp) {
-        hrpc_conn_destroy(conn, hadoop_lerr_alloc(ESHUTDOWN, 
-            "hrpc_reactor_start_outbound: the reactor is being shut down."));
+    if (uv_loop_close(&reactor->loop)) {
+        reactor_log_warn(reactor, "%s",
+                "uv_loop_close: failed to close the loop.");
     }
 }
 
@@ -68,9 +110,11 @@ static struct hrpc_conn *reuse_idle_conn(struct hrpc_reactor *reactor,
     memset(&exemplar, 0, sizeof(exemplar));
     exemplar.remote = *remote;
     exemplar.protocol = (char*)call->protocol;
+    exemplar.username = (char*)call->username;
     conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar);
-    if (!conn)
+    if (!conn) {
         return NULL;
+    }
     if (hrpc_conn_usable(conn, remote, call->protocol, call->username)) {
         if (conn->writer.state == HRPC_CONN_WRITE_IDLE) {
             RB_REMOVE(hrpc_conns, &reactor->conns, conn);
@@ -80,49 +124,61 @@ static struct hrpc_conn *reuse_idle_conn(struct hrpc_reactor *reactor,
     return NULL;
 }
 
+void reactor_finish_shutdown(uv_handle_t *handle)
+{
+    struct hrpc_reactor *reactor = handle->data;
+
+    // Note: other callbacks may still run after the libuv loop has been
+    // stopped.  But we won't block for I/O after this point.
+    uv_stop(&reactor->loop);
+}
+
 static void reactor_begin_shutdown(struct hrpc_reactor *reactor,
                              struct hrpc_calls *pending_calls)
 {
+    struct hrpc_conn *conn, *conn_tmp;
     struct hrpc_call *call;
 
-    reactor_log_debug(reactor, "%s", "reactor_begin_shutdown\n");
+    reactor_log_info(reactor, "%s", "reactor_begin_shutdown\n");
     STAILQ_FOREACH(call, pending_calls, entry) {
-        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN, 
-            "hrpc_reactor_start_outbound: the reactor is being shut down."));
+        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN,
+            "the reactor is being shut down."));
     }
-    // Note: other callbacks may still run after the libuv loop has been
-    // stopped.  But we won't block for I/O after this point.
-    uv_stop(&reactor->loop);
+    RB_FOREACH_SAFE(conn, hrpc_conns, &reactor->conns, conn_tmp) {
+        hrpc_conn_destroy(conn, hadoop_lerr_alloc(ESHUTDOWN,
+            "the reactor is being shut down."));
+    }
+    uv_close((uv_handle_t*)&reactor->inbox.notifier, NULL);
 }
 
 static void reactor_async_start_outbound(struct hrpc_reactor *reactor,
                                          struct hrpc_call *call)
 {
-    char remote_str[64] = { 0 };
     struct hrpc_conn *conn;
     struct hadoop_err *err;
 
-    conn = reuse_idle_conn(reactor, &call->remote, call);
+    conn = reuse_idle_conn(reactor, call->remote, call);
     if (conn) {
-        reactor_log_debug(reactor, "reactor_async_start_outbound(remote=%s) "
-                          "assigning to connection %p\n",
-                net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
-                conn);
-        hrpc_conn_start_outbound(conn, call);
+        hrpc_conn_restart_outbound(conn, call);
+        reactor_log_debug(reactor, "start_outbound: reused "
+                       "connection %s\n", conn->name);
     } else {
-        err = hrpc_conn_create_outbound(reactor, call, &conn);
+        char remote_str[64], conn_name[128];
+
+        net_ipv4_name(call->remote, remote_str, sizeof(remote_str));
+        snprintf(conn_name, sizeof(conn_name), "%s-%s-%"PRId64,
+                 reactor->name, remote_str, reactor->conns_created++);
+        err = hrpc_conn_create_outbound(reactor, call, conn_name, &conn);
         if (err) {
             reactor_log_warn(reactor, "reactor_async_start_outbound("
                 "remote=%s) got error %s\n",
-                net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
+                net_ipv4_name(call->remote, remote_str, sizeof(remote_str)),
                 hadoop_err_msg(err));
             hrpc_call_deliver_err(call, err);
             return;
         }
-        reactor_log_debug(reactor, "reactor_async_start_outbound("
-                "remote=%s) created new connection %p\n",
-                net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
-                conn);
+        reactor_log_debug(reactor, "start_outbound: created new "
+                       "connection %s\n", conn->name);
     }
     // Add or re-add the connection to the reactor's tree.
     RB_INSERT(hrpc_conns, &reactor->conns, conn);
@@ -143,10 +199,10 @@ static void reactor_async_cb(uv_async_t *handle)
 
     if (shutdown) {
         reactor_begin_shutdown(reactor, &pending_calls);
-        return;
-    }
-    STAILQ_FOREACH(call, &pending_calls, entry) {
-        reactor_async_start_outbound(reactor, call);
+    } else {
+        STAILQ_FOREACH(call, &pending_calls, entry) {
+            reactor_async_start_outbound(reactor, call);
+        }
     }
 }
 
@@ -161,18 +217,25 @@ void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn)
     }
 }
 
-struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out)
+void reactor_insert_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn)
+{
+    RB_INSERT(hrpc_conns, &reactor->conns, conn);
+}
+
+struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out,
+                                       const char *name)
 {
     struct hrpc_reactor *reactor = NULL;
     struct hadoop_err *err = NULL;
     int res;
 
-    reactor = calloc(1, sizeof(struct hrpc_reactor));
+    reactor = calloc(1, sizeof(struct hrpc_reactor) + strlen(name) + 1);
     if (!reactor) {
         err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: OOM while allocating "
                                 "reactor structure.");
         goto error_free_reactor;
     }
+    strcpy(reactor->name, name);
     if (uv_mutex_init(&reactor->inbox.lock) < 0) {
         err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: failed to init "
                                 "mutex.");
@@ -243,7 +306,7 @@ void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,
     }
     uv_mutex_unlock(&reactor->inbox.lock);
     if (shutdown) {
-        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN, 
+        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN,
             "hrpc_reactor_start_outbound: can't start call because the "
             "reactor has been shut down."));
     } else {

+ 24 - 2
hadoop-native-core/src/main/native/rpc/reactor.h

@@ -71,6 +71,11 @@ struct hrpc_reactor_inbox {
  * from another thread except the inbox.
  */
 struct hrpc_reactor {
+    /**
+     * Number of connections we've created.
+     */
+    uint64_t conns_created;
+
     /**
      * The inbox for incoming work for this reactor thread.
      */
@@ -99,6 +104,11 @@ struct hrpc_reactor {
      * The reactor thread.  All reactor callbacks are made from this context.
      */
     uv_thread_t thread;
+
+    /**
+     * Name of the reactor.
+     */
+    char name[0];
 };
 
 /**
@@ -109,10 +119,22 @@ struct hrpc_reactor {
  */
 void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn);
 
+/**
+ * Insert a connection to the reactor.
+ *
+ * @param reactor       The reactor.
+ * @param conn          The connection.
+ */
+void reactor_insert_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn);
+
 /**
  * Create the reactor thread.
+ *
+ * @param out           (out param) on success, the new reactor thread.
+ * @param name          The reactor name to use.
  */
-struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out);
+struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out,
+                                       const char *reactor_name);
 
 /**
  * Shut down the reactor thread and wait for it to terminate.
@@ -130,7 +152,7 @@ void hrpc_reactor_free(struct hrpc_reactor *reactor);
  * Start an outbound transfer.
  *
  * @param reactor       The reactor.
- * @param conn          The connection.  This connection must be either new, or 
+ * @param conn          The connection.  This connection must be either new, or
  * All pending calls will get timeout errors.
  */
 void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,