conn.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/hadoop_err.h"
  19. #include "common/string.h"
  20. #include "protobuf/IpcConnectionContext.pb-c.h.s"
  21. #include "protobuf/ProtobufRpcEngine.pb-c.h.s"
  22. #include "protobuf/RpcHeader.pb-c.h.s"
  23. #include "rpc/call.h"
  24. #include "rpc/conn.h"
  25. #include "rpc/messenger.h"
  26. #include "rpc/reactor.h"
  27. #include "rpc/varint.h"
  28. #include <errno.h>
  29. #include <stdio.h>
  30. #include <stdlib.h>
  31. #include <string.h>
  32. #include <uv.h>
  33. #define conn_log_warn(conn, fmt, ...) \
  34. fprintf(stderr, "WARN: conn %p (reactor %p): " fmt, \
  35. conn, conn->reactor, __VA_ARGS__)
  36. #define conn_log_info(conn, fmt, ...) \
  37. fprintf(stderr, "INFO: conn %p (reactor %p): " fmt, \
  38. conn, conn->reactor, __VA_ARGS__)
  39. #define conn_log_debug(conn, fmt, ...) \
  40. fprintf(stderr, "DEBUG: conn %p (reactor %p): " fmt, \
  41. conn, conn->reactor, __VA_ARGS__)
  42. /**
  43. * The maximum length that we'll allocate to hold a response from the server.
  44. * This number includes the response header.
  45. */
  46. #define MAX_READER_BODY_LEN (64 * 1024 * 1024)
  47. static const uint8_t FRAME[] = {
  48. 0x68, 0x72, 0x70, 0x63, // "hrpc"
  49. 0x09, // version
  50. 0x00, // service class
  51. 0x00 // auth
  52. };
  53. #define FRAME_LEN sizeof(FRAME)
  54. static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
  55. const uv_buf_t* buf);
  56. static void hrpc_conn_read_alloc(uv_handle_t *handle,
  57. size_t suggested_size, uv_buf_t *buf);
  58. static const char *conn_write_state_str(enum hrpc_conn_write_state state)
  59. {
  60. switch (state) {
  61. case HRPC_CONN_WRITE_CONNECTING:
  62. return "HRPC_CONN_WRITE_CONNECTING";
  63. case HRPC_CONN_WRITE_IPC_HEADER:
  64. return "HRPC_CONN_WRITE_IPC_HEADER";
  65. case HRPC_CONN_WRITE_PAYLOAD:
  66. return "HRPC_CONN_WRITE_PAYLOAD";
  67. case HRPC_CONN_WRITE_IDLE:
  68. return "HRPC_CONN_WRITE_IDLE";
  69. case HRPC_CONN_WRITE_CLOSED:
  70. return "HRPC_CONN_WRITE_CLOSED";
  71. default:
  72. return "(unknown)";
  73. }
  74. };
  75. static void free_write_bufs(struct hrpc_conn *conn)
  76. {
  77. int i;
  78. uv_buf_t *cur_writes = conn->writer.cur_writes;
  79. for (i = 0; i < MAX_CUR_WRITES; i++) {
  80. free(cur_writes[i].base);
  81. cur_writes[i].base = NULL;
  82. cur_writes[i].len = 0;
  83. }
  84. }
  85. static struct hadoop_err *conn_setup_ipc_header(struct hrpc_conn *conn)
  86. {
  87. struct hrpc_conn_writer *writer = &conn->writer;
  88. IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT;
  89. RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
  90. int32_t cset_len, buf_len;
  91. int32_t ipc_ctx_len, rpc_req_header_len, off = 0;
  92. uint8_t *buf;
  93. struct hadoop_err *err;
  94. rpc_req_header.has_rpckind = 1;
  95. rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
  96. rpc_req_header.has_rpcop = 1;
  97. rpc_req_header.rpcop = RPC_FINAL_PACKET;
  98. rpc_req_header.callid = -3; // ???
  99. rpc_req_header.clientid.data = conn->client_id.buf;
  100. rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
  101. rpc_req_header.has_retrycount = 0;
  102. rpc_req_header_len =
  103. rpc_request_header_proto__get_packed_size(&rpc_req_header);
  104. ipc_ctx.userinfo = NULL;
  105. ipc_ctx.protocol = conn->protocol;
  106. ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx);
  107. cset_len = varint32_size(rpc_req_header_len) + rpc_req_header_len +
  108. varint32_size(ipc_ctx_len) + ipc_ctx_len;
  109. buf_len = FRAME_LEN + sizeof(uint32_t) + cset_len;
  110. buf = malloc(buf_len);
  111. if (!buf) {
  112. err = hadoop_lerr_alloc(ENOMEM, "conn_setup_ipc_header: "
  113. "failed to allocate %d bytes", buf_len);
  114. return err;
  115. }
  116. memcpy(buf, FRAME, FRAME_LEN);
  117. off += FRAME_LEN;
  118. be32_encode(cset_len, buf + off);
  119. off += sizeof(uint32_t);
  120. varint32_encode(rpc_req_header_len, buf, buf_len, &off);
  121. rpc_request_header_proto__pack(&rpc_req_header, buf + off);
  122. off += rpc_req_header_len;
  123. varint32_encode(ipc_ctx_len, buf, buf_len, &off);
  124. ipc_connection_context_proto__pack(&ipc_ctx, buf + off);
  125. free_write_bufs(conn);
  126. writer->cur_writes[0].base = (char*)buf;
  127. writer->cur_writes[0].len = buf_len;
  128. return NULL;
  129. }
  130. static struct hadoop_err *hrpc_conn_setup_payload(struct hrpc_conn *conn)
  131. {
  132. struct hrpc_conn_writer *writer = &conn->writer;
  133. struct hrpc_call *call = conn->call;
  134. RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
  135. int32_t rpc_req_header_len, header_buf_len, total_len, off = 0;
  136. int64_t total_len64;
  137. uint8_t *header_buf;
  138. struct hadoop_err *err;
  139. rpc_req_header.has_rpckind = 1;
  140. rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
  141. rpc_req_header.has_rpcop = 1;
  142. rpc_req_header.rpcop = RPC_FINAL_PACKET;
  143. rpc_req_header.callid = 0; // ???
  144. rpc_req_header.clientid.data = conn->client_id.buf;
  145. rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
  146. rpc_req_header.has_retrycount = 0;
  147. rpc_req_header_len =
  148. rpc_request_header_proto__get_packed_size(&rpc_req_header);
  149. total_len64 = varint32_size(rpc_req_header_len);
  150. total_len64 += rpc_req_header_len;
  151. total_len64 += call->payload.len;
  152. if (total_len64 > MAX_READER_BODY_LEN) {
  153. err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: "
  154. "can't send a payload of length %"PRId64". The maximum "
  155. "payload length is %d", total_len64, MAX_READER_BODY_LEN);
  156. return err;
  157. }
  158. total_len = (int32_t)total_len64;
  159. header_buf_len = total_len - call->payload.len + sizeof(uint32_t);
  160. header_buf = malloc(header_buf_len);
  161. if (!header_buf) {
  162. err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_setup_payload: "
  163. "failed to allocate %d bytes for header.", header_buf_len);
  164. return err;
  165. }
  166. be32_encode(total_len, header_buf + off);
  167. off += sizeof(uint32_t);
  168. varint32_encode(rpc_req_header_len, header_buf, header_buf_len, &off);
  169. rpc_request_header_proto__pack(&rpc_req_header, header_buf + off);
  170. writer->cur_writes[0].base = (char*)header_buf;
  171. writer->cur_writes[0].len = header_buf_len;
  172. writer->cur_writes[1].base = call->payload.base;
  173. writer->cur_writes[1].len = call->payload.len;
  174. call->payload.base = NULL;
  175. call->payload.len = 0;
  176. return NULL;
  177. }
  178. static void conn_write_cb(uv_write_t* req, int status)
  179. {
  180. struct hrpc_conn *conn = req->data;
  181. struct hrpc_conn_writer *writer = &conn->writer;
  182. struct hrpc_conn_reader *reader = &conn->reader;
  183. struct hadoop_err *err;
  184. int res;
  185. if (status) {
  186. err = hadoop_uverr_alloc(status,
  187. "conn_write_cb got error");
  188. hrpc_conn_destroy(conn, err);
  189. return;
  190. }
  191. switch (writer->state) {
  192. case HRPC_CONN_WRITE_IPC_HEADER:
  193. free_write_bufs(conn);
  194. writer->state = HRPC_CONN_WRITE_PAYLOAD;
  195. err = hrpc_conn_setup_payload(conn);
  196. if (err) {
  197. hrpc_conn_destroy(conn, err);
  198. return;
  199. }
  200. writer->write_req.data = conn;
  201. res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
  202. writer->cur_writes, 2, conn_write_cb);
  203. if (res) {
  204. err = hadoop_uverr_alloc(res,
  205. "failed to call uv_write on payload");
  206. hrpc_conn_destroy(conn, err);
  207. return;
  208. }
  209. break;
  210. case HRPC_CONN_WRITE_PAYLOAD:
  211. conn_log_debug(conn, "%s", "conn_write_cb: finished writing payload. "
  212. "Now waiting for response.\n");
  213. free_write_bufs(conn);
  214. writer->state = HRPC_CONN_WRITE_IDLE;
  215. reader->state = HRPC_CONN_READ_LEN;
  216. conn->stream.data = conn;
  217. res = uv_read_start((uv_stream_t*)&conn->stream, hrpc_conn_read_alloc,
  218. conn_read_cb);
  219. if (res) {
  220. err = hadoop_uverr_alloc(res, "uv_read_start failed");
  221. hrpc_conn_destroy(conn, err);
  222. return;
  223. }
  224. break;
  225. default:
  226. conn_log_warn(conn, "conn_write_cb: got an unexpected write "
  227. "event while in %s state.\n",
  228. conn_write_state_str(writer->state));
  229. return;
  230. }
  231. }
  232. static void conn_start_outbound(struct hrpc_conn *conn)
  233. {
  234. struct hadoop_err *err = NULL;
  235. struct hrpc_conn_writer *writer = &conn->writer;
  236. int res;
  237. writer->state = HRPC_CONN_WRITE_IPC_HEADER;
  238. err = conn_setup_ipc_header(conn);
  239. if (err) {
  240. hrpc_conn_destroy(conn, err);
  241. return;
  242. }
  243. writer->write_req.data = conn;
  244. res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
  245. writer->cur_writes, 1, conn_write_cb);
  246. if (res) {
  247. err = hadoop_uverr_alloc(res,
  248. "failed to call uv_write on ipc_header_buf");
  249. hrpc_conn_destroy(conn, err);
  250. return;
  251. }
  252. }
  253. void hrpc_conn_start_outbound(struct hrpc_conn *conn, struct hrpc_call *call)
  254. {
  255. conn->call = call;
  256. conn_start_outbound(conn);
  257. }
  258. static void conn_connect_cb(uv_connect_t *req, int status)
  259. {
  260. struct hrpc_conn *conn = req->data;
  261. struct hadoop_err *err = NULL;
  262. struct hrpc_conn_writer *writer = &conn->writer;
  263. if (status) {
  264. err = hadoop_uverr_alloc(status, "uv_tcp_connect failed");
  265. hrpc_conn_destroy(conn, err);
  266. return;
  267. }
  268. if (writer->state != HRPC_CONN_WRITE_CONNECTING) {
  269. err = hadoop_lerr_alloc(EINVAL,
  270. "got conn_connect_cb, but connection was not in "
  271. "state HRPC_CONN_WRITE_CONNECTING. state = %d",
  272. writer->state);
  273. hrpc_conn_destroy(conn, err);
  274. return;
  275. }
  276. conn_start_outbound(conn);
  277. }
  278. struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
  279. struct hrpc_call *call,
  280. struct hrpc_conn **out)
  281. {
  282. struct hadoop_err *err = NULL;
  283. struct hrpc_conn *conn = NULL;
  284. int res, tcp_init = 0;
  285. conn = calloc(1, sizeof(struct hrpc_conn));
  286. if (!conn) {
  287. err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
  288. goto done;
  289. }
  290. conn->reactor = reactor;
  291. conn->call = call;
  292. conn->remote = call->remote;
  293. conn->protocol = strdup(call->protocol);
  294. if (!conn->protocol) {
  295. err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
  296. goto done;
  297. }
  298. hrpc_client_id_generate_rand(&conn->client_id);
  299. res = uv_tcp_init(&reactor->loop, &conn->stream);
  300. if (res) {
  301. err = hadoop_uverr_alloc(res,
  302. "hrpc_conn_create_outbound: uv_tcp_init failed");
  303. goto done;
  304. }
  305. tcp_init = 1;
  306. conn->writer.state = HRPC_CONN_WRITE_CONNECTING;
  307. conn->reader.state = HRPC_CONN_UNREADABLE;
  308. conn->conn_req.data = conn;
  309. res = uv_tcp_connect(&conn->conn_req, &conn->stream,
  310. (struct sockaddr*)&conn->remote, conn_connect_cb);
  311. if (res) {
  312. err = hadoop_uverr_alloc(res,
  313. "hrpc_conn_create_outbound: uv_tcp_connect failed");
  314. goto done;
  315. }
  316. done:
  317. if (err) {
  318. if (conn) {
  319. free(conn->protocol);
  320. if (tcp_init) {
  321. uv_close((uv_handle_t*)&conn->stream, NULL);
  322. }
  323. free(conn);
  324. }
  325. return err;
  326. }
  327. *out = conn;
  328. return NULL;
  329. }
  330. int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
  331. {
  332. int proto_cmp, a_active, b_active;
  333. // Big-endian versus little-endian doesn't matter here.
  334. // We just want a consistent ordering on the same machine.
  335. if (a->remote.sin_addr.s_addr < b->remote.sin_addr.s_addr)
  336. return -1;
  337. else if (a->remote.sin_addr.s_addr > b->remote.sin_addr.s_addr)
  338. return 1;
  339. else if (a->remote.sin_port < b->remote.sin_port)
  340. return -1;
  341. else if (a->remote.sin_port > b->remote.sin_port)
  342. return 1;
  343. // Compare protocol name.
  344. proto_cmp = strcmp(a->protocol, b->protocol);
  345. if (proto_cmp != 0)
  346. return proto_cmp;
  347. // Make the inactive connections sort before the active ones.
  348. a_active = !!a->call;
  349. b_active = !!b->call;
  350. if (a_active < b_active)
  351. return -1;
  352. else if (a_active > b_active)
  353. return 1;
  354. // Compare pointer identity, so that no two distinct connections are
  355. // ever identical.
  356. else if (a < b)
  357. return -1;
  358. else if (a > b)
  359. return 1;
  360. return 0;
  361. }
  362. int hrpc_conn_usable(const struct hrpc_conn *conn,
  363. const struct sockaddr_in *addr, const char *protocol)
  364. {
  365. if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
  366. return 0;
  367. else if (conn->remote.sin_port != addr->sin_port)
  368. return 0;
  369. else if (strcmp(conn->protocol, protocol))
  370. return 0;
  371. return 1;
  372. }
  373. static void free_read_bufs(struct hrpc_conn *conn)
  374. {
  375. free(conn->reader.body);
  376. conn->reader.body = NULL;
  377. conn->reader.body_len = 0;
  378. conn->reader.off = 0;
  379. }
  380. static struct hadoop_err *conn_deliver_resp(struct hrpc_conn *conn,
  381. int32_t off, int32_t payload_len)
  382. {
  383. struct hrpc_conn_reader *reader = &conn->reader;
  384. struct hrpc_call *call = conn->call;
  385. int64_t payload_end;
  386. struct hrpc_response resp;
  387. // Check if the server sent us a bogus payload_len value.
  388. if (payload_len < 0) {
  389. return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
  390. "server's payload_len was %"PRId32", but negative payload "
  391. "lengths are not valid.", payload_len);
  392. }
  393. payload_end = off;
  394. payload_end += payload_len;
  395. if (payload_end > reader->body_len) {
  396. return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
  397. "server's payload_len was %"PRId64", but there are only %d "
  398. "bytes left in the body buffer.", payload_end, reader->body_len);
  399. }
  400. // Reset the connection's read state. We'll hold on to the response buffer
  401. // while making the callback.
  402. resp.pb_base = (uint8_t*)(reader->body + off);
  403. resp.pb_len = payload_len;
  404. resp.base = reader->body;
  405. reader->body = NULL;
  406. free_read_bufs(conn);
  407. conn->call = NULL;
  408. conn->reader.state = HRPC_CONN_UNREADABLE;
  409. // TODO: cache connections
  410. hrpc_conn_destroy(conn, NULL);
  411. hrpc_call_deliver_resp(call, &resp);
  412. return NULL;
  413. }
  414. static struct hadoop_err *conn_process_response(struct hrpc_conn *conn)
  415. {
  416. struct hrpc_conn_reader *reader = &conn->reader;
  417. RpcResponseHeaderProto *resp_header = NULL;
  418. int32_t off = 0, resp_header_len, payload_len, rem;
  419. struct hadoop_err *err = NULL;
  420. if (varint32_decode(&resp_header_len,
  421. (uint8_t*)reader->body, reader->body_len, &off)) {
  422. err = hadoop_lerr_alloc(EIO, "conn_process_response: response was "
  423. "only %d bytes-- too short to read the rpc request header.",
  424. reader->body_len);
  425. goto done;
  426. }
  427. if (resp_header_len <= 0) {
  428. err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
  429. "invalid resp_header_len of %"PRId32, resp_header_len);
  430. goto done;
  431. }
  432. rem = reader->body_len - off;
  433. if (resp_header_len > rem) {
  434. err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
  435. "resp_header_len of %"PRId32", but there were only %"PRId32
  436. " bytes left in the response.", resp_header_len, rem);
  437. goto done;
  438. }
  439. resp_header = rpc_response_header_proto__unpack(NULL, resp_header_len,
  440. (const uint8_t*)(reader->body + off));
  441. if (!resp_header) {
  442. err = hadoop_lerr_alloc(EIO, "conn_process_response: failed to "
  443. "parse RpcRequestHeaderProto.");
  444. goto done;
  445. }
  446. off += resp_header_len;
  447. if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) {
  448. // TODO: keep connection open if we got an ERROR rather than a FATAL.
  449. err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s",
  450. resp_header->exceptionclassname, resp_header->errormsg);
  451. goto done;
  452. }
  453. if (varint32_decode(&payload_len,
  454. (uint8_t*)reader->body, reader->body_len, &off)) {
  455. err = hadoop_lerr_alloc(EIO, "conn_process_response: header was %d "
  456. "bytes, and total length was %d-- too short to read the "
  457. "payload.", resp_header_len, reader->body_len);
  458. goto done;
  459. }
  460. err = conn_deliver_resp(conn, off, payload_len);
  461. done:
  462. if (resp_header) {
  463. rpc_response_header_proto__free_unpacked(resp_header, NULL);
  464. }
  465. return err;
  466. }
  467. static const char *conn_read_state_str(enum hrpc_conn_read_state state)
  468. {
  469. switch (state) {
  470. case HRPC_CONN_UNREADABLE:
  471. return "HRPC_CONN_UNREADABLE";
  472. case HRPC_CONN_READ_LEN:
  473. return "HRPC_CONN_READ_LEN";
  474. case HRPC_CONN_READ_BODY:
  475. return "HRPC_CONN_READ_BODY";
  476. case HRPC_CONN_READ_CLOSED:
  477. return "HRPC_CONN_READ_CLOSED";
  478. default:
  479. return "(unknown)";
  480. }
  481. };
  482. /**
  483. * Return a read buffer to libuv.
  484. *
  485. * We don't do the actual allocation here, for two reasons. The first is that
  486. * we'd like to know how big a buffer to allocate first (by reading the first
  487. * 4 bytes). The second is that libuv doesn't really take kindly to
  488. * failures here... returning a zero-length buffer triggers a crash.
  489. * So we simply return previously allocated buffers here.
  490. *
  491. * @param handle The TCP stream.
  492. * @param suggested_size The suggested size.
  493. *
  494. * @return The read buffer to use.
  495. */
  496. static void hrpc_conn_read_alloc(uv_handle_t *handle,
  497. size_t suggested_size __attribute__((unused)), uv_buf_t *buf)
  498. {
  499. int32_t rem;
  500. struct hrpc_conn *conn = handle->data;
  501. struct hrpc_conn_reader *reader = &conn->reader;
  502. switch (reader->state) {
  503. case HRPC_CONN_READ_LEN:
  504. buf->base = (char*)(reader->body_len_buf + reader->off);
  505. buf->len = READLEN_BUF_LEN - reader->off;
  506. return;
  507. case HRPC_CONN_READ_BODY:
  508. rem = reader->body_len - reader->off;
  509. if (rem <= 0) {
  510. conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
  511. "HRPC_CONN_READ_BODY with reader->body_len = %"PRId32", but "
  512. "reader->off = %"PRId32"\n", reader->body_len, reader->off);
  513. buf->base = NULL;
  514. buf->len = 0;
  515. return;
  516. }
  517. buf->base = (char*)(reader->body + reader->off);
  518. buf->len = rem;
  519. return;
  520. default:
  521. conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
  522. "%s, but we're being asked to allocate "
  523. "a read buffer.\n", conn_read_state_str(reader->state));
  524. buf->base = NULL;
  525. buf->len = 0;
  526. return;
  527. }
  528. }
  529. /**
  530. * The read callback for this connection.
  531. */
  532. static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
  533. const uv_buf_t* buf __attribute__((unused)))
  534. {
  535. struct hrpc_conn *conn = stream->data;
  536. struct hrpc_conn_reader *reader = &conn->reader;
  537. struct hadoop_err *err;
  538. if (nread < 0) {
  539. hrpc_conn_destroy(conn,
  540. hadoop_uverr_alloc(-nread, "conn_read_cb error"));
  541. return;
  542. }
  543. if (nread == 0) {
  544. // Nothing to do.
  545. return;
  546. }
  547. switch (reader->state) {
  548. case HRPC_CONN_READ_LEN:
  549. reader->off += nread;
  550. if (reader->off < READLEN_BUF_LEN) {
  551. conn_log_debug(conn, "conn_read_cb: got partial read of "
  552. "body_len. reader->off = %"PRId32"\n",
  553. reader->off);
  554. return;
  555. }
  556. reader->body_len = be32_decode(reader->body_len_buf);
  557. if ((reader->body_len <= 0) ||
  558. (reader->body_len > MAX_READER_BODY_LEN)) {
  559. hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO,
  560. "conn_read_cb: got an invalid body length of %"PRId32"\n",
  561. reader->body_len));
  562. return;
  563. }
  564. conn_log_debug(conn, "conn_read_cb: got body length of "
  565. "%"PRId32". Transitioning to HRPC_CONN_READ_BODY.\n",
  566. reader->body_len);
  567. reader->off = 0;
  568. reader->state = HRPC_CONN_READ_BODY;
  569. reader->body = malloc(reader->body_len);
  570. if (!reader->body) {
  571. hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM,
  572. "hrpc_conn_read_alloc: failed to allocate "
  573. "%"PRId32" bytes.\n", reader->body_len));
  574. }
  575. break;
  576. case HRPC_CONN_READ_BODY:
  577. reader->off += nread;
  578. if (reader->off < reader->body_len) {
  579. conn_log_debug(conn, "conn_read_cb: got partial read of "
  580. "body. reader->off = %"PRId32" out of %"PRId32"\n",
  581. reader->off, reader->body_len);
  582. return;
  583. }
  584. err = conn_process_response(conn);
  585. free_read_bufs(conn);
  586. if (err) {
  587. hrpc_conn_destroy(conn, err);
  588. return;
  589. }
  590. reader->state = HRPC_CONN_UNREADABLE;
  591. break;
  592. default:
  593. conn_log_warn(conn, "conn_read_cb: got an unexpected read "
  594. "event while in %s state.\n",
  595. conn_read_state_str(reader->state));
  596. return;
  597. }
  598. }
  599. static void conn_free(uv_handle_t* handle)
  600. {
  601. struct hrpc_conn *conn = handle->data;
  602. free(conn);
  603. }
  604. void hrpc_conn_destroy(struct hrpc_conn *conn, struct hadoop_err *err)
  605. {
  606. reactor_remove_conn(conn->reactor, conn);
  607. if (conn->call) {
  608. err = err ? err : hadoop_lerr_alloc(EFAULT, "hrpc_conn_destroy: "
  609. "internal error: shutting down connection while it "
  610. "still has a call in progress.");
  611. conn_log_warn(conn, "hrpc_conn_destroy: %s\n", hadoop_err_msg(err));
  612. hrpc_call_deliver_err(conn->call, err);
  613. conn->call = NULL;
  614. } else if (err) {
  615. conn_log_warn(conn, "hrpc_conn_destroy: %s\n", hadoop_err_msg(err));
  616. hadoop_err_free(err);
  617. }
  618. free_read_bufs(conn);
  619. conn->reader.state = HRPC_CONN_READ_CLOSED;
  620. free_write_bufs(conn);
  621. conn->writer.state = HRPC_CONN_WRITE_CLOSED;
  622. free(conn->protocol);
  623. uv_close((uv_handle_t*)&conn->stream, conn_free);
  624. }
  625. // vim: ts=4:sw=4:tw=79:et