|
@@ -40,6 +40,16 @@
|
|
|
|
|
|
#define CLIENT_NN_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientProtocol"
|
|
|
|
|
|
+#define FS_PERMISSIONS_UMASK_KEY "fs.permissions.umask-mode"
|
|
|
+
|
|
|
+#define FS_PERMISSIONS_UMASK_DEFAULT "022"
|
|
|
+
|
|
|
+#define DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL \
|
|
|
+ "dfs.client.write.exclude.nodes.cache.expiry.interval.millis"
|
|
|
+
|
|
|
+#define DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT \
|
|
|
+ (10LL * 60LL * 1000LL)
|
|
|
+
|
|
|
struct native_fs {
|
|
|
/** Fields common to all filesystems. */
|
|
|
struct hadoop_fs_base base;
|
|
@@ -68,6 +78,12 @@ struct native_fs {
|
|
|
|
|
|
/** Lock which protects the working_uri. */
|
|
|
uv_mutex_t working_uri_lock;
|
|
|
+
|
|
|
+ /** Umask to use when creating files */
|
|
|
+ uint32_t umask;
|
|
|
+
|
|
|
+ /** How long to wait, in nanoseconds, before re-trying a dead datanode. */
|
|
|
+ uint64_t dead_dn_timeout_ns;
|
|
|
};
|
|
|
|
|
|
/**
|
|
@@ -116,6 +132,9 @@ struct ndfs_server_defaults {
|
|
|
|
|
|
static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri);
|
|
|
|
|
|
+static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
|
|
|
+ struct hconf *hconf);
|
|
|
+
|
|
|
static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
|
|
|
{
|
|
|
hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
|
|
@@ -368,6 +387,9 @@ struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
|
|
|
goto done;
|
|
|
}
|
|
|
working_dir_lock_created = 1;
|
|
|
+ err = ndfs_connect_setup_conf(fs, hdfs_bld->hconf);
|
|
|
+ if (err)
|
|
|
+ goto done;
|
|
|
|
|
|
// Ask the NameNode about our server defaults. We'll use this information
|
|
|
// later in ndfs_get_default_block_size, and when writing new files. Just
|
|
@@ -400,6 +422,53 @@ done:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+static struct hadoop_err *parse_permission(const char *str, uint32_t *perm)
|
|
|
+{
|
|
|
+ if (strspn(str, " 01234567") != strlen(str)) {
|
|
|
+ // TODO: support permission strings as in PermissionParser.java
|
|
|
+ return hadoop_lerr_alloc(ENOTSUP, "parse_permission(%s): "
|
|
|
+ "can't parse non-octal permissions (yet)", str);
|
|
|
+ }
|
|
|
+ errno = 0;
|
|
|
+ *perm = strtol(str, NULL, 8);
|
|
|
+ if (errno) {
|
|
|
+ int ret = errno;
|
|
|
+ return hadoop_lerr_alloc(EINVAL, "parse_permission(%s): "
|
|
|
+ "failed to parse this octal string: %s",
|
|
|
+ str, terror(ret));
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Configure the native file system using the Hadoop configuration.
|
|
|
+ *
|
|
|
+ * @param fs The filesystem to set configuration keys for.
|
|
|
+ * @param hconf The configuration object to read from.
|
|
|
+ */
|
|
|
+static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
|
|
|
+ struct hconf *hconf)
|
|
|
+{
|
|
|
+ struct hadoop_err *err = NULL;
|
|
|
+ const char *umask_str;
|
|
|
+ int64_t timeout_ms;
|
|
|
+
|
|
|
+ umask_str = hconf_get(hconf, FS_PERMISSIONS_UMASK_KEY);
|
|
|
+ if (!umask_str)
|
|
|
+ umask_str = FS_PERMISSIONS_UMASK_DEFAULT;
|
|
|
+ err = parse_permission(umask_str, &fs->umask);
|
|
|
+ if (err) {
|
|
|
+ return hadoop_err_prepend(err, 0, "ndfs_connect_setup_conf: "
|
|
|
+ "error handling %s", FS_PERMISSIONS_UMASK_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ timeout_ms = DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
|
|
+ hconf_get_int64(hconf,
|
|
|
+ DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, &timeout_ms);
|
|
|
+ fs->dead_dn_timeout_ns = timeout_ms * 1000LL;
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
static int ndfs_disconnect(hdfsFS bfs)
|
|
|
{
|
|
|
struct native_fs *fs = (struct native_fs*)bfs;
|
|
@@ -720,6 +789,7 @@ static int ndfs_mkdir(hdfsFS bfs, const char* uri)
|
|
|
struct hadoop_err *err = NULL;
|
|
|
MkdirsRequestProto req = MKDIRS_REQUEST_PROTO__INIT;
|
|
|
MkdirsResponseProto *resp = NULL;
|
|
|
+ FsPermissionProto perm = FS_PERMISSION_PROTO__INIT;
|
|
|
struct hrpc_proxy proxy;
|
|
|
char *path = NULL;
|
|
|
|
|
@@ -729,6 +799,10 @@ static int ndfs_mkdir(hdfsFS bfs, const char* uri)
|
|
|
goto done;
|
|
|
}
|
|
|
req.src = path;
|
|
|
+ // TODO: a better libhdfs API would allow us to specify what mode to
|
|
|
+ // create a particular directory with.
|
|
|
+ perm.perm = 0777 & (~fs->umask);
|
|
|
+ req.masked = &perm;
|
|
|
req.createparent = 1; // TODO: add libhdfs API for non-recursive mkdir
|
|
|
err = cnn_mkdirs(&proxy, &req, &resp);
|
|
|
if (err) {
|