rpc_engine_test.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "mock_connection.h"
  19. #include "test.pb.h"
  20. #include "RpcHeader.pb.h"
  21. #include "rpc/rpc_connection.h"
  22. #include "common/namenode_info.h"
  23. #include <google/protobuf/io/coded_stream.h>
  24. #include <gmock/gmock.h>
  25. using ::hadoop::common::RpcResponseHeaderProto;
  26. using ::hadoop::common::EmptyRequestProto;
  27. using ::hadoop::common::EmptyResponseProto;
  28. using ::hadoop::common::EchoRequestProto;
  29. using ::hadoop::common::EchoResponseProto;
  30. using ::asio::error_code;
  31. using ::testing::Return;
  32. using ::std::make_pair;
  33. using ::std::string;
  34. namespace pb = ::google::protobuf;
  35. namespace pbio = ::google::protobuf::io;
  36. namespace hdfs {
  37. std::vector<ResolvedNamenodeInfo> make_endpoint() {
  38. ResolvedNamenodeInfo result;
  39. result.endpoints.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
  40. return std::vector<ResolvedNamenodeInfo>({result});
  41. }
  42. class MockRPCConnection : public MockConnectionBase {
  43. public:
  44. MockRPCConnection(::asio::io_service &io_service)
  45. : MockConnectionBase(&io_service) {}
  46. MOCK_METHOD0(Produce, ProducerResult());
  47. };
  48. class SharedMockRPCConnection : public SharedMockConnection {
  49. public:
  50. SharedMockRPCConnection(::asio::io_service &io_service)
  51. : SharedMockConnection(&io_service) {}
  52. };
  53. class SharedConnectionEngine : public RpcEngine {
  54. using RpcEngine::RpcEngine;
  55. protected:
  56. std::shared_ptr<RpcConnection> NewConnection() override {
  57. // Stuff in some dummy endpoints so we don't error out
  58. last_endpoints_ = make_endpoint()[0].endpoints;
  59. return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
  60. }
  61. };
  62. }
  63. static inline std::pair<error_code, string> RpcResponse(
  64. const RpcResponseHeaderProto &h, const std::string &data,
  65. const ::asio::error_code &ec = error_code()) {
  66. uint32_t payload_length =
  67. pbio::CodedOutputStream::VarintSize32(h.ByteSize()) +
  68. pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() +
  69. data.size();
  70. std::string res;
  71. res.resize(sizeof(uint32_t) + payload_length);
  72. uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str()));
  73. buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray(
  74. htonl(payload_length), buf);
  75. buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf);
  76. buf = h.SerializeWithCachedSizesToArray(buf);
  77. buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf);
  78. buf = pbio::CodedOutputStream::WriteStringToArray(data, buf);
  79. return std::make_pair(ec, std::move(res));
  80. }
  81. using namespace hdfs;
  82. TEST(RpcEngineTest, TestRoundTrip) {
  83. ::asio::io_service io_service;
  84. Options options;
  85. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  86. auto conn =
  87. std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
  88. conn->TEST_set_connected(true);
  89. conn->StartReading();
  90. EchoResponseProto server_resp;
  91. server_resp.set_message("foo");
  92. RpcResponseHeaderProto h;
  93. h.set_callid(1);
  94. h.set_status(RpcResponseHeaderProto::SUCCESS);
  95. EXPECT_CALL(conn->next_layer(), Produce())
  96. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  97. std::shared_ptr<RpcConnection> conn_ptr(conn);
  98. engine.TEST_SetRpcConnection(conn_ptr);
  99. bool complete = false;
  100. EchoRequestProto req;
  101. req.set_message("foo");
  102. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  103. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  104. ASSERT_TRUE(stat.ok());
  105. ASSERT_EQ("foo", resp->message());
  106. complete = true;
  107. io_service.stop();
  108. });
  109. io_service.run();
  110. ASSERT_TRUE(complete);
  111. }
  112. TEST(RpcEngineTest, TestConnectionResetAndFail) {
  113. ::asio::io_service io_service;
  114. Options options;
  115. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  116. auto conn =
  117. std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
  118. conn->TEST_set_connected(true);
  119. conn->StartReading();
  120. bool complete = false;
  121. RpcResponseHeaderProto h;
  122. h.set_callid(1);
  123. h.set_status(RpcResponseHeaderProto::SUCCESS);
  124. EXPECT_CALL(conn->next_layer(), Produce())
  125. .WillOnce(Return(RpcResponse(
  126. h, "", make_error_code(::asio::error::connection_reset))));
  127. std::shared_ptr<RpcConnection> conn_ptr(conn);
  128. engine.TEST_SetRpcConnection(conn_ptr);
  129. EchoRequestProto req;
  130. req.set_message("foo");
  131. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  132. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  133. complete = true;
  134. io_service.stop();
  135. ASSERT_FALSE(stat.ok());
  136. });
  137. io_service.run();
  138. ASSERT_TRUE(complete);
  139. }
  140. TEST(RpcEngineTest, TestConnectionResetAndRecover) {
  141. ::asio::io_service io_service;
  142. Options options;
  143. options.max_rpc_retries = 1;
  144. options.rpc_retry_delay_ms = 0;
  145. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  146. // Normally determined during RpcEngine::Connect, but in this case options
  147. // provides enough info to determine policy here.
  148. engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions());
  149. EchoResponseProto server_resp;
  150. server_resp.set_message("foo");
  151. bool complete = false;
  152. auto producer = std::make_shared<SharedConnectionData>();
  153. RpcResponseHeaderProto h;
  154. h.set_callid(1);
  155. h.set_status(RpcResponseHeaderProto::SUCCESS);
  156. EXPECT_CALL(*producer, Produce())
  157. .WillOnce(Return(RpcResponse(
  158. h, "", make_error_code(::asio::error::connection_reset))))
  159. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  160. SharedMockConnection::SetSharedConnectionData(producer);
  161. EchoRequestProto req;
  162. req.set_message("foo");
  163. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  164. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  165. complete = true;
  166. io_service.stop();
  167. ASSERT_TRUE(stat.ok());
  168. });
  169. io_service.run();
  170. ASSERT_TRUE(complete);
  171. }
  172. TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
  173. ::asio::io_service io_service;
  174. Options options;
  175. options.max_rpc_retries = 1;
  176. options.rpc_retry_delay_ms = 1;
  177. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  178. // Normally determined during RpcEngine::Connect, but in this case options
  179. // provides enough info to determine policy here.
  180. engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions());
  181. EchoResponseProto server_resp;
  182. server_resp.set_message("foo");
  183. bool complete = false;
  184. auto producer = std::make_shared<SharedConnectionData>();
  185. RpcResponseHeaderProto h;
  186. h.set_callid(1);
  187. h.set_status(RpcResponseHeaderProto::SUCCESS);
  188. EXPECT_CALL(*producer, Produce())
  189. .WillOnce(Return(RpcResponse(
  190. h, "", make_error_code(::asio::error::connection_reset))))
  191. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  192. SharedMockConnection::SetSharedConnectionData(producer);
  193. EchoRequestProto req;
  194. req.set_message("foo");
  195. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  196. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  197. complete = true;
  198. io_service.stop();
  199. ASSERT_TRUE(stat.ok());
  200. });
  201. ::asio::deadline_timer timer(io_service);
  202. timer.expires_from_now(std::chrono::hours(100));
  203. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  204. io_service.run();
  205. ASSERT_TRUE(complete);
  206. }
  207. TEST(RpcEngineTest, TestConnectionFailure)
  208. {
  209. auto producer = std::make_shared<SharedConnectionData>();
  210. producer->checkProducerForConnect = true;
  211. SharedMockConnection::SetSharedConnectionData(producer);
  212. // Error and no retry
  213. ::asio::io_service io_service;
  214. bool complete = false;
  215. Options options;
  216. options.max_rpc_retries = 0;
  217. options.rpc_retry_delay_ms = 0;
  218. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  219. EXPECT_CALL(*producer, Produce())
  220. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  221. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  222. complete = true;
  223. io_service.stop();
  224. ASSERT_FALSE(stat.ok());
  225. });
  226. io_service.run();
  227. ASSERT_TRUE(complete);
  228. }
  229. TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
  230. {
  231. auto producer = std::make_shared<SharedConnectionData>();
  232. producer->checkProducerForConnect = true;
  233. SharedMockConnection::SetSharedConnectionData(producer);
  234. ::asio::io_service io_service;
  235. bool complete = false;
  236. Options options;
  237. options.max_rpc_retries = 2;
  238. options.rpc_retry_delay_ms = 0;
  239. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  240. EXPECT_CALL(*producer, Produce())
  241. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  242. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  243. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
  244. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  245. complete = true;
  246. io_service.stop();
  247. ASSERT_FALSE(stat.ok());
  248. });
  249. io_service.run();
  250. ASSERT_TRUE(complete);
  251. }
  252. TEST(RpcEngineTest, TestConnectionFailureAndRecover)
  253. {
  254. auto producer = std::make_shared<SharedConnectionData>();
  255. producer->checkProducerForConnect = true;
  256. SharedMockConnection::SetSharedConnectionData(producer);
  257. ::asio::io_service io_service;
  258. bool complete = false;
  259. Options options;
  260. options.max_rpc_retries = 1;
  261. options.rpc_retry_delay_ms = 0;
  262. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  263. EXPECT_CALL(*producer, Produce())
  264. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  265. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  266. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  267. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  268. complete = true;
  269. io_service.stop();
  270. ASSERT_TRUE(stat.ok());
  271. });
  272. io_service.run();
  273. ASSERT_TRUE(complete);
  274. }
  275. TEST(RpcEngineTest, TestEventCallbacks)
  276. {
  277. ::asio::io_service io_service;
  278. Options options;
  279. options.max_rpc_retries = 99;
  280. options.rpc_retry_delay_ms = 0;
  281. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  282. // Normally determined during RpcEngine::Connect, but in this case options
  283. // provides enough info to determine policy here.
  284. engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions());
  285. // Set up event callbacks
  286. int calls = 0;
  287. std::vector<std::string> callbacks;
  288. engine.SetFsEventCallback([&calls, &callbacks] (const char * event,
  289. const char * cluster,
  290. int64_t value) {
  291. (void)cluster; (void)value;
  292. callbacks.push_back(event);
  293. // Allow connect and fail first read
  294. calls++;
  295. if (calls == 1 || calls == 3) // First connect and first read
  296. return event_response::test_err(Status::Error("Test"));
  297. return event_response::ok();
  298. });
  299. EchoResponseProto server_resp;
  300. server_resp.set_message("foo");
  301. auto producer = std::make_shared<SharedConnectionData>();
  302. producer->checkProducerForConnect = true;
  303. RpcResponseHeaderProto h;
  304. h.set_callid(1);
  305. h.set_status(RpcResponseHeaderProto::SUCCESS);
  306. EXPECT_CALL(*producer, Produce())
  307. .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) // subverted by callback
  308. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  309. .WillOnce(Return(RpcResponse(h, "b"))) // subverted by callback
  310. .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
  311. SharedMockConnection::SetSharedConnectionData(producer);
  312. EchoRequestProto req;
  313. req.set_message("foo");
  314. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  315. bool complete = false;
  316. engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
  317. complete = true;
  318. io_service.stop();
  319. ASSERT_TRUE(stat.ok());
  320. });
  321. io_service.run();
  322. ASSERT_TRUE(complete);
  323. ASSERT_EQ(8, callbacks.size());
  324. ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
  325. ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
  326. ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
  327. ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect
  328. for (int i=4; i < 7; i++)
  329. ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
  330. }
  331. TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
  332. {
  333. // Error and async recover
  334. auto producer = std::make_shared<SharedConnectionData>();
  335. producer->checkProducerForConnect = true;
  336. SharedMockConnection::SetSharedConnectionData(producer);
  337. ::asio::io_service io_service;
  338. bool complete = false;
  339. Options options;
  340. options.max_rpc_retries = 1;
  341. options.rpc_retry_delay_ms = 1;
  342. SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
  343. EXPECT_CALL(*producer, Produce())
  344. .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
  345. .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
  346. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  347. engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
  348. complete = true;
  349. io_service.stop();
  350. ASSERT_TRUE(stat.ok());
  351. });
  352. ::asio::deadline_timer timer(io_service);
  353. timer.expires_from_now(std::chrono::hours(100));
  354. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  355. io_service.run();
  356. ASSERT_TRUE(complete);
  357. }
  358. TEST(RpcEngineTest, TestTimeout) {
  359. ::asio::io_service io_service;
  360. Options options;
  361. options.rpc_timeout = 1;
  362. RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
  363. auto conn =
  364. std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
  365. conn->TEST_set_connected(true);
  366. conn->StartReading();
  367. EXPECT_CALL(conn->next_layer(), Produce())
  368. .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
  369. std::shared_ptr<RpcConnection> conn_ptr(conn);
  370. engine.TEST_SetRpcConnection(conn_ptr);
  371. bool complete = false;
  372. EchoRequestProto req;
  373. req.set_message("foo");
  374. std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
  375. engine.AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
  376. complete = true;
  377. io_service.stop();
  378. ASSERT_FALSE(stat.ok());
  379. });
  380. ::asio::deadline_timer timer(io_service);
  381. timer.expires_from_now(std::chrono::hours(100));
  382. timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
  383. io_service.run();
  384. ASSERT_TRUE(complete);
  385. }
  386. int main(int argc, char *argv[]) {
  387. // The following line must be executed to initialize Google Mock
  388. // (and Google Test) before running the tests.
  389. ::testing::InitGoogleMock(&argc, argv);
  390. int exit_code = RUN_ALL_TESTS();
  391. // Clean up static data and prevent valgrind memory leaks
  392. google::protobuf::ShutdownProtobufLibrary();
  393. return exit_code;
  394. }