reactor.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/hadoop_err.h"
  19. #include "common/net.h"
  20. #include "common/queue.h"
  21. #include "common/tree.h"
  22. #include "rpc/call.h"
  23. #include "rpc/messenger.h"
  24. #include "rpc/reactor.h"
  25. #include <errno.h>
  26. #include <stdio.h>
  27. #include <stdlib.h>
  28. #include <string.h>
  29. #include <uv.h>
  30. #define reactor_log_warn(reactor, fmt, ...) \
  31. fprintf(stderr, "WARN: reactor %p: " fmt, reactor, __VA_ARGS__)
  32. #define reactor_log_info(msgr, fmt, ...) \
  33. fprintf(stderr, "INFO: reactor %p: " fmt, reactor, __VA_ARGS__)
  34. #define reactor_log_debug(msgr, fmt, ...) \
  35. fprintf(stderr, "DBUG: reactor %p: " fmt, reactor, __VA_ARGS__)
  36. RB_GENERATE(hrpc_conns, hrpc_conn, entry, hrpc_conn_compare);
  37. static void reactor_thread_run(void *arg)
  38. {
  39. struct hrpc_reactor *reactor = arg;
  40. struct hrpc_conn *conn, *conn_tmp;
  41. reactor_log_debug(reactor, "%s", "reactor thread starting.\n");
  42. uv_run(&reactor->loop, UV_RUN_DEFAULT);
  43. reactor_log_debug(reactor, "%s", "reactor thread terminating.\n");
  44. RB_FOREACH_SAFE(conn, hrpc_conns, &reactor->conns, conn_tmp) {
  45. hrpc_conn_destroy(conn, hadoop_lerr_alloc(ESHUTDOWN,
  46. "hrpc_reactor_start_outbound: the reactor is being shut down."));
  47. }
  48. }
  49. /**
  50. * Find an idle connection with a given address in the idle connection map.
  51. *
  52. * @param reactor The reactor.
  53. * @param remote The remote address to find.
  54. */
  55. static struct hrpc_conn *reuse_idle_conn(struct hrpc_reactor *reactor,
  56. const struct sockaddr_in *remote, const struct hrpc_call *call)
  57. {
  58. struct hrpc_conn *conn;
  59. struct hrpc_conn exemplar;
  60. memset(&exemplar, 0, sizeof(exemplar));
  61. exemplar.remote = *remote;
  62. exemplar.protocol = call->protocol;
  63. conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar);
  64. if (!conn)
  65. return NULL;
  66. if (hrpc_conn_usable(conn, remote, call->protocol)) {
  67. if (conn->writer.state == HRPC_CONN_WRITE_IDLE) {
  68. RB_REMOVE(hrpc_conns, &reactor->conns, conn);
  69. return conn;
  70. }
  71. }
  72. return NULL;
  73. }
  74. static void reactor_begin_shutdown(struct hrpc_reactor *reactor,
  75. struct hrpc_calls *pending_calls)
  76. {
  77. struct hrpc_call *call;
  78. reactor_log_debug(reactor, "%s", "reactor_begin_shutdown\n");
  79. STAILQ_FOREACH(call, pending_calls, entry) {
  80. hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN,
  81. "hrpc_reactor_start_outbound: the reactor is being shut down."));
  82. }
  83. // Note: other callbacks may still run after the libuv loop has been
  84. // stopped. But we won't block for I/O after this point.
  85. uv_stop(&reactor->loop);
  86. }
  87. static void reactor_async_start_outbound(struct hrpc_reactor *reactor,
  88. struct hrpc_call *call)
  89. {
  90. char remote_str[64] = { 0 };
  91. struct hrpc_conn *conn;
  92. struct hadoop_err *err;
  93. conn = reuse_idle_conn(reactor, &call->remote, call);
  94. if (conn) {
  95. reactor_log_debug(reactor, "start_outbound(remote=%s) assigning to "
  96. "connection %p\n",
  97. net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
  98. hrpc_conn_start_outbound(conn, call);
  99. } else {
  100. err = hrpc_conn_create_outbound(reactor, call, &conn);
  101. if (err) {
  102. reactor_log_warn(reactor, "reactor_async_start_outbound("
  103. "remote=%s) got error %s\n",
  104. net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
  105. hadoop_err_msg(err));
  106. hrpc_call_deliver_err(call, err);
  107. return;
  108. }
  109. reactor_log_debug(reactor, "start_outbound(remote=%s) created new "
  110. "connection %p\n",
  111. net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
  112. }
  113. // Add or re-add the connection to the reactor's tree.
  114. RB_INSERT(hrpc_conns, &reactor->conns, conn);
  115. }
  116. static void reactor_async_cb(uv_async_t *handle)
  117. {
  118. struct hrpc_reactor *reactor = handle->data;
  119. int shutdown;
  120. struct hrpc_calls pending_calls = STAILQ_HEAD_INITIALIZER(pending_calls);
  121. struct hrpc_call *call;
  122. uv_mutex_lock(&reactor->inbox.lock);
  123. shutdown = reactor->inbox.shutdown;
  124. STAILQ_SWAP(&reactor->inbox.pending_calls, &pending_calls,
  125. hrpc_call);
  126. uv_mutex_unlock(&reactor->inbox.lock);
  127. if (shutdown) {
  128. reactor_begin_shutdown(reactor, &pending_calls);
  129. return;
  130. }
  131. STAILQ_FOREACH(call, &pending_calls, entry) {
  132. reactor_async_start_outbound(reactor, call);
  133. }
  134. }
  135. void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn)
  136. {
  137. struct hrpc_conn *removed;
  138. removed = RB_REMOVE(hrpc_conns, &reactor->conns, conn);
  139. if (!removed) {
  140. reactor_log_warn(reactor, "reactor_remove_conn("
  141. "conn=%p): no such connection found.\n", conn);
  142. }
  143. }
  144. struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out)
  145. {
  146. struct hrpc_reactor *reactor = NULL;
  147. struct hadoop_err *err = NULL;
  148. int res;
  149. reactor = calloc(1, sizeof(struct hrpc_reactor));
  150. if (!reactor) {
  151. err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: OOM while allocating "
  152. "reactor structure.");
  153. goto error_free_reactor;
  154. }
  155. if (uv_mutex_init(&reactor->inbox.lock) < 0) {
  156. err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: failed to init "
  157. "mutex.");
  158. goto error_free_reactor;
  159. }
  160. RB_INIT(&reactor->conns);
  161. STAILQ_INIT(&reactor->inbox.pending_calls);
  162. if (uv_loop_init(&reactor->loop)) {
  163. err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: uv_loop_init "
  164. "failed.");
  165. goto error_free_mutex;
  166. }
  167. res = uv_async_init(&reactor->loop, &reactor->inbox.notifier,
  168. reactor_async_cb);
  169. if (res) {
  170. err = hadoop_uverr_alloc(res, "hrpc_reactor_create: "
  171. "uv_async_init failed.");
  172. goto error_close_loop;
  173. }
  174. reactor->inbox.notifier.data = reactor;
  175. res = uv_thread_create(&reactor->thread, reactor_thread_run, reactor);
  176. if (res) {
  177. err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: "
  178. "uv_thread_create failed.");
  179. goto error_free_async;
  180. }
  181. *out = reactor;
  182. return NULL;
  183. error_free_async:
  184. uv_close((uv_handle_t*)&reactor->inbox.notifier, NULL);
  185. error_close_loop:
  186. uv_loop_close(&reactor->loop);
  187. error_free_mutex:
  188. uv_mutex_destroy(&reactor->inbox.lock);
  189. error_free_reactor:
  190. free(reactor);
  191. return err;
  192. }
  193. void hrpc_reactor_shutdown(struct hrpc_reactor *reactor)
  194. {
  195. reactor_log_debug(reactor, "%s", "hrpc_reactor_shutdown\n");
  196. uv_mutex_lock(&reactor->inbox.lock);
  197. reactor->inbox.shutdown = 1;
  198. uv_mutex_unlock(&reactor->inbox.lock);
  199. uv_async_send(&reactor->inbox.notifier);
  200. uv_thread_join(&reactor->thread);
  201. }
  202. void hrpc_reactor_free(struct hrpc_reactor *reactor)
  203. {
  204. reactor_log_debug(reactor, "%s", "hrpc_reactor_free\n");
  205. uv_loop_close(&reactor->loop);
  206. uv_mutex_destroy(&reactor->inbox.lock);
  207. free(reactor);
  208. }
  209. void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,
  210. struct hrpc_call *call)
  211. {
  212. int shutdown = 0;
  213. uv_mutex_lock(&reactor->inbox.lock);
  214. shutdown = reactor->inbox.shutdown;
  215. if (!shutdown) {
  216. STAILQ_INSERT_TAIL(&reactor->inbox.pending_calls, call, entry);
  217. }
  218. uv_mutex_unlock(&reactor->inbox.lock);
  219. if (shutdown) {
  220. hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN,
  221. "hrpc_reactor_start_outbound: can't start call because the "
  222. "reactor has been shut down."));
  223. } else {
  224. uv_async_send(&reactor->inbox.notifier);
  225. }
  226. }
  227. // vim: ts=4:sw=4:tw=79:et