conn.h 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef HADOOP_CORE_RPC_CONNECTION_H
  19. #define HADOOP_CORE_RPC_CONNECTION_H
  20. #include "common/tree.h"
  21. #include "rpc/client_id.h"
  22. #include <stdint.h>
  23. #include <uv.h>
  24. struct hrpc_call;
  25. #define MAX_CUR_WRITES 2
  26. #define READLEN_BUF_LEN 4
  27. enum hrpc_conn_write_state {
  28. /**
  29. * The state when we're still calling connect(2).
  30. */
  31. HRPC_CONN_WRITE_CONNECTING,
  32. /**
  33. * The write state where we're sending the frame, IPC header, etc.
  34. * TODO: implement SASL and its associated states.
  35. */
  36. HRPC_CONN_WRITE_IPC_HEADER,
  37. /**
  38. * The write state where we're writing the RpcRequestHeaderProto and
  39. * request payload.
  40. */
  41. HRPC_CONN_WRITE_PAYLOAD,
  42. /**
  43. * The write state where we have nothing to write.
  44. */
  45. HRPC_CONN_WRITE_IDLE,
  46. /**
  47. * Closed state.
  48. */
  49. HRPC_CONN_WRITE_CLOSED,
  50. };
  51. enum hrpc_conn_read_state {
  52. /**
  53. * The read state in which we don't expect read callbacks.
  54. * Generally, we're only in this state when the connection itself has not
  55. * been established.
  56. */
  57. HRPC_CONN_UNREADABLE = 0,
  58. /**
  59. * The read state in which we're reading the 4-byte length prefix.
  60. */
  61. HRPC_CONN_READ_LEN,
  62. /**
  63. * The read state in which we're reading the message body.
  64. */
  65. HRPC_CONN_READ_BODY,
  66. /**
  67. * Closed state.
  68. */
  69. HRPC_CONN_READ_CLOSED,
  70. };
  71. struct hrpc_conn_reader {
  72. enum hrpc_conn_read_state state; //!< Current read state.
  73. uint8_t body_len_buf[READLEN_BUF_LEN];//!< The buffer for body length.
  74. int32_t body_len; //!< Body length to read.
  75. int32_t off; //!< Current offset.
  76. char *body; //!< malloc'ed message body we're reading.
  77. };
  78. struct hrpc_conn_writer {
  79. enum hrpc_conn_write_state state;//!< Current write state.
  80. uv_write_t write_req;
  81. uv_buf_t cur_writes[MAX_CUR_WRITES];
  82. };
  83. /**
  84. * A Hadoop connection.
  85. *
  86. * This object manages the TCP connection with the remote.
  87. * Note that we can read and write from the remote simultaneously;
  88. * that's why write_state and read_state are separate.
  89. */
  90. struct hrpc_conn {
  91. RB_ENTRY(hrpc_conn) entry;
  92. /**
  93. * The reactor that owns this connection.
  94. */
  95. struct hrpc_reactor *reactor;
  96. /**
  97. * The call we're handling.
  98. */
  99. struct hrpc_call *call;
  100. /**
  101. * The remote address we're connected to.
  102. */
  103. struct sockaddr_in remote;
  104. /**
  105. * This connection's TCP stream.
  106. */
  107. uv_tcp_t stream;
  108. /**
  109. * The Hadoop protocol we're talking to, such as
  110. * org.apache.hadoop.hdfs.protocol.ClientProtocol. Malloc'ed.
  111. */
  112. char *protocol;
  113. /**
  114. * The client ID we used when establishing the connection.
  115. */
  116. struct hrpc_client_id client_id;
  117. /**
  118. * The pending connection request, if one is pending.
  119. */
  120. uv_connect_t conn_req;
  121. struct hrpc_conn_writer writer;
  122. struct hrpc_conn_reader reader;
  123. };
  124. /**
  125. * Create a new hrpc_conn to the given remote, using bind, connect, etc.
  126. *
  127. * @param reactor The reactor that the connection will be associated
  128. * with.
  129. * @param call The call to make. The connection will take ownership
  130. * of this call. If the connection fails, the call
  131. * will be given a failure callback.
  132. * @param out (out param) On success, the new connection.
  133. *
  134. * @return NULL on success; the error otherwise.
  135. */
  136. struct hadoop_err *hrpc_conn_create_outbound(struct hrpc_reactor *reactor,
  137. struct hrpc_call *call,
  138. struct hrpc_conn **out);
  139. /**
  140. * Start an outbound call on this connection.
  141. *
  142. * @param conn The connection.
  143. * @param call The call.
  144. */
  145. void hrpc_conn_start_outbound(struct hrpc_conn *conn, struct hrpc_call *call);
  146. /**
  147. * Compare two hadoop connection objects.
  148. *
  149. * Comparison is done lexicographically by:
  150. * - IP address.
  151. * - Port.
  152. * - Whether the connection is in use.
  153. * - Memory Address.
  154. *
  155. * By doing the comparison this way, we make it easy to find the first idle
  156. * connection to a given remote, or quickly determine that there is not one.
  157. * We also allow multiple connections to the same address.
  158. *
  159. * @param a The first connection object.
  160. * @param b The second connection object.
  161. *
  162. * @return negative if a < b; positive if a > b; 0 otherwise.
  163. */
  164. int hrpc_conn_compare(const struct hrpc_conn *a,
  165. const struct hrpc_conn *b);
  166. /**
  167. * Determine if a connection is usable for a given address and protocol.
  168. *
  169. * @param conn The connection.
  170. * @param addr The address.
  171. * @param protocol The protocol.
  172. *
  173. * @return 1 if the connection is usable; 0 if not.
  174. */
  175. int hrpc_conn_usable(const struct hrpc_conn *conn,
  176. const struct sockaddr_in *addr, const char *protocol);
  177. /**
  178. * Destroy a connection.
  179. *
  180. * @param conn The connection to destroy.
  181. * @param err The error. This will be delivered to any callback that
  182. * the connection owns.
  183. */
  184. void hrpc_conn_destroy(struct hrpc_conn *conn, struct hadoop_err *err);
  185. #endif
  186. // vim: ts=4:sw=4:tw=79:et