namenode_operations.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "filesystem.h"
  19. #include "common/continuation/asio.h"
  20. #include <asio/ip/tcp.hpp>
  21. #include <functional>
  22. #include <limits>
  23. #include <future>
  24. #include <tuple>
  25. #include <iostream>
  26. #include <pwd.h>
  27. #include <utility>
  28. #define FMT_THIS_ADDR "this=" << (void*)this
  29. using ::asio::ip::tcp;
  30. namespace hdfs {
  31. /*****************************************************************************
  32. * NAMENODE OPERATIONS
  33. ****************************************************************************/
  34. Status NameNodeOperations::CheckValidPermissionMask(short permissions) {
  35. if (permissions < 0 || permissions > 01777) {
  36. std::stringstream errormsg;
  37. errormsg << "IsValidPermissionMask: argument 'permissions' is " << std::oct
  38. << std::showbase << permissions << " (should be between 0 and 01777)";
  39. //Avoid copying by binding errormsg.str() to a const reference, which extends its lifetime
  40. const std::string& tmp = errormsg.str();
  41. return Status::InvalidArgument(tmp.c_str());
  42. }
  43. return Status::OK();
  44. }
  45. void NameNodeOperations::Connect(const std::string &cluster_name,
  46. const std::vector<ResolvedNamenodeInfo> &servers,
  47. std::function<void(const Status &)> &&handler) {
  48. engine_.Connect(cluster_name, servers, handler);
  49. }
  50. void NameNodeOperations::GetBlockLocations(const std::string & path,
  51. std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
  52. {
  53. using ::hadoop::hdfs::GetBlockLocationsRequestProto;
  54. using ::hadoop::hdfs::GetBlockLocationsResponseProto;
  55. LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
  56. << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
  57. if (path.empty()) {
  58. handler(Status::InvalidArgument("GetBlockLocations: argument 'path' cannot be empty"), nullptr);
  59. return;
  60. }
  61. GetBlockLocationsRequestProto req;
  62. req.set_src(path);
  63. req.set_offset(0);
  64. req.set_length(std::numeric_limits<long long>::max());
  65. auto resp = std::make_shared<GetBlockLocationsResponseProto>();
  66. namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
  67. if (stat.ok()) {
  68. auto file_info = std::make_shared<struct FileInfo>();
  69. auto locations = resp->locations();
  70. file_info->file_length_ = locations.filelength();
  71. file_info->last_block_complete_ = locations.islastblockcomplete();
  72. file_info->under_construction_ = locations.underconstruction();
  73. for (const auto &block : locations.blocks()) {
  74. file_info->blocks_.push_back(block);
  75. }
  76. if (!locations.islastblockcomplete() &&
  77. locations.has_lastblock() && locations.lastblock().b().numbytes()) {
  78. file_info->blocks_.push_back(locations.lastblock());
  79. file_info->file_length_ += locations.lastblock().b().numbytes();
  80. }
  81. handler(stat, file_info);
  82. } else {
  83. handler(stat, nullptr);
  84. }
  85. });
  86. }
  87. void NameNodeOperations::GetFileInfo(const std::string & path,
  88. std::function<void(const Status &, const StatInfo &)> handler)
  89. {
  90. using ::hadoop::hdfs::GetFileInfoRequestProto;
  91. using ::hadoop::hdfs::GetFileInfoResponseProto;
  92. LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
  93. << FMT_THIS_ADDR << ", path=" << path << ") called");
  94. if (path.empty()) {
  95. handler(Status::InvalidArgument("GetFileInfo: argument 'path' cannot be empty"), StatInfo());
  96. return;
  97. }
  98. GetFileInfoRequestProto req;
  99. req.set_src(path);
  100. auto resp = std::make_shared<GetFileInfoResponseProto>();
  101. namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
  102. if (stat.ok()) {
  103. // For non-existant files, the server will respond with an OK message but
  104. // no fs in the protobuf.
  105. if(resp -> has_fs()){
  106. struct StatInfo stat_info;
  107. stat_info.path=path;
  108. HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
  109. handler(stat, stat_info);
  110. } else {
  111. std::string errormsg = "No such file or directory: " + path;
  112. Status statNew = Status::PathNotFound(errormsg.c_str());
  113. handler(statNew, StatInfo());
  114. }
  115. } else {
  116. handler(stat, StatInfo());
  117. }
  118. });
  119. }
  120. void NameNodeOperations::GetFsStats(
  121. std::function<void(const Status &, const FsInfo &)> handler) {
  122. using ::hadoop::hdfs::GetFsStatusRequestProto;
  123. using ::hadoop::hdfs::GetFsStatsResponseProto;
  124. LOG_TRACE(kFileSystem,
  125. << "NameNodeOperations::GetFsStats(" << FMT_THIS_ADDR << ") called");
  126. GetFsStatusRequestProto req;
  127. auto resp = std::make_shared<GetFsStatsResponseProto>();
  128. namenode_.GetFsStats(&req, resp, [resp, handler](const Status &stat) {
  129. if (stat.ok()) {
  130. struct FsInfo fs_info;
  131. GetFsStatsResponseProtoToFsInfo(fs_info, resp);
  132. handler(stat, fs_info);
  133. } else {
  134. handler(stat, FsInfo());
  135. }
  136. });
  137. }
  138. void NameNodeOperations::GetListing(
  139. const std::string & path,
  140. std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
  141. const std::string & start_after) {
  142. using ::hadoop::hdfs::GetListingRequestProto;
  143. using ::hadoop::hdfs::GetListingResponseProto;
  144. LOG_TRACE(
  145. kFileSystem,
  146. << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
  147. if (path.empty()) {
  148. std::shared_ptr<std::vector<StatInfo>> stat_infos;
  149. handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), stat_infos, false);
  150. return;
  151. }
  152. GetListingRequestProto req;
  153. req.set_src(path);
  154. req.set_startafter(start_after.c_str());
  155. req.set_needlocation(false);
  156. auto resp = std::make_shared<GetListingResponseProto>();
  157. namenode_.GetListing(
  158. &req,
  159. resp,
  160. [resp, handler, path](const Status &stat) {
  161. if (stat.ok()) {
  162. if(resp -> has_dirlist()){
  163. std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
  164. for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
  165. StatInfo si;
  166. si.path=fs.path();
  167. HdfsFileStatusProtoToStatInfo(si, fs);
  168. stat_infos->push_back(si);
  169. }
  170. handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
  171. } else {
  172. std::string errormsg = "No such file or directory: " + path;
  173. Status statNew = Status::PathNotFound(errormsg.c_str());
  174. std::shared_ptr<std::vector<StatInfo>> stat_infos;
  175. handler(statNew, stat_infos, false);
  176. }
  177. } else {
  178. std::shared_ptr<std::vector<StatInfo>> stat_infos;
  179. handler(stat, stat_infos, false);
  180. }
  181. });
  182. }
  183. void NameNodeOperations::Mkdirs(const std::string & path, long permissions, bool createparent,
  184. std::function<void(const Status &)> handler)
  185. {
  186. using ::hadoop::hdfs::MkdirsRequestProto;
  187. using ::hadoop::hdfs::MkdirsResponseProto;
  188. LOG_TRACE(kFileSystem,
  189. << "NameNodeOperations::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
  190. ", permissions=" << permissions << ", createparent=" << createparent << ") called");
  191. if (path.empty()) {
  192. handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty"));
  193. return;
  194. }
  195. MkdirsRequestProto req;
  196. req.set_src(path);
  197. hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked();
  198. if (permissions < 0) {
  199. perm->set_perm(0755);
  200. } else {
  201. perm->set_perm(permissions);
  202. }
  203. req.set_createparent(createparent);
  204. auto resp = std::make_shared<MkdirsResponseProto>();
  205. namenode_.Mkdirs(&req, resp, [resp, handler, path](const Status &stat) {
  206. if (stat.ok()) {
  207. // Checking resp
  208. if(resp -> has_result() && resp ->result() == 1) {
  209. handler(stat);
  210. } else {
  211. //NameNode does not specify why there is no result, in my testing it was happening when the path is not found
  212. std::string errormsg = "No such file or directory: " + path;
  213. Status statNew = Status::PathNotFound(errormsg.c_str());
  214. handler(statNew);
  215. }
  216. } else {
  217. handler(stat);
  218. }
  219. });
  220. }
  221. void NameNodeOperations::Delete(const std::string & path, bool recursive, std::function<void(const Status &)> handler) {
  222. using ::hadoop::hdfs::DeleteRequestProto;
  223. using ::hadoop::hdfs::DeleteResponseProto;
  224. LOG_TRACE(kFileSystem,
  225. << "NameNodeOperations::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
  226. if (path.empty()) {
  227. handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty"));
  228. return;
  229. }
  230. DeleteRequestProto req;
  231. req.set_src(path);
  232. req.set_recursive(recursive);
  233. auto resp = std::make_shared<DeleteResponseProto>();
  234. namenode_.Delete(&req, resp, [resp, handler, path](const Status &stat) {
  235. if (stat.ok()) {
  236. // Checking resp
  237. if(resp -> has_result() && resp ->result() == 1) {
  238. handler(stat);
  239. } else {
  240. //NameNode does not specify why there is no result, in my testing it was happening when the path is not found
  241. std::string errormsg = "No such file or directory: " + path;
  242. Status statNew = Status::PathNotFound(errormsg.c_str());
  243. handler(statNew);
  244. }
  245. } else {
  246. handler(stat);
  247. }
  248. });
  249. }
  250. void NameNodeOperations::Rename(const std::string & oldPath, const std::string & newPath, std::function<void(const Status &)> handler) {
  251. using ::hadoop::hdfs::RenameRequestProto;
  252. using ::hadoop::hdfs::RenameResponseProto;
  253. LOG_TRACE(kFileSystem,
  254. << "NameNodeOperations::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
  255. if (oldPath.empty()) {
  256. handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty"));
  257. return;
  258. }
  259. if (newPath.empty()) {
  260. handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty"));
  261. return;
  262. }
  263. RenameRequestProto req;
  264. req.set_src(oldPath);
  265. req.set_dst(newPath);
  266. auto resp = std::make_shared<RenameResponseProto>();
  267. namenode_.Rename(&req, resp, [resp, handler](const Status &stat) {
  268. if (stat.ok()) {
  269. // Checking resp
  270. if(resp -> has_result() && resp ->result() == 1) {
  271. handler(stat);
  272. } else {
  273. //Since NameNode does not specify why the result is not success, we set the general error
  274. std::string errormsg = "oldPath and parent directory of newPath must exist. newPath must not exist.";
  275. Status statNew = Status::InvalidArgument(errormsg.c_str());
  276. handler(statNew);
  277. }
  278. } else {
  279. handler(stat);
  280. }
  281. });
  282. }
  283. void NameNodeOperations::SetPermission(const std::string & path,
  284. short permissions, std::function<void(const Status &)> handler) {
  285. using ::hadoop::hdfs::SetPermissionRequestProto;
  286. using ::hadoop::hdfs::SetPermissionResponseProto;
  287. LOG_TRACE(kFileSystem,
  288. << "NameNodeOperations::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
  289. if (path.empty()) {
  290. handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
  291. return;
  292. }
  293. Status permStatus = CheckValidPermissionMask(permissions);
  294. if (!permStatus.ok()) {
  295. handler(permStatus);
  296. return;
  297. }
  298. SetPermissionRequestProto req;
  299. req.set_src(path);
  300. hadoop::hdfs::FsPermissionProto *perm = req.mutable_permission();
  301. perm->set_perm(permissions);
  302. auto resp = std::make_shared<SetPermissionResponseProto>();
  303. namenode_.SetPermission(&req, resp,
  304. [handler](const Status &stat) {
  305. handler(stat);
  306. });
  307. }
  308. void NameNodeOperations::SetOwner(const std::string & path,
  309. const std::string & username, const std::string & groupname, std::function<void(const Status &)> handler) {
  310. using ::hadoop::hdfs::SetOwnerRequestProto;
  311. using ::hadoop::hdfs::SetOwnerResponseProto;
  312. LOG_TRACE(kFileSystem,
  313. << "NameNodeOperations::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
  314. if (path.empty()) {
  315. handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty"));
  316. return;
  317. }
  318. SetOwnerRequestProto req;
  319. req.set_src(path);
  320. if(!username.empty()) {
  321. req.set_username(username);
  322. }
  323. if(!groupname.empty()) {
  324. req.set_groupname(groupname);
  325. }
  326. auto resp = std::make_shared<SetOwnerResponseProto>();
  327. namenode_.SetOwner(&req, resp,
  328. [handler](const Status &stat) {
  329. handler(stat);
  330. });
  331. }
  332. void NameNodeOperations::CreateSnapshot(const std::string & path,
  333. const std::string & name, std::function<void(const Status &)> handler) {
  334. using ::hadoop::hdfs::CreateSnapshotRequestProto;
  335. using ::hadoop::hdfs::CreateSnapshotResponseProto;
  336. LOG_TRACE(kFileSystem,
  337. << "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
  338. if (path.empty()) {
  339. handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty"));
  340. return;
  341. }
  342. CreateSnapshotRequestProto req;
  343. req.set_snapshotroot(path);
  344. if (!name.empty()) {
  345. req.set_snapshotname(name);
  346. }
  347. auto resp = std::make_shared<CreateSnapshotResponseProto>();
  348. namenode_.CreateSnapshot(&req, resp,
  349. [handler](const Status &stat) {
  350. handler(stat);
  351. });
  352. }
  353. void NameNodeOperations::DeleteSnapshot(const std::string & path,
  354. const std::string & name, std::function<void(const Status &)> handler) {
  355. using ::hadoop::hdfs::DeleteSnapshotRequestProto;
  356. using ::hadoop::hdfs::DeleteSnapshotResponseProto;
  357. LOG_TRACE(kFileSystem,
  358. << "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
  359. if (path.empty()) {
  360. handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty"));
  361. return;
  362. }
  363. if (name.empty()) {
  364. handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty"));
  365. return;
  366. }
  367. DeleteSnapshotRequestProto req;
  368. req.set_snapshotroot(path);
  369. req.set_snapshotname(name);
  370. auto resp = std::make_shared<DeleteSnapshotResponseProto>();
  371. namenode_.DeleteSnapshot(&req, resp,
  372. [handler](const Status &stat) {
  373. handler(stat);
  374. });
  375. }
  376. void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<void(const Status &)> handler) {
  377. using ::hadoop::hdfs::AllowSnapshotRequestProto;
  378. using ::hadoop::hdfs::AllowSnapshotResponseProto;
  379. LOG_TRACE(kFileSystem,
  380. << "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
  381. if (path.empty()) {
  382. handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty"));
  383. return;
  384. }
  385. AllowSnapshotRequestProto req;
  386. req.set_snapshotroot(path);
  387. auto resp = std::make_shared<AllowSnapshotResponseProto>();
  388. namenode_.AllowSnapshot(&req, resp,
  389. [handler](const Status &stat) {
  390. handler(stat);
  391. });
  392. }
  393. void NameNodeOperations::DisallowSnapshot(const std::string & path, std::function<void(const Status &)> handler) {
  394. using ::hadoop::hdfs::DisallowSnapshotRequestProto;
  395. using ::hadoop::hdfs::DisallowSnapshotResponseProto;
  396. LOG_TRACE(kFileSystem,
  397. << "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
  398. if (path.empty()) {
  399. handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty"));
  400. return;
  401. }
  402. DisallowSnapshotRequestProto req;
  403. req.set_snapshotroot(path);
  404. auto resp = std::make_shared<DisallowSnapshotResponseProto>();
  405. namenode_.DisallowSnapshot(&req, resp,
  406. [handler](const Status &stat) {
  407. handler(stat);
  408. });
  409. }
  410. void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
  411. engine_.SetFsEventCallback(callback);
  412. }
  413. void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
  414. hdfs::StatInfo & stat_info,
  415. const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
  416. stat_info.file_type = fs.filetype();
  417. stat_info.length = fs.length();
  418. stat_info.permissions = fs.permission().perm();
  419. stat_info.owner = fs.owner();
  420. stat_info.group = fs.group();
  421. stat_info.modification_time = fs.modification_time();
  422. stat_info.access_time = fs.access_time();
  423. stat_info.symlink = fs.symlink();
  424. stat_info.block_replication = fs.block_replication();
  425. stat_info.blocksize = fs.blocksize();
  426. stat_info.fileid = fs.fileid();
  427. stat_info.children_num = fs.childrennum();
  428. }
  429. void NameNodeOperations::GetFsStatsResponseProtoToFsInfo(
  430. hdfs::FsInfo & fs_info,
  431. const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs) {
  432. fs_info.capacity = fs->capacity();
  433. fs_info.used = fs->used();
  434. fs_info.remaining = fs->remaining();
  435. fs_info.under_replicated = fs->under_replicated();
  436. fs_info.corrupt_blocks = fs->corrupt_blocks();
  437. fs_info.missing_blocks = fs->missing_blocks();
  438. fs_info.missing_repl_one_blocks = fs->missing_repl_one_blocks();
  439. if(fs->has_blocks_in_future()){
  440. fs_info.blocks_in_future = fs->blocks_in_future();
  441. }
  442. }
  443. }