rpc_engine_test.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "mock_connection.h"
  19. #include "test.pb.h"
  20. #include "RpcHeader.pb.h"
  21. #include "rpc/rpc_connection.h"
  22. #include <google/protobuf/io/coded_stream.h>
  23. #include <gmock/gmock.h>
  24. using ::hadoop::common::RpcResponseHeaderProto;
  25. using ::hadoop::common::EmptyRequestProto;
  26. using ::hadoop::common::EmptyResponseProto;
  27. using ::hadoop::common::EchoRequestProto;
  28. using ::hadoop::common::EchoResponseProto;
  29. using ::asio::error_code;
  30. using ::testing::Return;
  31. using ::std::make_pair;
  32. using ::std::string;
  33. namespace pb = ::google::protobuf;
  34. namespace pbio = ::google::protobuf::io;
  35. namespace hdfs {
  36. class MockRPCConnection : public MockConnectionBase {
  37. public:
  38. MockRPCConnection(::asio::io_service &io_service)
  39. : MockConnectionBase(&io_service) {}
  40. MOCK_METHOD0(Produce, ProducerResult());
  41. };
  42. class SharedMockRPCConnection : public SharedMockConnection {
  43. public:
  44. SharedMockRPCConnection(::asio::io_service &io_service)
  45. : SharedMockConnection(&io_service) {}
  46. };
  47. class SharedConnectionEngine : public RpcEngine {
  48. using RpcEngine::RpcEngine;
  49. protected:
  50. std::shared_ptr<RpcConnection> NewConnection() override {
  51. return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
  52. }
  53. };
  54. }
  55. static inline std::pair<error_code, string> RpcResponse(
  56. const RpcResponseHeaderProto &h, const std::string &data,
  57. const ::asio::error_code &ec = error_code()) {
  58. uint32_t payload_length =
  59. pbio::CodedOutputStream::VarintSize32(h.ByteSize()) +
  60. pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() +
  61. data.size();
  62. std::string res;
  63. res.resize(sizeof(uint32_t) + payload_length);
  64. uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str()));
  65. buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray(
  66. htonl(payload_length), buf);
  67. buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf);
  68. buf = h.SerializeWithCachedSizesToArray(buf);
  69. buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf);
  70. buf = pbio::CodedOutputStream::WriteStringToArray(data, buf);
  71. return std::make_pair(ec, std::move(res));
  72. }
  73. using namespace hdfs;
  74. TEST(RpcEngineTest, TestRoundTrip) {
  75. ::asio::io_service io_service;
  76. Options options;
  77. RpcEngine engine(&io_service, options, "foo", "protocol", 1);
  78. RpcConnectionImpl<MockRPCConnection> *conn =
  79. new RpcConnectionImpl<MockRPCConnection>(&engine);
  80. conn->TEST_set_connected(true);
  81. conn->StartReading();
  82. EchoResponseProto server_resp;
  83. server_resp.set_message("foo");
  84. RpcResponseHeaderProto h;
  85. h.set_callid(1);
  86. h.set_status(RpcResponseHeaderProto::SUCCESS);
  87. EXPECT_CALL(conn->next_layer(), Produce())
  88. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  89. std::shared_ptr<RpcConnection> conn_ptr(conn);
  90. engine.TEST_SetRpcConnection(conn_ptr);
  91. bool complete = false;
  92. EchoRequestProto req;
  93. req.set_message("foo");
  94. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  95. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  96. ASSERT_TRUE(stat.ok());
  97. ASSERT_EQ("foo", resp->message());
  98. complete = true;
  99. io_service.stop();
  100. });
  101. io_service.run();
  102. ASSERT_TRUE(complete);
  103. }
  104. TEST(RpcEngineTest, TestConnectionResetAndFail) {
  105. ::asio::io_service io_service;
  106. Options options;
  107. RpcEngine engine(&io_service, options, "foo", "protocol", 1);
  108. RpcConnectionImpl<MockRPCConnection> *conn =
  109. new RpcConnectionImpl<MockRPCConnection>(&engine);
  110. conn->TEST_set_connected(true);
  111. conn->StartReading();
  112. bool complete = false;
  113. RpcResponseHeaderProto h;
  114. h.set_callid(1);
  115. h.set_status(RpcResponseHeaderProto::SUCCESS);
  116. EXPECT_CALL(conn->next_layer(), Produce())
  117. .WillOnce(Return(RpcResponse(
  118. h, "", make_error_code(::asio::error::connection_reset))));
  119. std::shared_ptr<RpcConnection> conn_ptr(conn);
  120. engine.TEST_SetRpcConnection(conn_ptr);
  121. EchoRequestProto req;
  122. req.set_message("foo");
  123. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  124. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  125. complete = true;
  126. io_service.stop();
  127. ASSERT_FALSE(stat.ok());
  128. });
  129. io_service.run();
  130. ASSERT_TRUE(complete);
  131. }
  132. TEST(RpcEngineTest, TestConnectionResetAndRecover) {
  133. ::asio::io_service io_service;
  134. Options options;
  135. options.max_rpc_retries = 1;
  136. options.rpc_retry_delay_ms = 0;
  137. SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
  138. EchoResponseProto server_resp;
  139. server_resp.set_message("foo");
  140. bool complete = false;
  141. auto producer = std::make_shared<SharedConnectionData>();
  142. RpcResponseHeaderProto h;
  143. h.set_callid(1);
  144. h.set_status(RpcResponseHeaderProto::SUCCESS);
  145. EXPECT_CALL(*producer, Produce())
  146. .WillOnce(Return(RpcResponse(
  147. h, "", make_error_code(::asio::error::connection_reset))))
  148. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  149. SharedMockConnection::SetSharedConnectionData(producer);
  150. EchoRequestProto req;
  151. req.set_message("foo");
  152. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  153. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  154. complete = true;
  155. io_service.stop();
  156. ASSERT_TRUE(stat.ok());
  157. });
  158. io_service.run();
  159. ASSERT_TRUE(complete);
  160. }
  161. TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
  162. ::asio::io_service io_service;
  163. Options options;
  164. options.max_rpc_retries = 1;
  165. options.rpc_retry_delay_ms = 1;
  166. SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
  167. EchoResponseProto server_resp;
  168. server_resp.set_message("foo");
  169. bool complete = false;
  170. auto producer = std::make_shared<SharedConnectionData>();
  171. RpcResponseHeaderProto h;
  172. h.set_callid(1);
  173. h.set_status(RpcResponseHeaderProto::SUCCESS);
  174. EXPECT_CALL(*producer, Produce())
  175. .WillOnce(Return(RpcResponse(
  176. h, "", make_error_code(::asio::error::connection_reset))))
  177. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  178. SharedMockConnection::SetSharedConnectionData(producer);
  179. EchoRequestProto req;
  180. req.set_message("foo");
  181. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  182. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  183. complete = true;
  184. io_service.stop();
  185. ASSERT_TRUE(stat.ok());
  186. });
  187. ::asio::deadline_timer timer(io_service);
  188. timer.expires_from_now(std::chrono::hours(100));
  189. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  190. io_service.run();
  191. ASSERT_TRUE(complete);
  192. }
  193. TEST(RpcEngineTest, TestConnectionFailure)
  194. {
  195. auto producer = std::make_shared<SharedConnectionData>();
  196. producer->checkProducerForConnect = true;
  197. SharedMockConnection::SetSharedConnectionData(producer);
  198. // Error and no retry
  199. ::asio::io_service io_service;
  200. bool complete = false;
  201. Options options;
  202. options.max_rpc_retries = 0;
  203. options.rpc_retry_delay_ms = 0;
  204. SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
  205. EXPECT_CALL(*producer, Produce())
  206. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  207. engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) {
  208. complete = true;
  209. io_service.stop();
  210. ASSERT_FALSE(stat.ok());
  211. });
  212. io_service.run();
  213. ASSERT_TRUE(complete);
  214. }
  215. TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
  216. {
  217. auto producer = std::make_shared<SharedConnectionData>();
  218. producer->checkProducerForConnect = true;
  219. SharedMockConnection::SetSharedConnectionData(producer);
  220. ::asio::io_service io_service;
  221. bool complete = false;
  222. Options options;
  223. options.max_rpc_retries = 2;
  224. options.rpc_retry_delay_ms = 0;
  225. SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
  226. EXPECT_CALL(*producer, Produce())
  227. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  228. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  229. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  230. engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) {
  231. complete = true;
  232. io_service.stop();
  233. ASSERT_FALSE(stat.ok());
  234. });
  235. io_service.run();
  236. ASSERT_TRUE(complete);
  237. }
  238. TEST(RpcEngineTest, TestConnectionFailureAndRecover)
  239. {
  240. auto producer = std::make_shared<SharedConnectionData>();
  241. producer->checkProducerForConnect = true;
  242. SharedMockConnection::SetSharedConnectionData(producer);
  243. ::asio::io_service io_service;
  244. bool complete = false;
  245. Options options;
  246. options.max_rpc_retries = 1;
  247. options.rpc_retry_delay_ms = 0;
  248. SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
  249. EXPECT_CALL(*producer, Produce())
  250. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  251. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  252. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  253. engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) {
  254. complete = true;
  255. io_service.stop();
  256. ASSERT_TRUE(stat.ok());
  257. });
  258. io_service.run();
  259. ASSERT_TRUE(complete);
  260. }
  261. TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
  262. {
  263. // Error and async recover
  264. auto producer = std::make_shared<SharedConnectionData>();
  265. producer->checkProducerForConnect = true;
  266. SharedMockConnection::SetSharedConnectionData(producer);
  267. ::asio::io_service io_service;
  268. bool complete = false;
  269. Options options;
  270. options.max_rpc_retries = 1;
  271. options.rpc_retry_delay_ms = 1;
  272. SharedConnectionEngine engine(&io_service, options, "foo", "protocol", 1);
  273. EXPECT_CALL(*producer, Produce())
  274. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  275. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  276. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  277. engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) {
  278. complete = true;
  279. io_service.stop();
  280. ASSERT_TRUE(stat.ok());
  281. });
  282. ::asio::deadline_timer timer(io_service);
  283. timer.expires_from_now(std::chrono::hours(100));
  284. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  285. io_service.run();
  286. ASSERT_TRUE(complete);
  287. }
  288. TEST(RpcEngineTest, TestTimeout) {
  289. ::asio::io_service io_service;
  290. Options options;
  291. options.rpc_timeout = 1;
  292. RpcEngine engine(&io_service, options, "foo", "protocol", 1);
  293. RpcConnectionImpl<MockRPCConnection> *conn =
  294. new RpcConnectionImpl<MockRPCConnection>(&engine);
  295. conn->TEST_set_connected(true);
  296. conn->StartReading();
  297. EXPECT_CALL(conn->next_layer(), Produce())
  298. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  299. std::shared_ptr<RpcConnection> conn_ptr(conn);
  300. engine.TEST_SetRpcConnection(conn_ptr);
  301. bool complete = false;
  302. EchoRequestProto req;
  303. req.set_message("foo");
  304. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  305. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  306. complete = true;
  307. io_service.stop();
  308. ASSERT_FALSE(stat.ok());
  309. });
  310. ::asio::deadline_timer timer(io_service);
  311. timer.expires_from_now(std::chrono::hours(100));
  312. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  313. io_service.run();
  314. ASSERT_TRUE(complete);
  315. }
  316. int main(int argc, char *argv[]) {
  317. // The following line must be executed to initialize Google Mock
  318. // (and Google Test) before running the tests.
  319. ::testing::InitGoogleMock(&argc, argv);
  320. return RUN_ALL_TESTS();
  321. }