rpc_engine_test.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "mock_connection.h"
  19. #include "test.pb.h"
  20. #include "RpcHeader.pb.h"
  21. #include "rpc/rpc_connection.h"
  22. #include <google/protobuf/io/coded_stream.h>
  23. #include <gmock/gmock.h>
  24. using ::hadoop::common::RpcResponseHeaderProto;
  25. using ::hadoop::common::EmptyRequestProto;
  26. using ::hadoop::common::EmptyResponseProto;
  27. using ::hadoop::common::EchoRequestProto;
  28. using ::hadoop::common::EchoResponseProto;
  29. using ::asio::error_code;
  30. using ::testing::Return;
  31. using ::std::make_pair;
  32. using ::std::string;
  33. namespace pb = ::google::protobuf;
  34. namespace pbio = ::google::protobuf::io;
  35. namespace hdfs {
  36. std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> make_endpoint() {
  37. std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> result;
  38. result.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
  39. return result;
  40. }
  41. class MockRPCConnection : public MockConnectionBase {
  42. public:
  43. MockRPCConnection(::asio::io_service &io_service)
  44. : MockConnectionBase(&io_service) {}
  45. MOCK_METHOD0(Produce, ProducerResult());
  46. };
  47. class SharedMockRPCConnection : public SharedMockConnection {
  48. public:
  49. SharedMockRPCConnection(::asio::io_service &io_service)
  50. : SharedMockConnection(&io_service) {}
  51. };
  52. class SharedConnectionEngine : public RpcEngine {
  53. using RpcEngine::RpcEngine;
  54. protected:
  55. std::shared_ptr<RpcConnection> NewConnection() override {
  56. // Stuff in some dummy endpoints so we don't error out
  57. last_endpoints_ = make_endpoint();
  58. return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
  59. }
  60. };
  61. }
  62. static inline std::pair<error_code, string> RpcResponse(
  63. const RpcResponseHeaderProto &h, const std::string &data,
  64. const ::asio::error_code &ec = error_code()) {
  65. uint32_t payload_length =
  66. pbio::CodedOutputStream::VarintSize32(h.ByteSize()) +
  67. pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() +
  68. data.size();
  69. std::string res;
  70. res.resize(sizeof(uint32_t) + payload_length);
  71. uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str()));
  72. buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray(
  73. htonl(payload_length), buf);
  74. buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf);
  75. buf = h.SerializeWithCachedSizesToArray(buf);
  76. buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf);
  77. buf = pbio::CodedOutputStream::WriteStringToArray(data, buf);
  78. return std::make_pair(ec, std::move(res));
  79. }
  80. using namespace hdfs;
  81. TEST(RpcEngineTest, TestRoundTrip) {
  82. ::asio::io_service io_service;
  83. Options options;
  84. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  85. RpcConnectionImpl<MockRPCConnection> *conn =
  86. new RpcConnectionImpl<MockRPCConnection>(&engine);
  87. conn->TEST_set_connected(true);
  88. conn->StartReading();
  89. EchoResponseProto server_resp;
  90. server_resp.set_message("foo");
  91. RpcResponseHeaderProto h;
  92. h.set_callid(1);
  93. h.set_status(RpcResponseHeaderProto::SUCCESS);
  94. EXPECT_CALL(conn->next_layer(), Produce())
  95. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  96. std::shared_ptr<RpcConnection> conn_ptr(conn);
  97. engine.TEST_SetRpcConnection(conn_ptr);
  98. bool complete = false;
  99. EchoRequestProto req;
  100. req.set_message("foo");
  101. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  102. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  103. ASSERT_TRUE(stat.ok());
  104. ASSERT_EQ("foo", resp->message());
  105. complete = true;
  106. io_service.stop();
  107. });
  108. io_service.run();
  109. ASSERT_TRUE(complete);
  110. }
  111. TEST(RpcEngineTest, TestConnectionResetAndFail) {
  112. ::asio::io_service io_service;
  113. Options options;
  114. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  115. RpcConnectionImpl<MockRPCConnection> *conn =
  116. new RpcConnectionImpl<MockRPCConnection>(&engine);
  117. conn->TEST_set_connected(true);
  118. conn->StartReading();
  119. bool complete = false;
  120. RpcResponseHeaderProto h;
  121. h.set_callid(1);
  122. h.set_status(RpcResponseHeaderProto::SUCCESS);
  123. EXPECT_CALL(conn->next_layer(), Produce())
  124. .WillOnce(Return(RpcResponse(
  125. h, "", make_error_code(::asio::error::connection_reset))));
  126. std::shared_ptr<RpcConnection> conn_ptr(conn);
  127. engine.TEST_SetRpcConnection(conn_ptr);
  128. EchoRequestProto req;
  129. req.set_message("foo");
  130. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  131. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  132. complete = true;
  133. io_service.stop();
  134. ASSERT_FALSE(stat.ok());
  135. });
  136. io_service.run();
  137. ASSERT_TRUE(complete);
  138. }
  139. TEST(RpcEngineTest, TestConnectionResetAndRecover) {
  140. ::asio::io_service io_service;
  141. Options options;
  142. options.max_rpc_retries = 1;
  143. options.rpc_retry_delay_ms = 0;
  144. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  145. EchoResponseProto server_resp;
  146. server_resp.set_message("foo");
  147. bool complete = false;
  148. auto producer = std::make_shared<SharedConnectionData>();
  149. RpcResponseHeaderProto h;
  150. h.set_callid(1);
  151. h.set_status(RpcResponseHeaderProto::SUCCESS);
  152. EXPECT_CALL(*producer, Produce())
  153. .WillOnce(Return(RpcResponse(
  154. h, "", make_error_code(::asio::error::connection_reset))))
  155. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  156. SharedMockConnection::SetSharedConnectionData(producer);
  157. EchoRequestProto req;
  158. req.set_message("foo");
  159. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  160. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  161. complete = true;
  162. io_service.stop();
  163. ASSERT_TRUE(stat.ok());
  164. });
  165. io_service.run();
  166. ASSERT_TRUE(complete);
  167. }
  168. TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
  169. ::asio::io_service io_service;
  170. Options options;
  171. options.max_rpc_retries = 1;
  172. options.rpc_retry_delay_ms = 1;
  173. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  174. EchoResponseProto server_resp;
  175. server_resp.set_message("foo");
  176. bool complete = false;
  177. auto producer = std::make_shared<SharedConnectionData>();
  178. RpcResponseHeaderProto h;
  179. h.set_callid(1);
  180. h.set_status(RpcResponseHeaderProto::SUCCESS);
  181. EXPECT_CALL(*producer, Produce())
  182. .WillOnce(Return(RpcResponse(
  183. h, "", make_error_code(::asio::error::connection_reset))))
  184. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  185. SharedMockConnection::SetSharedConnectionData(producer);
  186. EchoRequestProto req;
  187. req.set_message("foo");
  188. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  189. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  190. complete = true;
  191. io_service.stop();
  192. ASSERT_TRUE(stat.ok());
  193. });
  194. ::asio::deadline_timer timer(io_service);
  195. timer.expires_from_now(std::chrono::hours(100));
  196. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  197. io_service.run();
  198. ASSERT_TRUE(complete);
  199. }
  200. TEST(RpcEngineTest, TestConnectionFailure)
  201. {
  202. auto producer = std::make_shared<SharedConnectionData>();
  203. producer->checkProducerForConnect = true;
  204. SharedMockConnection::SetSharedConnectionData(producer);
  205. // Error and no retry
  206. ::asio::io_service io_service;
  207. bool complete = false;
  208. Options options;
  209. options.max_rpc_retries = 0;
  210. options.rpc_retry_delay_ms = 0;
  211. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  212. EXPECT_CALL(*producer, Produce())
  213. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  214. engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
  215. complete = true;
  216. io_service.stop();
  217. ASSERT_FALSE(stat.ok());
  218. });
  219. io_service.run();
  220. ASSERT_TRUE(complete);
  221. }
  222. TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
  223. {
  224. auto producer = std::make_shared<SharedConnectionData>();
  225. producer->checkProducerForConnect = true;
  226. SharedMockConnection::SetSharedConnectionData(producer);
  227. ::asio::io_service io_service;
  228. bool complete = false;
  229. Options options;
  230. options.max_rpc_retries = 2;
  231. options.rpc_retry_delay_ms = 0;
  232. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  233. EXPECT_CALL(*producer, Produce())
  234. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  235. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  236. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  237. engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
  238. complete = true;
  239. io_service.stop();
  240. ASSERT_FALSE(stat.ok());
  241. });
  242. io_service.run();
  243. ASSERT_TRUE(complete);
  244. }
  245. TEST(RpcEngineTest, TestConnectionFailureAndRecover)
  246. {
  247. auto producer = std::make_shared<SharedConnectionData>();
  248. producer->checkProducerForConnect = true;
  249. SharedMockConnection::SetSharedConnectionData(producer);
  250. ::asio::io_service io_service;
  251. bool complete = false;
  252. Options options;
  253. options.max_rpc_retries = 1;
  254. options.rpc_retry_delay_ms = 0;
  255. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  256. EXPECT_CALL(*producer, Produce())
  257. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  258. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  259. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  260. engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
  261. complete = true;
  262. io_service.stop();
  263. ASSERT_TRUE(stat.ok());
  264. });
  265. io_service.run();
  266. ASSERT_TRUE(complete);
  267. }
  268. TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
  269. {
  270. // Error and async recover
  271. auto producer = std::make_shared<SharedConnectionData>();
  272. producer->checkProducerForConnect = true;
  273. SharedMockConnection::SetSharedConnectionData(producer);
  274. ::asio::io_service io_service;
  275. bool complete = false;
  276. Options options;
  277. options.max_rpc_retries = 1;
  278. options.rpc_retry_delay_ms = 1;
  279. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  280. EXPECT_CALL(*producer, Produce())
  281. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  282. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  283. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  284. engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
  285. complete = true;
  286. io_service.stop();
  287. ASSERT_TRUE(stat.ok());
  288. });
  289. ::asio::deadline_timer timer(io_service);
  290. timer.expires_from_now(std::chrono::hours(100));
  291. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  292. io_service.run();
  293. ASSERT_TRUE(complete);
  294. }
  295. TEST(RpcEngineTest, TestTimeout) {
  296. ::asio::io_service io_service;
  297. Options options;
  298. options.rpc_timeout = 1;
  299. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  300. RpcConnectionImpl<MockRPCConnection> *conn =
  301. new RpcConnectionImpl<MockRPCConnection>(&engine);
  302. conn->TEST_set_connected(true);
  303. conn->StartReading();
  304. EXPECT_CALL(conn->next_layer(), Produce())
  305. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  306. std::shared_ptr<RpcConnection> conn_ptr(conn);
  307. engine.TEST_SetRpcConnection(conn_ptr);
  308. bool complete = false;
  309. EchoRequestProto req;
  310. req.set_message("foo");
  311. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  312. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  313. complete = true;
  314. io_service.stop();
  315. ASSERT_FALSE(stat.ok());
  316. });
  317. ::asio::deadline_timer timer(io_service);
  318. timer.expires_from_now(std::chrono::hours(100));
  319. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  320. io_service.run();
  321. ASSERT_TRUE(complete);
  322. }
  323. int main(int argc, char *argv[]) {
  324. // The following line must be executed to initialize Google Mock
  325. // (and Google Test) before running the tests.
  326. ::testing::InitGoogleMock(&argc, argv);
  327. int exit_code = RUN_ALL_TESTS();
  328. // Clean up static data and prevent valgrind memory leaks
  329. google::protobuf::ShutdownProtobufLibrary();
  330. return exit_code;
  331. }