1
0

rpc_engine_test.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "mock_connection.h"
  19. #include "test.pb.h"
  20. #include "RpcHeader.pb.h"
  21. #include "rpc/rpc_connection.h"
  22. #include <google/protobuf/io/coded_stream.h>
  23. #include <gmock/gmock.h>
  24. using ::hadoop::common::RpcResponseHeaderProto;
  25. using ::hadoop::common::EmptyRequestProto;
  26. using ::hadoop::common::EmptyResponseProto;
  27. using ::hadoop::common::EchoRequestProto;
  28. using ::hadoop::common::EchoResponseProto;
  29. using ::asio::error_code;
  30. using ::testing::Return;
  31. using ::std::make_pair;
  32. using ::std::string;
  33. namespace pb = ::google::protobuf;
  34. namespace pbio = ::google::protobuf::io;
  35. namespace hdfs {
  36. std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> make_endpoint() {
  37. std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> result;
  38. result.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
  39. return result;
  40. }
  41. class MockRPCConnection : public MockConnectionBase {
  42. public:
  43. MockRPCConnection(::asio::io_service &io_service)
  44. : MockConnectionBase(&io_service) {}
  45. MOCK_METHOD0(Produce, ProducerResult());
  46. };
  47. class SharedMockRPCConnection : public SharedMockConnection {
  48. public:
  49. SharedMockRPCConnection(::asio::io_service &io_service)
  50. : SharedMockConnection(&io_service) {}
  51. };
  52. class SharedConnectionEngine : public RpcEngine {
  53. using RpcEngine::RpcEngine;
  54. protected:
  55. std::shared_ptr<RpcConnection> NewConnection() override {
  56. // Stuff in some dummy endpoints so we don't error out
  57. last_endpoints_ = make_endpoint();
  58. return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
  59. }
  60. };
  61. }
  62. static inline std::pair<error_code, string> RpcResponse(
  63. const RpcResponseHeaderProto &h, const std::string &data,
  64. const ::asio::error_code &ec = error_code()) {
  65. uint32_t payload_length =
  66. pbio::CodedOutputStream::VarintSize32(h.ByteSize()) +
  67. pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() +
  68. data.size();
  69. std::string res;
  70. res.resize(sizeof(uint32_t) + payload_length);
  71. uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str()));
  72. buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray(
  73. htonl(payload_length), buf);
  74. buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf);
  75. buf = h.SerializeWithCachedSizesToArray(buf);
  76. buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf);
  77. buf = pbio::CodedOutputStream::WriteStringToArray(data, buf);
  78. return std::make_pair(ec, std::move(res));
  79. }
  80. using namespace hdfs;
  81. TEST(RpcEngineTest, TestRoundTrip) {
  82. ::asio::io_service io_service;
  83. Options options;
  84. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  85. auto conn =
  86. std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
  87. conn->TEST_set_connected(true);
  88. conn->StartReading();
  89. EchoResponseProto server_resp;
  90. server_resp.set_message("foo");
  91. RpcResponseHeaderProto h;
  92. h.set_callid(1);
  93. h.set_status(RpcResponseHeaderProto::SUCCESS);
  94. EXPECT_CALL(conn->next_layer(), Produce())
  95. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  96. std::shared_ptr<RpcConnection> conn_ptr(conn);
  97. engine.TEST_SetRpcConnection(conn_ptr);
  98. bool complete = false;
  99. EchoRequestProto req;
  100. req.set_message("foo");
  101. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  102. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  103. ASSERT_TRUE(stat.ok());
  104. ASSERT_EQ("foo", resp->message());
  105. complete = true;
  106. io_service.stop();
  107. });
  108. io_service.run();
  109. ASSERT_TRUE(complete);
  110. }
  111. TEST(RpcEngineTest, TestConnectionResetAndFail) {
  112. ::asio::io_service io_service;
  113. Options options;
  114. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  115. auto conn =
  116. std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
  117. conn->TEST_set_connected(true);
  118. conn->StartReading();
  119. bool complete = false;
  120. RpcResponseHeaderProto h;
  121. h.set_callid(1);
  122. h.set_status(RpcResponseHeaderProto::SUCCESS);
  123. EXPECT_CALL(conn->next_layer(), Produce())
  124. .WillOnce(Return(RpcResponse(
  125. h, "", make_error_code(::asio::error::connection_reset))));
  126. std::shared_ptr<RpcConnection> conn_ptr(conn);
  127. engine.TEST_SetRpcConnection(conn_ptr);
  128. EchoRequestProto req;
  129. req.set_message("foo");
  130. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  131. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  132. complete = true;
  133. io_service.stop();
  134. ASSERT_FALSE(stat.ok());
  135. });
  136. io_service.run();
  137. ASSERT_TRUE(complete);
  138. }
  139. TEST(RpcEngineTest, TestConnectionResetAndRecover) {
  140. ::asio::io_service io_service;
  141. Options options;
  142. options.max_rpc_retries = 1;
  143. options.rpc_retry_delay_ms = 0;
  144. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  145. EchoResponseProto server_resp;
  146. server_resp.set_message("foo");
  147. bool complete = false;
  148. auto producer = std::make_shared<SharedConnectionData>();
  149. RpcResponseHeaderProto h;
  150. h.set_callid(1);
  151. h.set_status(RpcResponseHeaderProto::SUCCESS);
  152. EXPECT_CALL(*producer, Produce())
  153. .WillOnce(Return(RpcResponse(
  154. h, "", make_error_code(::asio::error::connection_reset))))
  155. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  156. SharedMockConnection::SetSharedConnectionData(producer);
  157. EchoRequestProto req;
  158. req.set_message("foo");
  159. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  160. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  161. complete = true;
  162. io_service.stop();
  163. ASSERT_TRUE(stat.ok());
  164. });
  165. io_service.run();
  166. ASSERT_TRUE(complete);
  167. }
  168. TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
  169. ::asio::io_service io_service;
  170. Options options;
  171. options.max_rpc_retries = 1;
  172. options.rpc_retry_delay_ms = 1;
  173. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  174. EchoResponseProto server_resp;
  175. server_resp.set_message("foo");
  176. bool complete = false;
  177. auto producer = std::make_shared<SharedConnectionData>();
  178. RpcResponseHeaderProto h;
  179. h.set_callid(1);
  180. h.set_status(RpcResponseHeaderProto::SUCCESS);
  181. EXPECT_CALL(*producer, Produce())
  182. .WillOnce(Return(RpcResponse(
  183. h, "", make_error_code(::asio::error::connection_reset))))
  184. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  185. SharedMockConnection::SetSharedConnectionData(producer);
  186. EchoRequestProto req;
  187. req.set_message("foo");
  188. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  189. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  190. complete = true;
  191. io_service.stop();
  192. ASSERT_TRUE(stat.ok());
  193. });
  194. ::asio::deadline_timer timer(io_service);
  195. timer.expires_from_now(std::chrono::hours(100));
  196. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  197. io_service.run();
  198. ASSERT_TRUE(complete);
  199. }
  200. TEST(RpcEngineTest, TestConnectionFailure)
  201. {
  202. auto producer = std::make_shared<SharedConnectionData>();
  203. producer->checkProducerForConnect = true;
  204. SharedMockConnection::SetSharedConnectionData(producer);
  205. // Error and no retry
  206. ::asio::io_service io_service;
  207. bool complete = false;
  208. Options options;
  209. options.max_rpc_retries = 0;
  210. options.rpc_retry_delay_ms = 0;
  211. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  212. EXPECT_CALL(*producer, Produce())
  213. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  214. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  215. complete = true;
  216. io_service.stop();
  217. ASSERT_FALSE(stat.ok());
  218. });
  219. io_service.run();
  220. ASSERT_TRUE(complete);
  221. }
  222. TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
  223. {
  224. auto producer = std::make_shared<SharedConnectionData>();
  225. producer->checkProducerForConnect = true;
  226. SharedMockConnection::SetSharedConnectionData(producer);
  227. ::asio::io_service io_service;
  228. bool complete = false;
  229. Options options;
  230. options.max_rpc_retries = 2;
  231. options.rpc_retry_delay_ms = 0;
  232. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  233. EXPECT_CALL(*producer, Produce())
  234. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  235. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  236. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  237. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  238. complete = true;
  239. io_service.stop();
  240. ASSERT_FALSE(stat.ok());
  241. });
  242. io_service.run();
  243. ASSERT_TRUE(complete);
  244. }
  245. TEST(RpcEngineTest, TestConnectionFailureAndRecover)
  246. {
  247. auto producer = std::make_shared<SharedConnectionData>();
  248. producer->checkProducerForConnect = true;
  249. SharedMockConnection::SetSharedConnectionData(producer);
  250. ::asio::io_service io_service;
  251. bool complete = false;
  252. Options options;
  253. options.max_rpc_retries = 1;
  254. options.rpc_retry_delay_ms = 0;
  255. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  256. EXPECT_CALL(*producer, Produce())
  257. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  258. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  259. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  260. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  261. complete = true;
  262. io_service.stop();
  263. ASSERT_TRUE(stat.ok());
  264. });
  265. io_service.run();
  266. ASSERT_TRUE(complete);
  267. }
  268. TEST(RpcEngineTest, TestEventCallbacks)
  269. {
  270. ::asio::io_service io_service;
  271. Options options;
  272. options.max_rpc_retries = 99;
  273. options.rpc_retry_delay_ms = 0;
  274. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  275. // Set up event callbacks
  276. int calls = 0;
  277. std::vector<std::string> callbacks;
  278. engine.SetFsEventCallback([&calls, &callbacks] (const char * event,
  279. const char * cluster,
  280. int64_t value) {
  281. (void)cluster; (void)value;
  282. callbacks.push_back(event);
  283. // Allow connect and fail first read
  284. calls++;
  285. if (calls == 1 || calls == 3) // First connect and first read
  286. return event_response::test_err(Status::Error("Test"));
  287. return event_response::ok();
  288. });
  289. EchoResponseProto server_resp;
  290. server_resp.set_message("foo");
  291. auto producer = std::make_shared<SharedConnectionData>();
  292. producer->checkProducerForConnect = true;
  293. RpcResponseHeaderProto h;
  294. h.set_callid(1);
  295. h.set_status(RpcResponseHeaderProto::SUCCESS);
  296. EXPECT_CALL(*producer, Produce())
  297. .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) // subverted by callback
  298. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  299. .WillOnce(Return(RpcResponse(h, "b"))) // subverted by callback
  300. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  301. SharedMockConnection::SetSharedConnectionData(producer);
  302. EchoRequestProto req;
  303. req.set_message("foo");
  304. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  305. bool complete = false;
  306. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  307. complete = true;
  308. io_service.stop();
  309. ASSERT_TRUE(stat.ok());
  310. });
  311. io_service.run();
  312. ASSERT_TRUE(complete);
  313. ASSERT_EQ(8, callbacks.size());
  314. ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
  315. ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
  316. ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
  317. ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect
  318. for (int i=4; i < 7; i++)
  319. ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
  320. }
  321. TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
  322. {
  323. // Error and async recover
  324. auto producer = std::make_shared<SharedConnectionData>();
  325. producer->checkProducerForConnect = true;
  326. SharedMockConnection::SetSharedConnectionData(producer);
  327. ::asio::io_service io_service;
  328. bool complete = false;
  329. Options options;
  330. options.max_rpc_retries = 1;
  331. options.rpc_retry_delay_ms = 1;
  332. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  333. EXPECT_CALL(*producer, Produce())
  334. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  335. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  336. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  337. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  338. complete = true;
  339. io_service.stop();
  340. ASSERT_TRUE(stat.ok());
  341. });
  342. ::asio::deadline_timer timer(io_service);
  343. timer.expires_from_now(std::chrono::hours(100));
  344. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  345. io_service.run();
  346. ASSERT_TRUE(complete);
  347. }
  348. TEST(RpcEngineTest, TestTimeout) {
  349. ::asio::io_service io_service;
  350. Options options;
  351. options.rpc_timeout = 1;
  352. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  353. auto conn =
  354. std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
  355. conn->TEST_set_connected(true);
  356. conn->StartReading();
  357. EXPECT_CALL(conn->next_layer(), Produce())
  358. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  359. std::shared_ptr<RpcConnection> conn_ptr(conn);
  360. engine.TEST_SetRpcConnection(conn_ptr);
  361. bool complete = false;
  362. EchoRequestProto req;
  363. req.set_message("foo");
  364. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  365. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  366. complete = true;
  367. io_service.stop();
  368. ASSERT_FALSE(stat.ok());
  369. });
  370. ::asio::deadline_timer timer(io_service);
  371. timer.expires_from_now(std::chrono::hours(100));
  372. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  373. io_service.run();
  374. ASSERT_TRUE(complete);
  375. }
  376. int main(int argc, char *argv[]) {
  377. // The following line must be executed to initialize Google Mock
  378. // (and Google Test) before running the tests.
  379. ::testing::InitGoogleMock(&argc, argv);
  380. int exit_code = RUN_ALL_TESTS();
  381. // Clean up static data and prevent valgrind memory leaks
  382. google::protobuf::ShutdownProtobufLibrary();
  383. return exit_code;
  384. }