filesystem.cc 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "filesystem.h"
  19. #include "filehandle.h"
  20. #include "common/namenode_info.h"
  21. #include <functional>
  22. #include <limits>
  23. #include <future>
  24. #include <tuple>
  25. #include <pwd.h>
  26. #include <boost/asio/ip/tcp.hpp>
  27. #include "x-platform/syscall.h"
  28. #define FMT_THIS_ADDR "this=" << (void*)this
  29. namespace hdfs {
  30. static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
  31. static const int kNamenodeProtocolVersion = 1;
  32. using boost::asio::ip::tcp;
  33. static constexpr uint16_t kDefaultPort = 8020;
  34. // forward declarations
  35. const std::string get_effective_user_name(const std::string &);
  36. uint32_t FileSystem::GetDefaultFindMaxDepth() {
  37. return std::numeric_limits<uint32_t>::max();
  38. }
  39. uint16_t FileSystem::GetDefaultPermissionMask() {
  40. return 0755;
  41. }
  42. Status FileSystem::CheckValidPermissionMask(uint16_t permissions) {
  43. if (permissions > 01777) {
  44. std::stringstream errormsg;
  45. errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct
  46. << std::showbase << permissions << " (should be between 0 and 01777)";
  47. return Status::InvalidArgument(errormsg.str().c_str());
  48. }
  49. return Status::OK();
  50. }
  51. Status FileSystem::CheckValidReplication(uint16_t replication) {
  52. if (replication < 1 || replication > 512) {
  53. std::stringstream errormsg;
  54. errormsg << "CheckValidReplication: argument 'replication' is "
  55. << replication << " (should be between 1 and 512)";
  56. return Status::InvalidArgument(errormsg.str().c_str());
  57. }
  58. return Status::OK();
  59. }
  60. FileSystem::~FileSystem() {}
  61. /*****************************************************************************
  62. * FILESYSTEM BASE CLASS
  63. ****************************************************************************/
  64. FileSystem *FileSystem::New(
  65. IoService *&io_service, const std::string &user_name, const Options &options) {
  66. return new FileSystemImpl(io_service, user_name, options);
  67. }
  68. FileSystem *FileSystem::New(
  69. std::shared_ptr<IoService> io_service, const std::string &user_name, const Options &options) {
  70. return new FileSystemImpl(io_service, user_name, options);
  71. }
  72. FileSystem *FileSystem::New() {
  73. // No, this pointer won't be leaked. The FileSystem takes ownership.
  74. std::shared_ptr<IoService> io_service = IoService::MakeShared();
  75. if(!io_service)
  76. return nullptr;
  77. int thread_count = io_service->InitDefaultWorkers();
  78. if(thread_count < 1)
  79. return nullptr;
  80. std::string user_name = get_effective_user_name("");
  81. Options options;
  82. return new FileSystemImpl(io_service, user_name, options);
  83. }
  84. /*****************************************************************************
  85. * FILESYSTEM IMPLEMENTATION
  86. ****************************************************************************/
  87. struct FileSystemImpl::FindSharedState {
  88. //Name pattern (can have wild-cards) to find
  89. const std::string name;
  90. //Maximum depth to recurse after the end of path is reached.
  91. //Can be set to 0 for pure path globbing and ignoring name pattern entirely.
  92. const uint32_t maxdepth;
  93. //Vector of all sub-directories from the path argument (each can have wild-cards)
  94. std::vector<std::string> dirs;
  95. //Callback from Find
  96. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
  97. //outstanding_requests is incremented once for every GetListing call.
  98. std::atomic<uint64_t> outstanding_requests;
  99. //Boolean needed to abort all recursion on error or on user command
  100. std::atomic<bool> aborted;
  101. //Shared variables will need protection with a lock
  102. std::mutex lock;
  103. FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
  104. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
  105. uint64_t outstanding_recuests_, bool aborted_)
  106. : name(name_),
  107. maxdepth(maxdepth_),
  108. handler(handler_),
  109. outstanding_requests(outstanding_recuests_),
  110. aborted(aborted_),
  111. lock() {
  112. //Constructing the list of sub-directories
  113. std::stringstream ss(path_);
  114. if(path_.back() != '/'){
  115. ss << "/";
  116. }
  117. for (std::string token; std::getline(ss, token, '/'); ) {
  118. dirs.push_back(token);
  119. }
  120. }
  121. };
  122. struct FileSystemImpl::FindOperationalState {
  123. const std::string path;
  124. const uint32_t depth;
  125. const bool search_path;
  126. FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
  127. : path(path_),
  128. depth(depth_),
  129. search_path(search_path_) {
  130. }
  131. };
  132. const std::string get_effective_user_name(const std::string &user_name) {
  133. if (!user_name.empty())
  134. return user_name;
  135. // If no user name was provided, try the HADOOP_USER_NAME and USER environment
  136. // variables
  137. const char * env = getenv("HADOOP_USER_NAME");
  138. if (env) {
  139. return env;
  140. }
  141. env = getenv("USER");
  142. if (env) {
  143. return env;
  144. }
  145. // If running on POSIX, use the currently logged in user
  146. #if defined(_POSIX_VERSION)
  147. uid_t uid = geteuid();
  148. struct passwd *pw = getpwuid(uid);
  149. if (pw && pw->pw_name)
  150. {
  151. return pw->pw_name;
  152. }
  153. #endif
  154. return "unknown_user";
  155. }
  156. FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) :
  157. io_service_(io_service), options_(options),
  158. client_name_(GetRandomClientName()),
  159. nn_(
  160. io_service_, options, client_name_,
  161. get_effective_user_name(user_name), kNamenodeProtocol,
  162. kNamenodeProtocolVersion
  163. ),
  164. bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
  165. event_handlers_(std::make_shared<LibhdfsEvents>())
  166. {
  167. LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
  168. << FMT_THIS_ADDR << ") called");
  169. // Poor man's move
  170. io_service = nullptr;
  171. unsigned int running_workers = 0;
  172. if(options.io_threads_ < 1) {
  173. LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl Initializing default number of worker threads");
  174. running_workers = io_service_->InitDefaultWorkers();
  175. } else {
  176. LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystenImpl Initializing " << options_.io_threads_ << " worker threads.");
  177. running_workers = io_service->InitWorkers(options_.io_threads_);
  178. }
  179. if(running_workers < 1) {
  180. LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl was unable to start worker threads");
  181. }
  182. }
  183. FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) :
  184. io_service_(io_service), options_(options),
  185. client_name_(GetRandomClientName()),
  186. nn_(
  187. io_service_, options, client_name_,
  188. get_effective_user_name(user_name), kNamenodeProtocol,
  189. kNamenodeProtocolVersion
  190. ),
  191. bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
  192. event_handlers_(std::make_shared<LibhdfsEvents>())
  193. {
  194. LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
  195. << FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called");
  196. int worker_thread_count = io_service_->GetWorkerThreadCount();
  197. if(worker_thread_count < 1) {
  198. LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. "
  199. << "It needs at least 1 worker to connect to an HDFS cluster.")
  200. } else {
  201. LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl using " << worker_thread_count << " worker threads.");
  202. }
  203. }
  204. FileSystemImpl::~FileSystemImpl() {
  205. LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl("
  206. << FMT_THIS_ADDR << ") called");
  207. /**
  208. * Note: IoService must be stopped before getting rid of worker threads.
  209. * Once worker threads are joined and deleted the service can be deleted.
  210. **/
  211. io_service_->Stop();
  212. }
  213. void FileSystemImpl::Connect(const std::string &server,
  214. const std::string &service,
  215. const std::function<void(const Status &, FileSystem * fs)> &handler) {
  216. LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
  217. << ", server=" << server << ", service="
  218. << service << ") called");
  219. connect_callback_.SetCallback(handler);
  220. /* IoService::New can return nullptr */
  221. if (!io_service_) {
  222. handler (Status::Error("Null IoService"), this);
  223. }
  224. // DNS lookup here for namenode(s)
  225. std::vector<ResolvedNamenodeInfo> resolved_namenodes;
  226. auto name_service = options_.services.find(server);
  227. if(name_service != options_.services.end()) {
  228. cluster_name_ = name_service->first;
  229. resolved_namenodes = BulkResolve(io_service_, name_service->second);
  230. } else {
  231. cluster_name_ = server + ":" + service;
  232. // tmp namenode info just to get this in the right format for BulkResolve
  233. NamenodeInfo tmp_info;
  234. try {
  235. tmp_info.uri = URI::parse_from_string("hdfs://" + cluster_name_);
  236. } catch (const uri_parse_error& e) {
  237. LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << cluster_name_);
  238. handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this);
  239. }
  240. resolved_namenodes = BulkResolve(io_service_, {tmp_info});
  241. }
  242. for(unsigned int i=0;i<resolved_namenodes.size();i++) {
  243. LOG_DEBUG(kFileSystem, << "Resolved Namenode");
  244. LOG_DEBUG(kFileSystem, << resolved_namenodes[i].str());
  245. }
  246. nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this](const Status & s) {
  247. connect_callback_.GetCallback()(s, this);
  248. });
  249. }
  250. void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) {
  251. std::string scheme = options_.defaultFS.get_scheme();
  252. if (!XPlatform::Syscall::StringCompareIgnoreCase(scheme, "hdfs")) {
  253. std::string error_message;
  254. error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not supported";
  255. handler(Status::InvalidArgument(error_message.c_str()), nullptr);
  256. return;
  257. }
  258. std::string host = options_.defaultFS.get_host();
  259. if (host.empty()) {
  260. handler(Status::InvalidArgument("defaultFS must specify a hostname"), nullptr);
  261. return;
  262. }
  263. int16_t port = options_.defaultFS.get_port_or_default(kDefaultPort);
  264. std::string port_as_string = std::to_string(port);
  265. Connect(host, port_as_string, handler);
  266. }
  267. int FileSystemImpl::AddWorkerThread() {
  268. LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
  269. << FMT_THIS_ADDR << ") called."
  270. << " Existing thread count = " << WorkerThreadCount());
  271. if(!io_service_)
  272. return -1;
  273. io_service_->AddWorkerThread();
  274. return 1;
  275. }
  276. int FileSystemImpl::WorkerThreadCount() {
  277. if(!io_service_) {
  278. return -1;
  279. } else {
  280. return io_service_->GetWorkerThreadCount();
  281. }
  282. }
  283. bool FileSystemImpl::CancelPendingConnect() {
  284. if(connect_callback_.IsCallbackAccessed()) {
  285. // Temp fix for failover hangs, allow CancelPendingConnect to be called so it can push a flag through the RPC engine
  286. LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called after Connect completed");
  287. return nn_.CancelPendingConnect();
  288. }
  289. if(!connect_callback_.IsCallbackSet()) {
  290. LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called before Connect started");
  291. return false;
  292. }
  293. // First invoke callback, then do proper teardown in RpcEngine and RpcConnection
  294. ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) {
  295. LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled FileSystem@" << fs << "::Connect with status: " << stat.ToString());
  296. };
  297. bool callback_swapped = false;
  298. ConnectCallback original_callback = connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped);
  299. if(callback_swapped) {
  300. // Take original callback and invoke it as if it was canceled.
  301. LOG_DEBUG(kFileSystem, << "Swapped in dummy callback. Invoking connect callback with canceled status.");
  302. std::function<void(void)> wrapped_callback = [original_callback, this](){
  303. // handling code expected to check status before dereferenceing 'this'
  304. original_callback(Status::Canceled(), this);
  305. };
  306. io_service_->PostTask(wrapped_callback);
  307. } else {
  308. LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect. It hasn't been invoked yet or may have already completed.")
  309. return false;
  310. }
  311. // Now push cancel down to clean up where possible and make sure the RpcEngine
  312. // won't try to do retries in the background. The rest of the memory cleanup
  313. // happens when this FileSystem is deleted by the user.
  314. return nn_.CancelPendingConnect();
  315. }
  316. void FileSystemImpl::Open(
  317. const std::string &path,
  318. const std::function<void(const Status &, FileHandle *)> &handler) {
  319. LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open("
  320. << FMT_THIS_ADDR << ", path="
  321. << path << ") called");
  322. nn_.GetBlockLocations(path, 0, std::numeric_limits<int64_t>::max(), [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
  323. if(!stat.ok()) {
  324. LOG_DEBUG(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString());
  325. if(stat.get_server_exception_type() == Status::kStandbyException) {
  326. LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode");
  327. }
  328. }
  329. handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, io_service_, client_name_, file_info, bad_node_tracker_, event_handlers_)
  330. : nullptr);
  331. });
  332. }
  333. BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
  334. {
  335. BlockLocation result;
  336. result.setCorrupt(locatedBlock.corrupt());
  337. result.setOffset(locatedBlock.offset());
  338. std::vector<DNInfo> dn_info;
  339. dn_info.reserve(locatedBlock.locs_size());
  340. for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: locatedBlock.locs()) {
  341. const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id();
  342. DNInfo newInfo;
  343. if (id.has_ipaddr())
  344. newInfo.setIPAddr(id.ipaddr());
  345. if (id.has_hostname())
  346. newInfo.setHostname(id.hostname());
  347. if (id.has_xferport())
  348. newInfo.setXferPort(id.xferport());
  349. if (id.has_infoport())
  350. newInfo.setInfoPort(id.infoport());
  351. if (id.has_ipcport())
  352. newInfo.setIPCPort(id.ipcport());
  353. if (id.has_infosecureport())
  354. newInfo.setInfoSecurePort(id.infosecureport());
  355. if (datanode_info.has_location())
  356. newInfo.setNetworkLocation(datanode_info.location());
  357. dn_info.push_back(newInfo);
  358. }
  359. result.setDataNodes(dn_info);
  360. if (locatedBlock.has_b()) {
  361. const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b();
  362. result.setLength(b.numbytes());
  363. }
  364. return result;
  365. }
  366. void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
  367. const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
  368. {
  369. LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
  370. << FMT_THIS_ADDR << ", path="
  371. << path << ") called");
  372. //Protobuf gives an error 'Negative value is not supported'
  373. //if the high bit is set in uint64 in GetBlockLocations
  374. if (IsHighBitSet(offset)) {
  375. handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr);
  376. return;
  377. }
  378. if (IsHighBitSet(length)) {
  379. handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr);
  380. return;
  381. }
  382. auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
  383. if (status.ok()) {
  384. auto result = std::make_shared<FileBlockLocation>();
  385. result->setFileLength(fileInfo->file_length_);
  386. result->setLastBlockComplete(fileInfo->last_block_complete_);
  387. result->setUnderConstruction(fileInfo->under_construction_);
  388. std::vector<BlockLocation> blocks;
  389. for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: fileInfo->blocks_) {
  390. auto newLocation = LocatedBlockToBlockLocation(locatedBlock);
  391. blocks.push_back(newLocation);
  392. }
  393. result->setBlockLocations(blocks);
  394. handler(status, result);
  395. } else {
  396. handler(status, std::shared_ptr<FileBlockLocation>());
  397. }
  398. };
  399. nn_.GetBlockLocations(path, offset, length, conversion);
  400. }
  401. void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
  402. const std::function<void(const Status &, const uint64_t &)> &handler) {
  403. LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize("
  404. << FMT_THIS_ADDR << ", path="
  405. << path << ") called");
  406. nn_.GetPreferredBlockSize(path, handler);
  407. }
  408. void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) {
  409. LOG_DEBUG(kFileSystem,
  410. << "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
  411. ", replication=" << replication << ") called");
  412. if (path.empty()) {
  413. handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
  414. return;
  415. }
  416. Status replStatus = FileSystem::CheckValidReplication(replication);
  417. if (!replStatus.ok()) {
  418. handler(replStatus);
  419. return;
  420. }
  421. nn_.SetReplication(path, replication, handler);
  422. }
  423. void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
  424. std::function<void(const Status &)> handler) {
  425. LOG_DEBUG(kFileSystem,
  426. << "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
  427. ", mtime=" << mtime << ", atime=" << atime << ") called");
  428. if (path.empty()) {
  429. handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty"));
  430. return;
  431. }
  432. nn_.SetTimes(path, mtime, atime, handler);
  433. }
  434. void FileSystemImpl::GetFileInfo(
  435. const std::string &path,
  436. const std::function<void(const Status &, const StatInfo &)> &handler) {
  437. LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo("
  438. << FMT_THIS_ADDR << ", path="
  439. << path << ") called");
  440. nn_.GetFileInfo(path, handler);
  441. }
  442. void FileSystemImpl::GetContentSummary(
  443. const std::string &path,
  444. const std::function<void(const Status &, const ContentSummary &)> &handler) {
  445. LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetContentSummary("
  446. << FMT_THIS_ADDR << ", path="
  447. << path << ") called");
  448. nn_.GetContentSummary(path, handler);
  449. }
  450. void FileSystemImpl::GetFsStats(
  451. const std::function<void(const Status &, const FsInfo &)> &handler) {
  452. LOG_DEBUG(kFileSystem,
  453. << "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called");
  454. nn_.GetFsStats(handler);
  455. }
  456. /**
  457. * Helper function for recursive GetListing calls.
  458. *
  459. * Some compilers don't like recursive lambdas, so we make the lambda call a
  460. * method, which in turn creates a lambda calling itself.
  461. */
  462. void FileSystemImpl::GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
  463. std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
  464. bool has_next = !stat_infos.empty();
  465. bool get_more = handler(stat, stat_infos, has_more && has_next);
  466. if (get_more && has_more && has_next ) {
  467. auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
  468. GetListingShim(stat, stat_infos, has_more, path, handler);
  469. };
  470. std::string last = stat_infos.back().path;
  471. nn_.GetListing(path, callback, last);
  472. }
  473. }
  474. void FileSystemImpl::GetListing(
  475. const std::string &path,
  476. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
  477. LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetListing("
  478. << FMT_THIS_ADDR << ", path="
  479. << path << ") called");
  480. std::string path_fixed = path;
  481. if(path.back() != '/'){
  482. path_fixed += "/";
  483. }
  484. // Caputure the state and push it into the shim
  485. auto callback = [this, path_fixed, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
  486. GetListingShim(stat, stat_infos, has_more, path_fixed, handler);
  487. };
  488. nn_.GetListing(path_fixed, callback);
  489. }
  490. void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
  491. std::function<void(const Status &)> handler) {
  492. LOG_DEBUG(kFileSystem,
  493. << "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
  494. ", permissions=" << permissions << ", createparent=" << createparent << ") called");
  495. if (path.empty()) {
  496. handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty"));
  497. return;
  498. }
  499. Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
  500. if (!permStatus.ok()) {
  501. handler(permStatus);
  502. return;
  503. }
  504. nn_.Mkdirs(path, permissions, createparent, handler);
  505. }
  506. void FileSystemImpl::Delete(const std::string &path, bool recursive,
  507. const std::function<void(const Status &)> &handler) {
  508. LOG_DEBUG(kFileSystem,
  509. << "FileSystemImpl::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
  510. if (path.empty()) {
  511. handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty"));
  512. return;
  513. }
  514. nn_.Delete(path, recursive, handler);
  515. }
  516. void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath,
  517. const std::function<void(const Status &)> &handler) {
  518. LOG_DEBUG(kFileSystem,
  519. << "FileSystemImpl::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
  520. if (oldPath.empty()) {
  521. handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty"));
  522. return;
  523. }
  524. if (newPath.empty()) {
  525. handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty"));
  526. return;
  527. }
  528. nn_.Rename(oldPath, newPath, handler);
  529. }
  530. void FileSystemImpl::SetPermission(const std::string & path,
  531. uint16_t permissions, const std::function<void(const Status &)> &handler) {
  532. LOG_DEBUG(kFileSystem,
  533. << "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
  534. if (path.empty()) {
  535. handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
  536. return;
  537. }
  538. Status permStatus = FileSystem::CheckValidPermissionMask(permissions);
  539. if (!permStatus.ok()) {
  540. handler(permStatus);
  541. return;
  542. }
  543. nn_.SetPermission(path, permissions, handler);
  544. }
  545. void FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
  546. const std::string & groupname, const std::function<void(const Status &)> &handler) {
  547. LOG_DEBUG(kFileSystem,
  548. << "FileSystemImpl::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
  549. if (path.empty()) {
  550. handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty"));
  551. return;
  552. }
  553. nn_.SetOwner(path, username, groupname, handler);
  554. }
  555. /**
  556. * Helper function for recursive Find calls.
  557. *
  558. * Some compilers don't like recursive lambdas, so we make the lambda call a
  559. * method, which in turn creates a lambda calling itself.
  560. *
  561. * ***High-level explanation***
  562. *
  563. * Since we are allowing to use wild cards in both path and name, we start by expanding the path first.
  564. * Boolean search_path is set to true when we search for the path and false when we search for the name.
  565. * When we search for the path we break the given path pattern into sub-directories. Starting from the
  566. * first sub-directory we list them one-by-one and recursively continue into directories that matched the
  567. * path pattern at the current depth. Directories that are large will be requested to continue sending
  568. * the results. We keep track of the current depth within the path pattern in the 'depth' variable.
  569. * This continues recursively until the depth reaches the end of the path. Next that we start matching
  570. * the name pattern. All directories that we find we recurse now, and all names that match the given name
  571. * pattern are being stored in outputs and later sent back to the user.
  572. */
  573. void FileSystemImpl::FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more,
  574. std::shared_ptr<FindOperationalState> operational_state, std::shared_ptr<FindSharedState> shared_state) {
  575. //We buffer the outputs then send them back at the end
  576. std::vector<StatInfo> outputs;
  577. //Return on error
  578. if(!stat.ok()){
  579. std::lock_guard<std::mutex> find_lock(shared_state->lock);
  580. //We send true becuase we do not want the user code to exit before all our requests finished
  581. shared_state->handler(stat, outputs, true);
  582. shared_state->aborted = true;
  583. }
  584. if(!shared_state->aborted){
  585. //User did not abort the operation
  586. if (directory_has_more) {
  587. //Directory is large and has more results
  588. //We launch another async call to get more results
  589. shared_state->outstanding_requests++;
  590. auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
  591. FindShim(stat, stat_infos, has_more, operational_state, shared_state);
  592. };
  593. std::string last = stat_infos.back().path;
  594. nn_.GetListing(operational_state->path, callback, last);
  595. }
  596. if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){
  597. //We are searching for the path and did not reach the end of the path yet
  598. for (StatInfo const& si : stat_infos) {
  599. //If we are at the last depth and it matches both path and name, we need to output it.
  600. if (operational_state->depth == shared_state->dirs.size() - 2
  601. && XPlatform::Syscall::FnMatch(shared_state->dirs[operational_state->depth + 1], si.path)
  602. && XPlatform::Syscall::FnMatch(shared_state->name, si.path)) {
  603. outputs.push_back(si);
  604. }
  605. //Skip if not directory
  606. if(si.file_type != StatInfo::IS_DIR) {
  607. continue;
  608. }
  609. //Checking for a match with the path at the current depth
  610. if(XPlatform::Syscall::FnMatch(shared_state->dirs[operational_state->depth + 1], si.path)) {
  611. //Launch a new requests for every matched directory
  612. shared_state->outstanding_requests++;
  613. auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
  614. std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, true); //true because searching for the path
  615. FindShim(stat, stat_infos, has_more, new_current_state, shared_state);
  616. };
  617. nn_.GetListing(si.full_path, callback);
  618. }
  619. }
  620. }
  621. else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){
  622. //We are searching for the name now and maxdepth has not been reached
  623. for (StatInfo const& si : stat_infos) {
  624. //Launch a new request for every directory
  625. if(si.file_type == StatInfo::IS_DIR) {
  626. shared_state->outstanding_requests++;
  627. auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) {
  628. std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, false); //false because searching for the name
  629. FindShim(stat, stat_infos, has_more, new_current_state, shared_state);
  630. };
  631. nn_.GetListing(si.full_path, callback);
  632. }
  633. //All names that match the specified name are saved to outputs
  634. if(XPlatform::Syscall::FnMatch(shared_state->name, si.path)) {
  635. outputs.push_back(si);
  636. }
  637. }
  638. }
  639. }
  640. //This section needs a lock to make sure we return the final chunk only once
  641. //and no results are sent after aborted is set
  642. std::lock_guard<std::mutex> find_lock(shared_state->lock);
  643. //Decrement the counter once since we are done with this chunk
  644. shared_state->outstanding_requests--;
  645. if(shared_state->outstanding_requests == 0){
  646. //Send the outputs back to the user and notify that this is the final chunk
  647. shared_state->handler(stat, outputs, false);
  648. } else {
  649. //There will be more results and we are not aborting
  650. if (outputs.size() > 0 && !shared_state->aborted){
  651. //Send the outputs back to the user and notify that there is more
  652. bool user_wants_more = shared_state->handler(stat, outputs, true);
  653. if(!user_wants_more) {
  654. //Abort if user doesn't want more
  655. shared_state->aborted = true;
  656. }
  657. }
  658. }
  659. }
  660. void FileSystemImpl::Find(
  661. const std::string &path, const std::string &name, const uint32_t maxdepth,
  662. const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) {
  663. LOG_DEBUG(kFileSystem, << "FileSystemImpl::Find("
  664. << FMT_THIS_ADDR << ", path="
  665. << path << ", name="
  666. << name << ") called");
  667. //Populating the operational state, which includes:
  668. //current search path, depth within the path, and the indication that we are currently searching for a path (not name yet).
  669. std::shared_ptr<FindOperationalState> operational_state = std::make_shared<FindOperationalState>(path, 0, true);
  670. //Populating the shared state, which includes:
  671. //vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag.
  672. std::shared_ptr<FindSharedState> shared_state = std::make_shared<FindSharedState>(path, name, maxdepth, handler, 1, false);
  673. auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more) {
  674. FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state);
  675. };
  676. nn_.GetListing("/", callback);
  677. }
  678. void FileSystemImpl::CreateSnapshot(const std::string &path,
  679. const std::string &name,
  680. const std::function<void(const Status &)> &handler) {
  681. LOG_DEBUG(kFileSystem,
  682. << "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
  683. if (path.empty()) {
  684. handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty"));
  685. return;
  686. }
  687. nn_.CreateSnapshot(path, name, handler);
  688. }
  689. void FileSystemImpl::DeleteSnapshot(const std::string &path,
  690. const std::string &name,
  691. const std::function<void(const Status &)> &handler) {
  692. LOG_DEBUG(kFileSystem,
  693. << "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
  694. if (path.empty()) {
  695. handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty"));
  696. return;
  697. }
  698. if (name.empty()) {
  699. handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty"));
  700. return;
  701. }
  702. nn_.DeleteSnapshot(path, name, handler);
  703. }
  704. void FileSystemImpl::RenameSnapshot(const std::string &path,
  705. const std::string &old_name, const std::string &new_name,
  706. const std::function<void(const Status &)> &handler) {
  707. LOG_DEBUG(kFileSystem,
  708. << "FileSystemImpl::RenameSnapshot(" << FMT_THIS_ADDR << ", path=" << path <<
  709. ", old_name=" << old_name << ", new_name=" << new_name << ") called");
  710. if (path.empty()) {
  711. handler(Status::InvalidArgument("RenameSnapshot: argument 'path' cannot be empty"));
  712. return;
  713. }
  714. if (old_name.empty()) {
  715. handler(Status::InvalidArgument("RenameSnapshot: argument 'old_name' cannot be empty"));
  716. return;
  717. }
  718. if (new_name.empty()) {
  719. handler(Status::InvalidArgument("RenameSnapshot: argument 'new_name' cannot be empty"));
  720. return;
  721. }
  722. nn_.RenameSnapshot(path, old_name, new_name, handler);
  723. }
  724. void FileSystemImpl::AllowSnapshot(const std::string &path,
  725. const std::function<void(const Status &)> &handler) {
  726. LOG_DEBUG(kFileSystem,
  727. << "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
  728. if (path.empty()) {
  729. handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty"));
  730. return;
  731. }
  732. nn_.AllowSnapshot(path, handler);
  733. }
  734. void FileSystemImpl::DisallowSnapshot(const std::string &path,
  735. const std::function<void(const Status &)> &handler) {
  736. LOG_DEBUG(kFileSystem,
  737. << "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
  738. if (path.empty()) {
  739. handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty"));
  740. return;
  741. }
  742. nn_.DisallowSnapshot(path, handler);
  743. }
  744. void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
  745. if (event_handlers_) {
  746. event_handlers_->set_fs_callback(callback);
  747. nn_.SetFsEventCallback(callback);
  748. }
  749. }
  750. std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
  751. return event_handlers_;
  752. }
  753. Options FileSystemImpl::get_options() {
  754. return options_;
  755. }
  756. std::string FileSystemImpl::get_cluster_name() {
  757. return cluster_name_;
  758. }
  759. }