1
0

filehandle.cc 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "filehandle.h"
  19. #include "common/continuation/continuation.h"
  20. #include "connection/datanodeconnection.h"
  21. #include "reader/block_reader.h"
  22. #include <future>
  23. #include <tuple>
  24. namespace hdfs {
  25. using ::hadoop::hdfs::LocatedBlocksProto;
  26. FileHandle::~FileHandle() {}
  27. FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
  28. const std::shared_ptr<const struct FileInfo> file_info,
  29. std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
  30. : io_service_(io_service), client_name_(client_name), file_info_(file_info),
  31. bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) {
  32. }
  33. void FileHandleImpl::PositionRead(
  34. void *buf, size_t nbyte, uint64_t offset,
  35. const std::function<void(const Status &, size_t)> &handler) {
  36. /* prevent usage after cancelation */
  37. if(cancel_state_->is_canceled()) {
  38. handler(Status::Canceled(), 0);
  39. return;
  40. }
  41. auto callback = [this, handler](const Status &status,
  42. const std::string &contacted_datanode,
  43. size_t bytes_read) {
  44. /* determine if DN gets marked bad */
  45. if (ShouldExclude(status)) {
  46. bad_node_tracker_->AddBadNode(contacted_datanode);
  47. }
  48. handler(status, bytes_read);
  49. };
  50. AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback);
  51. }
  52. Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
  53. auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
  54. std::future<std::tuple<Status, size_t>> future(callstate->get_future());
  55. /* wrap async call with promise/future to make it blocking */
  56. auto callback = [callstate](const Status &s, size_t bytes) {
  57. callstate->set_value(std::make_tuple(s,bytes));
  58. };
  59. PositionRead(buf, *nbyte, offset, callback);
  60. /* wait for async to finish */
  61. auto returnstate = future.get();
  62. auto stat = std::get<0>(returnstate);
  63. if (!stat.ok()) {
  64. return stat;
  65. }
  66. *nbyte = std::get<1>(returnstate);
  67. return stat;
  68. }
  69. Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
  70. Status stat = PositionRead(buf, nbyte, offset_);
  71. if(!stat.ok()) {
  72. return stat;
  73. }
  74. offset_ += *nbyte;
  75. return Status::OK();
  76. }
  77. Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
  78. if(cancel_state_->is_canceled()) {
  79. return Status::Canceled();
  80. }
  81. off_t new_offset = -1;
  82. switch (whence) {
  83. case std::ios_base::beg:
  84. new_offset = *offset;
  85. break;
  86. case std::ios_base::cur:
  87. new_offset = offset_ + *offset;
  88. break;
  89. case std::ios_base::end:
  90. new_offset = file_info_->file_length_ + *offset;
  91. break;
  92. default:
  93. /* unsupported */
  94. return Status::InvalidArgument("Invalid Seek whence argument");
  95. }
  96. if(!CheckSeekBounds(new_offset)) {
  97. return Status::InvalidArgument("Seek offset out of bounds");
  98. }
  99. offset_ = new_offset;
  100. *offset = offset_;
  101. return Status::OK();
  102. }
  103. /* return false if seek will be out of bounds */
  104. bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
  105. ssize_t file_length = file_info_->file_length_;
  106. if (desired_position < 0 || desired_position > file_length) {
  107. return false;
  108. }
  109. return true;
  110. }
  111. /*
  112. * Note that this method must be thread-safe w.r.t. the unsafe operations occurring
  113. * on the FileHandle
  114. */
  115. void FileHandleImpl::AsyncPreadSome(
  116. size_t offset, const MutableBuffers &buffers,
  117. std::shared_ptr<NodeExclusionRule> excluded_nodes,
  118. const std::function<void(const Status &, const std::string &, size_t)> handler) {
  119. using ::hadoop::hdfs::DatanodeInfoProto;
  120. using ::hadoop::hdfs::LocatedBlockProto;
  121. if(cancel_state_->is_canceled()) {
  122. handler(Status::Canceled(), "", 0);
  123. return;
  124. }
  125. /**
  126. * Note: block and chosen_dn will end up pointing to things inside
  127. * the blocks_ vector. They shouldn't be directly deleted.
  128. **/
  129. auto block = std::find_if(
  130. file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) {
  131. return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
  132. });
  133. if (block == file_info_->blocks_.end()) {
  134. handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
  135. return;
  136. }
  137. /**
  138. * If user supplies a rule use it, otherwise use the tracker.
  139. * User is responsible for making sure one of them isn't null.
  140. **/
  141. std::shared_ptr<NodeExclusionRule> rule =
  142. excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
  143. auto datanodes = block->locs();
  144. auto it = std::find_if(datanodes.begin(), datanodes.end(),
  145. [rule](const DatanodeInfoProto &dn) {
  146. return !rule->IsBadNode(dn.id().datanodeuuid());
  147. });
  148. if (it == datanodes.end()) {
  149. handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
  150. return;
  151. }
  152. DatanodeInfoProto &chosen_dn = *it;
  153. uint64_t offset_within_block = offset - block->offset();
  154. uint64_t size_within_block = std::min<uint64_t>(
  155. block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
  156. // This is where we will put the logic for re-using a DN connection; we can
  157. // steal the FileHandle's dn and put it back when we're done
  158. std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, nullptr /*token*/);
  159. std::string dn_id = dn->uuid_;
  160. std::string client_name = client_name_;
  161. // Wrap the DN in a block reader to handle the state and logic of the
  162. // block request protocol
  163. std::shared_ptr<BlockReader> reader;
  164. reader = CreateBlockReader(BlockReaderOptions(), dn);
  165. auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) {
  166. handler(status, dn_id, transferred);
  167. };
  168. dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
  169. (Status status, std::shared_ptr<DataNodeConnection> dn) {
  170. (void)dn;
  171. if (status.ok()) {
  172. reader->AsyncReadBlock(
  173. client_name, *block, offset_within_block,
  174. asio::buffer(buffers, size_within_block), read_handler);
  175. } else {
  176. handler(status, dn_id, 0);
  177. }
  178. });
  179. return;
  180. }
  181. std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
  182. std::shared_ptr<DataNodeConnection> dn)
  183. {
  184. std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_);
  185. readers_.AddReader(reader);
  186. return reader;
  187. }
  188. std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
  189. ::asio::io_service * io_service,
  190. const ::hadoop::hdfs::DatanodeInfoProto & dn,
  191. const hadoop::common::TokenProto * token) {
  192. return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
  193. }
  194. void FileHandleImpl::CancelOperations() {
  195. cancel_state_->set_canceled();
  196. /* Push update to BlockReaders that may be hung in an asio call */
  197. std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders();
  198. for(auto reader : live_readers) {
  199. reader->CancelOperation();
  200. }
  201. }
  202. bool FileHandle::ShouldExclude(const Status &s) {
  203. if (s.ok()) {
  204. return false;
  205. }
  206. switch (s.code()) {
  207. /* client side resource exhaustion */
  208. case Status::kResourceUnavailable:
  209. case Status::kOperationCanceled:
  210. return false;
  211. case Status::kInvalidArgument:
  212. case Status::kUnimplemented:
  213. case Status::kException:
  214. default:
  215. return true;
  216. }
  217. }
  218. }