filesystem.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
  19. #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
  20. #include "filehandle.h"
  21. #include "hdfspp/hdfspp.h"
  22. #include "fs/bad_datanode_tracker.h"
  23. #include "reader/block_reader.h"
  24. #include "reader/fileinfo.h"
  25. #include "asio.hpp"
  26. #include <thread>
  27. #include "namenode_operations.h"
  28. namespace hdfs {
  29. /*
  30. * FileSystem: The consumer's main point of interaction with the cluster as
  31. * a whole.
  32. *
  33. * Initially constructed in a disconnected state; call Connect before operating
  34. * on the FileSystem.
  35. *
  36. * All open files must be closed before the FileSystem is destroyed.
  37. *
  38. * Threading model: thread-safe for all operations
  39. * Lifetime: pointer created for consumer who is responsible for deleting it
  40. */
  41. class FileSystemImpl : public FileSystem {
  42. public:
  43. MEMCHECKED_CLASS(FileSystemImpl)
  44. explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
  45. explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options);
  46. ~FileSystemImpl() override;
  47. /* attempt to connect to namenode, return bad status on failure */
  48. void Connect(const std::string &server, const std::string &service,
  49. const std::function<void(const Status &, FileSystem *)> &handler) override;
  50. /* attempt to connect to namenode, return bad status on failure */
  51. Status Connect(const std::string &server, const std::string &service) override;
  52. /* Connect to the NN indicated in options.defaultFs */
  53. virtual void ConnectToDefaultFs(
  54. const std::function<void(const Status &, FileSystem *)> &handler) override;
  55. virtual Status ConnectToDefaultFs() override;
  56. virtual void Open(const std::string &path,
  57. const std::function<void(const Status &, FileHandle *)>
  58. &handler) override;
  59. Status Open(const std::string &path, FileHandle **handle) override;
  60. virtual void GetPreferredBlockSize(const std::string &path,
  61. const std::function<void(const Status &, const uint64_t &)> &handler) override;
  62. virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) override;
  63. virtual void SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) override;
  64. virtual Status SetReplication(const std::string & path, int16_t replication) override;
  65. void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function<void(const Status &)> handler) override;
  66. Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) override;
  67. void GetFileInfo(
  68. const std::string &path,
  69. const std::function<void(const Status &, const StatInfo &)> &handler) override;
  70. Status GetFileInfo(const std::string &path, StatInfo & stat_info) override;
  71. /**
  72. * Retrieves the file system information such as the total raw size of all files in the filesystem
  73. * and the raw capacity of the filesystem
  74. *
  75. * @param FsInfo struct to be populated by GetFsStats
  76. **/
  77. void GetFsStats(
  78. const std::function<void(const Status &, const FsInfo &)> &handler) override;
  79. Status GetFsStats(FsInfo & fs_info) override;
  80. void GetListing(
  81. const std::string &path,
  82. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override;
  83. Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) override;
  84. virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
  85. const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
  86. virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
  87. std::shared_ptr<FileBlockLocation> * locations) override;
  88. virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
  89. std::function<void(const Status &)> handler) override;
  90. virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) override;
  91. virtual void Delete(const std::string &path, bool recursive,
  92. const std::function<void(const Status &)> &handler) override;
  93. virtual Status Delete(const std::string &path, bool recursive) override;
  94. virtual void Rename(const std::string &oldPath, const std::string &newPath,
  95. const std::function<void(const Status &)> &handler) override;
  96. virtual Status Rename(const std::string &oldPath, const std::string &newPath) override;
  97. virtual void SetPermission(const std::string & path, uint16_t permissions,
  98. const std::function<void(const Status &)> &handler) override;
  99. virtual Status SetPermission(const std::string & path, uint16_t permissions) override;
  100. virtual void SetOwner(const std::string & path, const std::string & username,
  101. const std::string & groupname, const std::function<void(const Status &)> &handler) override;
  102. virtual Status SetOwner(const std::string & path,
  103. const std::string & username, const std::string & groupname) override;
  104. void Find(
  105. const std::string &path, const std::string &name, const uint32_t maxdepth,
  106. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override;
  107. Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) override;
  108. /*****************************************************************************
  109. * FILE SYSTEM SNAPSHOT FUNCTIONS
  110. ****************************************************************************/
  111. /**
  112. * Creates a snapshot of a snapshottable directory specified by path
  113. *
  114. * @param path Path to the directory to be snapshotted (must be non-empty)
  115. * @param name Name to be given to the created snapshot (may be empty)
  116. **/
  117. void CreateSnapshot(const std::string &path, const std::string &name,
  118. const std::function<void(const Status &)> &handler) override;
  119. Status CreateSnapshot(const std::string &path, const std::string &name) override;
  120. /**
  121. * Deletes the directory snapshot specified by path and name
  122. *
  123. * @param path Path to the snapshotted directory (must be non-empty)
  124. * @param name Name of the snapshot to be deleted (must be non-empty)
  125. **/
  126. void DeleteSnapshot(const std::string &path, const std::string &name,
  127. const std::function<void(const Status &)> &handler) override;
  128. Status DeleteSnapshot(const std::string &path, const std::string &name) override;
  129. /**
  130. * Allows snapshots to be made on the specified directory
  131. *
  132. * @param path Path to the directory to be made snapshottable (must be non-empty)
  133. **/
  134. void AllowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override;
  135. Status AllowSnapshot(const std::string &path) override;
  136. /**
  137. * Disallows snapshots to be made on the specified directory
  138. *
  139. * @param path Path to the directory to be made non-snapshottable (must be non-empty)
  140. **/
  141. void DisallowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override;
  142. Status DisallowSnapshot(const std::string &path) override;
  143. void SetFsEventCallback(fs_event_callback callback) override;
  144. /* add a new thread to handle asio requests, return number of threads in pool
  145. */
  146. int AddWorkerThread();
  147. /* how many worker threads are servicing asio requests */
  148. int WorkerThreadCount();
  149. /* all monitored events will need to lookup handlers */
  150. std::shared_ptr<LibhdfsEvents> get_event_handlers();
  151. Options get_options();
  152. private:
  153. /**
  154. * The IoService must be the first member variable to ensure that it gets
  155. * destroyed last. This allows other members to dequeue things from the
  156. * service in their own destructors.
  157. **/
  158. std::shared_ptr<IoServiceImpl> io_service_;
  159. const Options options_;
  160. const std::string client_name_;
  161. std::string cluster_name_;
  162. NameNodeOperations nn_;
  163. std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
  164. /**
  165. * Runtime event monitoring handlers.
  166. * Note: This is really handy to have for advanced usage but
  167. * exposes implementation details that may change at any time.
  168. **/
  169. std::shared_ptr<LibhdfsEvents> event_handlers_;
  170. void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
  171. std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler);
  172. struct FindSharedState {
  173. //Name pattern (can have wild-cards) to find
  174. const std::string name;
  175. //Maximum depth to recurse after the end of path is reached.
  176. //Can be set to 0 for pure path globbing and ignoring name pattern entirely.
  177. const uint32_t maxdepth;
  178. //Vector of all sub-directories from the path argument (each can have wild-cards)
  179. std::vector<std::string> dirs;
  180. //Callback from Find
  181. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
  182. //outstanding_requests is incremented once for every GetListing call.
  183. std::atomic<uint64_t> outstanding_requests;
  184. //Boolean needed to abort all recursion on error or on user command
  185. std::atomic<bool> aborted;
  186. //Shared variables will need protection with a lock
  187. std::mutex lock;
  188. FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
  189. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
  190. uint64_t outstanding_recuests_, bool aborted_)
  191. : name(name_),
  192. maxdepth(maxdepth_),
  193. handler(handler_),
  194. outstanding_requests(outstanding_recuests_),
  195. aborted(aborted_),
  196. lock() {
  197. //Constructing the list of sub-directories
  198. std::stringstream ss(path_);
  199. if(path_.back() != '/'){
  200. ss << "/";
  201. }
  202. for (std::string token; std::getline(ss, token, '/'); ) {
  203. dirs.push_back(token);
  204. }
  205. }
  206. };
  207. struct FindOperationalState {
  208. const std::string path;
  209. const uint32_t depth;
  210. const bool search_path;
  211. FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
  212. : path(path_),
  213. depth(depth_),
  214. search_path(search_path_) {
  215. }
  216. };
  217. void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos,
  218. bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state);
  219. };
  220. }
  221. #endif