block_reader.h 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef BLOCK_READER_H_
  19. #define BLOCK_READER_H_
  20. #include "hdfspp/status.h"
  21. #include "common/async_stream.h"
  22. #include "common/cancel_tracker.h"
  23. #include "common/new_delete.h"
  24. #include "datatransfer.pb.h"
  25. #include "connection/datanodeconnection.h"
  26. #include <memory>
  27. namespace hdfs {
  28. struct CacheStrategy {
  29. bool drop_behind_specified;
  30. bool drop_behind;
  31. bool read_ahead_specified;
  32. unsigned long long read_ahead;
  33. CacheStrategy()
  34. : drop_behind_specified(false), drop_behind(false),
  35. read_ahead_specified(false), read_ahead(false) {}
  36. };
  37. enum DropBehindStrategy {
  38. kUnspecified = 0,
  39. kEnableDropBehind = 1,
  40. kDisableDropBehind = 2,
  41. };
  42. enum EncryptionScheme {
  43. kNone = 0,
  44. kAESCTRNoPadding = 1,
  45. };
  46. struct BlockReaderOptions {
  47. bool verify_checksum;
  48. CacheStrategy cache_strategy;
  49. EncryptionScheme encryption_scheme;
  50. BlockReaderOptions()
  51. : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
  52. };
  53. /**
  54. * Handles the operational state of request and reading a block (or portion of
  55. * a block) from a DataNode.
  56. *
  57. * Threading model: not thread-safe.
  58. * Lifecycle: should be created, used for a single read, then freed.
  59. */
  60. class BlockReader {
  61. public:
  62. MEMCHECKED_CLASS(BlockReader)
  63. virtual void AsyncReadBlock(
  64. const std::string & client_name,
  65. const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
  66. const MutableBuffers &buffers,
  67. const std::function<void(const Status &, size_t)> handler) = 0;
  68. virtual void AsyncReadPacket(
  69. const MutableBuffers &buffers,
  70. const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0;
  71. virtual void AsyncRequestBlock(
  72. const std::string &client_name,
  73. const hadoop::hdfs::ExtendedBlockProto *block,
  74. uint64_t length,
  75. uint64_t offset,
  76. const std::function<void(Status)> &handler) = 0;
  77. virtual void CancelOperation() = 0;
  78. };
  79. class BlockReaderImpl
  80. : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
  81. public:
  82. explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn,
  83. CancelHandle cancel_state)
  84. : dn_(dn), state_(kOpen), options_(options),
  85. chunk_padding_bytes_(0), cancel_state_(cancel_state) {}
  86. virtual void AsyncReadPacket(
  87. const MutableBuffers &buffers,
  88. const std::function<void(const Status &, size_t bytes_transferred)> &handler) override;
  89. virtual void AsyncRequestBlock(
  90. const std::string &client_name,
  91. const hadoop::hdfs::ExtendedBlockProto *block,
  92. uint64_t length,
  93. uint64_t offset,
  94. const std::function<void(Status)> &handler) override;
  95. virtual void AsyncReadBlock(
  96. const std::string & client_name,
  97. const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
  98. const MutableBuffers &buffers,
  99. const std::function<void(const Status &, size_t)> handler) override;
  100. virtual void CancelOperation() override;
  101. size_t ReadPacket(const MutableBuffers &buffers, Status *status);
  102. Status RequestBlock(
  103. const std::string &client_name,
  104. const hadoop::hdfs::ExtendedBlockProto *block,
  105. uint64_t length,
  106. uint64_t offset);
  107. private:
  108. struct RequestBlockContinuation;
  109. struct ReadBlockContinuation;
  110. struct ReadPacketHeader;
  111. struct ReadChecksum;
  112. struct ReadPadding;
  113. struct ReadData;
  114. struct AckRead;
  115. enum State {
  116. kOpen,
  117. kReadPacketHeader,
  118. kReadChecksum,
  119. kReadPadding,
  120. kReadData,
  121. kFinished,
  122. };
  123. std::shared_ptr<DataNodeConnection> dn_;
  124. hadoop::hdfs::PacketHeaderProto header_;
  125. State state_;
  126. BlockReaderOptions options_;
  127. size_t packet_len_;
  128. int packet_data_read_bytes_;
  129. int chunk_padding_bytes_;
  130. long long bytes_to_read_;
  131. std::vector<char> checksum_;
  132. CancelHandle cancel_state_;
  133. };
  134. }
  135. #endif