messenger.c 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/hadoop_err.h"
  19. #include "rpc/conn.h"
  20. #include "rpc/messenger.h"
  21. #include "rpc/reactor.h"
  22. #include <errno.h>
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <uv.h>
  26. #define msgr_log_warn(msgr, fmt, ...) \
  27. fprintf(stderr, "WARN: msgr %p: " fmt, msgr, __VA_ARGS__)
  28. #define msgr_log_info(msgr, fmt, ...) \
  29. fprintf(stderr, "INFO: msgr %p: " fmt, msgr, __VA_ARGS__)
  30. #define msgr_log_debug(msgr, fmt, ...) \
  31. fprintf(stderr, "DBUG: msgr %p: " fmt, msgr, __VA_ARGS__)
  32. struct hrpc_messenger_builder {
  33. };
  34. /**
  35. * The Hadoop Messenger.
  36. *
  37. * The messenger owns all the reactor threads, and is the main entry point into
  38. * the RPC system.
  39. */
  40. struct hrpc_messenger {
  41. /**
  42. * The reactor thread which makes the actual network calls.
  43. *
  44. * TODO: support multiple reactor threads.
  45. */
  46. struct hrpc_reactor *reactor;
  47. };
  48. struct hrpc_messenger_builder *hrpc_messenger_builder_alloc(void)
  49. {
  50. struct hrpc_messenger_builder *bld;
  51. bld = calloc(1, sizeof(struct hrpc_messenger_builder));
  52. if (!bld)
  53. return NULL;
  54. return bld;
  55. }
  56. void hrpc_messenger_builder_free(struct hrpc_messenger_builder *bld)
  57. {
  58. if (!bld)
  59. return;
  60. free(bld);
  61. }
  62. struct hadoop_err *hrpc_messenger_create(
  63. struct hrpc_messenger_builder *bld, struct hrpc_messenger **out)
  64. {
  65. struct hrpc_messenger *msgr = NULL;
  66. struct hadoop_err *err = NULL;
  67. free(bld);
  68. msgr = calloc(1, sizeof(struct hrpc_messenger));
  69. if (!msgr) {
  70. err = hadoop_lerr_alloc(ENOMEM, "hrpc_messenger_create: OOM");
  71. goto error;
  72. }
  73. err = hrpc_reactor_create(&msgr->reactor);
  74. if (err) {
  75. goto error_free_msgr;
  76. }
  77. msgr_log_info(msgr, "created messenger %p\n", msgr);
  78. *out = msgr;
  79. return NULL;
  80. error_free_msgr:
  81. free(msgr);
  82. error:
  83. return err;
  84. }
  85. void hrpc_messenger_start_outbound(struct hrpc_messenger *msgr,
  86. struct hrpc_call *call)
  87. {
  88. hrpc_reactor_start_outbound(msgr->reactor, call);
  89. }
  90. void hrpc_messenger_shutdown(struct hrpc_messenger *msgr)
  91. {
  92. msgr_log_debug(msgr, "%s", "hrpc_messenger_shutdown\n");
  93. hrpc_reactor_shutdown(msgr->reactor);
  94. }
  95. void hrpc_messenger_free(struct hrpc_messenger *msgr)
  96. {
  97. msgr_log_debug(msgr, "%s", "hrpc_messenger_free\n");
  98. hrpc_reactor_free(msgr->reactor);
  99. free(msgr);
  100. }
  101. // vim: ts=4:sw=4:tw=79:et