|
@@ -1,2007 +0,0 @@
|
|
|
-/**
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
- * distributed with this work for additional information
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
- * limitations under the License.
|
|
|
- */
|
|
|
-
|
|
|
-#include "hdfspp/hdfspp.h"
|
|
|
-
|
|
|
-#include "fs/filesystem.h"
|
|
|
-#include "common/hdfs_configuration.h"
|
|
|
-#include "common/configuration_loader.h"
|
|
|
-#include "common/logging.h"
|
|
|
-
|
|
|
-#include <hdfs/hdfs.h>
|
|
|
-#include <hdfspp/hdfs_ext.h>
|
|
|
-
|
|
|
-#include <libgen.h>
|
|
|
-#include "limits.h"
|
|
|
-
|
|
|
-#include <string>
|
|
|
-#include <cstring>
|
|
|
-#include <iostream>
|
|
|
-#include <algorithm>
|
|
|
-#include <functional>
|
|
|
-
|
|
|
-using namespace hdfs;
|
|
|
-using std::experimental::nullopt;
|
|
|
-using namespace std::placeholders;
|
|
|
-
|
|
|
-static constexpr tPort kDefaultPort = 8020;
|
|
|
-
|
|
|
-/** Annotate what parts of the code below are implementatons of API functions
|
|
|
- * and if they are normal vs. extended API.
|
|
|
- */
|
|
|
-#define LIBHDFS_C_API
|
|
|
-#define LIBHDFSPP_EXT_API
|
|
|
-
|
|
|
-/* Separate the handles used by the C api from the C++ API*/
|
|
|
-struct hdfs_internal {
|
|
|
- hdfs_internal(FileSystem *p) : filesystem_(p), working_directory_("/") {}
|
|
|
- hdfs_internal(std::unique_ptr<FileSystem> p)
|
|
|
- : filesystem_(std::move(p)), working_directory_("/") {}
|
|
|
- virtual ~hdfs_internal(){};
|
|
|
- FileSystem *get_impl() { return filesystem_.get(); }
|
|
|
- const FileSystem *get_impl() const { return filesystem_.get(); }
|
|
|
- std::string get_working_directory() {
|
|
|
- std::lock_guard<std::mutex> read_guard(wd_lock_);
|
|
|
- return working_directory_;
|
|
|
- }
|
|
|
- void set_working_directory(std::string new_directory) {
|
|
|
- std::lock_guard<std::mutex> write_guard(wd_lock_);
|
|
|
- working_directory_ = new_directory;
|
|
|
- }
|
|
|
-
|
|
|
- private:
|
|
|
- std::unique_ptr<FileSystem> filesystem_;
|
|
|
- std::string working_directory_; //has to always start and end with '/'
|
|
|
- std::mutex wd_lock_; //synchronize access to the working directory
|
|
|
-};
|
|
|
-
|
|
|
-struct hdfsFile_internal {
|
|
|
- hdfsFile_internal(FileHandle *p) : file_(p) {}
|
|
|
- hdfsFile_internal(std::unique_ptr<FileHandle> p) : file_(std::move(p)) {}
|
|
|
- virtual ~hdfsFile_internal(){};
|
|
|
- FileHandle *get_impl() { return file_.get(); }
|
|
|
- const FileHandle *get_impl() const { return file_.get(); }
|
|
|
-
|
|
|
- private:
|
|
|
- std::unique_ptr<FileHandle> file_;
|
|
|
-};
|
|
|
-
|
|
|
-/* Keep thread local copy of last error string */
|
|
|
-thread_local std::string errstr;
|
|
|
-
|
|
|
-/* Fetch last error that happened in this thread */
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsGetLastError(char *buf, int len) {
|
|
|
- //No error message
|
|
|
- if(errstr.empty()){
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- //There is an error, but no room for the error message to be copied to
|
|
|
- if(nullptr == buf || len < 1) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- /* leave space for a trailing null */
|
|
|
- size_t copylen = std::min((size_t)errstr.size(), (size_t)len);
|
|
|
- if(copylen == (size_t)len) {
|
|
|
- copylen--;
|
|
|
- }
|
|
|
-
|
|
|
- strncpy(buf, errstr.c_str(), copylen);
|
|
|
-
|
|
|
- /* stick in null */
|
|
|
- buf[copylen] = 0;
|
|
|
-
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-/* Event callbacks for next open calls */
|
|
|
-thread_local std::experimental::optional<fs_event_callback> fsEventCallback;
|
|
|
-thread_local std::experimental::optional<file_event_callback> fileEventCallback;
|
|
|
-
|
|
|
-struct hdfsBuilder {
|
|
|
- hdfsBuilder();
|
|
|
- hdfsBuilder(const char * directory);
|
|
|
- virtual ~hdfsBuilder() {}
|
|
|
- ConfigurationLoader loader;
|
|
|
- HdfsConfiguration config;
|
|
|
-
|
|
|
- optional<std::string> overrideHost;
|
|
|
- optional<tPort> overridePort;
|
|
|
- optional<std::string> user;
|
|
|
-
|
|
|
- static constexpr tPort kUseDefaultPort = 0;
|
|
|
-};
|
|
|
-
|
|
|
-/* Error handling with optional debug to stderr */
|
|
|
-static void ReportError(int errnum, const std::string & msg) {
|
|
|
- errno = errnum;
|
|
|
- errstr = msg;
|
|
|
-#ifdef LIBHDFSPP_C_API_ENABLE_DEBUG
|
|
|
- std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg
|
|
|
- << "\"" << std::endl;
|
|
|
-#else
|
|
|
- (void)msg;
|
|
|
-#endif
|
|
|
-}
|
|
|
-
|
|
|
-/* Convert Status wrapped error into appropriate errno and return code */
|
|
|
-static int Error(const Status &stat) {
|
|
|
- const char * default_message;
|
|
|
- int errnum;
|
|
|
-
|
|
|
- int code = stat.code();
|
|
|
- switch (code) {
|
|
|
- case Status::Code::kOk:
|
|
|
- return 0;
|
|
|
- case Status::Code::kInvalidArgument:
|
|
|
- errnum = EINVAL;
|
|
|
- default_message = "Invalid argument";
|
|
|
- break;
|
|
|
- case Status::Code::kResourceUnavailable:
|
|
|
- errnum = EAGAIN;
|
|
|
- default_message = "Resource temporarily unavailable";
|
|
|
- break;
|
|
|
- case Status::Code::kUnimplemented:
|
|
|
- errnum = ENOSYS;
|
|
|
- default_message = "Function not implemented";
|
|
|
- break;
|
|
|
- case Status::Code::kException:
|
|
|
- errnum = EINTR;
|
|
|
- default_message = "Exception raised";
|
|
|
- break;
|
|
|
- case Status::Code::kOperationCanceled:
|
|
|
- errnum = EINTR;
|
|
|
- default_message = "Operation canceled";
|
|
|
- break;
|
|
|
- case Status::Code::kPermissionDenied:
|
|
|
- errnum = EACCES;
|
|
|
- default_message = "Permission denied";
|
|
|
- break;
|
|
|
- case Status::Code::kPathNotFound:
|
|
|
- errnum = ENOENT;
|
|
|
- default_message = "No such file or directory";
|
|
|
- break;
|
|
|
- case Status::Code::kNotADirectory:
|
|
|
- errnum = ENOTDIR;
|
|
|
- default_message = "Not a directory";
|
|
|
- break;
|
|
|
- case Status::Code::kFileAlreadyExists:
|
|
|
- errnum = EEXIST;
|
|
|
- default_message = "File already exists";
|
|
|
- break;
|
|
|
- case Status::Code::kPathIsNotEmptyDirectory:
|
|
|
- errnum = ENOTEMPTY;
|
|
|
- default_message = "Directory is not empty";
|
|
|
- break;
|
|
|
- case Status::Code::kInvalidOffset:
|
|
|
- errnum = Status::Code::kInvalidOffset;
|
|
|
- default_message = "Trying to begin a read past the EOF";
|
|
|
- break;
|
|
|
- default:
|
|
|
- errnum = ENOSYS;
|
|
|
- default_message = "Error: unrecognised code";
|
|
|
- }
|
|
|
- if (stat.ToString().empty())
|
|
|
- ReportError(errnum, default_message);
|
|
|
- else
|
|
|
- ReportError(errnum, stat.ToString());
|
|
|
- return -1;
|
|
|
-}
|
|
|
-
|
|
|
-static int ReportException(const std::exception & e)
|
|
|
-{
|
|
|
- return Error(Status::Exception("Uncaught exception", e.what()));
|
|
|
-}
|
|
|
-
|
|
|
-static int ReportCaughtNonException()
|
|
|
-{
|
|
|
- return Error(Status::Exception("Uncaught value not derived from std::exception", ""));
|
|
|
-}
|
|
|
-
|
|
|
-/* return false on failure */
|
|
|
-bool CheckSystem(hdfsFS fs) {
|
|
|
- if (!fs) {
|
|
|
- ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
-/* return false on failure */
|
|
|
-bool CheckHandle(hdfsFile file) {
|
|
|
- if (!file) {
|
|
|
- ReportError(EBADF, "Cannot perform FS operations with null File handle.");
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
-/* return false on failure */
|
|
|
-bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
|
|
|
- if (!CheckSystem(fs))
|
|
|
- return false;
|
|
|
-
|
|
|
- if (!CheckHandle(file))
|
|
|
- return false;
|
|
|
-
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
-optional<std::string> getAbsolutePath(hdfsFS fs, const char* path) {
|
|
|
- //Does not support . (dot) and .. (double dot) semantics
|
|
|
- if (!path || path[0] == '\0') {
|
|
|
- Error(Status::InvalidArgument("getAbsolutePath: argument 'path' cannot be NULL or empty"));
|
|
|
- return optional<std::string>();
|
|
|
- }
|
|
|
- if (path[0] != '/') {
|
|
|
- //we know that working directory always ends with '/'
|
|
|
- return fs->get_working_directory().append(path);
|
|
|
- }
|
|
|
- return optional<std::string>(path);
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * C API implementations
|
|
|
- **/
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsFileIsOpenForRead(hdfsFile file) {
|
|
|
- /* files can only be open for reads at the moment, do a quick check */
|
|
|
- if (!CheckHandle(file)){
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return 1; // Update implementation when we get file writing
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsFileIsOpenForWrite(hdfsFile file) {
|
|
|
- /* files can only be open for reads at the moment, so return false */
|
|
|
- CheckHandle(file);
|
|
|
- return -1; // Update implementation when we get file writing
|
|
|
-}
|
|
|
-
|
|
|
-int hdfsConfGetLong(const char *key, int64_t *val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- hdfsBuilder builder;
|
|
|
- return hdfsBuilderConfGetLong(&builder, key, val);
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<std::string> user, const Options & options) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- IoService * io_service = IoService::New();
|
|
|
-
|
|
|
- FileSystem *fs = FileSystem::New(io_service, user.value_or(""), options);
|
|
|
- if (!fs) {
|
|
|
- ReportError(ENODEV, "Could not create FileSystem object");
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-
|
|
|
- if (fsEventCallback) {
|
|
|
- fs->SetFsEventCallback(fsEventCallback.value());
|
|
|
- }
|
|
|
-
|
|
|
- Status status;
|
|
|
- if (nn || port) {
|
|
|
- if (!port) {
|
|
|
- port = kDefaultPort;
|
|
|
- }
|
|
|
- std::string port_as_string = std::to_string(*port);
|
|
|
- status = fs->Connect(nn.value_or(""), port_as_string);
|
|
|
- } else {
|
|
|
- status = fs->ConnectToDefaultFs();
|
|
|
- }
|
|
|
-
|
|
|
- if (!status.ok()) {
|
|
|
- Error(status);
|
|
|
-
|
|
|
- // FileSystem's ctor might take ownership of the io_service; if it does,
|
|
|
- // it will null out the pointer
|
|
|
- if (io_service)
|
|
|
- delete io_service;
|
|
|
-
|
|
|
- delete fs;
|
|
|
-
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- return new hdfs_internal(fs);
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
|
|
|
- // Same idea as the first half of doHdfsConnect, but return the wrapped FS before
|
|
|
- // connecting.
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- std::shared_ptr<IoService> io_service = IoService::MakeShared();
|
|
|
-
|
|
|
- int io_thread_count = bld->config.GetOptions().io_threads_;
|
|
|
- if(io_thread_count < 1) {
|
|
|
- io_service->InitDefaultWorkers();
|
|
|
- } else {
|
|
|
- io_service->InitWorkers(io_thread_count);
|
|
|
- }
|
|
|
-
|
|
|
- FileSystem *fs = FileSystem::New(io_service, bld->user.value_or(""), bld->config.GetOptions());
|
|
|
- if (!fs) {
|
|
|
- ReportError(ENODEV, "Could not create FileSystem object");
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-
|
|
|
- if (fsEventCallback) {
|
|
|
- fs->SetFsEventCallback(fsEventCallback.value());
|
|
|
- }
|
|
|
-
|
|
|
- return new hdfs_internal(fs);
|
|
|
- } catch (const std::exception &e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- return nullptr;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
|
|
|
- if(!CheckSystem(fs)) {
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- if(!bld) {
|
|
|
- ReportError(ENODEV, "No hdfsBuilder object supplied");
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- // Get C++ FS to do connect
|
|
|
- FileSystem *fsImpl = fs->get_impl();
|
|
|
- if(!fsImpl) {
|
|
|
- ReportError(ENODEV, "Null FileSystem implementation");
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- // Unpack the required bits of the hdfsBuilder
|
|
|
- optional<std::string> nn = bld->overrideHost;
|
|
|
- optional<tPort> port = bld->overridePort;
|
|
|
- optional<std::string> user = bld->user;
|
|
|
-
|
|
|
- // try-catch in case some of the third-party stuff throws
|
|
|
- try {
|
|
|
- Status status;
|
|
|
- if (nn || port) {
|
|
|
- if (!port) {
|
|
|
- port = kDefaultPort;
|
|
|
- }
|
|
|
- std::string port_as_string = std::to_string(*port);
|
|
|
- status = fsImpl->Connect(nn.value_or(""), port_as_string);
|
|
|
- } else {
|
|
|
- status = fsImpl->ConnectToDefaultFs();
|
|
|
- }
|
|
|
-
|
|
|
- if (!status.ok()) {
|
|
|
- Error(status);
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- // 0 to indicate a good connection
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return ENODEV;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFS hdfsConnect(const char *nn, tPort port) {
|
|
|
- return hdfsConnectAsUser(nn, port, "");
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
|
|
|
- return doHdfsConnect(std::string(nn), port, std::string(user), Options());
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) {
|
|
|
- //libhdfspp always returns a new instance
|
|
|
- return doHdfsConnect(std::string(nn), port, std::string(user), Options());
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
|
|
|
- //libhdfspp always returns a new instance
|
|
|
- return hdfsConnectAsUser(nn, port, "");
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsCancelPendingConnection(hdfsFS fs) {
|
|
|
- // todo: stick an enum in hdfs_internal to check the connect state
|
|
|
- if(!CheckSystem(fs)) {
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- FileSystem *fsImpl = fs->get_impl();
|
|
|
- if(!fsImpl) {
|
|
|
- ReportError(ENODEV, "Null FileSystem implementation");
|
|
|
- return ENODEV;
|
|
|
- }
|
|
|
-
|
|
|
- bool canceled = fsImpl->CancelPendingConnect();
|
|
|
- if(canceled) {
|
|
|
- return 0;
|
|
|
- } else {
|
|
|
- return EINTR;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsDisconnect(hdfsFS fs) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!fs) {
|
|
|
- ReportError(ENODEV, "Cannot disconnect null FS handle.");
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- delete fs;
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
|
|
|
- short replication, tSize blocksize) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- (void)flags;
|
|
|
- (void)bufferSize;
|
|
|
- (void)replication;
|
|
|
- (void)blocksize;
|
|
|
- if (!fs) {
|
|
|
- ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- FileHandle *f = nullptr;
|
|
|
- Status stat = fs->get_impl()->Open(*abs_path, &f);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- if (f && fileEventCallback) {
|
|
|
- f->SetFileEventCallback(fileEventCallback.value());
|
|
|
- }
|
|
|
- return new hdfsFile_internal(f);
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- delete file;
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- std::string wd = fs->get_working_directory();
|
|
|
- size_t size = wd.size();
|
|
|
- if (size + 1 > bufferSize) {
|
|
|
- std::stringstream ss;
|
|
|
- ss << "hdfsGetWorkingDirectory: bufferSize is " << bufferSize <<
|
|
|
- ", which is not enough to fit working directory of size " << (size + 1);
|
|
|
- Error(Status::InvalidArgument(ss.str().c_str()));
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- wd.copy(buffer, size);
|
|
|
- buffer[size] = '\0';
|
|
|
- return buffer;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- //Enforce last character to be '/'
|
|
|
- std::string withSlash = *abs_path;
|
|
|
- char last = withSlash.back();
|
|
|
- if (last != '/'){
|
|
|
- withSlash += '/';
|
|
|
- }
|
|
|
- fs->set_working_directory(withSlash);
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsAvailable(hdfsFS fs, hdfsFile file) {
|
|
|
- //Since we do not have read ahead implemented, return 0 if fs and file are good;
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- return fs->get_impl()->get_options().block_size;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return -1;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return -1;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- uint64_t block_size;
|
|
|
- Status stat = fs->get_impl()->GetPreferredBlockSize(*abs_path, block_size);
|
|
|
- if (!stat.ok()) {
|
|
|
- if (stat.pathNotFound()){
|
|
|
- return fs->get_impl()->get_options().block_size;
|
|
|
- } else {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- }
|
|
|
- return block_size;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return -1;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return -1;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- if(replication < 1){
|
|
|
- return Error(Status::InvalidArgument("SetReplication: argument 'replication' cannot be less than 1"));
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->SetReplication(*abs_path, replication);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->SetTimes(*abs_path, mtime, atime);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tOffset hdfsGetCapacity(hdfsFS fs) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- hdfs::FsInfo fs_info;
|
|
|
- Status stat = fs->get_impl()->GetFsStats(fs_info);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- return -1;
|
|
|
- }
|
|
|
- return fs_info.capacity;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return -1;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return -1;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tOffset hdfsGetUsed(hdfsFS fs) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- hdfs::FsInfo fs_info;
|
|
|
- Status stat = fs->get_impl()->GetFsStats(fs_info);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- return -1;
|
|
|
- }
|
|
|
- return fs_info.used;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return -1;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return -1;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
|
|
|
- const hdfs::StatInfo & stat_info) {
|
|
|
- /* file or directory */
|
|
|
- if (stat_info.file_type == StatInfo::IS_DIR) {
|
|
|
- file_info->mKind = kObjectKindDirectory;
|
|
|
- } else if (stat_info.file_type == StatInfo::IS_FILE) {
|
|
|
- file_info->mKind = kObjectKindFile;
|
|
|
- } else {
|
|
|
- file_info->mKind = kObjectKindFile;
|
|
|
- LOG_WARN(kFileSystem, << "Symlink is not supported! Reporting as a file: ");
|
|
|
- }
|
|
|
-
|
|
|
- /* the name of the file */
|
|
|
- char copyOfPath[PATH_MAX];
|
|
|
- strncpy(copyOfPath, stat_info.path.c_str(), PATH_MAX);
|
|
|
- copyOfPath[PATH_MAX - 1] = '\0'; // in case strncpy ran out of space
|
|
|
-
|
|
|
- char * mName = basename(copyOfPath);
|
|
|
- size_t mName_size = strlen(mName);
|
|
|
- file_info->mName = new char[mName_size+1];
|
|
|
- strncpy(file_info->mName, basename(copyOfPath), mName_size + 1);
|
|
|
-
|
|
|
- /* the last modification time for the file in seconds */
|
|
|
- file_info->mLastMod = (tTime) stat_info.modification_time;
|
|
|
-
|
|
|
- /* the size of the file in bytes */
|
|
|
- file_info->mSize = (tOffset) stat_info.length;
|
|
|
-
|
|
|
- /* the count of replicas */
|
|
|
- file_info->mReplication = (short) stat_info.block_replication;
|
|
|
-
|
|
|
- /* the block size for the file */
|
|
|
- file_info->mBlockSize = (tOffset) stat_info.blocksize;
|
|
|
-
|
|
|
- /* the owner of the file */
|
|
|
- file_info->mOwner = new char[stat_info.owner.size() + 1];
|
|
|
- strncpy(file_info->mOwner, stat_info.owner.c_str(), stat_info.owner.size() + 1);
|
|
|
-
|
|
|
- /* the group associated with the file */
|
|
|
- file_info->mGroup = new char[stat_info.group.size() + 1];
|
|
|
- strncpy(file_info->mGroup, stat_info.group.c_str(), stat_info.group.size() + 1);
|
|
|
-
|
|
|
- /* the permissions associated with the file encoded as an octal number (0777)*/
|
|
|
- file_info->mPermissions = (short) stat_info.permissions;
|
|
|
-
|
|
|
- /* the last access time for the file in seconds since the epoch*/
|
|
|
- file_info->mLastAccess = stat_info.access_time;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsExists(hdfsFS fs, const char *path) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- hdfs::StatInfo stat_info;
|
|
|
- Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- hdfs::StatInfo stat_info;
|
|
|
- Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- hdfsFileInfo *file_info = new hdfsFileInfo[1];
|
|
|
- StatInfoToHdfsFileInfo(file_info, stat_info);
|
|
|
- return file_info;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- std::vector<StatInfo> stat_infos;
|
|
|
- Status stat = fs->get_impl()->GetListing(*abs_path, &stat_infos);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- if(stat_infos.empty()){
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- *numEntries = stat_infos.size();
|
|
|
- hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()];
|
|
|
- for(std::vector<StatInfo>::size_type i = 0; i < stat_infos.size(); i++) {
|
|
|
- StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i));
|
|
|
- }
|
|
|
-
|
|
|
- return file_infos;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- int i;
|
|
|
- for (i = 0; i < numEntries; ++i) {
|
|
|
- delete[] hdfsFileInfo[i].mName;
|
|
|
- delete[] hdfsFileInfo[i].mOwner;
|
|
|
- delete[] hdfsFileInfo[i].mGroup;
|
|
|
- }
|
|
|
- delete[] hdfsFileInfo;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsCreateDirectory(hdfsFS fs, const char* path) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- //Use default permissions and set true for creating all non-existant parent directories
|
|
|
- stat = fs->get_impl()->Mkdirs(*abs_path, FileSystem::GetDefaultPermissionMask(), true);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->Delete(*abs_path, recursive);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> old_abs_path = getAbsolutePath(fs, oldPath);
|
|
|
- const optional<std::string> new_abs_path = getAbsolutePath(fs, newPath);
|
|
|
- if(!old_abs_path || !new_abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->Rename(*old_abs_path, *new_abs_path);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsChmod(hdfsFS fs, const char* path, short mode){
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat = FileSystem::CheckValidPermissionMask(mode);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- stat = fs->get_impl()->SetPermission(*abs_path, mode);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- std::string own = (owner) ? owner : "";
|
|
|
- std::string grp = (group) ? group : "";
|
|
|
-
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->SetOwner(*abs_path, own, grp);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-
|
|
|
- std::vector<StatInfo> stat_infos;
|
|
|
- Status stat = fs->get_impl()->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &stat_infos);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- //Existing API expects nullptr if size is 0
|
|
|
- if(stat_infos.empty()){
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- *numEntries = stat_infos.size();
|
|
|
- hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()];
|
|
|
- for(std::vector<StatInfo>::size_type i = 0; i < stat_infos.size(); i++) {
|
|
|
- StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i));
|
|
|
- }
|
|
|
-
|
|
|
- return file_infos;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- *numEntries = 0;
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- if(!name){
|
|
|
- stat = fs->get_impl()->CreateSnapshot(*abs_path, "");
|
|
|
- } else {
|
|
|
- stat = fs->get_impl()->CreateSnapshot(*abs_path, name);
|
|
|
- }
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- if (!name) {
|
|
|
- return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'name' cannot be NULL"));
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->DeleteSnapshot(*abs_path, name);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-int hdfsRenameSnapshot(hdfsFS fs, const char* path, const char* old_name, const char* new_name) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- if (!old_name) {
|
|
|
- return Error(Status::InvalidArgument("hdfsRenameSnapshot: argument 'old_name' cannot be NULL"));
|
|
|
- }
|
|
|
- if (!new_name) {
|
|
|
- return Error(Status::InvalidArgument("hdfsRenameSnapshot: argument 'new_name' cannot be NULL"));
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->RenameSnapshot(*abs_path, old_name, new_name);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->AllowSnapshot(*abs_path);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
|
|
|
- try {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- Status stat;
|
|
|
- stat = fs->get_impl()->DisallowSnapshot(*abs_path);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
|
|
|
- tSize length) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- size_t len = 0;
|
|
|
- Status stat = file->get_impl()->PositionRead(buffer, length, position, &len);
|
|
|
- if(!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
- return (tSize)len;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- size_t len = 0;
|
|
|
- Status stat = file->get_impl()->Read(buffer, length, &len);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
-
|
|
|
- return (tSize)len;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsUnbufferFile(hdfsFile file) {
|
|
|
- //Currently we are not doing any buffering
|
|
|
- CheckHandle(file);
|
|
|
- return -1;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckHandle(file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- *stats = new hdfsReadStatistics;
|
|
|
- memset(*stats, 0, sizeof(hdfsReadStatistics));
|
|
|
- (*stats)->totalBytesRead = file->get_impl()->get_bytes_read();
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsFileClearReadStatistics(hdfsFile file) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckHandle(file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- file->get_impl()->clear_bytes_read();
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) {
|
|
|
- return stats->totalBytesRead - stats->totalLocalBytesRead;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
|
|
|
- errno = 0;
|
|
|
- delete stats;
|
|
|
-}
|
|
|
-
|
|
|
-/* 0 on success, -1 on error*/
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- off_t desired = desiredPos;
|
|
|
- Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- off_t offset = 0;
|
|
|
- Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
-
|
|
|
- return (tOffset)offset;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/* extended API */
|
|
|
-int hdfsCancel(hdfsFS fs, hdfsFile file) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystemAndHandle(fs, file)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- if (locations_out == nullptr) {
|
|
|
- ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations");
|
|
|
- return -1;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- std::shared_ptr<FileBlockLocation> ppLocations;
|
|
|
- Status stat = fs->get_impl()->GetBlockLocations(*abs_path, 0, std::numeric_limits<int64_t>::max(), &ppLocations);
|
|
|
- if (!stat.ok()) {
|
|
|
- return Error(stat);
|
|
|
- }
|
|
|
-
|
|
|
- hdfsBlockLocations *locations = new struct hdfsBlockLocations();
|
|
|
- (*locations_out) = locations;
|
|
|
-
|
|
|
- bzero(locations, sizeof(*locations));
|
|
|
- locations->fileLength = ppLocations->getFileLength();
|
|
|
- locations->isLastBlockComplete = ppLocations->isLastBlockComplete();
|
|
|
- locations->isUnderConstruction = ppLocations->isUnderConstruction();
|
|
|
-
|
|
|
- const std::vector<BlockLocation> & ppBlockLocations = ppLocations->getBlockLocations();
|
|
|
- locations->num_blocks = ppBlockLocations.size();
|
|
|
- locations->blocks = new struct hdfsBlockInfo[locations->num_blocks];
|
|
|
- for (size_t i=0; i < ppBlockLocations.size(); i++) {
|
|
|
- auto ppBlockLocation = ppBlockLocations[i];
|
|
|
- auto block = &locations->blocks[i];
|
|
|
-
|
|
|
- block->num_bytes = ppBlockLocation.getLength();
|
|
|
- block->start_offset = ppBlockLocation.getOffset();
|
|
|
-
|
|
|
- const std::vector<DNInfo> & ppDNInfos = ppBlockLocation.getDataNodes();
|
|
|
- block->num_locations = ppDNInfos.size();
|
|
|
- block->locations = new hdfsDNInfo[block->num_locations];
|
|
|
- for (size_t j=0; j < block->num_locations; j++) {
|
|
|
- auto ppDNInfo = ppDNInfos[j];
|
|
|
- auto dn_info = &block->locations[j];
|
|
|
-
|
|
|
- dn_info->xfer_port = ppDNInfo.getXferPort();
|
|
|
- dn_info->info_port = ppDNInfo.getInfoPort();
|
|
|
- dn_info->IPC_port = ppDNInfo.getIPCPort();
|
|
|
- dn_info->info_secure_port = ppDNInfo.getInfoSecurePort();
|
|
|
-
|
|
|
- char * buf;
|
|
|
- buf = new char[ppDNInfo.getHostname().size() + 1];
|
|
|
- strncpy(buf, ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size() + 1);
|
|
|
- dn_info->hostname = buf;
|
|
|
-
|
|
|
- buf = new char[ppDNInfo.getIPAddr().size() + 1];
|
|
|
- strncpy(buf, ppDNInfo.getIPAddr().c_str(), ppDNInfo.getIPAddr().size() + 1);
|
|
|
- dn_info->ip_address = buf;
|
|
|
-
|
|
|
- buf = new char[ppDNInfo.getNetworkLocation().size() + 1];
|
|
|
- strncpy(buf, ppDNInfo.getNetworkLocation().c_str(), ppDNInfo.getNetworkLocation().size() + 1);
|
|
|
- dn_info->network_location = buf;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
|
|
|
- errno = 0;
|
|
|
- if (blockLocations == nullptr)
|
|
|
- return 0;
|
|
|
-
|
|
|
- for (size_t i=0; i < blockLocations->num_blocks; i++) {
|
|
|
- auto block = &blockLocations->blocks[i];
|
|
|
- for (size_t j=0; j < block->num_locations; j++) {
|
|
|
- auto location = &block->locations[j];
|
|
|
- delete[] location->hostname;
|
|
|
- delete[] location->ip_address;
|
|
|
- delete[] location->network_location;
|
|
|
- }
|
|
|
- }
|
|
|
- delete[] blockLocations->blocks;
|
|
|
- delete blockLocations;
|
|
|
-
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- if (!CheckSystem(fs)) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- const optional<std::string> abs_path = getAbsolutePath(fs, path);
|
|
|
- if(!abs_path) {
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- std::shared_ptr<FileBlockLocation> ppLocations;
|
|
|
- Status stat = fs->get_impl()->GetBlockLocations(*abs_path, start, length, &ppLocations);
|
|
|
- if (!stat.ok()) {
|
|
|
- Error(stat);
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
- const std::vector<BlockLocation> & ppBlockLocations = ppLocations->getBlockLocations();
|
|
|
- char ***hosts = new char**[ppBlockLocations.size() + 1];
|
|
|
- for (size_t i=0; i < ppBlockLocations.size(); i++) {
|
|
|
- const std::vector<DNInfo> & ppDNInfos = ppBlockLocations[i].getDataNodes();
|
|
|
- hosts[i] = new char*[ppDNInfos.size() + 1];
|
|
|
- for (size_t j=0; j < ppDNInfos.size(); j++) {
|
|
|
- auto ppDNInfo = ppDNInfos[j];
|
|
|
- hosts[i][j] = new char[ppDNInfo.getHostname().size() + 1];
|
|
|
- strncpy(hosts[i][j], ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size() + 1);
|
|
|
- }
|
|
|
- hosts[i][ppDNInfos.size()] = nullptr;
|
|
|
- }
|
|
|
- hosts[ppBlockLocations.size()] = nullptr;
|
|
|
- return hosts;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsFreeHosts(char ***blockHosts) {
|
|
|
- errno = 0;
|
|
|
- if (blockHosts == nullptr)
|
|
|
- return;
|
|
|
-
|
|
|
- for (size_t i = 0; blockHosts[i]; i++) {
|
|
|
- for (size_t j = 0; blockHosts[i][j]; j++) {
|
|
|
- delete[] blockHosts[i][j];
|
|
|
- }
|
|
|
- delete[] blockHosts[i];
|
|
|
- }
|
|
|
- delete blockHosts;
|
|
|
-}
|
|
|
-
|
|
|
-/*******************************************************************
|
|
|
- * EVENT CALLBACKS
|
|
|
- *******************************************************************/
|
|
|
-
|
|
|
-const char * FS_NN_CONNECT_EVENT = hdfs::FS_NN_CONNECT_EVENT;
|
|
|
-const char * FS_NN_READ_EVENT = hdfs::FS_NN_READ_EVENT;
|
|
|
-const char * FS_NN_WRITE_EVENT = hdfs::FS_NN_WRITE_EVENT;
|
|
|
-
|
|
|
-const char * FILE_DN_CONNECT_EVENT = hdfs::FILE_DN_CONNECT_EVENT;
|
|
|
-const char * FILE_DN_READ_EVENT = hdfs::FILE_DN_READ_EVENT;
|
|
|
-const char * FILE_DN_WRITE_EVENT = hdfs::FILE_DN_WRITE_EVENT;
|
|
|
-
|
|
|
-
|
|
|
-event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
|
|
|
- int64_t cookie,
|
|
|
- const char * event,
|
|
|
- const char * cluster,
|
|
|
- int64_t value) {
|
|
|
- int result = handler(event, cluster, value, cookie);
|
|
|
- if (result == LIBHDFSPP_EVENT_OK) {
|
|
|
- return event_response::make_ok();
|
|
|
- }
|
|
|
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
|
|
|
- if (result == DEBUG_SIMULATE_ERROR) {
|
|
|
- return event_response::test_err(Status::Error("Simulated error"));
|
|
|
- }
|
|
|
-#endif
|
|
|
-
|
|
|
- return event_response::make_ok();
|
|
|
-}
|
|
|
-
|
|
|
-event_response file_callback_glue(libhdfspp_file_event_callback handler,
|
|
|
- int64_t cookie,
|
|
|
- const char * event,
|
|
|
- const char * cluster,
|
|
|
- const char * file,
|
|
|
- int64_t value) {
|
|
|
- int result = handler(event, cluster, file, value, cookie);
|
|
|
- if (result == LIBHDFSPP_EVENT_OK) {
|
|
|
- return event_response::make_ok();
|
|
|
- }
|
|
|
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
|
|
|
- if (result == DEBUG_SIMULATE_ERROR) {
|
|
|
- return event_response::test_err(Status::Error("Simulated error"));
|
|
|
- }
|
|
|
-#endif
|
|
|
-
|
|
|
- return event_response::make_ok();
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
|
|
|
-{
|
|
|
- fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3);
|
|
|
- fsEventCallback = callback;
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie)
|
|
|
-{
|
|
|
- file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4);
|
|
|
- fileEventCallback = callback;
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-/*******************************************************************
|
|
|
- * BUILDER INTERFACE
|
|
|
- *******************************************************************/
|
|
|
-
|
|
|
-HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
|
|
|
-{
|
|
|
- optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
|
|
|
- if (result)
|
|
|
- {
|
|
|
- return result.value();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- return loader.NewConfig<HdfsConfiguration>();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-hdfsBuilder::hdfsBuilder() : config(loader.NewConfig<HdfsConfiguration>())
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- config = LoadDefault(loader);
|
|
|
-}
|
|
|
-
|
|
|
-hdfsBuilder::hdfsBuilder(const char * directory) :
|
|
|
- config(loader.NewConfig<HdfsConfiguration>())
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- loader.SetSearchPath(directory);
|
|
|
- config = LoadDefault(loader);
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-struct hdfsBuilder *hdfsNewBuilder(void)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- return new struct hdfsBuilder();
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- bld->overrideHost = std::string(nn);
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- bld->overridePort = port;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- if (userName && *userName) {
|
|
|
- bld->user = std::string(userName);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
|
|
|
- //libhdfspp always returns a new instance, so nothing to do
|
|
|
- (void)bld;
|
|
|
- errno = 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- delete bld;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
|
|
- const char *val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
|
|
|
- if (newConfig)
|
|
|
- {
|
|
|
- bld->config = newConfig.value();
|
|
|
- return 0;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- ReportError(EINVAL, "Could not change Builder value");
|
|
|
- return -1;
|
|
|
- }
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-void hdfsConfStrFree(char *val)
|
|
|
-{
|
|
|
- errno = 0;
|
|
|
- free(val);
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
|
|
- hdfsFS fs = doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
|
|
|
- // Always free the builder
|
|
|
- hdfsFreeBuilder(bld);
|
|
|
- return fs;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsConfGetStr(const char *key, char **val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- hdfsBuilder builder;
|
|
|
- return hdfsBuilderConfGetStr(&builder, key, val);
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFS_C_API
|
|
|
-int hdfsConfGetInt(const char *key, int32_t *val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- hdfsBuilder builder;
|
|
|
- return hdfsBuilderConfGetInt(&builder, key, val);
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-//
|
|
|
-// Extended builder interface
|
|
|
-//
|
|
|
-struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- return new struct hdfsBuilder(configDirectory);
|
|
|
- } catch (const std::exception & e) {
|
|
|
- ReportException(e);
|
|
|
- return nullptr;
|
|
|
- } catch (...) {
|
|
|
- ReportCaughtNonException();
|
|
|
- return nullptr;
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
|
|
|
- char **val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- optional<std::string> value = bld->config.Get(key);
|
|
|
- if (value)
|
|
|
- {
|
|
|
- size_t len = value->length() + 1;
|
|
|
- *val = static_cast<char *>(malloc(len));
|
|
|
- strncpy(*val, value->c_str(), len);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- *val = nullptr;
|
|
|
- }
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// If we're running on a 32-bit platform, we might get 64-bit values that
|
|
|
-// don't fit in an int, and int is specified by the java hdfs.h interface
|
|
|
-bool isValidInt(int64_t value)
|
|
|
-{
|
|
|
- return (value >= std::numeric_limits<int>::min() &&
|
|
|
- value <= std::numeric_limits<int>::max());
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- // Pull from default configuration
|
|
|
- optional<int64_t> value = bld->config.GetInt(key);
|
|
|
- if (value)
|
|
|
- {
|
|
|
- if (!isValidInt(*value)){
|
|
|
- ReportError(EINVAL, "Builder value is not valid");
|
|
|
- return -1;
|
|
|
- }
|
|
|
- *val = *value;
|
|
|
- return 0;
|
|
|
- }
|
|
|
- // If not found, don't change val
|
|
|
- ReportError(EINVAL, "Could not get Builder value");
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val)
|
|
|
-{
|
|
|
- try
|
|
|
- {
|
|
|
- errno = 0;
|
|
|
- // Pull from default configuration
|
|
|
- optional<int64_t> value = bld->config.GetInt(key);
|
|
|
- if (value)
|
|
|
- {
|
|
|
- *val = *value;
|
|
|
- return 0;
|
|
|
- }
|
|
|
- // If not found, don't change val
|
|
|
- ReportError(EINVAL, "Could not get Builder value");
|
|
|
- return 0;
|
|
|
- } catch (const std::exception & e) {
|
|
|
- return ReportException(e);
|
|
|
- } catch (...) {
|
|
|
- return ReportCaughtNonException();
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-/**
|
|
|
- * Logging functions
|
|
|
- **/
|
|
|
-class CForwardingLogger : public LoggerInterface {
|
|
|
- public:
|
|
|
- CForwardingLogger() : callback_(nullptr) {};
|
|
|
-
|
|
|
- // Converts LogMessage into LogData, a POD type,
|
|
|
- // and invokes callback_ if it's not null.
|
|
|
- void Write(const LogMessage& msg);
|
|
|
-
|
|
|
- // pass in NULL to clear the hook
|
|
|
- void SetCallback(void (*callback)(LogData*));
|
|
|
-
|
|
|
- //return a copy, or null on failure.
|
|
|
- static LogData *CopyLogData(const LogData*);
|
|
|
- //free LogData allocated with CopyLogData
|
|
|
- static void FreeLogData(LogData*);
|
|
|
- private:
|
|
|
- void (*callback_)(LogData*);
|
|
|
-};
|
|
|
-
|
|
|
-/**
|
|
|
- * Plugin to forward message to a C function pointer
|
|
|
- **/
|
|
|
-void CForwardingLogger::Write(const LogMessage& msg) {
|
|
|
- if(!callback_)
|
|
|
- return;
|
|
|
-
|
|
|
- const std::string text = msg.MsgString();
|
|
|
-
|
|
|
- LogData data;
|
|
|
- data.level = msg.level();
|
|
|
- data.component = msg.component();
|
|
|
- data.msg = text.c_str();
|
|
|
- data.file_name = msg.file_name();
|
|
|
- data.file_line = msg.file_line();
|
|
|
- callback_(&data);
|
|
|
-}
|
|
|
-
|
|
|
-void CForwardingLogger::SetCallback(void (*callback)(LogData*)) {
|
|
|
- callback_ = callback;
|
|
|
-}
|
|
|
-
|
|
|
-LogData *CForwardingLogger::CopyLogData(const LogData *orig) {
|
|
|
- if(!orig)
|
|
|
- return nullptr;
|
|
|
-
|
|
|
- LogData *copy = (LogData*)malloc(sizeof(LogData));
|
|
|
- if(!copy)
|
|
|
- return nullptr;
|
|
|
-
|
|
|
- copy->level = orig->level;
|
|
|
- copy->component = orig->component;
|
|
|
- if(orig->msg)
|
|
|
- copy->msg = strdup(orig->msg);
|
|
|
- copy->file_name = orig->file_name;
|
|
|
- copy->file_line = orig->file_line;
|
|
|
- return copy;
|
|
|
-}
|
|
|
-
|
|
|
-void CForwardingLogger::FreeLogData(LogData *data) {
|
|
|
- if(!data)
|
|
|
- return;
|
|
|
- if(data->msg)
|
|
|
- free((void*)data->msg);
|
|
|
-
|
|
|
- // Inexpensive way to help catch use-after-free
|
|
|
- memset(data, 0, sizeof(LogData));
|
|
|
- free(data);
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-LogData *hdfsCopyLogData(LogData *data) {
|
|
|
- return CForwardingLogger::CopyLogData(data);
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-void hdfsFreeLogData(LogData *data) {
|
|
|
- CForwardingLogger::FreeLogData(data);
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-void hdfsSetLogFunction(void (*callback)(LogData*)) {
|
|
|
- CForwardingLogger *logger = new CForwardingLogger();
|
|
|
- logger->SetCallback(callback);
|
|
|
- LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger));
|
|
|
-}
|
|
|
-
|
|
|
-static bool IsLevelValid(int component) {
|
|
|
- if(component < HDFSPP_LOG_LEVEL_TRACE || component > HDFSPP_LOG_LEVEL_ERROR)
|
|
|
- return false;
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-// should use __builtin_popcnt as optimization on some platforms
|
|
|
-static int popcnt(int val) {
|
|
|
- int bits = sizeof(val) * 8;
|
|
|
- int count = 0;
|
|
|
- for(int i=0; i<bits; i++) {
|
|
|
- if((val >> i) & 0x1)
|
|
|
- count++;
|
|
|
- }
|
|
|
- return count;
|
|
|
-}
|
|
|
-
|
|
|
-static bool IsComponentValid(int component) {
|
|
|
- if(component < HDFSPP_LOG_COMPONENT_UNKNOWN || component > HDFSPP_LOG_COMPONENT_FILESYSTEM)
|
|
|
- return false;
|
|
|
- if(popcnt(component) != 1)
|
|
|
- return false;
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsEnableLoggingForComponent(int component) {
|
|
|
- errno = 0;
|
|
|
- if(!IsComponentValid(component))
|
|
|
- return -1;
|
|
|
- LogManager::EnableLogForComponent(static_cast<LogSourceComponent>(component));
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsDisableLoggingForComponent(int component) {
|
|
|
- errno = 0;
|
|
|
- if(!IsComponentValid(component))
|
|
|
- return -1;
|
|
|
- LogManager::DisableLogForComponent(static_cast<LogSourceComponent>(component));
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-LIBHDFSPP_EXT_API
|
|
|
-int hdfsSetLoggingLevel(int level) {
|
|
|
- errno = 0;
|
|
|
- if(!IsLevelValid(level))
|
|
|
- return -1;
|
|
|
- LogManager::SetLogLevel(static_cast<LogLevel>(level));
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-#undef LIBHDFS_C_API
|
|
|
-#undef LIBHDFSPP_EXT_API
|
|
|
-
|
|
|
-
|