|
@@ -33,7 +33,6 @@
|
|
|
#include <stdlib.h>
|
|
|
#include <string.h>
|
|
|
#include <strings.h>
|
|
|
-#include <uriparser/Uri.h>
|
|
|
#include <uv.h>
|
|
|
|
|
|
#define DEFAULT_NN_PORT 8020
|
|
@@ -67,14 +66,17 @@ struct native_fs {
|
|
|
/** The default block size obtained from getServerDefaults. */
|
|
|
int64_t default_block_size;
|
|
|
|
|
|
- /** User name to use for RPCs. Immutable. */
|
|
|
- char *user_name;
|
|
|
+ /** Prefix to use when building URLs. */
|
|
|
+ char *url_prefix;
|
|
|
+
|
|
|
+ /** URI that was used when connecting. */
|
|
|
+ struct hadoop_uri *conn_uri;
|
|
|
|
|
|
/**
|
|
|
* A dynamically allocated working directory which will be prepended to
|
|
|
* all relative paths.
|
|
|
*/
|
|
|
- UriUriA *working_uri;
|
|
|
+ struct hadoop_uri *working_uri;
|
|
|
|
|
|
/** Lock which protects the working_uri. */
|
|
|
uv_mutex_t working_uri_lock;
|
|
@@ -135,10 +137,12 @@ static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri);
|
|
|
static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
|
|
|
struct hconf *hconf);
|
|
|
|
|
|
+static void ndfs_free(struct native_fs *fs);
|
|
|
+
|
|
|
static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
|
|
|
{
|
|
|
hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
|
|
|
- fs->user_name);
|
|
|
+ fs->conn_uri->user_info);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -153,47 +157,34 @@ static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
|
|
|
static struct hadoop_err *build_path(struct native_fs *fs, const char *uri_str,
|
|
|
char **out)
|
|
|
{
|
|
|
- char *path = NULL;
|
|
|
struct hadoop_err *err = NULL;
|
|
|
- UriParserStateA uri_state;
|
|
|
- UriUriA uri;
|
|
|
-
|
|
|
- memset(&uri_state, 0, sizeof(uri_state));
|
|
|
+ struct hadoop_uri *uri = NULL;
|
|
|
|
|
|
uv_mutex_lock(&fs->working_uri_lock);
|
|
|
- err = uri_parse(uri_str, &uri_state, &uri, fs->working_uri);
|
|
|
+ err = hadoop_uri_parse(uri_str, fs->working_uri, &uri, H_URI_PARSE_PATH);
|
|
|
if (err)
|
|
|
goto done;
|
|
|
// TODO: check URI scheme and user against saved values?
|
|
|
- err = uri_get_path(&uri, &path);
|
|
|
- if (err)
|
|
|
+ if (uri->path[0]) {
|
|
|
+ *out = strdup(uri->path);
|
|
|
+ } else {
|
|
|
+ // As a special case, when the URI given has an empty path, we assume that
|
|
|
+ // we want the current working directory. This is to allow things like
|
|
|
+ // hdfs://mynamenode to map to the current working directory, as they do in
|
|
|
+ // Hadoop. Note that this is different than hdfs://mynamenode/ (note the
|
|
|
+ // trailing slash) which maps to the root directory.
|
|
|
+ *out = strdup(fs->working_uri->path);
|
|
|
+ }
|
|
|
+ if (!*out) {
|
|
|
+ err = hadoop_lerr_alloc(ENOMEM, "build_path: out of memory.");
|
|
|
goto done;
|
|
|
- // As a special case, when the URI given has an empty path, we assume that
|
|
|
- // we want the current working directory. This is to allow things like
|
|
|
- // hdfs://mynamenode to map to the current working directory, as they do in
|
|
|
- // Hadoop. Note that this is different than hdfs://mynamenode/ (note the
|
|
|
- // trailing slash) which maps to the root directory.
|
|
|
- if (!path[0]) {
|
|
|
- free(path);
|
|
|
- path = NULL;
|
|
|
- err = uri_get_path(fs->working_uri, &path);
|
|
|
- if (err) {
|
|
|
- goto done;
|
|
|
- }
|
|
|
}
|
|
|
err = NULL;
|
|
|
|
|
|
done:
|
|
|
uv_mutex_unlock(&fs->working_uri_lock);
|
|
|
- if (uri_state.uri) {
|
|
|
- uriFreeUriMembersA(&uri);
|
|
|
- }
|
|
|
- if (err) {
|
|
|
- free(path);
|
|
|
- return err;
|
|
|
- }
|
|
|
- *out = path;
|
|
|
- return NULL;
|
|
|
+ hadoop_uri_free(uri);
|
|
|
+ return err;
|
|
|
}
|
|
|
|
|
|
static int ndfs_file_is_open_for_read(hdfsFile bfile)
|
|
@@ -307,8 +298,8 @@ done:
|
|
|
return err;
|
|
|
}
|
|
|
|
|
|
-static struct hadoop_err *get_namenode_addr(const struct hdfsBuilder *hdfs_bld,
|
|
|
- struct sockaddr_in *nn_addr)
|
|
|
+static struct hadoop_err *get_namenode_addr(const struct hadoop_uri *conn_uri,
|
|
|
+ const struct hdfsBuilder *hdfs_bld, struct sockaddr_in *nn_addr)
|
|
|
{
|
|
|
const char *nameservice_id;
|
|
|
const char *rpc_addr;
|
|
@@ -322,7 +313,7 @@ static struct hadoop_err *get_namenode_addr(const struct hdfsBuilder *hdfs_bld,
|
|
|
if (rpc_addr) {
|
|
|
return parse_rpc_addr(rpc_addr, nn_addr, hdfs_bld->port);
|
|
|
}
|
|
|
- return parse_rpc_addr(hdfs_bld->uri_authority, nn_addr, hdfs_bld->port);
|
|
|
+ return parse_rpc_addr(conn_uri->auth, nn_addr, hdfs_bld->port);
|
|
|
}
|
|
|
|
|
|
struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
|
|
@@ -332,9 +323,8 @@ struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
|
|
|
struct native_fs *fs = NULL;
|
|
|
struct hrpc_messenger_builder *msgr_bld;
|
|
|
struct ndfs_server_defaults defaults;
|
|
|
- int working_dir_lock_created = 0;
|
|
|
+ int used_port;
|
|
|
char *working_dir = NULL;
|
|
|
- UriParserStateA uri_state;
|
|
|
|
|
|
fs = calloc(1, sizeof(*fs));
|
|
|
if (!fs) {
|
|
@@ -343,11 +333,23 @@ struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
|
|
|
goto done;
|
|
|
}
|
|
|
fs->base.ty = HADOOP_FS_TY_NDFS;
|
|
|
- fs->user_name = strdup(hdfs_bld->uri_user_info);
|
|
|
- if (!fs->user_name) {
|
|
|
- err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
|
|
|
- "for the user name.");
|
|
|
- goto done;
|
|
|
+ fs->conn_uri = hdfs_bld->uri;
|
|
|
+ hdfs_bld->uri = NULL;
|
|
|
+ // Calculate our url_prefix. We'll need this when spitting out URIs from
|
|
|
+ // listStatus and getFileInfo. We don't include the port in this URL
|
|
|
+ // prefix unless it is non-standard.
|
|
|
+ used_port = ntohs(fs->nn_addr.sin_port);
|
|
|
+ if (used_port == DEFAULT_NN_PORT) {
|
|
|
+ err = dynprintf(&fs->url_prefix, "%s://%s",
|
|
|
+ fs->conn_uri->scheme, fs->conn_uri->auth);
|
|
|
+ if (err)
|
|
|
+ goto done;
|
|
|
+ } else {
|
|
|
+ err = dynprintf(&fs->url_prefix, "%s://%s:%d",
|
|
|
+ fs->conn_uri->scheme, fs->conn_uri->auth,
|
|
|
+ used_port);
|
|
|
+ if (err)
|
|
|
+ goto done;
|
|
|
}
|
|
|
msgr_bld = hrpc_messenger_builder_alloc();
|
|
|
if (!msgr_bld) {
|
|
@@ -355,38 +357,31 @@ struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
|
|
|
"for a messenger builder.");
|
|
|
goto done;
|
|
|
}
|
|
|
- err = get_namenode_addr(hdfs_bld, &fs->nn_addr);
|
|
|
+ err = get_namenode_addr(fs->conn_uri, hdfs_bld, &fs->nn_addr);
|
|
|
if (err)
|
|
|
goto done;
|
|
|
err = hrpc_messenger_create(msgr_bld, &fs->msgr);
|
|
|
if (err)
|
|
|
goto done;
|
|
|
// Get the default working directory
|
|
|
- if (asprintf(&working_dir, "%s:///user/%s/",
|
|
|
- hdfs_bld->uri_scheme, hdfs_bld->uri_user_info) < 0) {
|
|
|
- working_dir = NULL;
|
|
|
- err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
|
|
|
- "working_dir");
|
|
|
- goto done;
|
|
|
- }
|
|
|
- fs->working_uri = calloc(1, sizeof(*(fs->working_uri)));
|
|
|
- if (!fs->working_uri) {
|
|
|
- err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
|
|
|
- "fs->working_uri");
|
|
|
+ if (uv_mutex_init(&fs->working_uri_lock) < 0) {
|
|
|
+ err = hadoop_lerr_alloc(ENOMEM, "failed to create a mutex.");
|
|
|
goto done;
|
|
|
}
|
|
|
- err = uri_parse_abs(working_dir, &uri_state, fs->working_uri,
|
|
|
- hdfs_bld->uri_scheme);
|
|
|
+ err = dynprintf(&working_dir, "%s:///user/%s/",
|
|
|
+ fs->conn_uri->scheme, fs->conn_uri->user_info);
|
|
|
if (err) {
|
|
|
- free(fs->working_uri);
|
|
|
- fs->working_uri = NULL;
|
|
|
+ uv_mutex_destroy(&fs->working_uri_lock);
|
|
|
goto done;
|
|
|
}
|
|
|
- if (uv_mutex_init(&fs->working_uri_lock) < 0) {
|
|
|
- err = hadoop_lerr_alloc(ENOMEM, "failed to create a mutex.");
|
|
|
+ err = hadoop_uri_parse(working_dir, NULL, &fs->working_uri,
|
|
|
+ H_URI_PARSE_ALL | H_URI_APPEND_SLASH);
|
|
|
+ if (err) {
|
|
|
+ uv_mutex_destroy(&fs->working_uri_lock);
|
|
|
+ err = hadoop_err_prepend(err, 0, "ndfs_connect: error parsing "
|
|
|
+ "working directory");
|
|
|
goto done;
|
|
|
}
|
|
|
- working_dir_lock_created = 1;
|
|
|
err = ndfs_connect_setup_conf(fs, hdfs_bld->hconf);
|
|
|
if (err)
|
|
|
goto done;
|
|
@@ -405,17 +400,7 @@ struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
|
|
|
done:
|
|
|
free(working_dir);
|
|
|
if (err) {
|
|
|
- if (fs) {
|
|
|
- free(fs->user_name);
|
|
|
- if (fs->working_uri) {
|
|
|
- uriFreeUriMembersA(fs->working_uri);
|
|
|
- free(fs->working_uri);
|
|
|
- }
|
|
|
- if (working_dir_lock_created) {
|
|
|
- uv_mutex_destroy(&fs->working_uri_lock);
|
|
|
- }
|
|
|
- free(fs);
|
|
|
- }
|
|
|
+ ndfs_free(fs);
|
|
|
return err;
|
|
|
}
|
|
|
*out = (struct hdfs_internal *)fs;
|
|
@@ -469,17 +454,24 @@ static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
-static int ndfs_disconnect(hdfsFS bfs)
|
|
|
+static void ndfs_free(struct native_fs *fs)
|
|
|
{
|
|
|
- struct native_fs *fs = (struct native_fs*)bfs;
|
|
|
-
|
|
|
hrpc_messenger_shutdown(fs->msgr);
|
|
|
hrpc_messenger_free(fs->msgr);
|
|
|
- free(fs->user_name);
|
|
|
- uriFreeUriMembersA(fs->working_uri);
|
|
|
- free(fs->working_uri);
|
|
|
- uv_mutex_destroy(&fs->working_uri_lock);
|
|
|
+ free(fs->url_prefix);
|
|
|
+ hadoop_uri_free(fs->conn_uri);
|
|
|
+ if (fs->working_uri) {
|
|
|
+ hadoop_uri_free(fs->working_uri);
|
|
|
+ uv_mutex_destroy(&fs->working_uri_lock);
|
|
|
+ }
|
|
|
free(fs);
|
|
|
+}
|
|
|
+
|
|
|
+static int ndfs_disconnect(hdfsFS bfs)
|
|
|
+{
|
|
|
+ struct native_fs *fs = (struct native_fs*)bfs;
|
|
|
+
|
|
|
+ ndfs_free(fs);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -717,16 +709,9 @@ static char* ndfs_get_working_directory(hdfsFS bfs, char *buffer,
|
|
|
size_t len;
|
|
|
struct native_fs *fs = (struct native_fs *)bfs;
|
|
|
struct hadoop_err *err = NULL;
|
|
|
- char *working_path = NULL;
|
|
|
|
|
|
uv_mutex_lock(&fs->working_uri_lock);
|
|
|
- err = uri_get_path(fs->working_uri, &working_path);
|
|
|
- if (err) {
|
|
|
- err = hadoop_err_prepend(err, 0, "ndfs_get_working_directory: failed "
|
|
|
- "to get the path of the working_uri.");
|
|
|
- goto done;
|
|
|
- }
|
|
|
- len = strlen(working_path);
|
|
|
+ len = strlen(fs->conn_uri->path);
|
|
|
if (len + 1 > bufferSize) {
|
|
|
err = hadoop_lerr_alloc(ENAMETOOLONG, "ndfs_get_working_directory: "
|
|
|
"the buffer supplied was only %zd bytes, but we would need "
|
|
@@ -734,52 +719,31 @@ static char* ndfs_get_working_directory(hdfsFS bfs, char *buffer,
|
|
|
bufferSize, len + 1);
|
|
|
goto done;
|
|
|
}
|
|
|
- strcpy(buffer, working_path);
|
|
|
+ strcpy(buffer, fs->conn_uri->path);
|
|
|
done:
|
|
|
uv_mutex_unlock(&fs->working_uri_lock);
|
|
|
- free(working_path);
|
|
|
return hadoopfs_errno_and_retptr(err, buffer);
|
|
|
}
|
|
|
|
|
|
static int ndfs_set_working_directory(hdfsFS bfs, const char* uri_str)
|
|
|
{
|
|
|
struct native_fs *fs = (struct native_fs *)bfs;
|
|
|
- char *path = NULL;
|
|
|
- char *scheme = NULL;
|
|
|
struct hadoop_err *err = NULL;
|
|
|
- UriParserStateA uri_state;
|
|
|
- UriUriA *uri = NULL;
|
|
|
+ struct hadoop_uri *uri = NULL;
|
|
|
|
|
|
uv_mutex_lock(&fs->working_uri_lock);
|
|
|
- uri = calloc(1, sizeof(*uri));
|
|
|
- if (!uri) {
|
|
|
- err = hadoop_lerr_alloc(ENOMEM, "ndfs_set_working_directory: OOM");
|
|
|
- goto done;
|
|
|
- }
|
|
|
- err = uri_get_scheme(fs->working_uri, &scheme);
|
|
|
+ err = hadoop_uri_parse(uri_str, fs->working_uri, &uri,
|
|
|
+ H_URI_PARSE_ALL | H_URI_APPEND_SLASH);
|
|
|
if (err) {
|
|
|
- err = hadoop_err_prepend(err, ENOMEM, "ndfs_set_working_directory: "
|
|
|
- "failed to get scheme of current working_uri");
|
|
|
+ err = hadoop_err_prepend(err, 0, "ndfs_set_working_directory: ");
|
|
|
goto done;
|
|
|
}
|
|
|
- err = build_path(fs, uri_str, &path);
|
|
|
- if (err)
|
|
|
- goto done;
|
|
|
- err = uri_parse_abs(path, &uri_state, uri, scheme);
|
|
|
- if (err)
|
|
|
- goto done;
|
|
|
- uriFreeUriMembersA(fs->working_uri);
|
|
|
- free(fs->working_uri);
|
|
|
+ hadoop_uri_free(fs->working_uri);
|
|
|
fs->working_uri = uri;
|
|
|
err = NULL;
|
|
|
|
|
|
done:
|
|
|
- if (err) {
|
|
|
- free(uri);
|
|
|
- }
|
|
|
uv_mutex_unlock(&fs->working_uri_lock);
|
|
|
- free(scheme);
|
|
|
- free(path);
|
|
|
return hadoopfs_errno_and_retcode(err);
|
|
|
}
|
|
|
|