123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
- #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
- #include "filehandle.h"
- #include "hdfspp/hdfspp.h"
- #include "fs/bad_datanode_tracker.h"
- #include "reader/block_reader.h"
- #include "reader/fileinfo.h"
- #include "asio.hpp"
- #include <thread>
- #include "namenode_operations.h"
- namespace hdfs {
- /*
- * FileSystem: The consumer's main point of interaction with the cluster as
- * a whole.
- *
- * Initially constructed in a disconnected state; call Connect before operating
- * on the FileSystem.
- *
- * All open files must be closed before the FileSystem is destroyed.
- *
- * Threading model: thread-safe for all operations
- * Lifetime: pointer created for consumer who is responsible for deleting it
- */
- class FileSystemImpl : public FileSystem {
- public:
- MEMCHECKED_CLASS(FileSystemImpl)
- explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
- explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options);
- ~FileSystemImpl() override;
- /* attempt to connect to namenode, return bad status on failure */
- void Connect(const std::string &server, const std::string &service,
- const std::function<void(const Status &, FileSystem *)> &handler) override;
- /* attempt to connect to namenode, return bad status on failure */
- Status Connect(const std::string &server, const std::string &service) override;
- /* Connect to the NN indicated in options.defaultFs */
- virtual void ConnectToDefaultFs(
- const std::function<void(const Status &, FileSystem *)> &handler) override;
- virtual Status ConnectToDefaultFs() override;
- virtual void Open(const std::string &path,
- const std::function<void(const Status &, FileHandle *)>
- &handler) override;
- Status Open(const std::string &path, FileHandle **handle) override;
- virtual void GetPreferredBlockSize(const std::string &path,
- const std::function<void(const Status &, const uint64_t &)> &handler) override;
- virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) override;
- virtual void SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) override;
- virtual Status SetReplication(const std::string & path, int16_t replication) override;
- void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function<void(const Status &)> handler) override;
- Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) override;
- void GetFileInfo(
- const std::string &path,
- const std::function<void(const Status &, const StatInfo &)> &handler) override;
- Status GetFileInfo(const std::string &path, StatInfo & stat_info) override;
- /**
- * Retrieves the file system information such as the total raw size of all files in the filesystem
- * and the raw capacity of the filesystem
- *
- * @param FsInfo struct to be populated by GetFsStats
- **/
- void GetFsStats(
- const std::function<void(const Status &, const FsInfo &)> &handler) override;
- Status GetFsStats(FsInfo & fs_info) override;
- void GetListing(
- const std::string &path,
- const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override;
- Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) override;
- virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
- const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
- virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
- std::shared_ptr<FileBlockLocation> * locations) override;
- virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
- std::function<void(const Status &)> handler) override;
- virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) override;
- virtual void Delete(const std::string &path, bool recursive,
- const std::function<void(const Status &)> &handler) override;
- virtual Status Delete(const std::string &path, bool recursive) override;
- virtual void Rename(const std::string &oldPath, const std::string &newPath,
- const std::function<void(const Status &)> &handler) override;
- virtual Status Rename(const std::string &oldPath, const std::string &newPath) override;
- virtual void SetPermission(const std::string & path, uint16_t permissions,
- const std::function<void(const Status &)> &handler) override;
- virtual Status SetPermission(const std::string & path, uint16_t permissions) override;
- virtual void SetOwner(const std::string & path, const std::string & username,
- const std::string & groupname, const std::function<void(const Status &)> &handler) override;
- virtual Status SetOwner(const std::string & path,
- const std::string & username, const std::string & groupname) override;
- void Find(
- const std::string &path, const std::string &name, const uint32_t maxdepth,
- const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override;
- Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) override;
- /*****************************************************************************
- * FILE SYSTEM SNAPSHOT FUNCTIONS
- ****************************************************************************/
- /**
- * Creates a snapshot of a snapshottable directory specified by path
- *
- * @param path Path to the directory to be snapshotted (must be non-empty)
- * @param name Name to be given to the created snapshot (may be empty)
- **/
- void CreateSnapshot(const std::string &path, const std::string &name,
- const std::function<void(const Status &)> &handler) override;
- Status CreateSnapshot(const std::string &path, const std::string &name) override;
- /**
- * Deletes the directory snapshot specified by path and name
- *
- * @param path Path to the snapshotted directory (must be non-empty)
- * @param name Name of the snapshot to be deleted (must be non-empty)
- **/
- void DeleteSnapshot(const std::string &path, const std::string &name,
- const std::function<void(const Status &)> &handler) override;
- Status DeleteSnapshot(const std::string &path, const std::string &name) override;
- /**
- * Allows snapshots to be made on the specified directory
- *
- * @param path Path to the directory to be made snapshottable (must be non-empty)
- **/
- void AllowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override;
- Status AllowSnapshot(const std::string &path) override;
- /**
- * Disallows snapshots to be made on the specified directory
- *
- * @param path Path to the directory to be made non-snapshottable (must be non-empty)
- **/
- void DisallowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override;
- Status DisallowSnapshot(const std::string &path) override;
- void SetFsEventCallback(fs_event_callback callback) override;
- /* add a new thread to handle asio requests, return number of threads in pool
- */
- int AddWorkerThread();
- /* how many worker threads are servicing asio requests */
- int WorkerThreadCount();
- /* all monitored events will need to lookup handlers */
- std::shared_ptr<LibhdfsEvents> get_event_handlers();
- Options get_options();
- private:
- /**
- * The IoService must be the first member variable to ensure that it gets
- * destroyed last. This allows other members to dequeue things from the
- * service in their own destructors.
- **/
- std::shared_ptr<IoServiceImpl> io_service_;
- const Options options_;
- const std::string client_name_;
- std::string cluster_name_;
- NameNodeOperations nn_;
- std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
- /**
- * Runtime event monitoring handlers.
- * Note: This is really handy to have for advanced usage but
- * exposes implementation details that may change at any time.
- **/
- std::shared_ptr<LibhdfsEvents> event_handlers_;
- void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
- std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler);
- struct FindSharedState {
- //Name pattern (can have wild-cards) to find
- const std::string name;
- //Maximum depth to recurse after the end of path is reached.
- //Can be set to 0 for pure path globbing and ignoring name pattern entirely.
- const uint32_t maxdepth;
- //Vector of all sub-directories from the path argument (each can have wild-cards)
- std::vector<std::string> dirs;
- //Callback from Find
- const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
- //outstanding_requests is incremented once for every GetListing call.
- std::atomic<uint64_t> outstanding_requests;
- //Boolean needed to abort all recursion on error or on user command
- std::atomic<bool> aborted;
- //Shared variables will need protection with a lock
- std::mutex lock;
- FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
- const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
- uint64_t outstanding_recuests_, bool aborted_)
- : name(name_),
- maxdepth(maxdepth_),
- handler(handler_),
- outstanding_requests(outstanding_recuests_),
- aborted(aborted_),
- lock() {
- //Constructing the list of sub-directories
- std::stringstream ss(path_);
- if(path_.back() != '/'){
- ss << "/";
- }
- for (std::string token; std::getline(ss, token, '/'); ) {
- dirs.push_back(token);
- }
- }
- };
- struct FindOperationalState {
- const std::string path;
- const uint32_t depth;
- const bool search_path;
- FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
- : path(path_),
- depth(depth_),
- search_path(search_path_) {
- }
- };
- void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos,
- bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state);
- };
- }
- #endif
|