123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "filesystem.h"
- #include "common/continuation/asio.h"
- #include <asio/ip/tcp.hpp>
- #include <functional>
- #include <limits>
- #include <future>
- #include <tuple>
- #include <iostream>
- #include <pwd.h>
- #include <utility>
- #define FMT_THIS_ADDR "this=" << (void*)this
- using ::asio::ip::tcp;
- namespace hdfs {
- /*****************************************************************************
- * NAMENODE OPERATIONS
- ****************************************************************************/
- void NameNodeOperations::Connect(const std::string &cluster_name,
- const std::vector<ResolvedNamenodeInfo> &servers,
- std::function<void(const Status &)> &&handler) {
- engine_.Connect(cluster_name, servers, handler);
- }
- void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
- std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
- {
- using ::hadoop::hdfs::GetBlockLocationsRequestProto;
- using ::hadoop::hdfs::GetBlockLocationsResponseProto;
- LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
- << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
- if (path.empty()) {
- handler(Status::InvalidArgument("GetBlockLocations: argument 'path' cannot be empty"), nullptr);
- return;
- }
- //Protobuf gives an error 'Negative value is not supported'
- //if the high bit is set in uint64 in GetBlockLocations
- if (IsHighBitSet(offset)) {
- handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr);
- return;
- }
- if (IsHighBitSet(length)) {
- handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr);
- return;
- }
- GetBlockLocationsRequestProto req;
- req.set_src(path);
- req.set_offset(offset);
- req.set_length(length);
- auto resp = std::make_shared<GetBlockLocationsResponseProto>();
- namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
- if (stat.ok()) {
- auto file_info = std::make_shared<struct FileInfo>();
- auto locations = resp->locations();
- file_info->file_length_ = locations.filelength();
- file_info->last_block_complete_ = locations.islastblockcomplete();
- file_info->under_construction_ = locations.underconstruction();
- for (const auto &block : locations.blocks()) {
- file_info->blocks_.push_back(block);
- }
- if (!locations.islastblockcomplete() &&
- locations.has_lastblock() && locations.lastblock().b().numbytes()) {
- file_info->blocks_.push_back(locations.lastblock());
- file_info->file_length_ += locations.lastblock().b().numbytes();
- }
- handler(stat, file_info);
- } else {
- handler(stat, nullptr);
- }
- });
- }
- void NameNodeOperations::GetPreferredBlockSize(const std::string & path,
- std::function<void(const Status &, const uint64_t)> handler)
- {
- using ::hadoop::hdfs::GetPreferredBlockSizeRequestProto;
- using ::hadoop::hdfs::GetPreferredBlockSizeResponseProto;
- LOG_TRACE(kFileSystem, << "NameNodeOperations::GetPreferredBlockSize("
- << FMT_THIS_ADDR << ", path=" << path << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("GetPreferredBlockSize: argument 'path' cannot be empty"), -1);
- return;
- }
- GetPreferredBlockSizeRequestProto req;
- req.set_filename(path);
- auto resp = std::make_shared<GetPreferredBlockSizeResponseProto>();
- namenode_.GetPreferredBlockSize(&req, resp, [resp, handler, path](const Status &stat) {
- if (stat.ok() && resp -> has_bsize()) {
- uint64_t block_size = resp -> bsize();
- handler(stat, block_size);
- } else {
- handler(stat, -1);
- }
- });
- }
- void NameNodeOperations::SetReplication(const std::string & path, int16_t replication,
- std::function<void(const Status &)> handler)
- {
- using ::hadoop::hdfs::SetReplicationRequestProto;
- using ::hadoop::hdfs::SetReplicationResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
- ", replication=" << replication << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
- return;
- }
- Status replStatus = FileSystemImpl::CheckValidReplication(replication);
- if (!replStatus.ok()) {
- handler(replStatus);
- return;
- }
- SetReplicationRequestProto req;
- req.set_src(path);
- req.set_replication(replication);
- auto resp = std::make_shared<SetReplicationResponseProto>();
- namenode_.SetReplication(&req, resp, [resp, handler, path](const Status &stat) {
- if (stat.ok()) {
- // Checking resp
- if(resp -> has_result() && resp ->result() == 1) {
- handler(stat);
- } else {
- //NameNode does not specify why there is no result, in my testing it was happening when the path is not found
- std::string errormsg = "No such file or directory: " + path;
- Status statNew = Status::PathNotFound(errormsg.c_str());
- handler(statNew);
- }
- } else {
- handler(stat);
- }
- });
- }
- void NameNodeOperations::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
- std::function<void(const Status &)> handler)
- {
- using ::hadoop::hdfs::SetTimesRequestProto;
- using ::hadoop::hdfs::SetTimesResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
- ", mtime=" << mtime << ", atime=" << atime << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty"));
- return;
- }
- SetTimesRequestProto req;
- req.set_src(path);
- req.set_mtime(mtime);
- req.set_atime(atime);
- auto resp = std::make_shared<SetTimesResponseProto>();
- namenode_.SetTimes(&req, resp, [resp, handler, path](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::GetFileInfo(const std::string & path,
- std::function<void(const Status &, const StatInfo &)> handler)
- {
- using ::hadoop::hdfs::GetFileInfoRequestProto;
- using ::hadoop::hdfs::GetFileInfoResponseProto;
- LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
- << FMT_THIS_ADDR << ", path=" << path << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("GetFileInfo: argument 'path' cannot be empty"), StatInfo());
- return;
- }
- GetFileInfoRequestProto req;
- req.set_src(path);
- auto resp = std::make_shared<GetFileInfoResponseProto>();
- namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
- if (stat.ok()) {
- // For non-existant files, the server will respond with an OK message but
- // no fs in the protobuf.
- if(resp -> has_fs()){
- struct StatInfo stat_info;
- stat_info.path = path;
- stat_info.full_path = path;
- HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
- handler(stat, stat_info);
- } else {
- std::string errormsg = "No such file or directory: " + path;
- Status statNew = Status::PathNotFound(errormsg.c_str());
- handler(statNew, StatInfo());
- }
- } else {
- handler(stat, StatInfo());
- }
- });
- }
- void NameNodeOperations::GetFsStats(
- std::function<void(const Status &, const FsInfo &)> handler) {
- using ::hadoop::hdfs::GetFsStatusRequestProto;
- using ::hadoop::hdfs::GetFsStatsResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::GetFsStats(" << FMT_THIS_ADDR << ") called");
- GetFsStatusRequestProto req;
- auto resp = std::make_shared<GetFsStatsResponseProto>();
- namenode_.GetFsStats(&req, resp, [resp, handler](const Status &stat) {
- if (stat.ok()) {
- struct FsInfo fs_info;
- GetFsStatsResponseProtoToFsInfo(fs_info, resp);
- handler(stat, fs_info);
- } else {
- handler(stat, FsInfo());
- }
- });
- }
- void NameNodeOperations::GetListing(
- const std::string & path,
- std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler,
- const std::string & start_after) {
- using ::hadoop::hdfs::GetListingRequestProto;
- using ::hadoop::hdfs::GetListingResponseProto;
- LOG_TRACE(
- kFileSystem,
- << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
- if (path.empty()) {
- std::vector<StatInfo> empty;
- handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), empty, false);
- return;
- }
- GetListingRequestProto req;
- req.set_src(path);
- req.set_startafter(start_after.c_str());
- req.set_needlocation(false);
- auto resp = std::make_shared<GetListingResponseProto>();
- namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) {
- std::vector<StatInfo> stat_infos;
- if (stat.ok()) {
- if(resp -> has_dirlist()){
- for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
- StatInfo si;
- si.path = fs.path();
- si.full_path = path + fs.path() + "/";
- HdfsFileStatusProtoToStatInfo(si, fs);
- stat_infos.push_back(si);
- }
- handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
- } else {
- std::string errormsg = "No such file or directory: " + path;
- handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false);
- }
- } else {
- handler(stat, stat_infos, false);
- }
- });
- }
- void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
- std::function<void(const Status &)> handler)
- {
- using ::hadoop::hdfs::MkdirsRequestProto;
- using ::hadoop::hdfs::MkdirsResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
- ", permissions=" << permissions << ", createparent=" << createparent << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty"));
- return;
- }
- MkdirsRequestProto req;
- Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions);
- if (!permStatus.ok()) {
- handler(permStatus);
- return;
- }
- req.set_src(path);
- hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked();
- perm->set_perm(permissions);
- req.set_createparent(createparent);
- auto resp = std::make_shared<MkdirsResponseProto>();
- namenode_.Mkdirs(&req, resp, [resp, handler, path](const Status &stat) {
- if (stat.ok()) {
- // Checking resp
- if(resp -> has_result() && resp ->result() == 1) {
- handler(stat);
- } else {
- //NameNode does not specify why there is no result, in my testing it was happening when the path is not found
- std::string errormsg = "No such file or directory: " + path;
- Status statNew = Status::PathNotFound(errormsg.c_str());
- handler(statNew);
- }
- } else {
- handler(stat);
- }
- });
- }
- void NameNodeOperations::Delete(const std::string & path, bool recursive, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::DeleteRequestProto;
- using ::hadoop::hdfs::DeleteResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty"));
- return;
- }
- DeleteRequestProto req;
- req.set_src(path);
- req.set_recursive(recursive);
- auto resp = std::make_shared<DeleteResponseProto>();
- namenode_.Delete(&req, resp, [resp, handler, path](const Status &stat) {
- if (stat.ok()) {
- // Checking resp
- if(resp -> has_result() && resp ->result() == 1) {
- handler(stat);
- } else {
- //NameNode does not specify why there is no result, in my testing it was happening when the path is not found
- std::string errormsg = "No such file or directory: " + path;
- Status statNew = Status::PathNotFound(errormsg.c_str());
- handler(statNew);
- }
- } else {
- handler(stat);
- }
- });
- }
- void NameNodeOperations::Rename(const std::string & oldPath, const std::string & newPath, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::RenameRequestProto;
- using ::hadoop::hdfs::RenameResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
- if (oldPath.empty()) {
- handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty"));
- return;
- }
- if (newPath.empty()) {
- handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty"));
- return;
- }
- RenameRequestProto req;
- req.set_src(oldPath);
- req.set_dst(newPath);
- auto resp = std::make_shared<RenameResponseProto>();
- namenode_.Rename(&req, resp, [resp, handler](const Status &stat) {
- if (stat.ok()) {
- // Checking resp
- if(resp -> has_result() && resp ->result() == 1) {
- handler(stat);
- } else {
- //Since NameNode does not specify why the result is not success, we set the general error
- std::string errormsg = "oldPath and parent directory of newPath must exist. newPath must not exist.";
- Status statNew = Status::InvalidArgument(errormsg.c_str());
- handler(statNew);
- }
- } else {
- handler(stat);
- }
- });
- }
- void NameNodeOperations::SetPermission(const std::string & path,
- uint16_t permissions, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::SetPermissionRequestProto;
- using ::hadoop::hdfs::SetPermissionResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty"));
- return;
- }
- Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions);
- if (!permStatus.ok()) {
- handler(permStatus);
- return;
- }
- SetPermissionRequestProto req;
- req.set_src(path);
- hadoop::hdfs::FsPermissionProto *perm = req.mutable_permission();
- perm->set_perm(permissions);
- auto resp = std::make_shared<SetPermissionResponseProto>();
- namenode_.SetPermission(&req, resp,
- [handler](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::SetOwner(const std::string & path,
- const std::string & username, const std::string & groupname, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::SetOwnerRequestProto;
- using ::hadoop::hdfs::SetOwnerResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("SetOwner: argument 'path' cannot be empty"));
- return;
- }
- SetOwnerRequestProto req;
- req.set_src(path);
- if(!username.empty()) {
- req.set_username(username);
- }
- if(!groupname.empty()) {
- req.set_groupname(groupname);
- }
- auto resp = std::make_shared<SetOwnerResponseProto>();
- namenode_.SetOwner(&req, resp,
- [handler](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::CreateSnapshot(const std::string & path,
- const std::string & name, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::CreateSnapshotRequestProto;
- using ::hadoop::hdfs::CreateSnapshotResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty"));
- return;
- }
- CreateSnapshotRequestProto req;
- req.set_snapshotroot(path);
- if (!name.empty()) {
- req.set_snapshotname(name);
- }
- auto resp = std::make_shared<CreateSnapshotResponseProto>();
- namenode_.CreateSnapshot(&req, resp,
- [handler](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::DeleteSnapshot(const std::string & path,
- const std::string & name, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::DeleteSnapshotRequestProto;
- using ::hadoop::hdfs::DeleteSnapshotResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty"));
- return;
- }
- if (name.empty()) {
- handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty"));
- return;
- }
- DeleteSnapshotRequestProto req;
- req.set_snapshotroot(path);
- req.set_snapshotname(name);
- auto resp = std::make_shared<DeleteSnapshotResponseProto>();
- namenode_.DeleteSnapshot(&req, resp,
- [handler](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::AllowSnapshotRequestProto;
- using ::hadoop::hdfs::AllowSnapshotResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty"));
- return;
- }
- AllowSnapshotRequestProto req;
- req.set_snapshotroot(path);
- auto resp = std::make_shared<AllowSnapshotResponseProto>();
- namenode_.AllowSnapshot(&req, resp,
- [handler](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::DisallowSnapshot(const std::string & path, std::function<void(const Status &)> handler) {
- using ::hadoop::hdfs::DisallowSnapshotRequestProto;
- using ::hadoop::hdfs::DisallowSnapshotResponseProto;
- LOG_TRACE(kFileSystem,
- << "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
- if (path.empty()) {
- handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty"));
- return;
- }
- DisallowSnapshotRequestProto req;
- req.set_snapshotroot(path);
- auto resp = std::make_shared<DisallowSnapshotResponseProto>();
- namenode_.DisallowSnapshot(&req, resp,
- [handler](const Status &stat) {
- handler(stat);
- });
- }
- void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
- engine_.SetFsEventCallback(callback);
- }
- void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
- hdfs::StatInfo & stat_info,
- const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
- stat_info.file_type = fs.filetype();
- stat_info.length = fs.length();
- stat_info.permissions = fs.permission().perm();
- stat_info.owner = fs.owner();
- stat_info.group = fs.group();
- stat_info.modification_time = fs.modification_time();
- stat_info.access_time = fs.access_time();
- stat_info.symlink = fs.symlink();
- stat_info.block_replication = fs.block_replication();
- stat_info.blocksize = fs.blocksize();
- stat_info.fileid = fs.fileid();
- stat_info.children_num = fs.childrennum();
- }
- void NameNodeOperations::GetFsStatsResponseProtoToFsInfo(
- hdfs::FsInfo & fs_info,
- const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs) {
- fs_info.capacity = fs->capacity();
- fs_info.used = fs->used();
- fs_info.remaining = fs->remaining();
- fs_info.under_replicated = fs->under_replicated();
- fs_info.corrupt_blocks = fs->corrupt_blocks();
- fs_info.missing_blocks = fs->missing_blocks();
- fs_info.missing_repl_one_blocks = fs->missing_repl_one_blocks();
- if(fs->has_blocks_in_future()){
- fs_info.blocks_in_future = fs->blocks_in_future();
- }
- }
- }
|