bad_datanode_test.cc 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "fs/filesystem.h"
  19. #include "fs/bad_datanode_tracker.h"
  20. #include "common/util.h"
  21. #include <gmock/gmock.h>
  22. using hadoop::common::TokenProto;
  23. using hadoop::hdfs::DatanodeInfoProto;
  24. using hadoop::hdfs::DatanodeIDProto;
  25. using hadoop::hdfs::ExtendedBlockProto;
  26. using hadoop::hdfs::LocatedBlockProto;
  27. using hadoop::hdfs::LocatedBlocksProto;
  28. using ::testing::_;
  29. using ::testing::InvokeArgument;
  30. using ::testing::Return;
  31. using namespace hdfs;
  32. class MockReader : public BlockReader {
  33. public:
  34. MOCK_METHOD2(
  35. AsyncReadPacket,
  36. void(const asio::mutable_buffers_1 &,
  37. const std::function<void(const Status &, size_t transferred)> &));
  38. MOCK_METHOD5(AsyncRequestBlock,
  39. void(const std::string &client_name,
  40. const hadoop::hdfs::ExtendedBlockProto *block,
  41. uint64_t length, uint64_t offset,
  42. const std::function<void(Status)> &handler));
  43. MOCK_METHOD5(AsyncReadBlock, void(
  44. const std::string & client_name,
  45. const hadoop::hdfs::LocatedBlockProto &block,
  46. size_t offset,
  47. const MutableBuffers &buffers,
  48. const std::function<void(const Status &, size_t)> handler));
  49. };
  50. class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> {
  51. void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override {
  52. handler(Status::OK(), shared_from_this());
  53. }
  54. void async_read_some(const MutableBuffers &buf,
  55. std::function<void (const asio::error_code & error,
  56. std::size_t bytes_transferred) > handler) override {
  57. (void)buf;
  58. handler(asio::error::fault, 0);
  59. }
  60. void async_write_some(const ConstBuffers &buf,
  61. std::function<void (const asio::error_code & error,
  62. std::size_t bytes_transferred) > handler) override {
  63. (void)buf;
  64. handler(asio::error::fault, 0);
  65. }
  66. };
  67. class PartialMockFileHandle : public FileHandleImpl {
  68. using FileHandleImpl::FileHandleImpl;
  69. public:
  70. std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
  71. protected:
  72. std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
  73. std::shared_ptr<DataNodeConnection> dn) override
  74. {
  75. (void) options; (void) dn;
  76. assert(mock_reader_);
  77. return mock_reader_;
  78. }
  79. std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
  80. ::asio::io_service *io_service,
  81. const ::hadoop::hdfs::DatanodeInfoProto & dn,
  82. const hadoop::common::TokenProto * token) override {
  83. (void) io_service; (void) dn; (void) token;
  84. return std::make_shared<MockDNConnection>();
  85. }
  86. };
  87. TEST(BadDataNodeTest, TestNoNodes) {
  88. auto file_info = std::make_shared<struct FileInfo>();
  89. file_info->blocks_.push_back(LocatedBlockProto());
  90. LocatedBlockProto & block = file_info->blocks_[0];
  91. ExtendedBlockProto *b = block.mutable_b();
  92. b->set_poolid("");
  93. b->set_blockid(1);
  94. b->set_generationstamp(1);
  95. b->set_numbytes(4096);
  96. // Set up the one block to have one datanode holding it
  97. DatanodeInfoProto *di = block.add_locs();
  98. DatanodeIDProto *dnid = di->mutable_id();
  99. dnid->set_datanodeuuid("foo");
  100. char buf[4096] = {
  101. 0,
  102. };
  103. IoServiceImpl io_service;
  104. auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
  105. bad_node_tracker->AddBadNode("foo");
  106. PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker);
  107. Status stat;
  108. size_t read = 0;
  109. // Exclude the one datanode with the data
  110. is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), nullptr,
  111. [&stat, &read](const Status &status, const std::string &, size_t transferred) {
  112. stat = status;
  113. read = transferred;
  114. });
  115. // Should fail with no resource available
  116. ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
  117. ASSERT_EQ(0UL, read);
  118. }
  119. TEST(BadDataNodeTest, RecoverableError) {
  120. auto file_info = std::make_shared<struct FileInfo>();
  121. file_info->blocks_.push_back(LocatedBlockProto());
  122. LocatedBlockProto & block = file_info->blocks_[0];
  123. ExtendedBlockProto *b = block.mutable_b();
  124. b->set_poolid("");
  125. b->set_blockid(1);
  126. b->set_generationstamp(1);
  127. b->set_numbytes(4096);
  128. // Set up the one block to have one datanode holding it
  129. DatanodeInfoProto *di = block.add_locs();
  130. DatanodeIDProto *dnid = di->mutable_id();
  131. dnid->set_datanodeuuid("foo");
  132. char buf[4096] = {
  133. 0,
  134. };
  135. IoServiceImpl io_service;
  136. auto tracker = std::make_shared<BadDataNodeTracker>();
  137. PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker);
  138. Status stat;
  139. size_t read = 0;
  140. EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
  141. // resource unavailable error
  142. .WillOnce(InvokeArgument<4>(
  143. Status::ResourceUnavailable("Unable to get some resource, try again later"), 0));
  144. is.AsyncPreadSome(
  145. 0, asio::buffer(buf, sizeof(buf)), nullptr,
  146. [&stat, &read](const Status &status, const std::string &,
  147. size_t transferred) {
  148. stat = status;
  149. read = transferred;
  150. });
  151. ASSERT_FALSE(stat.ok());
  152. std::string failing_dn = "id_of_bad_datanode";
  153. if (!stat.ok()) {
  154. if (FileHandle::ShouldExclude(stat)) {
  155. tracker->AddBadNode(failing_dn);
  156. }
  157. }
  158. ASSERT_FALSE(tracker->IsBadNode(failing_dn));
  159. }
  160. TEST(BadDataNodeTest, InternalError) {
  161. auto file_info = std::make_shared<struct FileInfo>();
  162. file_info->blocks_.push_back(LocatedBlockProto());
  163. LocatedBlockProto & block = file_info->blocks_[0];
  164. ExtendedBlockProto *b = block.mutable_b();
  165. b->set_poolid("");
  166. b->set_blockid(1);
  167. b->set_generationstamp(1);
  168. b->set_numbytes(4096);
  169. // Set up the one block to have one datanode holding it
  170. DatanodeInfoProto *di = block.add_locs();
  171. DatanodeIDProto *dnid = di->mutable_id();
  172. dnid->set_datanodeuuid("foo");
  173. char buf[4096] = {
  174. 0,
  175. };
  176. IoServiceImpl io_service;
  177. auto tracker = std::make_shared<BadDataNodeTracker>();
  178. PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker);
  179. Status stat;
  180. size_t read = 0;
  181. EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
  182. // resource unavailable error
  183. .WillOnce(InvokeArgument<4>(
  184. Status::Exception("server_explosion_exception",
  185. "the server exploded"),
  186. sizeof(buf)));
  187. is.AsyncPreadSome(
  188. 0, asio::buffer(buf, sizeof(buf)), nullptr,
  189. [&stat, &read](const Status &status, const std::string &,
  190. size_t transferred) {
  191. stat = status;
  192. read = transferred;
  193. });
  194. ASSERT_FALSE(stat.ok());
  195. std::string failing_dn = "id_of_bad_datanode";
  196. if (!stat.ok()) {
  197. if (FileHandle::ShouldExclude(stat)) {
  198. tracker->AddBadNode(failing_dn);
  199. }
  200. }
  201. ASSERT_TRUE(tracker->IsBadNode(failing_dn));
  202. }
  203. int main(int argc, char *argv[]) {
  204. // The following line must be executed to initialize Google Mock
  205. // (and Google Test) before running the tests.
  206. ::testing::InitGoogleMock(&argc, argv);
  207. int exit_code = RUN_ALL_TESTS();
  208. // Clean up static data and prevent valgrind memory leaks
  209. google::protobuf::ShutdownProtobufLibrary();
  210. return exit_code;
  211. }