|
@@ -16,7 +16,11 @@
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
|
|
|
+#include "hdfspp/hdfspp.h"
|
|
|
+
|
|
|
#include "fs/filesystem.h"
|
|
|
+#include "common/hdfs_configuration.h"
|
|
|
+#include "common/configuration_loader.h"
|
|
|
|
|
|
#include <hdfs/hdfs.h>
|
|
|
#include <hdfspp/hdfs_ext.h>
|
|
@@ -28,7 +32,7 @@
|
|
|
|
|
|
using namespace hdfs;
|
|
|
|
|
|
-/* Seperate the handles used by the C api from the C++ API*/
|
|
|
+/* Separate the handles used by the C api from the C++ API*/
|
|
|
struct hdfs_internal {
|
|
|
hdfs_internal(FileSystem *p) : filesystem_(p) {}
|
|
|
hdfs_internal(std::unique_ptr<FileSystem> p)
|
|
@@ -73,6 +77,20 @@ void hdfsGetLastError(char *buf, int len) {
|
|
|
buf[copylen] = 0;
|
|
|
}
|
|
|
|
|
|
+struct hdfsBuilder {
|
|
|
+ hdfsBuilder();
|
|
|
+ hdfsBuilder(const char * directory);
|
|
|
+ virtual ~hdfsBuilder() {}
|
|
|
+ ConfigurationLoader loader;
|
|
|
+ HdfsConfiguration config;
|
|
|
+
|
|
|
+ std::string overrideHost;
|
|
|
+ tPort overridePort; // 0 --> use default
|
|
|
+
|
|
|
+ static constexpr tPort kUseDefaultPort = 0;
|
|
|
+ static constexpr tPort kDefaultPort = 8020;
|
|
|
+};
|
|
|
+
|
|
|
/* Error handling with optional debug to stderr */
|
|
|
static void ReportError(int errnum, std::string msg) {
|
|
|
errno = errnum;
|
|
@@ -248,3 +266,155 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
|
|
|
|
|
|
return offset;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+/*******************************************************************
|
|
|
+ * BUILDER INTERFACE
|
|
|
+ *******************************************************************/
|
|
|
+
|
|
|
+HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
|
|
|
+{
|
|
|
+ optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
|
|
|
+ if (result)
|
|
|
+ {
|
|
|
+ return result.value();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return loader.New<HdfsConfiguration>();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+hdfsBuilder::hdfsBuilder() : config(LoadDefault(loader)), overridePort(kUseDefaultPort)
|
|
|
+{
|
|
|
+}
|
|
|
+
|
|
|
+hdfsBuilder::hdfsBuilder(const char * directory) :
|
|
|
+ config(loader.New<HdfsConfiguration>()), overridePort(kUseDefaultPort)
|
|
|
+{
|
|
|
+ loader.SetSearchPath(directory);
|
|
|
+ config = LoadDefault(loader);
|
|
|
+}
|
|
|
+
|
|
|
+struct hdfsBuilder *hdfsNewBuilder(void)
|
|
|
+{
|
|
|
+ return new struct hdfsBuilder();
|
|
|
+}
|
|
|
+
|
|
|
+void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
|
|
|
+{
|
|
|
+ bld->overrideHost = nn;
|
|
|
+}
|
|
|
+
|
|
|
+void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
|
|
+{
|
|
|
+ bld->overridePort = port;
|
|
|
+}
|
|
|
+
|
|
|
+void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
|
|
+{
|
|
|
+ delete bld;
|
|
|
+}
|
|
|
+
|
|
|
+int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
|
|
+ const char *val)
|
|
|
+{
|
|
|
+ optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
|
|
|
+ if (newConfig)
|
|
|
+ {
|
|
|
+ bld->config = newConfig.value();
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void hdfsConfStrFree(char *val)
|
|
|
+{
|
|
|
+ if (val)
|
|
|
+ {
|
|
|
+ free(val);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
|
|
+
|
|
|
+ if (!bld->overrideHost.empty())
|
|
|
+ {
|
|
|
+ // TODO: pass rest of config once we get that done (HDFS-9556)
|
|
|
+ tPort port = bld->overridePort;
|
|
|
+ if (port == hdfsBuilder::kUseDefaultPort)
|
|
|
+ {
|
|
|
+ port = hdfsBuilder::kDefaultPort;
|
|
|
+ }
|
|
|
+ return hdfsConnect(bld->overrideHost.c_str(), port);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //TODO: allow construction from default port once that is done (HDFS-9556)
|
|
|
+ ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect");
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+int hdfsConfGetStr(const char *key, char **val)
|
|
|
+{
|
|
|
+ hdfsBuilder builder;
|
|
|
+ return hdfsBuilderConfGetStr(&builder, key, val);
|
|
|
+}
|
|
|
+
|
|
|
+int hdfsConfGetInt(const char *key, int32_t *val)
|
|
|
+{
|
|
|
+ hdfsBuilder builder;
|
|
|
+ return hdfsBuilderConfGetInt(&builder, key, val);
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+// Extended builder interface
|
|
|
+//
|
|
|
+struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
|
|
|
+{
|
|
|
+ return new struct hdfsBuilder(configDirectory);
|
|
|
+}
|
|
|
+
|
|
|
+int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
|
|
|
+ char **val)
|
|
|
+{
|
|
|
+ optional<std::string> value = bld->config.Get(key);
|
|
|
+ if (value)
|
|
|
+ {
|
|
|
+ size_t len = value->length() + 1;
|
|
|
+ *val = static_cast<char *>(malloc(len));
|
|
|
+ strncpy(*val, value->c_str(), len);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ *val = nullptr;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+// If we're running on a 32-bit platform, we might get 64-bit values that
|
|
|
+// don't fit in an int, and int is specified by the java hdfs.h interface
|
|
|
+bool isValidInt(int64_t value)
|
|
|
+{
|
|
|
+ return (value >= std::numeric_limits<int>::min() &&
|
|
|
+ value <= std::numeric_limits<int>::max());
|
|
|
+}
|
|
|
+
|
|
|
+int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
|
|
|
+{
|
|
|
+ // Pull from default configuration
|
|
|
+ optional<int64_t> value = bld->config.GetInt(key);
|
|
|
+ if (value)
|
|
|
+ {
|
|
|
+ if (!isValidInt(*value))
|
|
|
+ return 1;
|
|
|
+
|
|
|
+ *val = *value;
|
|
|
+ }
|
|
|
+ // If not found, don't change val
|
|
|
+ return 0;
|
|
|
+}
|