123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "common/hadoop_err.h"
- #include "common/string.h"
- #include "protobuf/IpcConnectionContext.pb-c.h.s"
- #include "protobuf/ProtobufRpcEngine.pb-c.h.s"
- #include "protobuf/RpcHeader.pb-c.h.s"
- #include "rpc/call.h"
- #include "rpc/conn.h"
- #include "rpc/messenger.h"
- #include "rpc/reactor.h"
- #include "rpc/varint.h"
- #include <errno.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <uv.h>
- #define conn_log_warn(conn, fmt, ...) \
- fprintf(stderr, "WARN: conn %p (reactor %p): " fmt, \
- conn, conn->reactor, __VA_ARGS__)
- #define conn_log_info(conn, fmt, ...) \
- fprintf(stderr, "INFO: conn %p (reactor %p): " fmt, \
- conn, conn->reactor, __VA_ARGS__)
- #define conn_log_debug(conn, fmt, ...) \
- fprintf(stderr, "DEBUG: conn %p (reactor %p): " fmt, \
- conn, conn->reactor, __VA_ARGS__)
- /**
- * The maximum length that we'll allocate to hold a response from the server.
- * This number includes the response header.
- */
- #define MAX_READER_BODY_LEN (64 * 1024 * 1024)
- static const uint8_t FRAME[] = {
- 0x68, 0x72, 0x70, 0x63, // "hrpc"
- 0x09, // version
- 0x00, // service class
- 0x00 // auth
- };
- #define FRAME_LEN sizeof(FRAME)
- static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
- const uv_buf_t* buf);
- static void hrpc_conn_read_alloc(uv_handle_t *handle,
- size_t suggested_size, uv_buf_t *buf);
- static const char *conn_write_state_str(enum hrpc_conn_write_state state)
- {
- switch (state) {
- case HRPC_CONN_WRITE_CONNECTING:
- return "HRPC_CONN_WRITE_CONNECTING";
- case HRPC_CONN_WRITE_IPC_HEADER:
- return "HRPC_CONN_WRITE_IPC_HEADER";
- case HRPC_CONN_WRITE_PAYLOAD:
- return "HRPC_CONN_WRITE_PAYLOAD";
- case HRPC_CONN_WRITE_IDLE:
- return "HRPC_CONN_WRITE_IDLE";
- case HRPC_CONN_WRITE_CLOSED:
- return "HRPC_CONN_WRITE_CLOSED";
- default:
- return "(unknown)";
- }
- };
- static void free_write_bufs(struct hrpc_conn *conn)
- {
- int i;
- uv_buf_t *cur_writes = conn->writer.cur_writes;
- for (i = 0; i < MAX_CUR_WRITES; i++) {
- free(cur_writes[i].base);
- cur_writes[i].base = NULL;
- cur_writes[i].len = 0;
- }
- }
- static struct hadoop_err *conn_setup_ipc_header(struct hrpc_conn *conn)
- {
- struct hrpc_conn_writer *writer = &conn->writer;
- IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT;
- RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
- int32_t cset_len, buf_len;
- int32_t ipc_ctx_len, rpc_req_header_len, off = 0;
- uint8_t *buf;
- struct hadoop_err *err;
- rpc_req_header.has_rpckind = 1;
- rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
- rpc_req_header.has_rpcop = 1;
- rpc_req_header.rpcop = RPC_FINAL_PACKET;
- rpc_req_header.callid = -3; // ???
- rpc_req_header.clientid.data = conn->client_id.buf;
- rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
- rpc_req_header.has_retrycount = 0;
- rpc_req_header_len =
- rpc_request_header_proto__get_packed_size(&rpc_req_header);
- ipc_ctx.userinfo = NULL;
- ipc_ctx.protocol = conn->protocol;
- ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx);
- cset_len = varint32_size(rpc_req_header_len) + rpc_req_header_len +
- varint32_size(ipc_ctx_len) + ipc_ctx_len;
- buf_len = FRAME_LEN + sizeof(uint32_t) + cset_len;
- buf = malloc(buf_len);
- if (!buf) {
- err = hadoop_lerr_alloc(ENOMEM, "conn_setup_ipc_header: "
- "failed to allocate %d bytes", buf_len);
- return err;
- }
- memcpy(buf, FRAME, FRAME_LEN);
- off += FRAME_LEN;
- be32_encode(cset_len, buf + off);
- off += sizeof(uint32_t);
- varint32_encode(rpc_req_header_len, buf, buf_len, &off);
- rpc_request_header_proto__pack(&rpc_req_header, buf + off);
- off += rpc_req_header_len;
- varint32_encode(ipc_ctx_len, buf, buf_len, &off);
- ipc_connection_context_proto__pack(&ipc_ctx, buf + off);
- free_write_bufs(conn);
- writer->cur_writes[0].base = (char*)buf;
- writer->cur_writes[0].len = buf_len;
- return NULL;
- }
- static struct hadoop_err *hrpc_conn_setup_payload(struct hrpc_conn *conn)
- {
- struct hrpc_conn_writer *writer = &conn->writer;
- struct hrpc_call *call = conn->call;
- RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
- int32_t rpc_req_header_len, header_buf_len, total_len, off = 0;
- int64_t total_len64;
- uint8_t *header_buf;
- struct hadoop_err *err;
- rpc_req_header.has_rpckind = 1;
- rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
- rpc_req_header.has_rpcop = 1;
- rpc_req_header.rpcop = RPC_FINAL_PACKET;
- rpc_req_header.callid = 0; // ???
- rpc_req_header.clientid.data = conn->client_id.buf;
- rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
- rpc_req_header.has_retrycount = 0;
- rpc_req_header_len =
- rpc_request_header_proto__get_packed_size(&rpc_req_header);
- total_len64 = varint32_size(rpc_req_header_len);
- total_len64 += rpc_req_header_len;
- total_len64 += call->payload.len;
- if (total_len64 > MAX_READER_BODY_LEN) {
- err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: "
- "can't send a payload of length %"PRId64". The maximum "
- "payload length is %d", total_len64, MAX_READER_BODY_LEN);
- return err;
- }
- total_len = (int32_t)total_len64;
- header_buf_len = total_len - call->payload.len + sizeof(uint32_t);
- header_buf = malloc(header_buf_len);
- if (!header_buf) {
- err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_setup_payload: "
- "failed to allocate %d bytes for header.", header_buf_len);
- return err;
- }
- be32_encode(total_len, header_buf + off);
- off += sizeof(uint32_t);
- varint32_encode(rpc_req_header_len, header_buf, header_buf_len, &off);
- rpc_request_header_proto__pack(&rpc_req_header, header_buf + off);
- writer->cur_writes[0].base = (char*)header_buf;
- writer->cur_writes[0].len = header_buf_len;
- writer->cur_writes[1].base = call->payload.base;
- writer->cur_writes[1].len = call->payload.len;
- call->payload.base = NULL;
- call->payload.len = 0;
- return NULL;
- }
- static void conn_write_cb(uv_write_t* req, int status)
- {
- struct hrpc_conn *conn = req->data;
- struct hrpc_conn_writer *writer = &conn->writer;
- struct hrpc_conn_reader *reader = &conn->reader;
- struct hadoop_err *err;
- int res;
- if (status) {
- err = hadoop_uverr_alloc(status,
- "conn_write_cb got error");
- hrpc_conn_destroy(conn, err);
- return;
- }
- switch (writer->state) {
- case HRPC_CONN_WRITE_IPC_HEADER:
- free_write_bufs(conn);
- writer->state = HRPC_CONN_WRITE_PAYLOAD;
- err = hrpc_conn_setup_payload(conn);
- if (err) {
- hrpc_conn_destroy(conn, err);
- return;
- }
- writer->write_req.data = conn;
- res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
- writer->cur_writes, 2, conn_write_cb);
- if (res) {
- err = hadoop_uverr_alloc(res,
- "failed to call uv_write on payload");
- hrpc_conn_destroy(conn, err);
- return;
- }
- break;
- case HRPC_CONN_WRITE_PAYLOAD:
- conn_log_debug(conn, "%s", "conn_write_cb: finished writing payload. "
- "Now waiting for response.\n");
- free_write_bufs(conn);
- writer->state = HRPC_CONN_WRITE_IDLE;
- reader->state = HRPC_CONN_READ_LEN;
- conn->stream.data = conn;
- res = uv_read_start((uv_stream_t*)&conn->stream, hrpc_conn_read_alloc,
- conn_read_cb);
- if (res) {
- err = hadoop_uverr_alloc(res, "uv_read_start failed");
- hrpc_conn_destroy(conn, err);
- return;
- }
- break;
- default:
- conn_log_warn(conn, "conn_write_cb: got an unexpected write "
- "event while in %s state.\n",
- conn_write_state_str(writer->state));
- return;
- }
- }
- static void conn_start_outbound(struct hrpc_conn *conn)
- {
- struct hadoop_err *err = NULL;
- struct hrpc_conn_writer *writer = &conn->writer;
- int res;
- writer->state = HRPC_CONN_WRITE_IPC_HEADER;
- err = conn_setup_ipc_header(conn);
- if (err) {
- hrpc_conn_destroy(conn, err);
- return;
- }
- writer->write_req.data = conn;
- res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
- writer->cur_writes, 1, conn_write_cb);
- if (res) {
- err = hadoop_uverr_alloc(res,
- "failed to call uv_write on ipc_header_buf");
- hrpc_conn_destroy(conn, err);
- return;
- }
- }
- void hrpc_conn_start_outbound(struct hrpc_conn *conn, struct hrpc_call *call)
- {
- conn->call = call;
- conn_start_outbound(conn);
- }
- static void conn_connect_cb(uv_connect_t *req, int status)
- {
- struct hrpc_conn *conn = req->data;
- struct hadoop_err *err = NULL;
- struct hrpc_conn_writer *writer = &conn->writer;
- if (status) {
- err = hadoop_uverr_alloc(status, "uv_tcp_connect failed");
- hrpc_conn_destroy(conn, err);
- return;
- }
- if (writer->state != HRPC_CONN_WRITE_CONNECTING) {
- err = hadoop_lerr_alloc(EINVAL,
- "got conn_connect_cb, but connection was not in "
- "state HRPC_CONN_WRITE_CONNECTING. state = %d",
- writer->state);
- hrpc_conn_destroy(conn, err);
- return;
- }
- conn_start_outbound(conn);
- }
- struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
- struct hrpc_call *call,
- struct hrpc_conn **out)
- {
- struct hadoop_err *err = NULL;
- struct hrpc_conn *conn = NULL;
- int res, tcp_init = 0;
- conn = calloc(1, sizeof(struct hrpc_conn));
- if (!conn) {
- err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
- goto done;
- }
- conn->reactor = reactor;
- conn->call = call;
- conn->remote = call->remote;
- conn->protocol = strdup(call->protocol);
- if (!conn->protocol) {
- err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
- goto done;
- }
- hrpc_client_id_generate_rand(&conn->client_id);
- res = uv_tcp_init(&reactor->loop, &conn->stream);
- if (res) {
- err = hadoop_uverr_alloc(res,
- "hrpc_conn_create_outbound: uv_tcp_init failed");
- goto done;
- }
- tcp_init = 1;
- conn->writer.state = HRPC_CONN_WRITE_CONNECTING;
- conn->reader.state = HRPC_CONN_UNREADABLE;
- conn->conn_req.data = conn;
- res = uv_tcp_connect(&conn->conn_req, &conn->stream,
- (struct sockaddr*)&conn->remote, conn_connect_cb);
- if (res) {
- err = hadoop_uverr_alloc(res,
- "hrpc_conn_create_outbound: uv_tcp_connect failed");
- goto done;
- }
- done:
- if (err) {
- if (conn) {
- free(conn->protocol);
- if (tcp_init) {
- uv_close((uv_handle_t*)&conn->stream, NULL);
- }
- free(conn);
- }
- return err;
- }
- *out = conn;
- return NULL;
- }
- int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
- {
- int proto_cmp, a_active, b_active;
- // Big-endian versus little-endian doesn't matter here.
- // We just want a consistent ordering on the same machine.
- if (a->remote.sin_addr.s_addr < b->remote.sin_addr.s_addr)
- return -1;
- else if (a->remote.sin_addr.s_addr > b->remote.sin_addr.s_addr)
- return 1;
- else if (a->remote.sin_port < b->remote.sin_port)
- return -1;
- else if (a->remote.sin_port > b->remote.sin_port)
- return 1;
- // Compare protocol name.
- proto_cmp = strcmp(a->protocol, b->protocol);
- if (proto_cmp != 0)
- return proto_cmp;
- // Make the inactive connections sort before the active ones.
- a_active = !!a->call;
- b_active = !!b->call;
- if (a_active < b_active)
- return -1;
- else if (a_active > b_active)
- return 1;
- // Compare pointer identity, so that no two distinct connections are
- // ever identical.
- else if (a < b)
- return -1;
- else if (a > b)
- return 1;
- return 0;
- }
- int hrpc_conn_usable(const struct hrpc_conn *conn,
- const struct sockaddr_in *addr, const char *protocol)
- {
- if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
- return 0;
- else if (conn->remote.sin_port != addr->sin_port)
- return 0;
- else if (strcmp(conn->protocol, protocol))
- return 0;
- return 1;
- }
- static void free_read_bufs(struct hrpc_conn *conn)
- {
- free(conn->reader.body);
- conn->reader.body = NULL;
- conn->reader.body_len = 0;
- conn->reader.off = 0;
- }
- static struct hadoop_err *conn_deliver_resp(struct hrpc_conn *conn,
- int32_t off, int32_t payload_len)
- {
- struct hrpc_conn_reader *reader = &conn->reader;
- struct hrpc_call *call = conn->call;
- int64_t payload_end;
- struct hrpc_response resp;
- // Check if the server sent us a bogus payload_len value.
- if (payload_len < 0) {
- return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
- "server's payload_len was %"PRId32", but negative payload "
- "lengths are not valid.", payload_len);
- }
- payload_end = off;
- payload_end += payload_len;
- if (payload_end > reader->body_len) {
- return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
- "server's payload_len was %"PRId64", but there are only %d "
- "bytes left in the body buffer.", payload_end, reader->body_len);
- }
- // Reset the connection's read state. We'll hold on to the response buffer
- // while making the callback.
- resp.pb_base = (uint8_t*)(reader->body + off);
- resp.pb_len = payload_len;
- resp.base = reader->body;
- reader->body = NULL;
- free_read_bufs(conn);
- conn->call = NULL;
- conn->reader.state = HRPC_CONN_UNREADABLE;
- // TODO: cache connections
- hrpc_conn_destroy(conn, NULL);
- hrpc_call_deliver_resp(call, &resp);
- return NULL;
- }
- static struct hadoop_err *conn_process_response(struct hrpc_conn *conn)
- {
- struct hrpc_conn_reader *reader = &conn->reader;
- RpcResponseHeaderProto *resp_header = NULL;
- int32_t off = 0, resp_header_len, payload_len, rem;
- struct hadoop_err *err = NULL;
- if (varint32_decode(&resp_header_len,
- (uint8_t*)reader->body, reader->body_len, &off)) {
- err = hadoop_lerr_alloc(EIO, "conn_process_response: response was "
- "only %d bytes-- too short to read the rpc request header.",
- reader->body_len);
- goto done;
- }
- if (resp_header_len <= 0) {
- err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
- "invalid resp_header_len of %"PRId32, resp_header_len);
- goto done;
- }
- rem = reader->body_len - off;
- if (resp_header_len > rem) {
- err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
- "resp_header_len of %"PRId32", but there were only %"PRId32
- " bytes left in the response.", resp_header_len, rem);
- goto done;
- }
- resp_header = rpc_response_header_proto__unpack(NULL, resp_header_len,
- (const uint8_t*)(reader->body + off));
- if (!resp_header) {
- err = hadoop_lerr_alloc(EIO, "conn_process_response: failed to "
- "parse RpcRequestHeaderProto.");
- goto done;
- }
- off += resp_header_len;
- if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) {
- // TODO: keep connection open if we got an ERROR rather than a FATAL.
- err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s",
- resp_header->exceptionclassname, resp_header->errormsg);
- goto done;
- }
- if (varint32_decode(&payload_len,
- (uint8_t*)reader->body, reader->body_len, &off)) {
- err = hadoop_lerr_alloc(EIO, "conn_process_response: header was %d "
- "bytes, and total length was %d-- too short to read the "
- "payload.", resp_header_len, reader->body_len);
- goto done;
- }
- err = conn_deliver_resp(conn, off, payload_len);
- done:
- if (resp_header) {
- rpc_response_header_proto__free_unpacked(resp_header, NULL);
- }
- return err;
- }
- static const char *conn_read_state_str(enum hrpc_conn_read_state state)
- {
- switch (state) {
- case HRPC_CONN_UNREADABLE:
- return "HRPC_CONN_UNREADABLE";
- case HRPC_CONN_READ_LEN:
- return "HRPC_CONN_READ_LEN";
- case HRPC_CONN_READ_BODY:
- return "HRPC_CONN_READ_BODY";
- case HRPC_CONN_READ_CLOSED:
- return "HRPC_CONN_READ_CLOSED";
- default:
- return "(unknown)";
- }
- };
- /**
- * Return a read buffer to libuv.
- *
- * We don't do the actual allocation here, for two reasons. The first is that
- * we'd like to know how big a buffer to allocate first (by reading the first
- * 4 bytes). The second is that libuv doesn't really take kindly to
- * failures here... returning a zero-length buffer triggers a crash.
- * So we simply return previously allocated buffers here.
- *
- * @param handle The TCP stream.
- * @param suggested_size The suggested size.
- *
- * @return The read buffer to use.
- */
- static void hrpc_conn_read_alloc(uv_handle_t *handle,
- size_t suggested_size __attribute__((unused)), uv_buf_t *buf)
- {
- int32_t rem;
- struct hrpc_conn *conn = handle->data;
- struct hrpc_conn_reader *reader = &conn->reader;
- switch (reader->state) {
- case HRPC_CONN_READ_LEN:
- buf->base = (char*)(reader->body_len_buf + reader->off);
- buf->len = READLEN_BUF_LEN - reader->off;
- return;
- case HRPC_CONN_READ_BODY:
- rem = reader->body_len - reader->off;
- if (rem <= 0) {
- conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
- "HRPC_CONN_READ_BODY with reader->body_len = %"PRId32", but "
- "reader->off = %"PRId32"\n", reader->body_len, reader->off);
- buf->base = NULL;
- buf->len = 0;
- return;
- }
- buf->base = (char*)(reader->body + reader->off);
- buf->len = rem;
- return;
- default:
- conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
- "%s, but we're being asked to allocate "
- "a read buffer.\n", conn_read_state_str(reader->state));
- buf->base = NULL;
- buf->len = 0;
- return;
- }
- }
- /**
- * The read callback for this connection.
- */
- static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
- const uv_buf_t* buf __attribute__((unused)))
- {
- struct hrpc_conn *conn = stream->data;
- struct hrpc_conn_reader *reader = &conn->reader;
- struct hadoop_err *err;
- if (nread < 0) {
- hrpc_conn_destroy(conn,
- hadoop_uverr_alloc(-nread, "conn_read_cb error"));
- return;
- }
- if (nread == 0) {
- // Nothing to do.
- return;
- }
- switch (reader->state) {
- case HRPC_CONN_READ_LEN:
- reader->off += nread;
- if (reader->off < READLEN_BUF_LEN) {
- conn_log_debug(conn, "conn_read_cb: got partial read of "
- "body_len. reader->off = %"PRId32"\n",
- reader->off);
- return;
- }
- reader->body_len = be32_decode(reader->body_len_buf);
- if ((reader->body_len <= 0) ||
- (reader->body_len > MAX_READER_BODY_LEN)) {
- hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO,
- "conn_read_cb: got an invalid body length of %"PRId32"\n",
- reader->body_len));
- return;
- }
- conn_log_debug(conn, "conn_read_cb: got body length of "
- "%"PRId32". Transitioning to HRPC_CONN_READ_BODY.\n",
- reader->body_len);
- reader->off = 0;
- reader->state = HRPC_CONN_READ_BODY;
- reader->body = malloc(reader->body_len);
- if (!reader->body) {
- hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM,
- "hrpc_conn_read_alloc: failed to allocate "
- "%"PRId32" bytes.\n", reader->body_len));
- }
- break;
- case HRPC_CONN_READ_BODY:
- reader->off += nread;
- if (reader->off < reader->body_len) {
- conn_log_debug(conn, "conn_read_cb: got partial read of "
- "body. reader->off = %"PRId32" out of %"PRId32"\n",
- reader->off, reader->body_len);
- return;
- }
- err = conn_process_response(conn);
- free_read_bufs(conn);
- if (err) {
- hrpc_conn_destroy(conn, err);
- return;
- }
- reader->state = HRPC_CONN_UNREADABLE;
- break;
- default:
- conn_log_warn(conn, "conn_read_cb: got an unexpected read "
- "event while in %s state.\n",
- conn_read_state_str(reader->state));
- return;
- }
- }
- static void conn_free(uv_handle_t* handle)
- {
- struct hrpc_conn *conn = handle->data;
- free(conn);
- }
- void hrpc_conn_destroy(struct hrpc_conn *conn, struct hadoop_err *err)
- {
- reactor_remove_conn(conn->reactor, conn);
- if (conn->call) {
- err = err ? err : hadoop_lerr_alloc(EFAULT, "hrpc_conn_destroy: "
- "internal error: shutting down connection while it "
- "still has a call in progress.");
- conn_log_warn(conn, "hrpc_conn_destroy: %s\n", hadoop_err_msg(err));
- hrpc_call_deliver_err(conn->call, err);
- conn->call = NULL;
- } else if (err) {
- conn_log_warn(conn, "hrpc_conn_destroy: %s\n", hadoop_err_msg(err));
- hadoop_err_free(err);
- }
- free_read_bufs(conn);
- conn->reader.state = HRPC_CONN_READ_CLOSED;
- free_write_bufs(conn);
- conn->writer.state = HRPC_CONN_WRITE_CLOSED;
- free(conn->protocol);
- uv_close((uv_handle_t*)&conn->stream, conn_free);
- }
- // vim: ts=4:sw=4:tw=79:et
|