/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef FS_INPUTSTREAM_IMPL_H_ #define FS_INPUTSTREAM_IMPL_H_ #include "reader/block_reader.h" #include "common/continuation/asio.h" #include "common/continuation/protobuf.h" #include #include #include namespace hdfs { struct InputStreamImpl::RemoteBlockReaderTrait { typedef RemoteBlockReader Reader; struct State { std::unique_ptr conn_; std::shared_ptr reader_; std::array endpoints_; size_t transferred_; Reader *reader() { return reader_.get(); } size_t *transferred() { return &transferred_; } const size_t *transferred() const { return &transferred_; } }; static continuation::Pipeline * CreatePipeline(::asio::io_service *io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn) { using namespace ::asio::ip; auto m = continuation::Pipeline::Create(); auto &s = m->state(); s.conn_.reset(new tcp::socket(*io_service)); s.reader_ = std::make_shared(BlockReaderOptions(), s.conn_.get()); auto datanode = dn.id(); s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), datanode.xferport()); m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), s.endpoints_.end())); return m; } }; template struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { HandshakeContinuation(Reader *reader, const std::string &client_name, const hadoop::common::TokenProto *token, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset) : reader_(reader), client_name_(client_name), length_(length), offset_(offset) { if (token) { token_.reset(new hadoop::common::TokenProto()); token_->CheckTypeAndMergeFrom(*token); } block_.CheckTypeAndMergeFrom(*block); } virtual void Run(const Next &next) override { reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next); } private: Reader *reader_; const std::string client_name_; std::unique_ptr token_; hadoop::hdfs::ExtendedBlockProto block_; uint64_t length_; uint64_t offset_; }; template struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, size_t *transferred) : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { static_assert(!std::is_reference::value, "Buffer must not be a reference type"); } virtual void Run(const Next &next) override { *transferred_ = 0; next_ = next; OnReadData(Status::OK(), 0); } private: Reader *reader_; const MutableBufferSequence buffer_; const size_t buffer_size_; size_t *transferred_; std::function next_; void OnReadData(const Status &status, size_t transferred) { using std::placeholders::_1; using std::placeholders::_2; *transferred_ += transferred; if (!status.ok()) { next_(status); } else if (*transferred_ >= buffer_size_) { next_(status); } else { reader_->async_read_some( asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); } } }; template void InputStreamImpl::AsyncPreadSome( size_t offset, const MutableBufferSequence &buffers, const std::set &excluded_datanodes, const Handler &handler) { using ::hadoop::hdfs::DatanodeInfoProto; using ::hadoop::hdfs::LocatedBlockProto; auto it = std::find_if( blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); }); if (it == blocks_.end()) { handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); return; } const DatanodeInfoProto *chosen_dn = nullptr; for (int i = 0; i < it->locs_size(); ++i) { const auto &di = it->locs(i); if (!excluded_datanodes.count(di.id().datanodeuuid())) { chosen_dn = &di; break; } } if (!chosen_dn) { handler(Status::ResourceUnavailable("No datanodes available"), "", 0); return; } uint64_t offset_within_block = offset - it->offset(); uint64_t size_within_block = std::min( it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); AsyncReadBlock( fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block, asio::buffer(buffers, size_within_block), handler); } template void InputStreamImpl::AsyncReadBlock( const std::string &client_name, const hadoop::hdfs::LocatedBlockProto &block, const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, const MutableBufferSequence &buffers, const Handler &handler) { typedef typename BlockReaderTrait::Reader Reader; auto m = BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn); auto &s = m->state(); size_t size = asio::buffer_size(buffers); m->Push(new HandshakeContinuation(s.reader(), client_name, nullptr, &block.b(), size, offset)) .Push(new ReadBlockContinuation( s.reader(), buffers, s.transferred())); const std::string &dnid = dn.id().datanodeuuid(); m->Run([handler, dnid](const Status &status, const typename BlockReaderTrait::State &state) { handler(status, dnid, *state.transferred()); }); } } #endif