filehandle.h 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
  19. #define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
  20. #include "hdfspp/ioservice.h"
  21. #include "common/async_stream.h"
  22. #include "common/cancel_tracker.h"
  23. #include "common/libhdfs_events_impl.h"
  24. #include "common/new_delete.h"
  25. #include "reader/fileinfo.h"
  26. #include "reader/readergroup.h"
  27. #include "bad_datanode_tracker.h"
  28. #include "ClientNamenodeProtocol.pb.h"
  29. #include <mutex>
  30. namespace hdfs {
  31. class BlockReader;
  32. struct BlockReaderOptions;
  33. class DataNodeConnection;
  34. /*
  35. * FileHandle: coordinates operations on a particular file in HDFS
  36. *
  37. * Threading model: not thread-safe; consumers and io_service should not call
  38. * concurrently. PositionRead is the exceptions; they can be
  39. * called concurrently and repeatedly.
  40. * Lifetime: pointer returned to consumer by FileSystem::Open. Consumer is
  41. * resonsible for freeing the object.
  42. */
  43. class FileHandleImpl : public FileHandle {
  44. public:
  45. MEMCHECKED_CLASS(FileHandleImpl)
  46. FileHandleImpl(const std::string & cluster_name,
  47. const std::string & path,
  48. std::shared_ptr<IoService> io_service, const std::string &client_name,
  49. const std::shared_ptr<const struct FileInfo> file_info,
  50. std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
  51. std::shared_ptr<LibhdfsEvents> event_handlers);
  52. /*
  53. * Reads the file at the specified offset into the buffer.
  54. * bytes_read returns the number of bytes successfully read on success
  55. * and on error. Status::InvalidOffset is returned when trying to begin
  56. * a read past the EOF.
  57. */
  58. void PositionRead(
  59. void *buf,
  60. size_t buf_size,
  61. uint64_t offset,
  62. const std::function<void(const Status &status, size_t bytes_read)> &handler
  63. ) override;
  64. /**
  65. * Reads the file at the specified offset into the buffer.
  66. * @param buf output buffer
  67. * @param buf_size size of the output buffer
  68. * @param offset offset at which to start reading
  69. * @param bytes_read number of bytes successfully read
  70. */
  71. Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) override;
  72. Status Read(void *buf, size_t buf_size, size_t *bytes_read) override;
  73. Status Seek(off_t *offset, std::ios_base::seekdir whence) override;
  74. /*
  75. * Reads some amount of data into the buffer. Will attempt to find the best
  76. * datanode and read data from it.
  77. *
  78. * If an error occurs during connection or transfer, the callback will be
  79. * called with bytes_read equal to the number of bytes successfully transferred.
  80. * If no data nodes can be found, status will be Status::ResourceUnavailable.
  81. * If trying to begin a read past the EOF, status will be Status::InvalidOffset.
  82. *
  83. */
  84. void AsyncPreadSome(size_t offset, const MutableBuffer &buffer,
  85. std::shared_ptr<NodeExclusionRule> excluded_nodes,
  86. const std::function<void(const Status &status,
  87. const std::string &dn_id, size_t bytes_read)> handler);
  88. /**
  89. * Cancels all operations instantiated from this FileHandle.
  90. * Will set a flag to abort continuation pipelines when they try to move to the next step.
  91. * Closes TCP connections to Datanode in order to abort pipelines waiting on slow IO.
  92. **/
  93. virtual void CancelOperations(void) override;
  94. virtual void SetFileEventCallback(file_event_callback callback) override;
  95. /**
  96. * Ephemeral objects created by the filehandle will need to get the event
  97. * handler registry owned by the FileSystem.
  98. **/
  99. std::shared_ptr<LibhdfsEvents> get_event_handlers();
  100. /* how many bytes have been successfully read */
  101. virtual uint64_t get_bytes_read() override;
  102. /* resets the number of bytes read to zero */
  103. virtual void clear_bytes_read() override;
  104. protected:
  105. virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
  106. std::shared_ptr<DataNodeConnection> dn,
  107. std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
  108. virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
  109. std::shared_ptr<IoService> io_service,
  110. const ::hadoop::hdfs::DatanodeInfoProto & dn,
  111. const hadoop::common::TokenProto * token);
  112. private:
  113. const std::string cluster_name_;
  114. const std::string path_;
  115. std::shared_ptr<IoService> io_service_;
  116. const std::string client_name_;
  117. const std::shared_ptr<const struct FileInfo> file_info_;
  118. std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
  119. bool CheckSeekBounds(ssize_t desired_position);
  120. off_t offset_;
  121. CancelHandle cancel_state_;
  122. ReaderGroup readers_;
  123. std::shared_ptr<LibhdfsEvents> event_handlers_;
  124. std::atomic<uint64_t> bytes_read_;
  125. };
  126. }
  127. #endif