1
0

block_reader.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "reader/block_reader.h"
  19. #include "reader/datatransfer.h"
  20. #include "common/continuation/continuation.h"
  21. #include "common/continuation/asio.h"
  22. #include "common/logging.h"
  23. #include "common/util.h"
  24. #include <future>
  25. namespace hdfs {
  26. #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
  27. #define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_
  28. #define FMT_THIS_ADDR "this=" << (void*)this
  29. // Stuff an OpReadBlockProto message with required fields.
  30. hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name,
  31. bool verify_checksum, const hadoop::common::TokenProto *token,
  32. const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset)
  33. {
  34. using namespace hadoop::hdfs;
  35. using namespace hadoop::common;
  36. BaseHeaderProto *base_h = new BaseHeaderProto();
  37. base_h->set_allocated_block(new ExtendedBlockProto(*block));
  38. if (token) {
  39. base_h->set_allocated_token(new TokenProto(*token));
  40. }
  41. ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
  42. h->set_clientname(client_name);
  43. h->set_allocated_baseheader(base_h);
  44. OpReadBlockProto p;
  45. p.set_allocated_header(h);
  46. p.set_offset(offset);
  47. p.set_len(length);
  48. p.set_sendchecksums(verify_checksum);
  49. // TODO: p.set_allocated_cachingstrategy();
  50. return p;
  51. }
  52. //
  53. // Notes about the BlockReader and associated object lifecycles (9/29/16)
  54. // -We have a several stages in the read pipeline. Each stage represents a logical
  55. // step in the HDFS block transfer logic. They are implemented as continuations
  56. // for now, and in some cases the stage may have a nested continuation as well.
  57. // It's important to make sure that continuations, nested or otherwise, cannot
  58. // outlive the objects they depend on.
  59. //
  60. // -The BlockReader holds a shared_ptr to the DataNodeConnection that's used in each
  61. // pipeline stage. The connection object must never be destroyed while operations are
  62. // pending on the ASIO side (see HDFS-10931). In order to prevent a state where the
  63. // BlockReader or one of the corresponding pipelines outlives the connection each
  64. // pipeline stage must explicitly hold a shared pointer copied from BlockReaderImpl::dn_.
  65. //
  66. static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock};
  67. void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name,
  68. const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
  69. uint64_t offset, const std::function<void(Status)> &handler)
  70. {
  71. LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock("
  72. << FMT_THIS_ADDR << ", ..., length="
  73. << length << ", offset=" << offset << ", ...) called");
  74. // The total number of bytes that we need to transfer from the DN is
  75. // the amount that the user wants (bytesToRead), plus the padding at
  76. // the beginning in order to chunk-align. Note that the DN may elect
  77. // to send more than this amount if the read starts/ends mid-chunk.
  78. bytes_to_read_ = length;
  79. struct State {
  80. std::string header;
  81. hadoop::hdfs::OpReadBlockProto request;
  82. hadoop::hdfs::BlockOpResponseProto response;
  83. };
  84. auto m = continuation::Pipeline<State>::Create(cancel_state_);
  85. State *s = &m->state();
  86. s->request = ReadBlockProto(client_name, options_.verify_checksum,
  87. dn_->token_.get(), block, length, offset);
  88. s->header = std::string((const char*)unsecured_request_block_header, 3);
  89. bool serialize_success = true;
  90. s->header += SerializeDelimitedProtobufMessage(&s->request, &serialize_success);
  91. if(!serialize_success) {
  92. handler(Status::Error("Unable to serialize protobuf message"));
  93. return;
  94. }
  95. auto read_pb_message =
  96. new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(dn_, &s->response);
  97. m->Push(asio_continuation::Write(dn_, asio::buffer(s->header))).Push(read_pb_message);
  98. m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
  99. if (stat.ok()) {
  100. const auto &resp = s.response;
  101. if(this->event_handlers_) {
  102. event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
  103. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  104. if (stat.ok() && event_resp.response_type() == event_response::kTest_Error) {
  105. stat = Status::Error("Test error");
  106. }
  107. #endif
  108. }
  109. if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
  110. if (resp.has_readopchecksuminfo()) {
  111. const auto &checksum_info = resp.readopchecksuminfo();
  112. chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
  113. }
  114. state_ = kReadPacketHeader;
  115. } else {
  116. stat = Status::Error(s.response.message().c_str());
  117. }
  118. }
  119. handler(stat);
  120. });
  121. }
  122. Status BlockReaderImpl::RequestBlock(const std::string &client_name,
  123. const hadoop::hdfs::ExtendedBlockProto *block,
  124. uint64_t length, uint64_t offset)
  125. {
  126. LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock("
  127. << FMT_THIS_ADDR <<"..., length="
  128. << length << ", offset=" << offset << ") called");
  129. auto stat = std::make_shared<std::promise<Status>>();
  130. std::future<Status> future(stat->get_future());
  131. AsyncRequestBlock(client_name, block, length, offset,
  132. [stat](const Status &status) { stat->set_value(status); });
  133. return future.get();
  134. }
  135. struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation
  136. {
  137. ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
  138. virtual void Run(const Next &next) override {
  139. LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run("
  140. << FMT_CONT_AND_PARENT_ADDR << ") called");
  141. parent_->packet_data_read_bytes_ = 0;
  142. parent_->packet_len_ = 0;
  143. auto handler = [next, this](const asio::error_code &ec, size_t) {
  144. Status status;
  145. if (ec) {
  146. status = Status(ec.value(), ec.message().c_str());
  147. } else {
  148. parent_->packet_len_ = packet_length();
  149. parent_->header_.Clear();
  150. bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
  151. header_length());
  152. assert(v && "Failed to parse the header");
  153. (void)v; //avoids unused variable warning
  154. parent_->state_ = kReadChecksum;
  155. }
  156. if(parent_->event_handlers_) {
  157. event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
  158. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  159. if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
  160. status = Status::Error("Test error");
  161. }
  162. #endif
  163. }
  164. next(status);
  165. };
  166. asio::async_read(*parent_->dn_, asio::buffer(buf_),
  167. std::bind(&ReadPacketHeader::CompletionHandler, this,
  168. std::placeholders::_1, std::placeholders::_2), handler);
  169. }
  170. private:
  171. static const size_t kMaxHeaderSize = 512;
  172. static const size_t kPayloadLenOffset = 0;
  173. static const size_t kPayloadLenSize = sizeof(int32_t);
  174. static const size_t kHeaderLenOffset = 4;
  175. static const size_t kHeaderLenSize = sizeof(int16_t);
  176. static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
  177. BlockReaderImpl *parent_;
  178. std::array<char, kMaxHeaderSize> buf_;
  179. size_t packet_length() const {
  180. return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
  181. }
  182. size_t header_length() const {
  183. return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
  184. }
  185. size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
  186. if (ec) {
  187. return 0;
  188. } else if (transferred < kHeaderStart) {
  189. return kHeaderStart - transferred;
  190. } else {
  191. return kHeaderStart + header_length() - transferred;
  192. }
  193. }
  194. // Keep the DN connection alive
  195. std::shared_ptr<DataNodeConnection> shared_conn_;
  196. };
  197. struct BlockReaderImpl::ReadChecksum : continuation::Continuation
  198. {
  199. ReadChecksum(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
  200. virtual void Run(const Next &next) override {
  201. LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run("
  202. << FMT_CONT_AND_PARENT_ADDR << ") called");
  203. auto parent = parent_;
  204. if (parent->state_ != kReadChecksum) {
  205. next(Status::OK());
  206. return;
  207. }
  208. std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
  209. auto handler = [parent, next, this, keep_conn_alive_](const asio::error_code &ec, size_t)
  210. {
  211. Status status;
  212. if (ec) {
  213. status = Status(ec.value(), ec.message().c_str());
  214. } else {
  215. parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
  216. }
  217. if(parent->event_handlers_) {
  218. event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
  219. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  220. if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
  221. status = Status::Error("Test error");
  222. }
  223. #endif
  224. }
  225. next(status);
  226. };
  227. parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen());
  228. asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler);
  229. }
  230. private:
  231. BlockReaderImpl *parent_;
  232. // Keep the DataNodeConnection alive
  233. std::shared_ptr<DataNodeConnection> shared_conn_;
  234. };
  235. struct BlockReaderImpl::ReadData : continuation::Continuation
  236. {
  237. ReadData(BlockReaderImpl *parent, std::shared_ptr<size_t> bytes_transferred,
  238. const asio::mutable_buffers_1 &buf) : parent_(parent),
  239. bytes_transferred_(bytes_transferred), buf_(buf), shared_conn_(parent->dn_)
  240. {
  241. buf_.begin();
  242. }
  243. ~ReadData() {
  244. buf_.end();
  245. }
  246. virtual void Run(const Next &next) override {
  247. LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
  248. << FMT_CONT_AND_PARENT_ADDR << ") called");
  249. auto handler =
  250. [next, this](const asio::error_code &ec, size_t transferred) {
  251. Status status;
  252. if (ec) {
  253. status = Status(ec.value(), ec.message().c_str());
  254. }
  255. *bytes_transferred_ += transferred;
  256. parent_->bytes_to_read_ -= transferred;
  257. parent_->packet_data_read_bytes_ += transferred;
  258. if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
  259. parent_->state_ = kReadPacketHeader;
  260. }
  261. if(parent_->event_handlers_) {
  262. event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
  263. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  264. if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
  265. status = Status::Error("Test error");
  266. }
  267. #endif
  268. }
  269. next(status);
  270. };
  271. auto data_len = parent_->header_.datalen() - parent_->packet_data_read_bytes_;
  272. asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), handler);
  273. }
  274. private:
  275. BlockReaderImpl *parent_;
  276. std::shared_ptr<size_t> bytes_transferred_;
  277. const asio::mutable_buffers_1 buf_;
  278. // Keep DNConnection alive.
  279. std::shared_ptr<DataNodeConnection> shared_conn_;
  280. };
  281. struct BlockReaderImpl::ReadPadding : continuation::Continuation
  282. {
  283. ReadPadding(BlockReaderImpl *parent) : parent_(parent),
  284. padding_(parent->chunk_padding_bytes_),
  285. bytes_transferred_(std::make_shared<size_t>(0)),
  286. read_data_(new ReadData(parent, bytes_transferred_, asio::buffer(padding_))),
  287. shared_conn_(parent->dn_) {}
  288. virtual void Run(const Next &next) override {
  289. LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run("
  290. << FMT_CONT_AND_PARENT_ADDR << ") called");
  291. if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
  292. next(Status::OK());
  293. return;
  294. }
  295. std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
  296. auto h = [next, this, keep_conn_alive_](const Status &stat) {
  297. Status status = stat;
  298. if (status.ok()) {
  299. assert(reinterpret_cast<const int &>(*bytes_transferred_) == parent_->chunk_padding_bytes_);
  300. parent_->chunk_padding_bytes_ = 0;
  301. parent_->state_ = kReadData;
  302. }
  303. if(parent_->event_handlers_) {
  304. event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
  305. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  306. if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
  307. status = Status::Error("Test error");
  308. }
  309. #endif
  310. }
  311. next(status);
  312. };
  313. read_data_->Run(h);
  314. }
  315. private:
  316. BlockReaderImpl *parent_;
  317. std::vector<char> padding_;
  318. std::shared_ptr<size_t> bytes_transferred_;
  319. std::shared_ptr<continuation::Continuation> read_data_;
  320. ReadPadding(const ReadPadding &) = delete;
  321. ReadPadding &operator=(const ReadPadding &) = delete;
  322. // Keep DNConnection alive.
  323. std::shared_ptr<DataNodeConnection> shared_conn_;
  324. };
  325. struct BlockReaderImpl::AckRead : continuation::Continuation
  326. {
  327. AckRead(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
  328. virtual void Run(const Next &next) override {
  329. LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called");
  330. if (parent_->bytes_to_read_ > 0) {
  331. next(Status::OK());
  332. return;
  333. }
  334. auto m = continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
  335. m->state().set_status(parent_->options_.verify_checksum
  336. ? hadoop::hdfs::Status::CHECKSUM_OK
  337. : hadoop::hdfs::Status::SUCCESS);
  338. m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
  339. std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
  340. m->Run([this, next, keep_conn_alive_](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &)
  341. {
  342. Status status = stat;
  343. if (status.ok()) {
  344. parent_->state_ = BlockReaderImpl::kFinished;
  345. }
  346. if(parent_->event_handlers_) {
  347. event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
  348. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  349. if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
  350. status = Status::Error("Test error");
  351. }
  352. #endif
  353. }
  354. next(status);
  355. });
  356. }
  357. private:
  358. BlockReaderImpl *parent_;
  359. // Keep DNConnection alive.
  360. std::shared_ptr<DataNodeConnection> shared_conn_;
  361. };
  362. void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers,
  363. const std::function<void(const Status &, size_t bytes_transferred)> &handler)
  364. {
  365. assert(state_ != kOpen && "Not connected");
  366. LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called");
  367. struct State {
  368. std::shared_ptr<size_t> bytes_transferred;
  369. };
  370. auto m = continuation::Pipeline<State>::Create(cancel_state_);
  371. m->state().bytes_transferred = std::make_shared<size_t>(0);
  372. // Note: some of these continuations have nested pipelines.
  373. m->Push(new ReadPacketHeader(this))
  374. .Push(new ReadChecksum(this))
  375. .Push(new ReadPadding(this))
  376. .Push(new ReadData(
  377. this, m->state().bytes_transferred, buffers))
  378. .Push(new AckRead(this));
  379. auto self = this->shared_from_this();
  380. m->Run([self, handler](const Status &status, const State &state) {
  381. handler(status, *state.bytes_transferred);
  382. });
  383. }
  384. size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status)
  385. {
  386. LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
  387. size_t transferred = 0;
  388. auto done = std::make_shared<std::promise<void>>();
  389. auto future = done->get_future();
  390. AsyncReadPacket(buffers,
  391. [status, &transferred, done](const Status &stat, size_t t) {
  392. *status = stat;
  393. transferred = t;
  394. done->set_value();
  395. });
  396. future.wait();
  397. return transferred;
  398. }
  399. struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation
  400. {
  401. RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
  402. const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset)
  403. : reader_(reader), client_name_(client_name), length_(length), offset_(offset)
  404. {
  405. block_.CheckTypeAndMergeFrom(*block);
  406. }
  407. virtual void Run(const Next &next) override {
  408. LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run("
  409. << FMT_CONT_AND_READER_ADDR << ") called");
  410. reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next);
  411. }
  412. private:
  413. BlockReader *reader_;
  414. const std::string client_name_;
  415. hadoop::hdfs::ExtendedBlockProto block_;
  416. uint64_t length_;
  417. uint64_t offset_;
  418. };
  419. struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
  420. {
  421. ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred)
  422. : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
  423. virtual void Run(const Next &next) override {
  424. LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run("
  425. << FMT_CONT_AND_READER_ADDR << ") called");
  426. *transferred_ = 0;
  427. next_ = next;
  428. OnReadData(Status::OK(), 0);
  429. }
  430. private:
  431. BlockReader *reader_;
  432. const MutableBuffers buffer_;
  433. const size_t buffer_size_;
  434. size_t *transferred_;
  435. std::function<void(const Status &)> next_;
  436. void OnReadData(const Status &status, size_t transferred) {
  437. using std::placeholders::_1;
  438. using std::placeholders::_2;
  439. *transferred_ += transferred;
  440. if (!status.ok()) {
  441. next_(status);
  442. } else if (*transferred_ >= buffer_size_) {
  443. next_(status);
  444. } else {
  445. reader_->AsyncReadPacket(
  446. asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
  447. std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
  448. }
  449. }
  450. };
  451. void BlockReaderImpl::AsyncReadBlock(
  452. const std::string & client_name,
  453. const hadoop::hdfs::LocatedBlockProto &block,
  454. size_t offset,
  455. const MutableBuffers &buffers,
  456. const std::function<void(const Status &, size_t)> handler)
  457. {
  458. LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
  459. << FMT_THIS_ADDR << ") called");
  460. auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
  461. size_t * bytesTransferred = &m->state();
  462. size_t size = asio::buffer_size(buffers);
  463. m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset))
  464. .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
  465. m->Run([handler] (const Status &status, const size_t totalBytesTransferred) {
  466. handler(status, totalBytesTransferred);
  467. });
  468. }
  469. void BlockReaderImpl::CancelOperation() {
  470. LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation("
  471. << FMT_THIS_ADDR << ") called");
  472. /* just forward cancel to DNConnection */
  473. dn_->Cancel();
  474. }
  475. }