|
@@ -24,10 +24,12 @@
|
|
|
#include "rpc/call.h"
|
|
|
#include "rpc/conn.h"
|
|
|
#include "rpc/messenger.h"
|
|
|
+#include "rpc/proxy.h"
|
|
|
#include "rpc/reactor.h"
|
|
|
#include "rpc/varint.h"
|
|
|
|
|
|
#include <errno.h>
|
|
|
+#include <inttypes.h>
|
|
|
#include <stdio.h>
|
|
|
#include <stdlib.h>
|
|
|
#include <string.h>
|
|
@@ -58,6 +60,8 @@ static const uint8_t FRAME[] = {
|
|
|
|
|
|
#define FRAME_LEN sizeof(FRAME)
|
|
|
|
|
|
+#define CONNECTION_CONTEXT_CALL_ID (-3)
|
|
|
+
|
|
|
static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
|
|
|
const uv_buf_t* buf);
|
|
|
static void hrpc_conn_read_alloc(uv_handle_t *handle,
|
|
@@ -98,6 +102,7 @@ static struct hadoop_err *conn_setup_ipc_header(struct hrpc_conn *conn)
|
|
|
struct hrpc_conn_writer *writer = &conn->writer;
|
|
|
IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT;
|
|
|
RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
|
|
|
+ UserInformationProto userinfo = USER_INFORMATION_PROTO__INIT;
|
|
|
int32_t cset_len, buf_len;
|
|
|
int32_t ipc_ctx_len, rpc_req_header_len, off = 0;
|
|
|
uint8_t *buf;
|
|
@@ -107,14 +112,15 @@ static struct hadoop_err *conn_setup_ipc_header(struct hrpc_conn *conn)
|
|
|
rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
|
|
|
rpc_req_header.has_rpcop = 1;
|
|
|
rpc_req_header.rpcop = RPC_FINAL_PACKET;
|
|
|
- rpc_req_header.callid = -3; // ???
|
|
|
+ rpc_req_header.callid = CONNECTION_CONTEXT_CALL_ID;
|
|
|
rpc_req_header.clientid.data = conn->client_id.buf;
|
|
|
rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
|
|
|
rpc_req_header.has_retrycount = 0;
|
|
|
rpc_req_header_len =
|
|
|
rpc_request_header_proto__get_packed_size(&rpc_req_header);
|
|
|
-
|
|
|
- ipc_ctx.userinfo = NULL;
|
|
|
+ userinfo.effectiveuser = conn->username;
|
|
|
+ userinfo.realuser = NULL;
|
|
|
+ ipc_ctx.userinfo = &userinfo;
|
|
|
ipc_ctx.protocol = conn->protocol;
|
|
|
ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx);
|
|
|
|
|
@@ -156,7 +162,7 @@ static struct hadoop_err *hrpc_conn_setup_payload(struct hrpc_conn *conn)
|
|
|
rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
|
|
|
rpc_req_header.has_rpcop = 1;
|
|
|
rpc_req_header.rpcop = RPC_FINAL_PACKET;
|
|
|
- rpc_req_header.callid = 0; // ???
|
|
|
+ rpc_req_header.callid = conn->call_id;
|
|
|
rpc_req_header.clientid.data = conn->client_id.buf;
|
|
|
rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
|
|
|
rpc_req_header.has_retrycount = 0;
|
|
@@ -168,7 +174,7 @@ static struct hadoop_err *hrpc_conn_setup_payload(struct hrpc_conn *conn)
|
|
|
total_len64 += call->payload.len;
|
|
|
if (total_len64 > MAX_READER_BODY_LEN) {
|
|
|
err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: "
|
|
|
- "can't send a payload of length %"PRId64". The maximum "
|
|
|
+ "can't send a payload of length %" PRId64 ". The maximum "
|
|
|
"payload length is %d", total_len64, MAX_READER_BODY_LEN);
|
|
|
return err;
|
|
|
}
|
|
@@ -255,6 +261,12 @@ static void conn_start_outbound(struct hrpc_conn *conn)
|
|
|
struct hrpc_conn_writer *writer = &conn->writer;
|
|
|
int res;
|
|
|
|
|
|
+ // Get next call ID to use.
|
|
|
+ if (conn->call_id >= 0x7fffffff) {
|
|
|
+ conn->call_id = 1;
|
|
|
+ } else {
|
|
|
+ conn->call_id++;
|
|
|
+ }
|
|
|
writer->state = HRPC_CONN_WRITE_IPC_HEADER;
|
|
|
err = conn_setup_ipc_header(conn);
|
|
|
if (err) {
|
|
@@ -317,7 +329,8 @@ struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
|
|
|
conn->call = call;
|
|
|
conn->remote = call->remote;
|
|
|
conn->protocol = strdup(call->protocol);
|
|
|
- if (!conn->protocol) {
|
|
|
+ conn->username = strdup(call->username);
|
|
|
+ if ((!conn->protocol) || (!conn->username)) {
|
|
|
err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
|
|
|
goto done;
|
|
|
}
|
|
@@ -344,6 +357,7 @@ done:
|
|
|
if (err) {
|
|
|
if (conn) {
|
|
|
free(conn->protocol);
|
|
|
+ free(conn->username);
|
|
|
if (tcp_init) {
|
|
|
uv_close((uv_handle_t*)&conn->stream, NULL);
|
|
|
}
|
|
@@ -357,7 +371,7 @@ done:
|
|
|
|
|
|
int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
|
|
|
{
|
|
|
- int proto_cmp, a_active, b_active;
|
|
|
+ int proto_cmp, username_cmp, a_active, b_active;
|
|
|
|
|
|
// Big-endian versus little-endian doesn't matter here.
|
|
|
// We just want a consistent ordering on the same machine.
|
|
@@ -373,6 +387,10 @@ int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
|
|
|
proto_cmp = strcmp(a->protocol, b->protocol);
|
|
|
if (proto_cmp != 0)
|
|
|
return proto_cmp;
|
|
|
+ // Compare username.
|
|
|
+ username_cmp = strcmp(a->username, b->username);
|
|
|
+ if (username_cmp != 0)
|
|
|
+ return username_cmp;
|
|
|
// Make the inactive connections sort before the active ones.
|
|
|
a_active = !!a->call;
|
|
|
b_active = !!b->call;
|
|
@@ -390,7 +408,8 @@ int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
|
|
|
}
|
|
|
|
|
|
int hrpc_conn_usable(const struct hrpc_conn *conn,
|
|
|
- const struct sockaddr_in *addr, const char *protocol)
|
|
|
+ const struct sockaddr_in *addr,
|
|
|
+ const char *protocol, const char *username)
|
|
|
{
|
|
|
if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
|
|
|
return 0;
|
|
@@ -398,6 +417,8 @@ int hrpc_conn_usable(const struct hrpc_conn *conn,
|
|
|
return 0;
|
|
|
else if (strcmp(conn->protocol, protocol))
|
|
|
return 0;
|
|
|
+ else if (strcmp(conn->username, username))
|
|
|
+ return 0;
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
@@ -420,14 +441,14 @@ static struct hadoop_err *conn_deliver_resp(struct hrpc_conn *conn,
|
|
|
// Check if the server sent us a bogus payload_len value.
|
|
|
if (payload_len < 0) {
|
|
|
return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
|
|
|
- "server's payload_len was %"PRId32", but negative payload "
|
|
|
+ "server's payload_len was %" PRId32 ", but negative payload "
|
|
|
"lengths are not valid.", payload_len);
|
|
|
}
|
|
|
payload_end = off;
|
|
|
payload_end += payload_len;
|
|
|
if (payload_end > reader->body_len) {
|
|
|
return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
|
|
|
- "server's payload_len was %"PRId64", but there are only %d "
|
|
|
+ "server's payload_len was %" PRId64 ", but there are only %d "
|
|
|
"bytes left in the body buffer.", payload_end, reader->body_len);
|
|
|
}
|
|
|
// Reset the connection's read state. We'll hold on to the response buffer
|
|
@@ -461,13 +482,13 @@ static struct hadoop_err *conn_process_response(struct hrpc_conn *conn)
|
|
|
}
|
|
|
if (resp_header_len <= 0) {
|
|
|
err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
|
|
|
- "invalid resp_header_len of %"PRId32, resp_header_len);
|
|
|
+ "invalid resp_header_len of %" PRId32, resp_header_len);
|
|
|
goto done;
|
|
|
}
|
|
|
rem = reader->body_len - off;
|
|
|
if (resp_header_len > rem) {
|
|
|
err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
|
|
|
- "resp_header_len of %"PRId32", but there were only %"PRId32
|
|
|
+ "resp_header_len of %" PRId32 ", but there were only %" PRId32
|
|
|
" bytes left in the response.", resp_header_len, rem);
|
|
|
goto done;
|
|
|
}
|
|
@@ -479,6 +500,14 @@ static struct hadoop_err *conn_process_response(struct hrpc_conn *conn)
|
|
|
goto done;
|
|
|
}
|
|
|
off += resp_header_len;
|
|
|
+ if (resp_header->callid != conn->call_id) {
|
|
|
+ // We currently only send one request at a time. So when we get a
|
|
|
+ // response, we expect it to be for the request we just sent.
|
|
|
+ err = hadoop_lerr_alloc(EIO, "conn_process_response: incorrect call "
|
|
|
+ "id in response. Expected %" PRId32 ", got %" PRId32 ".",
|
|
|
+ conn->call_id, resp_header->callid);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) {
|
|
|
// TODO: keep connection open if we got an ERROR rather than a FATAL.
|
|
|
err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s",
|
|
@@ -546,8 +575,8 @@ static void hrpc_conn_read_alloc(uv_handle_t *handle,
|
|
|
rem = reader->body_len - reader->off;
|
|
|
if (rem <= 0) {
|
|
|
conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
|
|
|
- "HRPC_CONN_READ_BODY with reader->body_len = %"PRId32", but "
|
|
|
- "reader->off = %"PRId32"\n", reader->body_len, reader->off);
|
|
|
+ "HRPC_CONN_READ_BODY with reader->body_len = %" PRId32 ", but "
|
|
|
+ "reader->off = %" PRId32 "\n", reader->body_len, reader->off);
|
|
|
buf->base = NULL;
|
|
|
buf->len = 0;
|
|
|
return;
|
|
@@ -589,7 +618,7 @@ static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
|
|
|
reader->off += nread;
|
|
|
if (reader->off < READLEN_BUF_LEN) {
|
|
|
conn_log_debug(conn, "conn_read_cb: got partial read of "
|
|
|
- "body_len. reader->off = %"PRId32"\n",
|
|
|
+ "body_len. reader->off = %" PRId32 "\n",
|
|
|
reader->off);
|
|
|
return;
|
|
|
}
|
|
@@ -597,12 +626,12 @@ static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
|
|
|
if ((reader->body_len <= 0) ||
|
|
|
(reader->body_len > MAX_READER_BODY_LEN)) {
|
|
|
hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO,
|
|
|
- "conn_read_cb: got an invalid body length of %"PRId32"\n",
|
|
|
+ "conn_read_cb: got an invalid body length of %" PRId32 "\n",
|
|
|
reader->body_len));
|
|
|
return;
|
|
|
}
|
|
|
conn_log_debug(conn, "conn_read_cb: got body length of "
|
|
|
- "%"PRId32". Transitioning to HRPC_CONN_READ_BODY.\n",
|
|
|
+ "%" PRId32 ". Transitioning to HRPC_CONN_READ_BODY.\n",
|
|
|
reader->body_len);
|
|
|
reader->off = 0;
|
|
|
reader->state = HRPC_CONN_READ_BODY;
|
|
@@ -610,15 +639,15 @@ static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
|
|
|
if (!reader->body) {
|
|
|
hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM,
|
|
|
"hrpc_conn_read_alloc: failed to allocate "
|
|
|
- "%"PRId32" bytes.\n", reader->body_len));
|
|
|
+ "%" PRId32 " bytes.\n", reader->body_len));
|
|
|
}
|
|
|
break;
|
|
|
case HRPC_CONN_READ_BODY:
|
|
|
reader->off += nread;
|
|
|
if (reader->off < reader->body_len) {
|
|
|
conn_log_debug(conn, "conn_read_cb: got partial read of "
|
|
|
- "body. reader->off = %"PRId32" out of %"PRId32"\n",
|
|
|
- reader->off, reader->body_len);
|
|
|
+ "body. reader->off = %" PRId32 " out of %"
|
|
|
+ PRId32 "\n", reader->off, reader->body_len);
|
|
|
return;
|
|
|
}
|
|
|
err = conn_process_response(conn);
|
|
@@ -663,6 +692,7 @@ void hrpc_conn_destroy(struct hrpc_conn *conn, struct hadoop_err *err)
|
|
|
free_write_bufs(conn);
|
|
|
conn->writer.state = HRPC_CONN_WRITE_CLOSED;
|
|
|
free(conn->protocol);
|
|
|
+ free(conn->username);
|
|
|
uv_close((uv_handle_t*)&conn->stream, conn_free);
|
|
|
}
|
|
|
|