rpc_engine.cc 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "rpc_engine.h"
  19. #include "rpc_connection.h"
  20. #include "common/util.h"
  21. #include <openssl/rand.h>
  22. #include <sstream>
  23. #include <future>
  24. namespace hdfs {
  25. RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
  26. const std::string &client_name, const char *protocol_name,
  27. int protocol_version)
  28. : io_service_(io_service), options_(options), client_name_(client_name),
  29. protocol_name_(protocol_name), protocol_version_(protocol_version),
  30. call_id_(0) {
  31. }
  32. void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server,
  33. const std::function<void(const Status &)> &handler) {
  34. conn_.reset(new RpcConnectionImpl<::asio::ip::tcp::socket>(this));
  35. conn_->Connect(server, [this, handler](const Status &stat) {
  36. if (!stat.ok()) {
  37. handler(stat);
  38. } else {
  39. conn_->Handshake([handler](const Status &s) { handler(s); });
  40. }
  41. });
  42. }
  43. void RpcEngine::Start() { conn_->Start(); }
  44. void RpcEngine::Shutdown() {
  45. io_service_->post([this]() { conn_->Shutdown(); });
  46. }
  47. void RpcEngine::TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn) {
  48. conn_.reset(conn->release());
  49. }
  50. void RpcEngine::AsyncRpc(
  51. const std::string &method_name, const ::google::protobuf::MessageLite *req,
  52. const std::shared_ptr<::google::protobuf::MessageLite> &resp,
  53. const std::function<void(const Status &)> &handler) {
  54. conn_->AsyncRpc(method_name, req, resp, handler);
  55. }
  56. Status
  57. RpcEngine::Rpc(const std::string &method_name,
  58. const ::google::protobuf::MessageLite *req,
  59. const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
  60. auto stat = std::make_shared<std::promise<Status>>();
  61. std::future<Status> future(stat->get_future());
  62. AsyncRpc(method_name, req, resp,
  63. [stat](const Status &status) { stat->set_value(status); });
  64. return future.get();
  65. }
  66. Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
  67. std::shared_ptr<std::string> resp) {
  68. auto stat = std::make_shared<std::promise<Status>>();
  69. std::future<Status> future(stat->get_future());
  70. conn_->AsyncRawRpc(method_name, req, resp,
  71. [stat](const Status &status) { stat->set_value(status); });
  72. return future.get();
  73. }
  74. std::string RpcEngine::GetRandomClientName() {
  75. unsigned char buf[6] = {
  76. 0,
  77. };
  78. RAND_pseudo_bytes(buf, sizeof(buf));
  79. std::stringstream ss;
  80. ss << "libhdfs++_"
  81. << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
  82. return ss.str();
  83. }
  84. }