|
@@ -71,30 +71,19 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
|
|
|
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
|
|
|
|
|
|
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
|
|
|
- << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
|
|
|
+ << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
|
|
|
|
|
|
- struct State {
|
|
|
- GetBlockLocationsRequestProto req;
|
|
|
- std::shared_ptr<GetBlockLocationsResponseProto> resp;
|
|
|
- };
|
|
|
-
|
|
|
- auto m = continuation::Pipeline<State>::Create();
|
|
|
- auto &req = m->state().req;
|
|
|
+ GetBlockLocationsRequestProto req;
|
|
|
req.set_src(path);
|
|
|
req.set_offset(0);
|
|
|
req.set_length(std::numeric_limits<long long>::max());
|
|
|
- m->state().resp.reset(new GetBlockLocationsResponseProto());
|
|
|
|
|
|
- State *s = &m->state();
|
|
|
- m->Push(continuation::Bind(
|
|
|
- [this, s](const continuation::Continuation::Next &next) {
|
|
|
- namenode_.GetBlockLocations(&s->req, s->resp, next);
|
|
|
- }));
|
|
|
+ auto resp = std::make_shared<GetBlockLocationsResponseProto>();
|
|
|
|
|
|
- m->Run([this, handler](const Status &stat, const State &s) {
|
|
|
+ namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
|
|
|
if (stat.ok()) {
|
|
|
auto file_info = std::make_shared<struct FileInfo>();
|
|
|
- auto locations = s.resp->locations();
|
|
|
+ auto locations = resp->locations();
|
|
|
|
|
|
file_info->file_length_ = locations.filelength();
|
|
|
file_info->last_block_complete_ = locations.islastblockcomplete();
|
|
@@ -117,11 +106,107 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+void NameNodeOperations::GetFileInfo(const std::string & path,
|
|
|
+ std::function<void(const Status &, const StatInfo &)> handler)
|
|
|
+{
|
|
|
+ using ::hadoop::hdfs::GetFileInfoRequestProto;
|
|
|
+ using ::hadoop::hdfs::GetFileInfoResponseProto;
|
|
|
+
|
|
|
+ LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
|
|
|
+ << FMT_THIS_ADDR << ", path=" << path << ") called");
|
|
|
+
|
|
|
+ GetFileInfoRequestProto req;
|
|
|
+ req.set_src(path);
|
|
|
+
|
|
|
+ auto resp = std::make_shared<GetFileInfoResponseProto>();
|
|
|
+
|
|
|
+ namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
|
|
|
+ if (stat.ok()) {
|
|
|
+ // For non-existant files, the server will respond with an OK message but
|
|
|
+ // no fs in the protobuf.
|
|
|
+ if(resp -> has_fs()){
|
|
|
+ struct StatInfo stat_info;
|
|
|
+ stat_info.path=path;
|
|
|
+ HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
|
|
|
+ handler(stat, stat_info);
|
|
|
+ } else {
|
|
|
+ std::string errormsg = "No such file or directory: " + path;
|
|
|
+ Status statNew = Status::PathNotFound(errormsg.c_str());
|
|
|
+ handler(statNew, StatInfo());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ handler(stat, StatInfo());
|
|
|
+ }
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+void NameNodeOperations::GetListing(
|
|
|
+ const std::string & path,
|
|
|
+ std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
|
|
|
+ const std::string & start_after) {
|
|
|
+ using ::hadoop::hdfs::GetListingRequestProto;
|
|
|
+ using ::hadoop::hdfs::GetListingResponseProto;
|
|
|
+
|
|
|
+ LOG_TRACE(
|
|
|
+ kFileSystem,
|
|
|
+ << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
|
|
+
|
|
|
+ GetListingRequestProto req;
|
|
|
+ req.set_src(path);
|
|
|
+ req.set_startafter(start_after.c_str());
|
|
|
+ req.set_needlocation(false);
|
|
|
+
|
|
|
+ auto resp = std::make_shared<GetListingResponseProto>();
|
|
|
+
|
|
|
+ namenode_.GetListing(
|
|
|
+ &req,
|
|
|
+ resp,
|
|
|
+ [resp, handler, path](const Status &stat) {
|
|
|
+ if (stat.ok()) {
|
|
|
+ if(resp -> has_dirlist()){
|
|
|
+ std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
|
|
|
+ for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
|
|
|
+ StatInfo si;
|
|
|
+ si.path=fs.path();
|
|
|
+ HdfsFileStatusProtoToStatInfo(si, fs);
|
|
|
+ stat_infos->push_back(si);
|
|
|
+ }
|
|
|
+ handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
|
|
|
+ } else {
|
|
|
+ std::string errormsg = "No such file or directory: " + path;
|
|
|
+ Status statNew = Status::PathNotFound(errormsg.c_str());
|
|
|
+ std::shared_ptr<std::vector<StatInfo>> stat_infos;
|
|
|
+ handler(statNew, stat_infos, false);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ std::shared_ptr<std::vector<StatInfo>> stat_infos;
|
|
|
+ handler(stat, stat_infos, false);
|
|
|
+ }
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
|
|
|
engine_.SetFsEventCallback(callback);
|
|
|
}
|
|
|
|
|
|
+void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
|
|
|
+ hdfs::StatInfo & stat_info,
|
|
|
+ const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
|
|
|
+ stat_info.file_type = fs.filetype();
|
|
|
+ stat_info.length = fs.length();
|
|
|
+ stat_info.permissions = fs.permission().perm();
|
|
|
+ stat_info.owner = fs.owner();
|
|
|
+ stat_info.group = fs.group();
|
|
|
+ stat_info.modification_time = fs.modification_time();
|
|
|
+ stat_info.access_time = fs.access_time();
|
|
|
+ stat_info.symlink = fs.symlink();
|
|
|
+ stat_info.block_replication = fs.block_replication();
|
|
|
+ stat_info.blocksize = fs.blocksize();
|
|
|
+ stat_info.fileid = fs.fileid();
|
|
|
+ stat_info.children_num = fs.childrennum();
|
|
|
+}
|
|
|
+
|
|
|
/*****************************************************************************
|
|
|
* FILESYSTEM BASE CLASS
|
|
|
****************************************************************************/
|
|
@@ -339,7 +424,6 @@ Status FileSystemImpl::Open(const std::string &path,
|
|
|
return stat;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
|
|
|
{
|
|
|
BlockLocation result;
|
|
@@ -380,6 +464,10 @@ BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto
|
|
|
void FileSystemImpl::GetBlockLocations(const std::string & path,
|
|
|
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
|
|
|
{
|
|
|
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
|
|
|
+ << FMT_THIS_ADDR << ", path="
|
|
|
+ << path << ") called");
|
|
|
+
|
|
|
auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
|
|
|
if (status.ok()) {
|
|
|
auto result = std::make_shared<FileBlockLocation>();
|
|
@@ -407,6 +495,10 @@ void FileSystemImpl::GetBlockLocations(const std::string & path,
|
|
|
Status FileSystemImpl::GetBlockLocations(const std::string & path,
|
|
|
std::shared_ptr<FileBlockLocation> * fileBlockLocations)
|
|
|
{
|
|
|
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
|
|
|
+ << FMT_THIS_ADDR << ", path="
|
|
|
+ << path << ") called");
|
|
|
+
|
|
|
if (!fileBlockLocations)
|
|
|
return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
|
|
|
|
|
@@ -433,6 +525,125 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path,
|
|
|
return stat;
|
|
|
}
|
|
|
|
|
|
+void FileSystemImpl::GetFileInfo(
|
|
|
+ const std::string &path,
|
|
|
+ const std::function<void(const Status &, const StatInfo &)> &handler) {
|
|
|
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo("
|
|
|
+ << FMT_THIS_ADDR << ", path="
|
|
|
+ << path << ") called");
|
|
|
+
|
|
|
+ nn_.GetFileInfo(path, [handler](const Status &stat, const StatInfo &stat_info) {
|
|
|
+ handler(stat, stat_info);
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+Status FileSystemImpl::GetFileInfo(const std::string &path,
|
|
|
+ StatInfo & stat_info) {
|
|
|
+ LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
|
|
|
+ << FMT_THIS_ADDR << ", path="
|
|
|
+ << path << ") called");
|
|
|
+
|
|
|
+ auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
|
|
|
+ std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
|
|
|
+
|
|
|
+ /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
|
|
|
+ auto h = [callstate](const Status &s, const StatInfo &si) {
|
|
|
+ callstate->set_value(std::make_tuple(s, si));
|
|
|
+ };
|
|
|
+
|
|
|
+ GetFileInfo(path, h);
|
|
|
+
|
|
|
+ /* block until promise is set */
|
|
|
+ auto returnstate = future.get();
|
|
|
+ Status stat = std::get<0>(returnstate);
|
|
|
+ StatInfo info = std::get<1>(returnstate);
|
|
|
+
|
|
|
+ if (!stat.ok()) {
|
|
|
+ return stat;
|
|
|
+ }
|
|
|
+
|
|
|
+ stat_info = info;
|
|
|
+ return stat;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Helper function for recursive GetListing calls.
|
|
|
+ *
|
|
|
+ * Some compilers don't like recursive lambdas, so we make the lambda call a
|
|
|
+ * method, which in turn creates a lambda calling itself.
|
|
|
+ */
|
|
|
+void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
|
|
|
+ std::string path,
|
|
|
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
|
|
|
+ bool has_next = stat_infos && stat_infos->size() > 0;
|
|
|
+ bool get_more = handler(stat, stat_infos, has_more && has_next);
|
|
|
+ if (get_more && has_more && has_next ) {
|
|
|
+ auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
|
|
|
+ GetListingShim(stat, stat_infos, has_more, path, handler);
|
|
|
+ };
|
|
|
+
|
|
|
+ std::string last = stat_infos->back().path;
|
|
|
+ nn_.GetListing(path, callback, last);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void FileSystemImpl::GetListing(
|
|
|
+ const std::string &path,
|
|
|
+ const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
|
|
|
+ LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing("
|
|
|
+ << FMT_THIS_ADDR << ", path="
|
|
|
+ << path << ") called");
|
|
|
+
|
|
|
+ // Caputure the state and push it into the shim
|
|
|
+ auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
|
|
|
+ GetListingShim(stat, stat_infos, has_more, path, handler);
|
|
|
+ };
|
|
|
+
|
|
|
+ nn_.GetListing(path, callback);
|
|
|
+}
|
|
|
+
|
|
|
+Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) {
|
|
|
+ LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing("
|
|
|
+ << FMT_THIS_ADDR << ", path="
|
|
|
+ << path << ") called");
|
|
|
+
|
|
|
+ // In this case, we're going to allocate the result on the heap and have the
|
|
|
+ // async code populate it.
|
|
|
+ auto results = std::make_shared<std::vector<StatInfo>>();
|
|
|
+
|
|
|
+ auto callstate = std::make_shared<std::promise<Status>>();
|
|
|
+ std::future<Status> future(callstate->get_future());
|
|
|
+
|
|
|
+ /* wrap async FileSystem::GetListing with promise to make it a blocking call.
|
|
|
+ *
|
|
|
+ Keep requesting more until we get the entire listing, and don't set the promise
|
|
|
+ * until we have the entire listing.
|
|
|
+ */
|
|
|
+ auto h = [callstate, results](const Status &s, std::shared_ptr<std::vector<StatInfo>> si, bool has_more) -> bool {
|
|
|
+ if (si) {
|
|
|
+ results->insert(results->end(), si->begin(), si->end());
|
|
|
+ }
|
|
|
+
|
|
|
+ bool done = !s.ok() || !has_more;
|
|
|
+ if (done) {
|
|
|
+ callstate->set_value(s);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ };
|
|
|
+
|
|
|
+ GetListing(path, h);
|
|
|
+
|
|
|
+ /* block until promise is set */
|
|
|
+ Status stat = future.get();
|
|
|
+
|
|
|
+ if (!stat.ok()) {
|
|
|
+ return stat;
|
|
|
+ }
|
|
|
+
|
|
|
+ stat_infos = results;
|
|
|
+ return stat;
|
|
|
+}
|
|
|
|
|
|
|
|
|
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
|