/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "hdfspp/hdfspp.h" #include "hdfspp/hdfs_ext.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" #include "common/logging.h" #include "fs/filesystem.h" #include "fs/filehandle.h" #include "x-platform/utils.h" #include #include #include #include #include #include using namespace hdfs; using std::experimental::nullopt; using namespace std::placeholders; static constexpr tPort kDefaultPort = 8020; /** Annotate what parts of the code below are implementations of API functions * and if they are normal vs. extended API. */ #define LIBHDFS_C_API #define LIBHDFSPP_EXT_API /* Separate the handles used by the C api from the C++ API*/ struct hdfs_internal { hdfs_internal(FileSystem *p) : filesystem_(p), working_directory_("/") {} hdfs_internal(std::unique_ptr p) : filesystem_(std::move(p)), working_directory_("/") {} virtual ~hdfs_internal(){}; FileSystem *get_impl() { return filesystem_.get(); } const FileSystem *get_impl() const { return filesystem_.get(); } std::string get_working_directory() { std::lock_guard read_guard(wd_lock_); return working_directory_; } void set_working_directory(std::string new_directory) { std::lock_guard write_guard(wd_lock_); working_directory_ = new_directory; } private: std::unique_ptr filesystem_; std::string working_directory_; //has to always start and end with '/' std::mutex wd_lock_; //synchronize access to the working directory }; struct hdfsFile_internal { hdfsFile_internal(FileHandle *p) : file_(p) {} hdfsFile_internal(std::unique_ptr p) : file_(std::move(p)) {} virtual ~hdfsFile_internal(){}; FileHandle *get_impl() { return file_.get(); } const FileHandle *get_impl() const { return file_.get(); } private: std::unique_ptr file_; }; /* Keep thread local copy of last error string */ thread_local std::string errstr; /* Fetch last error that happened in this thread */ LIBHDFSPP_EXT_API int hdfsGetLastError(char *buf, int len) { //No error message if(errstr.empty()){ return -1; } //There is an error, but no room for the error message to be copied to if(nullptr == buf || len < 1) { return -1; } /* leave space for a trailing null */ size_t copylen = std::min((size_t)errstr.size(), (size_t)len); if(copylen == (size_t)len) { copylen--; } strncpy(buf, errstr.c_str(), copylen); /* stick in null */ buf[copylen] = 0; return 0; } /* Event callbacks for next open calls */ thread_local std::experimental::optional fsEventCallback; thread_local std::experimental::optional fileEventCallback; struct hdfsBuilder { hdfsBuilder(); hdfsBuilder(const char * directory); virtual ~hdfsBuilder() {} ConfigurationLoader loader; HdfsConfiguration config; optional overrideHost; optional overridePort; optional user; static constexpr tPort kUseDefaultPort = 0; }; /* Error handling with optional debug to stderr */ static void ReportError(int errnum, const std::string & msg) { errno = errnum; errstr = msg; #ifdef LIBHDFSPP_C_API_ENABLE_DEBUG std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg << "\"" << std::endl; #else (void)msg; #endif } /* Convert Status wrapped error into appropriate errno and return code */ static int Error(const Status &stat) { const char * default_message; int errnum; int code = stat.code(); switch (code) { case Status::Code::kOk: return 0; case Status::Code::kInvalidArgument: errnum = EINVAL; default_message = "Invalid argument"; break; case Status::Code::kResourceUnavailable: errnum = EAGAIN; default_message = "Resource temporarily unavailable"; break; case Status::Code::kUnimplemented: errnum = ENOSYS; default_message = "Function not implemented"; break; case Status::Code::kException: errnum = EINTR; default_message = "Exception raised"; break; case Status::Code::kOperationCanceled: errnum = EINTR; default_message = "Operation canceled"; break; case Status::Code::kPermissionDenied: errnum = EACCES; default_message = "Permission denied"; break; case Status::Code::kPathNotFound: errnum = ENOENT; default_message = "No such file or directory"; break; case Status::Code::kNotADirectory: errnum = ENOTDIR; default_message = "Not a directory"; break; case Status::Code::kFileAlreadyExists: errnum = EEXIST; default_message = "File already exists"; break; case Status::Code::kPathIsNotEmptyDirectory: errnum = ENOTEMPTY; default_message = "Directory is not empty"; break; case Status::Code::kInvalidOffset: errnum = Status::Code::kInvalidOffset; default_message = "Trying to begin a read past the EOF"; break; default: errnum = ENOSYS; default_message = "Error: unrecognised code"; } if (stat.ToString().empty()) ReportError(errnum, default_message); else ReportError(errnum, stat.ToString()); return -1; } static int ReportException(const std::exception & e) { return Error(Status::Exception("Uncaught exception", e.what())); } static int ReportCaughtNonException() { return Error(Status::Exception("Uncaught value not derived from std::exception", "")); } /* return false on failure */ bool CheckSystem(hdfsFS fs) { if (!fs) { ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); return false; } return true; } /* return false on failure */ bool CheckHandle(hdfsFile file) { if (!file) { ReportError(EBADF, "Cannot perform FS operations with null File handle."); return false; } return true; } /* return false on failure */ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { if (!CheckSystem(fs)) return false; if (!CheckHandle(file)) return false; return true; } optional getAbsolutePath(hdfsFS fs, const char* path) { //Does not support . (dot) and .. (double dot) semantics if (!path || path[0] == '\0') { Error(Status::InvalidArgument("getAbsolutePath: argument 'path' cannot be NULL or empty")); return optional(); } if (path[0] != '/') { //we know that working directory always ends with '/' return fs->get_working_directory().append(path); } return optional(path); } /** * C API implementations **/ LIBHDFS_C_API int hdfsFileIsOpenForRead(hdfsFile file) { /* files can only be open for reads at the moment, do a quick check */ if (!CheckHandle(file)){ return 0; } return 1; // Update implementation when we get file writing } LIBHDFS_C_API int hdfsFileIsOpenForWrite(hdfsFile file) { /* files can only be open for reads at the moment, so return false */ CheckHandle(file); return -1; // Update implementation when we get file writing } int hdfsConfGetLong(const char *key, int64_t *val) { try { errno = 0; hdfsBuilder builder; return hdfsBuilderConfGetLong(&builder, key, val); } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } hdfsFS doHdfsConnect(optional nn, optional port, optional user, const Options & options) { try { errno = 0; IoService * io_service = IoService::New(); FileSystem *fs = FileSystem::New(io_service, user.value_or(""), options); if (!fs) { ReportError(ENODEV, "Could not create FileSystem object"); return nullptr; } if (fsEventCallback) { fs->SetFsEventCallback(fsEventCallback.value()); } Status status; if (nn || port) { if (!port) { port = kDefaultPort; } std::string port_as_string = std::to_string(*port); status = fs->Connect(nn.value_or(""), port_as_string); } else { status = fs->ConnectToDefaultFs(); } if (!status.ok()) { Error(status); // FileSystem's ctor might take ownership of the io_service; if it does, // it will null out the pointer if (io_service) delete io_service; delete fs; return nullptr; } return new hdfs_internal(fs); } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFSPP_EXT_API hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) { // Same idea as the first half of doHdfsConnect, but return the wrapped FS before // connecting. try { errno = 0; std::shared_ptr io_service = IoService::MakeShared(); int io_thread_count = bld->config.GetOptions().io_threads_; if(io_thread_count < 1) { io_service->InitDefaultWorkers(); } else { io_service->InitWorkers(io_thread_count); } FileSystem *fs = FileSystem::New(io_service, bld->user.value_or(""), bld->config.GetOptions()); if (!fs) { ReportError(ENODEV, "Could not create FileSystem object"); return nullptr; } if (fsEventCallback) { fs->SetFsEventCallback(fsEventCallback.value()); } return new hdfs_internal(fs); } catch (const std::exception &e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } return nullptr; } LIBHDFSPP_EXT_API int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) { if(!CheckSystem(fs)) { return ENODEV; } if(!bld) { ReportError(ENODEV, "No hdfsBuilder object supplied"); return ENODEV; } // Get C++ FS to do connect FileSystem *fsImpl = fs->get_impl(); if(!fsImpl) { ReportError(ENODEV, "Null FileSystem implementation"); return ENODEV; } // Unpack the required bits of the hdfsBuilder optional nn = bld->overrideHost; optional port = bld->overridePort; optional user = bld->user; // try-catch in case some of the third-party stuff throws try { Status status; if (nn || port) { if (!port) { port = kDefaultPort; } std::string port_as_string = std::to_string(*port); status = fsImpl->Connect(nn.value_or(""), port_as_string); } else { status = fsImpl->ConnectToDefaultFs(); } if (!status.ok()) { Error(status); return ENODEV; } // 0 to indicate a good connection return 0; } catch (const std::exception & e) { ReportException(e); return ENODEV; } catch (...) { ReportCaughtNonException(); return ENODEV; } return 0; } LIBHDFS_C_API hdfsFS hdfsConnect(const char *nn, tPort port) { return hdfsConnectAsUser(nn, port, ""); } LIBHDFS_C_API hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) { return doHdfsConnect(std::string(nn), port, std::string(user), Options()); } LIBHDFS_C_API hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) { //libhdfspp always returns a new instance return doHdfsConnect(std::string(nn), port, std::string(user), Options()); } LIBHDFS_C_API hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) { //libhdfspp always returns a new instance return hdfsConnectAsUser(nn, port, ""); } LIBHDFSPP_EXT_API int hdfsCancelPendingConnection(hdfsFS fs) { // todo: stick an enum in hdfs_internal to check the connect state if(!CheckSystem(fs)) { return ENODEV; } FileSystem *fsImpl = fs->get_impl(); if(!fsImpl) { ReportError(ENODEV, "Null FileSystem implementation"); return ENODEV; } bool canceled = fsImpl->CancelPendingConnect(); if(canceled) { return 0; } else { return EINTR; } } LIBHDFS_C_API int hdfsDisconnect(hdfsFS fs) { try { errno = 0; if (!fs) { ReportError(ENODEV, "Cannot disconnect null FS handle."); return -1; } delete fs; return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, short replication, tSize blocksize) { try { errno = 0; (void)flags; (void)bufferSize; (void)replication; (void)blocksize; if (!fs) { ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); return nullptr; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return nullptr; } FileHandle *f = nullptr; Status stat = fs->get_impl()->Open(*abs_path, &f); if (!stat.ok()) { Error(stat); return nullptr; } if (f && fileEventCallback) { f->SetFileEventCallback(fileEventCallback.value()); } return new hdfsFile_internal(f); } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFS_C_API int hdfsCloseFile(hdfsFS fs, hdfsFile file) { try { errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } delete file; return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { try { errno = 0; if (!CheckSystem(fs)) { return nullptr; } std::string wd = fs->get_working_directory(); size_t size = wd.size(); if (size + 1 > bufferSize) { std::stringstream ss; ss << "hdfsGetWorkingDirectory: bufferSize is " << bufferSize << ", which is not enough to fit working directory of size " << (size + 1); Error(Status::InvalidArgument(ss.str().c_str())); return nullptr; } wd.copy(buffer, size); buffer[size] = '\0'; return buffer; } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFS_C_API int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } //Enforce last character to be '/' std::string withSlash = *abs_path; char last = withSlash.back(); if (last != '/'){ withSlash += '/'; } fs->set_working_directory(withSlash); return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsAvailable(hdfsFS fs, hdfsFile file) { //Since we do not have read ahead implemented, return 0 if fs and file are good; errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } return 0; } LIBHDFS_C_API tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { try { errno = 0; return fs->get_impl()->get_options().block_size; } catch (const std::exception & e) { ReportException(e); return -1; } catch (...) { ReportCaughtNonException(); return -1; } } LIBHDFS_C_API tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } uint64_t block_size; Status stat = fs->get_impl()->GetPreferredBlockSize(*abs_path, block_size); if (!stat.ok()) { if (stat.pathNotFound()){ return fs->get_impl()->get_options().block_size; } else { return Error(stat); } } return block_size; } catch (const std::exception & e) { ReportException(e); return -1; } catch (...) { ReportCaughtNonException(); return -1; } } LIBHDFS_C_API int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } if(replication < 1){ return Error(Status::InvalidArgument("SetReplication: argument 'replication' cannot be less than 1")); } Status stat; stat = fs->get_impl()->SetReplication(*abs_path, replication); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat; stat = fs->get_impl()->SetTimes(*abs_path, mtime, atime); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API tOffset hdfsGetCapacity(hdfsFS fs) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } hdfs::FsInfo fs_info; Status stat = fs->get_impl()->GetFsStats(fs_info); if (!stat.ok()) { Error(stat); return -1; } return fs_info.capacity; } catch (const std::exception & e) { ReportException(e); return -1; } catch (...) { ReportCaughtNonException(); return -1; } } LIBHDFS_C_API tOffset hdfsGetUsed(hdfsFS fs) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } hdfs::FsInfo fs_info; Status stat = fs->get_impl()->GetFsStats(fs_info); if (!stat.ok()) { Error(stat); return -1; } return fs_info.used; } catch (const std::exception & e) { ReportException(e); return -1; } catch (...) { ReportCaughtNonException(); return -1; } } void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info, const hdfs::StatInfo & stat_info) { /* file or directory */ if (stat_info.file_type == StatInfo::IS_DIR) { file_info->mKind = kObjectKindDirectory; } else if (stat_info.file_type == StatInfo::IS_FILE) { file_info->mKind = kObjectKindFile; } else { file_info->mKind = kObjectKindFile; LOG_WARN(kFileSystem, << "Symlink is not supported! Reporting as a file: "); } const auto filename = XPlatform::Utils::Basename(stat_info.path); file_info->mName = new char[filename.size() + 1]; strncpy(file_info->mName, filename.c_str(), filename.size() + 1); /* the last modification time for the file in seconds */ file_info->mLastMod = (tTime) stat_info.modification_time; /* the size of the file in bytes */ file_info->mSize = (tOffset) stat_info.length; /* the count of replicas */ file_info->mReplication = (short) stat_info.block_replication; /* the block size for the file */ file_info->mBlockSize = (tOffset) stat_info.blocksize; /* the owner of the file */ file_info->mOwner = new char[stat_info.owner.size() + 1]; strncpy(file_info->mOwner, stat_info.owner.c_str(), stat_info.owner.size() + 1); /* the group associated with the file */ file_info->mGroup = new char[stat_info.group.size() + 1]; strncpy(file_info->mGroup, stat_info.group.c_str(), stat_info.group.size() + 1); /* the permissions associated with the file encoded as an octal number (0777)*/ file_info->mPermissions = (short) stat_info.permissions; /* the last access time for the file in seconds since the epoch*/ file_info->mLastAccess = stat_info.access_time; } LIBHDFS_C_API int hdfsExists(hdfsFS fs, const char *path) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } hdfs::StatInfo stat_info; Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { try { errno = 0; if (!CheckSystem(fs)) { return nullptr; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return nullptr; } hdfs::StatInfo stat_info; Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info); if (!stat.ok()) { Error(stat); return nullptr; } hdfsFileInfo *file_info = new hdfsFileInfo[1]; StatInfoToHdfsFileInfo(file_info, stat_info); return file_info; } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFS_C_API hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { try { errno = 0; if (!CheckSystem(fs)) { *numEntries = 0; return nullptr; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return nullptr; } std::vector stat_infos; Status stat = fs->get_impl()->GetListing(*abs_path, &stat_infos); if (!stat.ok()) { Error(stat); *numEntries = 0; return nullptr; } if(stat_infos.empty()){ *numEntries = 0; return nullptr; } *numEntries = stat_infos.size(); hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()]; for(std::vector::size_type i = 0; i < stat_infos.size(); i++) { StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i)); } return file_infos; } catch (const std::exception & e) { ReportException(e); *numEntries = 0; return nullptr; } catch (...) { ReportCaughtNonException(); *numEntries = 0; return nullptr; } } LIBHDFS_C_API void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { errno = 0; int i; for (i = 0; i < numEntries; ++i) { delete[] hdfsFileInfo[i].mName; delete[] hdfsFileInfo[i].mOwner; delete[] hdfsFileInfo[i].mGroup; } delete[] hdfsFileInfo; } LIBHDFS_C_API int hdfsCreateDirectory(hdfsFS fs, const char* path) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat; //Use default permissions and set true for creating all non-existant parent directories stat = fs->get_impl()->Mkdirs(*abs_path, FileSystem::GetDefaultPermissionMask(), true); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsDelete(hdfsFS fs, const char* path, int recursive) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat; stat = fs->get_impl()->Delete(*abs_path, recursive); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional old_abs_path = getAbsolutePath(fs, oldPath); const optional new_abs_path = getAbsolutePath(fs, newPath); if(!old_abs_path || !new_abs_path) { return -1; } Status stat; stat = fs->get_impl()->Rename(*old_abs_path, *new_abs_path); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsChmod(hdfsFS fs, const char* path, short mode){ try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat = FileSystem::CheckValidPermissionMask(mode); if (!stat.ok()) { return Error(stat); } stat = fs->get_impl()->SetPermission(*abs_path, mode); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){ try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } std::string own = (owner) ? owner : ""; std::string grp = (group) ? group : ""; Status stat; stat = fs->get_impl()->SetOwner(*abs_path, own, grp); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){ try { errno = 0; if (!CheckSystem(fs)) { *numEntries = 0; return nullptr; } std::vector stat_infos; Status stat = fs->get_impl()->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &stat_infos); if (!stat.ok()) { Error(stat); *numEntries = 0; return nullptr; } //Existing API expects nullptr if size is 0 if(stat_infos.empty()){ *numEntries = 0; return nullptr; } *numEntries = stat_infos.size(); hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()]; for(std::vector::size_type i = 0; i < stat_infos.size(); i++) { StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i)); } return file_infos; } catch (const std::exception & e) { ReportException(e); *numEntries = 0; return nullptr; } catch (...) { ReportCaughtNonException(); *numEntries = 0; return nullptr; } } LIBHDFSPP_EXT_API int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat; if(!name){ stat = fs->get_impl()->CreateSnapshot(*abs_path, ""); } else { stat = fs->get_impl()->CreateSnapshot(*abs_path, name); } if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } if (!name) { return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'name' cannot be NULL")); } Status stat; stat = fs->get_impl()->DeleteSnapshot(*abs_path, name); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } int hdfsRenameSnapshot(hdfsFS fs, const char* path, const char* old_name, const char* new_name) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } if (!old_name) { return Error(Status::InvalidArgument("hdfsRenameSnapshot: argument 'old_name' cannot be NULL")); } if (!new_name) { return Error(Status::InvalidArgument("hdfsRenameSnapshot: argument 'new_name' cannot be NULL")); } Status stat; stat = fs->get_impl()->RenameSnapshot(*abs_path, old_name, new_name); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API int hdfsAllowSnapshot(hdfsFS fs, const char* path) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat; stat = fs->get_impl()->AllowSnapshot(*abs_path); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API int hdfsDisallowSnapshot(hdfsFS fs, const char* path) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } Status stat; stat = fs->get_impl()->DisallowSnapshot(*abs_path); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, tSize length) { try { errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } size_t len = 0; Status stat = file->get_impl()->PositionRead(buffer, length, position, &len); if(!stat.ok()) { return Error(stat); } return (tSize)len; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { try { errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } size_t len = 0; Status stat = file->get_impl()->Read(buffer, length, &len); if (!stat.ok()) { return Error(stat); } return (tSize)len; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsUnbufferFile(hdfsFile file) { //Currently we are not doing any buffering CheckHandle(file); return -1; } LIBHDFS_C_API int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) { try { errno = 0; if (!CheckHandle(file)) { return -1; } *stats = new hdfsReadStatistics; memset(*stats, 0, sizeof(hdfsReadStatistics)); (*stats)->totalBytesRead = file->get_impl()->get_bytes_read(); return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsFileClearReadStatistics(hdfsFile file) { try { errno = 0; if (!CheckHandle(file)) { return -1; } file->get_impl()->clear_bytes_read(); return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) { return stats->totalBytesRead - stats->totalLocalBytesRead; } LIBHDFS_C_API void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { errno = 0; delete stats; } /* 0 on success, -1 on error*/ LIBHDFS_C_API int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { try { errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } off_t desired = desiredPos; Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg); if (!stat.ok()) { return Error(stat); } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API tOffset hdfsTell(hdfsFS fs, hdfsFile file) { try { errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } off_t offset = 0; Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur); if (!stat.ok()) { return Error(stat); } return (tOffset)offset; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } /* extended API */ int hdfsCancel(hdfsFS fs, hdfsFile file) { try { errno = 0; if (!CheckSystemAndHandle(fs, file)) { return -1; } static_cast(file->get_impl())->CancelOperations(); return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out) { try { errno = 0; if (!CheckSystem(fs)) { return -1; } if (locations_out == nullptr) { ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations"); return -1; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return -1; } std::shared_ptr ppLocations; Status stat = fs->get_impl()->GetBlockLocations(*abs_path, 0, std::numeric_limits::max(), &ppLocations); if (!stat.ok()) { return Error(stat); } hdfsBlockLocations *locations = new struct hdfsBlockLocations(); (*locations_out) = locations; explicit_bzero(locations, sizeof(*locations)); locations->fileLength = ppLocations->getFileLength(); locations->isLastBlockComplete = ppLocations->isLastBlockComplete(); locations->isUnderConstruction = ppLocations->isUnderConstruction(); const std::vector & ppBlockLocations = ppLocations->getBlockLocations(); locations->num_blocks = ppBlockLocations.size(); locations->blocks = new struct hdfsBlockInfo[locations->num_blocks]; for (size_t i=0; i < ppBlockLocations.size(); i++) { auto ppBlockLocation = ppBlockLocations[i]; auto block = &locations->blocks[i]; block->num_bytes = ppBlockLocation.getLength(); block->start_offset = ppBlockLocation.getOffset(); const std::vector & ppDNInfos = ppBlockLocation.getDataNodes(); block->num_locations = ppDNInfos.size(); block->locations = new hdfsDNInfo[block->num_locations]; for (size_t j=0; j < block->num_locations; j++) { auto ppDNInfo = ppDNInfos[j]; auto dn_info = &block->locations[j]; dn_info->xfer_port = ppDNInfo.getXferPort(); dn_info->info_port = ppDNInfo.getInfoPort(); dn_info->IPC_port = ppDNInfo.getIPCPort(); dn_info->info_secure_port = ppDNInfo.getInfoSecurePort(); char * buf; buf = new char[ppDNInfo.getHostname().size() + 1]; strncpy(buf, ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size() + 1); dn_info->hostname = buf; buf = new char[ppDNInfo.getIPAddr().size() + 1]; strncpy(buf, ppDNInfo.getIPAddr().c_str(), ppDNInfo.getIPAddr().size() + 1); dn_info->ip_address = buf; buf = new char[ppDNInfo.getNetworkLocation().size() + 1]; strncpy(buf, ppDNInfo.getNetworkLocation().c_str(), ppDNInfo.getNetworkLocation().size() + 1); dn_info->network_location = buf; } } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) { errno = 0; if (blockLocations == nullptr) return 0; for (size_t i=0; i < blockLocations->num_blocks; i++) { auto block = &blockLocations->blocks[i]; for (size_t j=0; j < block->num_locations; j++) { auto location = &block->locations[j]; delete[] location->hostname; delete[] location->ip_address; delete[] location->network_location; } } delete[] blockLocations->blocks; delete blockLocations; return 0; } LIBHDFS_C_API char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { try { errno = 0; if (!CheckSystem(fs)) { return nullptr; } const optional abs_path = getAbsolutePath(fs, path); if(!abs_path) { return nullptr; } std::shared_ptr ppLocations; Status stat = fs->get_impl()->GetBlockLocations(*abs_path, start, length, &ppLocations); if (!stat.ok()) { Error(stat); return nullptr; } const std::vector & ppBlockLocations = ppLocations->getBlockLocations(); char ***hosts = new char**[ppBlockLocations.size() + 1]; for (size_t i=0; i < ppBlockLocations.size(); i++) { const std::vector & ppDNInfos = ppBlockLocations[i].getDataNodes(); hosts[i] = new char*[ppDNInfos.size() + 1]; for (size_t j=0; j < ppDNInfos.size(); j++) { auto ppDNInfo = ppDNInfos[j]; hosts[i][j] = new char[ppDNInfo.getHostname().size() + 1]; strncpy(hosts[i][j], ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size() + 1); } hosts[i][ppDNInfos.size()] = nullptr; } hosts[ppBlockLocations.size()] = nullptr; return hosts; } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFS_C_API void hdfsFreeHosts(char ***blockHosts) { errno = 0; if (blockHosts == nullptr) return; for (size_t i = 0; blockHosts[i]; i++) { for (size_t j = 0; blockHosts[i][j]; j++) { delete[] blockHosts[i][j]; } delete[] blockHosts[i]; } delete blockHosts; } /******************************************************************* * EVENT CALLBACKS *******************************************************************/ const char * FS_NN_CONNECT_EVENT = hdfs::FS_NN_CONNECT_EVENT; const char * FS_NN_READ_EVENT = hdfs::FS_NN_READ_EVENT; const char * FS_NN_WRITE_EVENT = hdfs::FS_NN_WRITE_EVENT; const char * FILE_DN_CONNECT_EVENT = hdfs::FILE_DN_CONNECT_EVENT; const char * FILE_DN_READ_EVENT = hdfs::FILE_DN_READ_EVENT; const char * FILE_DN_WRITE_EVENT = hdfs::FILE_DN_WRITE_EVENT; event_response fs_callback_glue(libhdfspp_fs_event_callback handler, int64_t cookie, const char * event, const char * cluster, int64_t value) { int result = handler(event, cluster, value, cookie); if (result == LIBHDFSPP_EVENT_OK) { return event_response::make_ok(); } #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (result == DEBUG_SIMULATE_ERROR) { return event_response::test_err(Status::Error("Simulated error")); } #endif return event_response::make_ok(); } event_response file_callback_glue(libhdfspp_file_event_callback handler, int64_t cookie, const char * event, const char * cluster, const char * file, int64_t value) { int result = handler(event, cluster, file, value, cookie); if (result == LIBHDFSPP_EVENT_OK) { return event_response::make_ok(); } #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED if (result == DEBUG_SIMULATE_ERROR) { return event_response::test_err(Status::Error("Simulated error")); } #endif return event_response::make_ok(); } LIBHDFSPP_EXT_API int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie) { fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3); fsEventCallback = callback; return 0; } LIBHDFSPP_EXT_API int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie) { file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4); fileEventCallback = callback; return 0; } /******************************************************************* * BUILDER INTERFACE *******************************************************************/ HdfsConfiguration LoadDefault(ConfigurationLoader & loader) { optional result = loader.LoadDefaultResources(); if (result) { return result.value(); } else { return loader.NewConfig(); } } hdfsBuilder::hdfsBuilder() : config(loader.NewConfig()) { errno = 0; config = LoadDefault(loader); } hdfsBuilder::hdfsBuilder(const char * directory) : config(loader.NewConfig()) { errno = 0; loader.SetSearchPath(directory); config = LoadDefault(loader); } LIBHDFS_C_API struct hdfsBuilder *hdfsNewBuilder(void) { try { errno = 0; return new struct hdfsBuilder(); } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFS_C_API void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) { errno = 0; bld->overrideHost = std::string(nn); } LIBHDFS_C_API void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) { errno = 0; bld->overridePort = port; } LIBHDFS_C_API void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) { errno = 0; if (userName && *userName) { bld->user = std::string(userName); } } LIBHDFS_C_API void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) { //libhdfspp always returns a new instance, so nothing to do (void)bld; errno = 0; } LIBHDFS_C_API void hdfsFreeBuilder(struct hdfsBuilder *bld) { try { errno = 0; delete bld; } catch (const std::exception & e) { ReportException(e); } catch (...) { ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, const char *val) { try { errno = 0; optional newConfig = bld->loader.OverlayValue(bld->config, key, val); if (newConfig) { bld->config = newConfig.value(); return 0; } else { ReportError(EINVAL, "Could not change Builder value"); return -1; } } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API void hdfsConfStrFree(char *val) { errno = 0; free(val); } LIBHDFS_C_API hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { hdfsFS fs = doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions()); // Always free the builder hdfsFreeBuilder(bld); return fs; } LIBHDFS_C_API int hdfsConfGetStr(const char *key, char **val) { try { errno = 0; hdfsBuilder builder; return hdfsBuilderConfGetStr(&builder, key, val); } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFS_C_API int hdfsConfGetInt(const char *key, int32_t *val) { try { errno = 0; hdfsBuilder builder; return hdfsBuilderConfGetInt(&builder, key, val); } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } // // Extended builder interface // struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory) { try { errno = 0; return new struct hdfsBuilder(configDirectory); } catch (const std::exception & e) { ReportException(e); return nullptr; } catch (...) { ReportCaughtNonException(); return nullptr; } } LIBHDFSPP_EXT_API int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key, char **val) { try { errno = 0; optional value = bld->config.Get(key); if (value) { size_t len = value->length() + 1; *val = static_cast(malloc(len)); strncpy(*val, value->c_str(), len); } else { *val = nullptr; } return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } // If we're running on a 32-bit platform, we might get 64-bit values that // don't fit in an int, and int is specified by the java hdfs.h interface bool isValidInt(int64_t value) { return (value >= std::numeric_limits::min() && value <= std::numeric_limits::max()); } LIBHDFSPP_EXT_API int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val) { try { errno = 0; // Pull from default configuration optional value = bld->config.GetInt(key); if (value) { if (!isValidInt(*value)){ ReportError(EINVAL, "Builder value is not valid"); return -1; } *val = *value; return 0; } // If not found, don't change val ReportError(EINVAL, "Could not get Builder value"); return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } LIBHDFSPP_EXT_API int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val) { try { errno = 0; // Pull from default configuration optional value = bld->config.GetInt(key); if (value) { *val = *value; return 0; } // If not found, don't change val ReportError(EINVAL, "Could not get Builder value"); return 0; } catch (const std::exception & e) { return ReportException(e); } catch (...) { return ReportCaughtNonException(); } } /** * Logging functions **/ class CForwardingLogger : public LoggerInterface { public: CForwardingLogger() : callback_(nullptr) {}; // Converts LogMessage into LogData, a POD type, // and invokes callback_ if it's not null. void Write(const LogMessage& msg); // pass in NULL to clear the hook void SetCallback(void (*callback)(LogData*)); //return a copy, or null on failure. static LogData *CopyLogData(const LogData*); //free LogData allocated with CopyLogData static void FreeLogData(LogData*); private: void (*callback_)(LogData*); }; /** * Plugin to forward message to a C function pointer **/ void CForwardingLogger::Write(const LogMessage& msg) { if(!callback_) return; const std::string text = msg.MsgString(); LogData data; data.level = msg.level(); data.component = msg.component(); data.msg = text.c_str(); data.file_name = msg.file_name(); data.file_line = msg.file_line(); callback_(&data); } void CForwardingLogger::SetCallback(void (*callback)(LogData*)) { callback_ = callback; } LogData *CForwardingLogger::CopyLogData(const LogData *orig) { if(!orig) return nullptr; LogData *copy = (LogData*)malloc(sizeof(LogData)); if(!copy) return nullptr; copy->level = orig->level; copy->component = orig->component; if(orig->msg) copy->msg = strdup(orig->msg); copy->file_name = orig->file_name; copy->file_line = orig->file_line; return copy; } void CForwardingLogger::FreeLogData(LogData *data) { if(!data) return; if(data->msg) free((void*)data->msg); // Inexpensive way to help catch use-after-free memset(data, 0, sizeof(LogData)); free(data); } LIBHDFSPP_EXT_API LogData *hdfsCopyLogData(LogData *data) { return CForwardingLogger::CopyLogData(data); } LIBHDFSPP_EXT_API void hdfsFreeLogData(LogData *data) { CForwardingLogger::FreeLogData(data); } LIBHDFSPP_EXT_API void hdfsSetLogFunction(void (*callback)(LogData*)) { CForwardingLogger *logger = new CForwardingLogger(); logger->SetCallback(callback); LogManager::SetLoggerImplementation(std::unique_ptr(logger)); } static bool IsLevelValid(int component) { if(component < HDFSPP_LOG_LEVEL_TRACE || component > HDFSPP_LOG_LEVEL_ERROR) return false; return true; } // should use __builtin_popcnt as optimization on some platforms static int popcnt(int val) { int bits = sizeof(val) * 8; int count = 0; for(int i=0; i> i) & 0x1) count++; } return count; } static bool IsComponentValid(int component) { if(component < HDFSPP_LOG_COMPONENT_UNKNOWN || component > HDFSPP_LOG_COMPONENT_FILESYSTEM) return false; if(popcnt(component) != 1) return false; return true; } LIBHDFSPP_EXT_API int hdfsEnableLoggingForComponent(int component) { errno = 0; if(!IsComponentValid(component)) return -1; LogManager::EnableLogForComponent(static_cast(component)); return 0; } LIBHDFSPP_EXT_API int hdfsDisableLoggingForComponent(int component) { errno = 0; if(!IsComponentValid(component)) return -1; LogManager::DisableLogForComponent(static_cast(component)); return 0; } LIBHDFSPP_EXT_API int hdfsSetLoggingLevel(int level) { errno = 0; if(!IsLevelValid(level)) return -1; LogManager::SetLogLevel(static_cast(level)); return 0; } #undef LIBHDFS_C_API #undef LIBHDFSPP_EXT_API