inputstream_test.cc 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "fs/filesystem.h"
  19. #include <gmock/gmock.h>
  20. using hadoop::common::TokenProto;
  21. using hadoop::hdfs::DatanodeInfoProto;
  22. using hadoop::hdfs::DatanodeIDProto;
  23. using hadoop::hdfs::ExtendedBlockProto;
  24. using hadoop::hdfs::LocatedBlockProto;
  25. using hadoop::hdfs::LocatedBlocksProto;
  26. using ::testing::_;
  27. using ::testing::InvokeArgument;
  28. using ::testing::Return;
  29. using namespace hdfs;
  30. namespace hdfs {
  31. class MockReader {
  32. public:
  33. virtual ~MockReader() {}
  34. MOCK_METHOD2(
  35. async_read_some,
  36. void(const asio::mutable_buffers_1 &,
  37. const std::function<void(const Status &, size_t transferred)> &));
  38. MOCK_METHOD6(async_connect,
  39. void(const std::string &, TokenProto *, ExtendedBlockProto *,
  40. uint64_t, uint64_t,
  41. const std::function<void(const Status &)> &));
  42. };
  43. template <class Trait> struct MockBlockReaderTrait {
  44. typedef MockReader Reader;
  45. struct State {
  46. MockReader reader_;
  47. size_t transferred_;
  48. Reader *reader() { return &reader_; }
  49. size_t *transferred() { return &transferred_; }
  50. const size_t *transferred() const { return &transferred_; }
  51. };
  52. static continuation::Pipeline<State> *
  53. CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) {
  54. auto m = continuation::Pipeline<State>::Create();
  55. *m->state().transferred() = 0;
  56. Trait::InitializeMockReader(m->state().reader());
  57. return m;
  58. }
  59. };
  60. }
  61. TEST(InputStreamTest, TestReadSingleTrunk) {
  62. LocatedBlocksProto blocks;
  63. LocatedBlockProto block;
  64. DatanodeInfoProto dn;
  65. char buf[4096] = {
  66. 0,
  67. };
  68. IoServiceImpl io_service;
  69. FileSystemImpl fs(&io_service);
  70. InputStreamImpl is(&fs, &blocks);
  71. Status stat;
  72. size_t read = 0;
  73. struct Trait {
  74. static void InitializeMockReader(MockReader *reader) {
  75. EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
  76. .WillOnce(InvokeArgument<5>(Status::OK()));
  77. EXPECT_CALL(*reader, async_read_some(_, _))
  78. .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
  79. }
  80. };
  81. is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
  82. "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
  83. [&stat, &read](const Status &status, const std::string &, size_t transferred) {
  84. stat = status;
  85. read = transferred;
  86. });
  87. ASSERT_TRUE(stat.ok());
  88. ASSERT_EQ(sizeof(buf), read);
  89. read = 0;
  90. }
  91. TEST(InputStreamTest, TestReadMultipleTrunk) {
  92. LocatedBlocksProto blocks;
  93. LocatedBlockProto block;
  94. DatanodeInfoProto dn;
  95. char buf[4096] = {
  96. 0,
  97. };
  98. IoServiceImpl io_service;
  99. FileSystemImpl fs(&io_service);
  100. InputStreamImpl is(&fs, &blocks);
  101. Status stat;
  102. size_t read = 0;
  103. struct Trait {
  104. static void InitializeMockReader(MockReader *reader) {
  105. EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
  106. .WillOnce(InvokeArgument<5>(Status::OK()));
  107. EXPECT_CALL(*reader, async_read_some(_, _))
  108. .Times(4)
  109. .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
  110. }
  111. };
  112. is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
  113. "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
  114. [&stat, &read](const Status &status, const std::string &,
  115. size_t transferred) {
  116. stat = status;
  117. read = transferred;
  118. });
  119. ASSERT_TRUE(stat.ok());
  120. ASSERT_EQ(sizeof(buf), read);
  121. read = 0;
  122. }
  123. TEST(InputStreamTest, TestReadError) {
  124. LocatedBlocksProto blocks;
  125. LocatedBlockProto block;
  126. DatanodeInfoProto dn;
  127. char buf[4096] = {
  128. 0,
  129. };
  130. IoServiceImpl io_service;
  131. FileSystemImpl fs(&io_service);
  132. InputStreamImpl is(&fs, &blocks);
  133. Status stat;
  134. size_t read = 0;
  135. struct Trait {
  136. static void InitializeMockReader(MockReader *reader) {
  137. EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
  138. .WillOnce(InvokeArgument<5>(Status::OK()));
  139. EXPECT_CALL(*reader, async_read_some(_, _))
  140. .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
  141. .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
  142. .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
  143. .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
  144. }
  145. };
  146. is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
  147. "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
  148. [&stat, &read](const Status &status, const std::string &,
  149. size_t transferred) {
  150. stat = status;
  151. read = transferred;
  152. });
  153. ASSERT_FALSE(stat.ok());
  154. ASSERT_EQ(sizeof(buf) / 4 * 3, read);
  155. read = 0;
  156. }
  157. TEST(InputStreamTest, TestExcludeDataNode) {
  158. LocatedBlocksProto blocks;
  159. LocatedBlockProto *block = blocks.add_blocks();
  160. ExtendedBlockProto *b = block->mutable_b();
  161. b->set_poolid("");
  162. b->set_blockid(1);
  163. b->set_generationstamp(1);
  164. b->set_numbytes(4096);
  165. DatanodeInfoProto *di = block->add_locs();
  166. DatanodeIDProto *dnid = di->mutable_id();
  167. dnid->set_datanodeuuid("foo");
  168. char buf[4096] = {
  169. 0,
  170. };
  171. IoServiceImpl io_service;
  172. FileSystemImpl fs(&io_service);
  173. InputStreamImpl is(&fs, &blocks);
  174. Status stat;
  175. size_t read = 0;
  176. struct Trait {
  177. static void InitializeMockReader(MockReader *reader) {
  178. EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
  179. .WillOnce(InvokeArgument<5>(Status::OK()));
  180. EXPECT_CALL(*reader, async_read_some(_, _))
  181. .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
  182. }
  183. };
  184. std::set<std::string> excluded_dn({"foo"});
  185. is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn,
  186. [&stat, &read](const Status &status, const std::string &, size_t transferred) {
  187. stat = status;
  188. read = transferred;
  189. });
  190. ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
  191. ASSERT_EQ(0UL, read);
  192. }
  193. int main(int argc, char *argv[]) {
  194. // The following line must be executed to initialize Google Mock
  195. // (and Google Test) before running the tests.
  196. ::testing::InitGoogleMock(&argc, argv);
  197. return RUN_ALL_TESTS();
  198. }