filehandle.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "filehandle.h"
  19. #include "common/continuation/continuation.h"
  20. #include "common/logging.h"
  21. #include "connection/datanodeconnection.h"
  22. #include "reader/block_reader.h"
  23. #include "hdfspp/events.h"
  24. #include <future>
  25. #include <memory>
  26. #include <string>
  27. #include <tuple>
  28. #include <boost/asio/buffer.hpp>
  29. #define FMT_THIS_ADDR "this=" << (void*)this
  30. namespace hdfs {
  31. using ::hadoop::hdfs::LocatedBlocksProto;
  32. FileHandle::~FileHandle() {}
  33. FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
  34. const std::string & path,
  35. std::shared_ptr<IoService> io_service, const std::shared_ptr<std::string> &client_name,
  36. const std::shared_ptr<const struct FileInfo> file_info,
  37. std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
  38. std::shared_ptr<LibhdfsEvents> event_handlers)
  39. : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info),
  40. bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) {
  41. LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
  42. << FMT_THIS_ADDR << ", ...) called");
  43. }
  44. void FileHandleImpl::PositionRead(
  45. void *buf, size_t buf_size, uint64_t offset,
  46. const std::function<void(const Status &, size_t)> &handler) {
  47. LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead("
  48. << FMT_THIS_ADDR << ", buf=" << buf
  49. << ", buf_size=" << std::to_string(buf_size) << ") called");
  50. /* prevent usage after cancelation */
  51. if(cancel_state_->is_canceled()) {
  52. handler(Status::Canceled(), 0);
  53. return;
  54. }
  55. auto callback = [this, handler](const Status &status,
  56. const std::string &contacted_datanode,
  57. size_t bytes_read) {
  58. /* determine if DN gets marked bad */
  59. if (ShouldExclude(status)) {
  60. bad_node_tracker_->AddBadNode(contacted_datanode);
  61. }
  62. bytes_read_ += bytes_read;
  63. handler(status, bytes_read);
  64. };
  65. AsyncPreadSome(offset, boost::asio::buffer(buf, buf_size), bad_node_tracker_, callback);
  66. }
  67. Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) {
  68. LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
  69. << FMT_THIS_ADDR << ", buf=" << buf
  70. << ", buf_size=" << std::to_string(buf_size)
  71. << ", offset=" << offset << ") called");
  72. auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
  73. std::future<std::tuple<Status, size_t>> future(callstate->get_future());
  74. /* wrap async call with promise/future to make it blocking */
  75. auto callback = [callstate](const Status &s, size_t bytes) {
  76. callstate->set_value(std::make_tuple(s,bytes));
  77. };
  78. PositionRead(buf, buf_size, offset, callback);
  79. /* wait for async to finish */
  80. auto returnstate = future.get();
  81. auto stat = std::get<0>(returnstate);
  82. if (!stat.ok()) {
  83. return stat;
  84. }
  85. *bytes_read = std::get<1>(returnstate);
  86. return stat;
  87. }
  88. Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) {
  89. LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read("
  90. << FMT_THIS_ADDR << ", buf=" << buf
  91. << ", buf_size=" << std::to_string(buf_size) << ") called");
  92. Status stat = PositionRead(buf, buf_size, offset_, bytes_read);
  93. if(!stat.ok()) {
  94. return stat;
  95. }
  96. offset_ += *bytes_read;
  97. return Status::OK();
  98. }
  99. Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
  100. LOG_DEBUG(kFileHandle, << "FileHandleImpl::Seek("
  101. << ", offset=" << *offset << ", ...) called");
  102. if(cancel_state_->is_canceled()) {
  103. return Status::Canceled();
  104. }
  105. off_t new_offset = -1;
  106. switch (whence) {
  107. case std::ios_base::beg:
  108. new_offset = *offset;
  109. break;
  110. case std::ios_base::cur:
  111. new_offset = offset_ + *offset;
  112. break;
  113. case std::ios_base::end:
  114. new_offset = file_info_->file_length_ + *offset;
  115. break;
  116. default:
  117. /* unsupported */
  118. return Status::InvalidArgument("Invalid Seek whence argument");
  119. }
  120. if(!CheckSeekBounds(new_offset)) {
  121. return Status::InvalidArgument("Seek offset out of bounds");
  122. }
  123. offset_ = new_offset;
  124. *offset = offset_;
  125. return Status::OK();
  126. }
  127. /* return false if seek will be out of bounds */
  128. bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
  129. ssize_t file_length = file_info_->file_length_;
  130. if (desired_position < 0 || desired_position > file_length) {
  131. return false;
  132. }
  133. return true;
  134. }
  135. /*
  136. * Note that this method must be thread-safe w.r.t. the unsafe operations occurring
  137. * on the FileHandle
  138. */
  139. void FileHandleImpl::AsyncPreadSome(
  140. size_t offset, const MutableBuffer &buffer,
  141. std::shared_ptr<NodeExclusionRule> excluded_nodes,
  142. const std::function<void(const Status &, const std::string &, size_t)> handler) {
  143. using ::hadoop::hdfs::DatanodeInfoProto;
  144. using ::hadoop::hdfs::LocatedBlockProto;
  145. LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
  146. << FMT_THIS_ADDR << ", ...) called");
  147. if(cancel_state_->is_canceled()) {
  148. handler(Status::Canceled(), "", 0);
  149. return;
  150. }
  151. if(offset == file_info_->file_length_) {
  152. handler(Status::OK(), "", 0);
  153. return;
  154. } else if(offset > file_info_->file_length_){
  155. handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past the EOF"), "", 0);
  156. return;
  157. }
  158. if (client_name_ == nullptr) {
  159. handler(Status::Error("AsyncPreadSome: Unable to generate random client name"), "", 0);
  160. return;
  161. }
  162. /**
  163. * Note: block and chosen_dn will end up pointing to things inside
  164. * the blocks_ vector. They shouldn't be directly deleted.
  165. **/
  166. auto block = std::find_if(
  167. file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) {
  168. return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
  169. });
  170. if (block == file_info_->blocks_.end()) {
  171. LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR
  172. << ", ...) Cannot find corresponding blocks");
  173. handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
  174. return;
  175. }
  176. /**
  177. * If user supplies a rule use it, otherwise use the tracker.
  178. * User is responsible for making sure one of them isn't null.
  179. **/
  180. std::shared_ptr<NodeExclusionRule> rule =
  181. excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
  182. auto datanodes = block->locs();
  183. auto it = std::find_if(datanodes.begin(), datanodes.end(),
  184. [rule](const DatanodeInfoProto &dn) {
  185. return !rule->IsBadNode(dn.id().datanodeuuid());
  186. });
  187. if (it == datanodes.end()) {
  188. LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
  189. << FMT_THIS_ADDR << ", ...) No datanodes available");
  190. handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
  191. return;
  192. }
  193. DatanodeInfoProto &chosen_dn = *it;
  194. std::string dnIpAddr = chosen_dn.id().ipaddr();
  195. std::string dnHostName = chosen_dn.id().hostname();
  196. uint64_t offset_within_block = offset - block->offset();
  197. uint64_t size_within_block = std::min<uint64_t>(
  198. block->b().numbytes() - offset_within_block, boost::asio::buffer_size(buffer));
  199. LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
  200. << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr
  201. << ", file path=\"" << path_ << "\", offset=" << std::to_string(offset) << ", read size=" << size_within_block);
  202. // This is where we will put the logic for re-using a DN connection; we can
  203. // steal the FileHandle's dn and put it back when we're done
  204. std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken());
  205. std::string dn_id = dn->uuid_;
  206. std::string client_name = *client_name_;
  207. // Wrap the DN in a block reader to handle the state and logic of the
  208. // block request protocol
  209. std::shared_ptr<BlockReader> reader;
  210. reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
  211. // Lambdas cannot capture copies of member variables so we'll make explicit
  212. // copies for it
  213. auto event_handlers = event_handlers_;
  214. auto path = path_;
  215. auto cluster_name = cluster_name_;
  216. auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
  217. event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
  218. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  219. if (event_resp.response_type() == event_response::kTest_Error) {
  220. handler(event_resp.status(), dn_id, transferred);
  221. return;
  222. }
  223. #endif
  224. handler(status, dn_id, transferred);
  225. };
  226. auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffer, reader, dn_id, client_name]
  227. (Status status, std::shared_ptr<DataNodeConnection> dn) {
  228. (void)dn;
  229. event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
  230. #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
  231. if (event_resp.response_type() == event_response::kTest_Error) {
  232. status = event_resp.status();
  233. }
  234. #endif
  235. if (status.ok()) {
  236. reader->AsyncReadBlock(
  237. client_name, *block, offset_within_block,
  238. boost::asio::buffer(buffer, size_within_block), read_handler);
  239. } else {
  240. handler(status, dn_id, 0);
  241. }
  242. };
  243. dn->Connect(connect_handler);
  244. return;
  245. }
  246. std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
  247. std::shared_ptr<DataNodeConnection> dn,
  248. std::shared_ptr<LibhdfsEvents> event_handlers)
  249. {
  250. std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers);
  251. LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
  252. << ", ..., dnconn=" << dn.get()
  253. << ") called. New BlockReader = " << reader.get());
  254. readers_.AddReader(reader);
  255. return reader;
  256. }
  257. std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
  258. std::shared_ptr<IoService> io_service,
  259. const ::hadoop::hdfs::DatanodeInfoProto & dn,
  260. const hadoop::common::TokenProto * token) {
  261. LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("
  262. << FMT_THIS_ADDR << ", ...) called");
  263. return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token, event_handlers_.get());
  264. }
  265. std::shared_ptr<LibhdfsEvents> FileHandleImpl::get_event_handlers() {
  266. return event_handlers_;
  267. }
  268. void FileHandleImpl::CancelOperations() {
  269. LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations("
  270. << FMT_THIS_ADDR << ") called");
  271. cancel_state_->set_canceled();
  272. /* Push update to BlockReaders that may be hung in an asio call */
  273. std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders();
  274. for(auto reader : live_readers) {
  275. reader->CancelOperation();
  276. }
  277. }
  278. void FileHandleImpl::SetFileEventCallback(file_event_callback callback) {
  279. std::shared_ptr<LibhdfsEvents> new_event_handlers;
  280. if (event_handlers_) {
  281. new_event_handlers = std::make_shared<LibhdfsEvents>(*event_handlers_);
  282. } else {
  283. new_event_handlers = std::make_shared<LibhdfsEvents>();
  284. }
  285. new_event_handlers->set_file_callback(callback);
  286. event_handlers_ = new_event_handlers;
  287. }
  288. bool FileHandle::ShouldExclude(const Status &s) {
  289. if (s.ok()) {
  290. return false;
  291. }
  292. switch (s.code()) {
  293. /* client side resource exhaustion */
  294. case Status::kResourceUnavailable:
  295. case Status::kOperationCanceled:
  296. return false;
  297. case Status::kInvalidArgument:
  298. case Status::kUnimplemented:
  299. case Status::kException:
  300. default:
  301. return true;
  302. }
  303. }
  304. uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_.load(); }
  305. void FileHandleImpl::clear_bytes_read() { bytes_read_.store(0); }
  306. }