inputstream_impl.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef FS_INPUTSTREAM_IMPL_H_
  19. #define FS_INPUTSTREAM_IMPL_H_
  20. #include "reader/block_reader.h"
  21. #include "common/continuation/asio.h"
  22. #include "common/continuation/protobuf.h"
  23. #include <functional>
  24. #include <future>
  25. #include <type_traits>
  26. namespace hdfs {
  27. struct InputStreamImpl::RemoteBlockReaderTrait {
  28. typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
  29. struct State {
  30. std::unique_ptr<asio::ip::tcp::socket> conn_;
  31. std::shared_ptr<Reader> reader_;
  32. std::array<asio::ip::tcp::endpoint, 1> endpoints_;
  33. size_t transferred_;
  34. Reader *reader() { return reader_.get(); }
  35. size_t *transferred() { return &transferred_; }
  36. const size_t *transferred() const { return &transferred_; }
  37. };
  38. static continuation::Pipeline<State> *
  39. CreatePipeline(::asio::io_service *io_service,
  40. const ::hadoop::hdfs::DatanodeInfoProto &dn) {
  41. using namespace ::asio::ip;
  42. auto m = continuation::Pipeline<State>::Create();
  43. auto &s = m->state();
  44. s.conn_.reset(new tcp::socket(*io_service));
  45. s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get());
  46. auto datanode = dn.id();
  47. s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
  48. datanode.xferport());
  49. m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
  50. s.endpoints_.end()));
  51. return m;
  52. }
  53. };
  54. template <class Reader>
  55. struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
  56. HandshakeContinuation(Reader *reader, const std::string &client_name,
  57. const hadoop::common::TokenProto *token,
  58. const hadoop::hdfs::ExtendedBlockProto *block,
  59. uint64_t length, uint64_t offset)
  60. : reader_(reader), client_name_(client_name), length_(length),
  61. offset_(offset) {
  62. if (token) {
  63. token_.reset(new hadoop::common::TokenProto());
  64. token_->CheckTypeAndMergeFrom(*token);
  65. }
  66. block_.CheckTypeAndMergeFrom(*block);
  67. }
  68. virtual void Run(const Next &next) override {
  69. reader_->async_connect(client_name_, token_.get(), &block_, length_,
  70. offset_, next);
  71. }
  72. private:
  73. Reader *reader_;
  74. const std::string client_name_;
  75. std::unique_ptr<hadoop::common::TokenProto> token_;
  76. hadoop::hdfs::ExtendedBlockProto block_;
  77. uint64_t length_;
  78. uint64_t offset_;
  79. };
  80. template <class Reader, class MutableBufferSequence>
  81. struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
  82. ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
  83. size_t *transferred)
  84. : reader_(reader), buffer_(buffer),
  85. buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
  86. static_assert(!std::is_reference<MutableBufferSequence>::value,
  87. "Buffer must not be a reference type");
  88. }
  89. virtual void Run(const Next &next) override {
  90. *transferred_ = 0;
  91. next_ = next;
  92. OnReadData(Status::OK(), 0);
  93. }
  94. private:
  95. Reader *reader_;
  96. const MutableBufferSequence buffer_;
  97. const size_t buffer_size_;
  98. size_t *transferred_;
  99. std::function<void(const Status &)> next_;
  100. void OnReadData(const Status &status, size_t transferred) {
  101. using std::placeholders::_1;
  102. using std::placeholders::_2;
  103. *transferred_ += transferred;
  104. if (!status.ok()) {
  105. next_(status);
  106. } else if (*transferred_ >= buffer_size_) {
  107. next_(status);
  108. } else {
  109. reader_->async_read_some(
  110. asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
  111. std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
  112. }
  113. }
  114. };
  115. template <class MutableBufferSequence, class Handler>
  116. void InputStreamImpl::AsyncPreadSome(
  117. size_t offset, const MutableBufferSequence &buffers,
  118. const std::set<std::string> &excluded_datanodes, const Handler &handler) {
  119. using ::hadoop::hdfs::DatanodeInfoProto;
  120. using ::hadoop::hdfs::LocatedBlockProto;
  121. auto it = std::find_if(
  122. blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
  123. return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
  124. });
  125. if (it == blocks_.end()) {
  126. handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
  127. return;
  128. }
  129. const DatanodeInfoProto *chosen_dn = nullptr;
  130. for (int i = 0; i < it->locs_size(); ++i) {
  131. const auto &di = it->locs(i);
  132. if (!excluded_datanodes.count(di.id().datanodeuuid())) {
  133. chosen_dn = &di;
  134. break;
  135. }
  136. }
  137. if (!chosen_dn) {
  138. handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
  139. return;
  140. }
  141. uint64_t offset_within_block = offset - it->offset();
  142. uint64_t size_within_block = std::min<uint64_t>(
  143. it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
  144. AsyncReadBlock<RemoteBlockReaderTrait>(
  145. fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block,
  146. asio::buffer(buffers, size_within_block), handler);
  147. }
  148. template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
  149. void InputStreamImpl::AsyncReadBlock(
  150. const std::string &client_name,
  151. const hadoop::hdfs::LocatedBlockProto &block,
  152. const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
  153. const MutableBufferSequence &buffers, const Handler &handler) {
  154. typedef typename BlockReaderTrait::Reader Reader;
  155. auto m =
  156. BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);
  157. auto &s = m->state();
  158. size_t size = asio::buffer_size(buffers);
  159. m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
  160. &block.b(), size, offset))
  161. .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>(
  162. s.reader(), buffers, s.transferred()));
  163. const std::string &dnid = dn.id().datanodeuuid();
  164. m->Run([handler, dnid](const Status &status,
  165. const typename BlockReaderTrait::State &state) {
  166. handler(status, dnid, *state.transferred());
  167. });
  168. }
  169. }
  170. #endif