conn.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/hadoop_err.h"
  19. #include "common/string.h"
  20. #include "protobuf/IpcConnectionContext.pb-c.h.s"
  21. #include "protobuf/ProtobufRpcEngine.pb-c.h.s"
  22. #include "protobuf/RpcHeader.pb-c.h.s"
  23. #include "rpc/call.h"
  24. #include "rpc/conn.h"
  25. #include "rpc/messenger.h"
  26. #include "rpc/proxy.h"
  27. #include "rpc/reactor.h"
  28. #include "rpc/varint.h"
  29. #include <errno.h>
  30. #include <inttypes.h>
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <string.h>
  34. #include <uv.h>
  35. #define conn_log_warn(conn, fmt, ...) \
  36. fprintf(stderr, "WARN: conn %p (reactor %p): " fmt, \
  37. conn, conn->reactor, __VA_ARGS__)
  38. #define conn_log_info(conn, fmt, ...) \
  39. fprintf(stderr, "INFO: conn %p (reactor %p): " fmt, \
  40. conn, conn->reactor, __VA_ARGS__)
  41. #define conn_log_debug(conn, fmt, ...) \
  42. fprintf(stderr, "DEBUG: conn %p (reactor %p): " fmt, \
  43. conn, conn->reactor, __VA_ARGS__)
  44. /**
  45. * The maximum length that we'll allocate to hold a response from the server.
  46. * This number includes the response header.
  47. */
  48. #define MAX_READER_BODY_LEN (64 * 1024 * 1024)
  49. static const uint8_t FRAME[] = {
  50. 0x68, 0x72, 0x70, 0x63, // "hrpc"
  51. 0x09, // version
  52. 0x00, // service class
  53. 0x00 // auth
  54. };
  55. #define FRAME_LEN sizeof(FRAME)
  56. #define CONNECTION_CONTEXT_CALL_ID (-3)
  57. static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
  58. const uv_buf_t* buf);
  59. static void hrpc_conn_read_alloc(uv_handle_t *handle,
  60. size_t suggested_size, uv_buf_t *buf);
  61. static const char *conn_write_state_str(enum hrpc_conn_write_state state)
  62. {
  63. switch (state) {
  64. case HRPC_CONN_WRITE_CONNECTING:
  65. return "HRPC_CONN_WRITE_CONNECTING";
  66. case HRPC_CONN_WRITE_IPC_HEADER:
  67. return "HRPC_CONN_WRITE_IPC_HEADER";
  68. case HRPC_CONN_WRITE_PAYLOAD:
  69. return "HRPC_CONN_WRITE_PAYLOAD";
  70. case HRPC_CONN_WRITE_IDLE:
  71. return "HRPC_CONN_WRITE_IDLE";
  72. case HRPC_CONN_WRITE_CLOSED:
  73. return "HRPC_CONN_WRITE_CLOSED";
  74. default:
  75. return "(unknown)";
  76. }
  77. };
  78. static void free_write_bufs(struct hrpc_conn *conn)
  79. {
  80. int i;
  81. uv_buf_t *cur_writes = conn->writer.cur_writes;
  82. for (i = 0; i < MAX_CUR_WRITES; i++) {
  83. free(cur_writes[i].base);
  84. cur_writes[i].base = NULL;
  85. cur_writes[i].len = 0;
  86. }
  87. }
  88. static struct hadoop_err *conn_setup_ipc_header(struct hrpc_conn *conn)
  89. {
  90. struct hrpc_conn_writer *writer = &conn->writer;
  91. IpcConnectionContextProto ipc_ctx = IPC_CONNECTION_CONTEXT_PROTO__INIT;
  92. RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
  93. UserInformationProto userinfo = USER_INFORMATION_PROTO__INIT;
  94. int32_t cset_len, buf_len;
  95. int32_t ipc_ctx_len, rpc_req_header_len, off = 0;
  96. uint8_t *buf;
  97. struct hadoop_err *err;
  98. rpc_req_header.has_rpckind = 1;
  99. rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
  100. rpc_req_header.has_rpcop = 1;
  101. rpc_req_header.rpcop = RPC_FINAL_PACKET;
  102. rpc_req_header.callid = CONNECTION_CONTEXT_CALL_ID;
  103. rpc_req_header.clientid.data = conn->client_id.buf;
  104. rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
  105. rpc_req_header.has_retrycount = 0;
  106. rpc_req_header_len =
  107. rpc_request_header_proto__get_packed_size(&rpc_req_header);
  108. userinfo.effectiveuser = conn->username;
  109. userinfo.realuser = NULL;
  110. ipc_ctx.userinfo = &userinfo;
  111. ipc_ctx.protocol = conn->protocol;
  112. ipc_ctx_len = ipc_connection_context_proto__get_packed_size(&ipc_ctx);
  113. cset_len = varint32_size(rpc_req_header_len) + rpc_req_header_len +
  114. varint32_size(ipc_ctx_len) + ipc_ctx_len;
  115. buf_len = FRAME_LEN + sizeof(uint32_t) + cset_len;
  116. buf = malloc(buf_len);
  117. if (!buf) {
  118. err = hadoop_lerr_alloc(ENOMEM, "conn_setup_ipc_header: "
  119. "failed to allocate %d bytes", buf_len);
  120. return err;
  121. }
  122. memcpy(buf, FRAME, FRAME_LEN);
  123. off += FRAME_LEN;
  124. be32_encode(cset_len, buf + off);
  125. off += sizeof(uint32_t);
  126. varint32_encode(rpc_req_header_len, buf, buf_len, &off);
  127. rpc_request_header_proto__pack(&rpc_req_header, buf + off);
  128. off += rpc_req_header_len;
  129. varint32_encode(ipc_ctx_len, buf, buf_len, &off);
  130. ipc_connection_context_proto__pack(&ipc_ctx, buf + off);
  131. free_write_bufs(conn);
  132. writer->cur_writes[0].base = (char*)buf;
  133. writer->cur_writes[0].len = buf_len;
  134. return NULL;
  135. }
  136. static struct hadoop_err *hrpc_conn_setup_payload(struct hrpc_conn *conn)
  137. {
  138. struct hrpc_conn_writer *writer = &conn->writer;
  139. struct hrpc_call *call = conn->call;
  140. RpcRequestHeaderProto rpc_req_header = RPC_REQUEST_HEADER_PROTO__INIT;
  141. int32_t rpc_req_header_len, header_buf_len, total_len, off = 0;
  142. int64_t total_len64;
  143. uint8_t *header_buf;
  144. struct hadoop_err *err;
  145. rpc_req_header.has_rpckind = 1;
  146. rpc_req_header.rpckind = RPC_PROTOCOL_BUFFER;
  147. rpc_req_header.has_rpcop = 1;
  148. rpc_req_header.rpcop = RPC_FINAL_PACKET;
  149. rpc_req_header.callid = conn->call_id;
  150. rpc_req_header.clientid.data = conn->client_id.buf;
  151. rpc_req_header.clientid.len = HRPC_CLIENT_ID_LEN;
  152. rpc_req_header.has_retrycount = 0;
  153. rpc_req_header_len =
  154. rpc_request_header_proto__get_packed_size(&rpc_req_header);
  155. total_len64 = varint32_size(rpc_req_header_len);
  156. total_len64 += rpc_req_header_len;
  157. total_len64 += call->payload.len;
  158. if (total_len64 > MAX_READER_BODY_LEN) {
  159. err = hadoop_lerr_alloc(EINVAL, "hrpc_conn_setup_payload: "
  160. "can't send a payload of length %" PRId64 ". The maximum "
  161. "payload length is %d", total_len64, MAX_READER_BODY_LEN);
  162. return err;
  163. }
  164. total_len = (int32_t)total_len64;
  165. header_buf_len = total_len - call->payload.len + sizeof(uint32_t);
  166. header_buf = malloc(header_buf_len);
  167. if (!header_buf) {
  168. err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_setup_payload: "
  169. "failed to allocate %d bytes for header.", header_buf_len);
  170. return err;
  171. }
  172. be32_encode(total_len, header_buf + off);
  173. off += sizeof(uint32_t);
  174. varint32_encode(rpc_req_header_len, header_buf, header_buf_len, &off);
  175. rpc_request_header_proto__pack(&rpc_req_header, header_buf + off);
  176. writer->cur_writes[0].base = (char*)header_buf;
  177. writer->cur_writes[0].len = header_buf_len;
  178. writer->cur_writes[1].base = call->payload.base;
  179. writer->cur_writes[1].len = call->payload.len;
  180. call->payload.base = NULL;
  181. call->payload.len = 0;
  182. return NULL;
  183. }
  184. static void conn_write_cb(uv_write_t* req, int status)
  185. {
  186. struct hrpc_conn *conn = req->data;
  187. struct hrpc_conn_writer *writer = &conn->writer;
  188. struct hrpc_conn_reader *reader = &conn->reader;
  189. struct hadoop_err *err;
  190. int res;
  191. if (status) {
  192. err = hadoop_uverr_alloc(status,
  193. "conn_write_cb got error");
  194. hrpc_conn_destroy(conn, err);
  195. return;
  196. }
  197. switch (writer->state) {
  198. case HRPC_CONN_WRITE_IPC_HEADER:
  199. free_write_bufs(conn);
  200. writer->state = HRPC_CONN_WRITE_PAYLOAD;
  201. err = hrpc_conn_setup_payload(conn);
  202. if (err) {
  203. hrpc_conn_destroy(conn, err);
  204. return;
  205. }
  206. writer->write_req.data = conn;
  207. res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
  208. writer->cur_writes, 2, conn_write_cb);
  209. if (res) {
  210. err = hadoop_uverr_alloc(res,
  211. "failed to call uv_write on payload");
  212. hrpc_conn_destroy(conn, err);
  213. return;
  214. }
  215. break;
  216. case HRPC_CONN_WRITE_PAYLOAD:
  217. conn_log_debug(conn, "%s", "conn_write_cb: finished writing payload. "
  218. "Now waiting for response.\n");
  219. free_write_bufs(conn);
  220. writer->state = HRPC_CONN_WRITE_IDLE;
  221. reader->state = HRPC_CONN_READ_LEN;
  222. conn->stream.data = conn;
  223. res = uv_read_start((uv_stream_t*)&conn->stream, hrpc_conn_read_alloc,
  224. conn_read_cb);
  225. if (res) {
  226. err = hadoop_uverr_alloc(res, "uv_read_start failed");
  227. hrpc_conn_destroy(conn, err);
  228. return;
  229. }
  230. break;
  231. default:
  232. conn_log_warn(conn, "conn_write_cb: got an unexpected write "
  233. "event while in %s state.\n",
  234. conn_write_state_str(writer->state));
  235. return;
  236. }
  237. }
  238. static void conn_start_outbound(struct hrpc_conn *conn)
  239. {
  240. struct hadoop_err *err = NULL;
  241. struct hrpc_conn_writer *writer = &conn->writer;
  242. int res;
  243. // Get next call ID to use.
  244. if (conn->call_id >= 0x7fffffff) {
  245. conn->call_id = 1;
  246. } else {
  247. conn->call_id++;
  248. }
  249. writer->state = HRPC_CONN_WRITE_IPC_HEADER;
  250. err = conn_setup_ipc_header(conn);
  251. if (err) {
  252. hrpc_conn_destroy(conn, err);
  253. return;
  254. }
  255. writer->write_req.data = conn;
  256. res = uv_write(&writer->write_req, (uv_stream_t*)&conn->stream,
  257. writer->cur_writes, 1, conn_write_cb);
  258. if (res) {
  259. err = hadoop_uverr_alloc(res,
  260. "failed to call uv_write on ipc_header_buf");
  261. hrpc_conn_destroy(conn, err);
  262. return;
  263. }
  264. }
  265. void hrpc_conn_start_outbound(struct hrpc_conn *conn, struct hrpc_call *call)
  266. {
  267. conn->call = call;
  268. conn_start_outbound(conn);
  269. }
  270. static void conn_connect_cb(uv_connect_t *req, int status)
  271. {
  272. struct hrpc_conn *conn = req->data;
  273. struct hadoop_err *err = NULL;
  274. struct hrpc_conn_writer *writer = &conn->writer;
  275. if (status) {
  276. err = hadoop_uverr_alloc(status, "uv_tcp_connect failed");
  277. hrpc_conn_destroy(conn, err);
  278. return;
  279. }
  280. if (writer->state != HRPC_CONN_WRITE_CONNECTING) {
  281. err = hadoop_lerr_alloc(EINVAL,
  282. "got conn_connect_cb, but connection was not in "
  283. "state HRPC_CONN_WRITE_CONNECTING. state = %d",
  284. writer->state);
  285. hrpc_conn_destroy(conn, err);
  286. return;
  287. }
  288. conn_start_outbound(conn);
  289. }
  290. struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
  291. struct hrpc_call *call,
  292. struct hrpc_conn **out)
  293. {
  294. struct hadoop_err *err = NULL;
  295. struct hrpc_conn *conn = NULL;
  296. int res, tcp_init = 0;
  297. conn = calloc(1, sizeof(struct hrpc_conn));
  298. if (!conn) {
  299. err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
  300. goto done;
  301. }
  302. conn->reactor = reactor;
  303. conn->call = call;
  304. conn->remote = call->remote;
  305. conn->protocol = strdup(call->protocol);
  306. conn->username = strdup(call->username);
  307. if ((!conn->protocol) || (!conn->username)) {
  308. err = hadoop_lerr_alloc(ENOMEM, "hrpc_conn_create_outbound: OOM");
  309. goto done;
  310. }
  311. hrpc_client_id_generate_rand(&conn->client_id);
  312. res = uv_tcp_init(&reactor->loop, &conn->stream);
  313. if (res) {
  314. err = hadoop_uverr_alloc(res,
  315. "hrpc_conn_create_outbound: uv_tcp_init failed");
  316. goto done;
  317. }
  318. tcp_init = 1;
  319. conn->writer.state = HRPC_CONN_WRITE_CONNECTING;
  320. conn->reader.state = HRPC_CONN_UNREADABLE;
  321. conn->conn_req.data = conn;
  322. res = uv_tcp_connect(&conn->conn_req, &conn->stream,
  323. (struct sockaddr*)&conn->remote, conn_connect_cb);
  324. if (res) {
  325. err = hadoop_uverr_alloc(res,
  326. "hrpc_conn_create_outbound: uv_tcp_connect failed");
  327. goto done;
  328. }
  329. done:
  330. if (err) {
  331. if (conn) {
  332. free(conn->protocol);
  333. free(conn->username);
  334. if (tcp_init) {
  335. uv_close((uv_handle_t*)&conn->stream, NULL);
  336. }
  337. free(conn);
  338. }
  339. return err;
  340. }
  341. *out = conn;
  342. return NULL;
  343. }
  344. int hrpc_conn_compare(const struct hrpc_conn *a, const struct hrpc_conn *b)
  345. {
  346. int proto_cmp, username_cmp, a_active, b_active;
  347. // Big-endian versus little-endian doesn't matter here.
  348. // We just want a consistent ordering on the same machine.
  349. if (a->remote.sin_addr.s_addr < b->remote.sin_addr.s_addr)
  350. return -1;
  351. else if (a->remote.sin_addr.s_addr > b->remote.sin_addr.s_addr)
  352. return 1;
  353. else if (a->remote.sin_port < b->remote.sin_port)
  354. return -1;
  355. else if (a->remote.sin_port > b->remote.sin_port)
  356. return 1;
  357. // Compare protocol name.
  358. proto_cmp = strcmp(a->protocol, b->protocol);
  359. if (proto_cmp != 0)
  360. return proto_cmp;
  361. // Compare username.
  362. username_cmp = strcmp(a->username, b->username);
  363. if (username_cmp != 0)
  364. return username_cmp;
  365. // Make the inactive connections sort before the active ones.
  366. a_active = !!a->call;
  367. b_active = !!b->call;
  368. if (a_active < b_active)
  369. return -1;
  370. else if (a_active > b_active)
  371. return 1;
  372. // Compare pointer identity, so that no two distinct connections are
  373. // ever identical.
  374. else if (a < b)
  375. return -1;
  376. else if (a > b)
  377. return 1;
  378. return 0;
  379. }
  380. int hrpc_conn_usable(const struct hrpc_conn *conn,
  381. const struct sockaddr_in *addr,
  382. const char *protocol, const char *username)
  383. {
  384. if (conn->remote.sin_addr.s_addr != addr->sin_addr.s_addr)
  385. return 0;
  386. else if (conn->remote.sin_port != addr->sin_port)
  387. return 0;
  388. else if (strcmp(conn->protocol, protocol))
  389. return 0;
  390. else if (strcmp(conn->username, username))
  391. return 0;
  392. return 1;
  393. }
  394. static void free_read_bufs(struct hrpc_conn *conn)
  395. {
  396. free(conn->reader.body);
  397. conn->reader.body = NULL;
  398. conn->reader.body_len = 0;
  399. conn->reader.off = 0;
  400. }
  401. static struct hadoop_err *conn_deliver_resp(struct hrpc_conn *conn,
  402. int32_t off, int32_t payload_len)
  403. {
  404. struct hrpc_conn_reader *reader = &conn->reader;
  405. struct hrpc_call *call = conn->call;
  406. int64_t payload_end;
  407. struct hrpc_response resp;
  408. // Check if the server sent us a bogus payload_len value.
  409. if (payload_len < 0) {
  410. return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
  411. "server's payload_len was %" PRId32 ", but negative payload "
  412. "lengths are not valid.", payload_len);
  413. }
  414. payload_end = off;
  415. payload_end += payload_len;
  416. if (payload_end > reader->body_len) {
  417. return hadoop_lerr_alloc(EIO, "conn_deliver_resp: "
  418. "server's payload_len was %" PRId64 ", but there are only %d "
  419. "bytes left in the body buffer.", payload_end, reader->body_len);
  420. }
  421. // Reset the connection's read state. We'll hold on to the response buffer
  422. // while making the callback.
  423. resp.pb_base = (uint8_t*)(reader->body + off);
  424. resp.pb_len = payload_len;
  425. resp.base = reader->body;
  426. reader->body = NULL;
  427. free_read_bufs(conn);
  428. conn->call = NULL;
  429. conn->reader.state = HRPC_CONN_UNREADABLE;
  430. // TODO: cache connections
  431. hrpc_conn_destroy(conn, NULL);
  432. hrpc_call_deliver_resp(call, &resp);
  433. return NULL;
  434. }
  435. static struct hadoop_err *conn_process_response(struct hrpc_conn *conn)
  436. {
  437. struct hrpc_conn_reader *reader = &conn->reader;
  438. RpcResponseHeaderProto *resp_header = NULL;
  439. int32_t off = 0, resp_header_len, payload_len, rem;
  440. struct hadoop_err *err = NULL;
  441. if (varint32_decode(&resp_header_len,
  442. (uint8_t*)reader->body, reader->body_len, &off)) {
  443. err = hadoop_lerr_alloc(EIO, "conn_process_response: response was "
  444. "only %d bytes-- too short to read the rpc request header.",
  445. reader->body_len);
  446. goto done;
  447. }
  448. if (resp_header_len <= 0) {
  449. err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
  450. "invalid resp_header_len of %" PRId32, resp_header_len);
  451. goto done;
  452. }
  453. rem = reader->body_len - off;
  454. if (resp_header_len > rem) {
  455. err = hadoop_lerr_alloc(EIO, "conn_process_response: server sent "
  456. "resp_header_len of %" PRId32 ", but there were only %" PRId32
  457. " bytes left in the response.", resp_header_len, rem);
  458. goto done;
  459. }
  460. resp_header = rpc_response_header_proto__unpack(NULL, resp_header_len,
  461. (const uint8_t*)(reader->body + off));
  462. if (!resp_header) {
  463. err = hadoop_lerr_alloc(EIO, "conn_process_response: failed to "
  464. "parse RpcRequestHeaderProto.");
  465. goto done;
  466. }
  467. off += resp_header_len;
  468. if (resp_header->callid != conn->call_id) {
  469. // We currently only send one request at a time. So when we get a
  470. // response, we expect it to be for the request we just sent.
  471. err = hadoop_lerr_alloc(EIO, "conn_process_response: incorrect call "
  472. "id in response. Expected %" PRId32 ", got %" PRId32 ".",
  473. conn->call_id, resp_header->callid);
  474. goto done;
  475. }
  476. if (resp_header->status != RPC_STATUS_PROTO__SUCCESS) {
  477. // TODO: keep connection open if we got an ERROR rather than a FATAL.
  478. err = hadoop_lerr_alloc(EIO, "conn_process_response: error %s: %s",
  479. resp_header->exceptionclassname, resp_header->errormsg);
  480. goto done;
  481. }
  482. if (varint32_decode(&payload_len,
  483. (uint8_t*)reader->body, reader->body_len, &off)) {
  484. err = hadoop_lerr_alloc(EIO, "conn_process_response: header was %d "
  485. "bytes, and total length was %d-- too short to read the "
  486. "payload.", resp_header_len, reader->body_len);
  487. goto done;
  488. }
  489. err = conn_deliver_resp(conn, off, payload_len);
  490. done:
  491. if (resp_header) {
  492. rpc_response_header_proto__free_unpacked(resp_header, NULL);
  493. }
  494. return err;
  495. }
  496. static const char *conn_read_state_str(enum hrpc_conn_read_state state)
  497. {
  498. switch (state) {
  499. case HRPC_CONN_UNREADABLE:
  500. return "HRPC_CONN_UNREADABLE";
  501. case HRPC_CONN_READ_LEN:
  502. return "HRPC_CONN_READ_LEN";
  503. case HRPC_CONN_READ_BODY:
  504. return "HRPC_CONN_READ_BODY";
  505. case HRPC_CONN_READ_CLOSED:
  506. return "HRPC_CONN_READ_CLOSED";
  507. default:
  508. return "(unknown)";
  509. }
  510. };
  511. /**
  512. * Return a read buffer to libuv.
  513. *
  514. * We don't do the actual allocation here, for two reasons. The first is that
  515. * we'd like to know how big a buffer to allocate first (by reading the first
  516. * 4 bytes). The second is that libuv doesn't really take kindly to
  517. * failures here... returning a zero-length buffer triggers a crash.
  518. * So we simply return previously allocated buffers here.
  519. *
  520. * @param handle The TCP stream.
  521. * @param suggested_size The suggested size.
  522. *
  523. * @return The read buffer to use.
  524. */
  525. static void hrpc_conn_read_alloc(uv_handle_t *handle,
  526. size_t suggested_size __attribute__((unused)), uv_buf_t *buf)
  527. {
  528. int32_t rem;
  529. struct hrpc_conn *conn = handle->data;
  530. struct hrpc_conn_reader *reader = &conn->reader;
  531. switch (reader->state) {
  532. case HRPC_CONN_READ_LEN:
  533. buf->base = (char*)(reader->body_len_buf + reader->off);
  534. buf->len = READLEN_BUF_LEN - reader->off;
  535. return;
  536. case HRPC_CONN_READ_BODY:
  537. rem = reader->body_len - reader->off;
  538. if (rem <= 0) {
  539. conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
  540. "HRPC_CONN_READ_BODY with reader->body_len = %" PRId32 ", but "
  541. "reader->off = %" PRId32 "\n", reader->body_len, reader->off);
  542. buf->base = NULL;
  543. buf->len = 0;
  544. return;
  545. }
  546. buf->base = (char*)(reader->body + reader->off);
  547. buf->len = rem;
  548. return;
  549. default:
  550. conn_log_warn(conn, "hrpc_conn_read_alloc: we're in state "
  551. "%s, but we're being asked to allocate "
  552. "a read buffer.\n", conn_read_state_str(reader->state));
  553. buf->base = NULL;
  554. buf->len = 0;
  555. return;
  556. }
  557. }
  558. /**
  559. * The read callback for this connection.
  560. */
  561. static void conn_read_cb(uv_stream_t *stream, ssize_t nread,
  562. const uv_buf_t* buf __attribute__((unused)))
  563. {
  564. struct hrpc_conn *conn = stream->data;
  565. struct hrpc_conn_reader *reader = &conn->reader;
  566. struct hadoop_err *err;
  567. if (nread < 0) {
  568. hrpc_conn_destroy(conn,
  569. hadoop_uverr_alloc(-nread, "conn_read_cb error"));
  570. return;
  571. }
  572. if (nread == 0) {
  573. // Nothing to do.
  574. return;
  575. }
  576. switch (reader->state) {
  577. case HRPC_CONN_READ_LEN:
  578. reader->off += nread;
  579. if (reader->off < READLEN_BUF_LEN) {
  580. conn_log_debug(conn, "conn_read_cb: got partial read of "
  581. "body_len. reader->off = %" PRId32 "\n",
  582. reader->off);
  583. return;
  584. }
  585. reader->body_len = be32_decode(reader->body_len_buf);
  586. if ((reader->body_len <= 0) ||
  587. (reader->body_len > MAX_READER_BODY_LEN)) {
  588. hrpc_conn_destroy(conn, hadoop_lerr_alloc(EIO,
  589. "conn_read_cb: got an invalid body length of %" PRId32 "\n",
  590. reader->body_len));
  591. return;
  592. }
  593. conn_log_debug(conn, "conn_read_cb: got body length of "
  594. "%" PRId32 ". Transitioning to HRPC_CONN_READ_BODY.\n",
  595. reader->body_len);
  596. reader->off = 0;
  597. reader->state = HRPC_CONN_READ_BODY;
  598. reader->body = malloc(reader->body_len);
  599. if (!reader->body) {
  600. hrpc_conn_destroy(conn, hadoop_lerr_alloc(ENOMEM,
  601. "hrpc_conn_read_alloc: failed to allocate "
  602. "%" PRId32 " bytes.\n", reader->body_len));
  603. }
  604. break;
  605. case HRPC_CONN_READ_BODY:
  606. reader->off += nread;
  607. if (reader->off < reader->body_len) {
  608. conn_log_debug(conn, "conn_read_cb: got partial read of "
  609. "body. reader->off = %" PRId32 " out of %"
  610. PRId32 "\n", reader->off, reader->body_len);
  611. return;
  612. }
  613. err = conn_process_response(conn);
  614. free_read_bufs(conn);
  615. if (err) {
  616. hrpc_conn_destroy(conn, err);
  617. return;
  618. }
  619. reader->state = HRPC_CONN_UNREADABLE;
  620. break;
  621. default:
  622. conn_log_warn(conn, "conn_read_cb: got an unexpected read "
  623. "event while in %s state.\n",
  624. conn_read_state_str(reader->state));
  625. return;
  626. }
  627. }
  628. static void conn_free(uv_handle_t* handle)
  629. {
  630. struct hrpc_conn *conn = handle->data;
  631. free(conn);
  632. }
  633. void hrpc_conn_destroy(struct hrpc_conn *conn, struct hadoop_err *err)
  634. {
  635. reactor_remove_conn(conn->reactor, conn);
  636. if (conn->call) {
  637. err = err ? err : hadoop_lerr_alloc(EFAULT, "hrpc_conn_destroy: "
  638. "internal error: shutting down connection while it "
  639. "still has a call in progress.");
  640. conn_log_warn(conn, "hrpc_conn_destroy: %s\n", hadoop_err_msg(err));
  641. hrpc_call_deliver_err(conn->call, err);
  642. conn->call = NULL;
  643. } else if (err) {
  644. conn_log_warn(conn, "hrpc_conn_destroy: %s\n", hadoop_err_msg(err));
  645. hadoop_err_free(err);
  646. }
  647. free_read_bufs(conn);
  648. conn->reader.state = HRPC_CONN_READ_CLOSED;
  649. free_write_bufs(conn);
  650. conn->writer.state = HRPC_CONN_WRITE_CLOSED;
  651. free(conn->protocol);
  652. free(conn->username);
  653. uv_close((uv_handle_t*)&conn->stream, conn_free);
  654. }
  655. // vim: ts=4:sw=4:tw=79:et