1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "common/hadoop_err.h"
- #include "common/hconf.h"
- #include "common/net.h"
- #include "common/string.h"
- #include "common/uri.h"
- #include "fs/common.h"
- #include "fs/fs.h"
- #include "protobuf/ClientNamenodeProtocol.call.h"
- #include "protobuf/hdfs.pb-c.h.s"
- #include "rpc/messenger.h"
- #include "rpc/proxy.h"
- #include <netinet/in.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <strings.h>
- #include <uriparser/Uri.h>
- #include <uv.h>
- #define DEFAULT_NN_PORT 8020
- #define CLIENT_NN_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientProtocol"
- struct native_fs {
- /** Fields common to all filesystems. */
- struct hadoop_fs_base base;
- /**
- * Address of the namenode.
- * TODO: implement HA failover
- * TODO: implement IPv6
- */
- struct sockaddr_in nn_addr;
- /** The messenger used to perform RPCs. */
- struct hrpc_messenger *msgr;
- /** The default block size obtained from getServerDefaults. */
- int64_t default_block_size;
- /** User name to use for RPCs. Immutable. */
- char *user_name;
- /**
- * A dynamically allocated working directory which will be prepended to
- * all relative paths.
- */
- UriUriA *working_uri;
- /** Lock which protects the working_uri. */
- uv_mutex_t working_uri_lock;
- };
- /**
- * Set if the file is read-only... otherwise, the file is assumed to be
- * write-only.
- */
- #define NDFS_FILE_FLAG_RO 0x1
- /** This flag is for compatibility with some old test harnesses. */
- #define NDFS_FILE_FLAG_DISABLE_DIRECT_READ 0x2
- /** Base class for both read-only and write-only files. */
- struct native_file_base {
- /** Fields common to all filesystems. */
- struct hadoop_file_base base;
- /** NDFS file flags. */
- int flags;
- };
- /** A read-only file. */
- struct native_ro_file {
- struct native_file_base base;
- uint64_t bytes_read;
- };
- /** A write-only file. */
- struct native_wo_file {
- struct native_file_base base;
- };
- /** Whole-filesystem stats sent back from the NameNode. */
- struct hadoop_vfs_stats {
- int64_t capacity;
- int64_t used;
- int64_t remaining;
- int64_t under_replicated;
- int64_t corrupt_blocks;
- int64_t missing_blocks;
- };
- /** Server defaults sent back from the NameNode. */
- struct ndfs_server_defaults {
- uint64_t blocksize;
- };
- static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri);
- static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
- {
- hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
- fs->user_name);
- }
- /**
- * Construct a canonical path from a URI.
- *
- * @param fs The filesystem.
- * @param uri The URI.
- * @param out (out param) the canonical path.
- *
- * @return NULL on success; the error otherwise.
- */
- static struct hadoop_err *build_path(struct native_fs *fs, const char *uri_str,
- char **out)
- {
- char *path = NULL;
- struct hadoop_err *err = NULL;
- UriParserStateA uri_state;
- UriUriA uri;
- memset(&uri_state, 0, sizeof(uri_state));
- uv_mutex_lock(&fs->working_uri_lock);
- err = uri_parse(uri_str, &uri_state, &uri, fs->working_uri);
- if (err)
- goto done;
- // TODO: check URI scheme and user against saved values?
- err = uri_get_path(&uri, &path);
- if (err)
- goto done;
- // As a special case, when the URI given has an empty path, we assume that
- // we want the current working directory. This is to allow things like
- // hdfs://mynamenode to map to the current working directory, as they do in
- // Hadoop. Note that this is different than hdfs://mynamenode/ (note the
- // trailing slash) which maps to the root directory.
- if (!path[0]) {
- free(path);
- path = NULL;
- err = uri_get_path(fs->working_uri, &path);
- if (err) {
- goto done;
- }
- }
- err = NULL;
- done:
- uv_mutex_unlock(&fs->working_uri_lock);
- if (uri_state.uri) {
- uriFreeUriMembersA(&uri);
- }
- if (err) {
- free(path);
- return err;
- }
- *out = path;
- return NULL;
- }
- static int ndfs_file_is_open_for_read(hdfsFile bfile)
- {
- struct native_file_base *file = (struct native_file_base *)bfile;
- return !!(file->flags & NDFS_FILE_FLAG_RO);
- }
- static int ndfs_file_is_open_for_write(hdfsFile bfile)
- {
- struct native_file_base *file = (struct native_file_base *)bfile;
- return !(file->flags & NDFS_FILE_FLAG_RO);
- }
- static int ndfs_file_get_read_statistics(hdfsFile bfile,
- struct hdfsReadStatistics **out)
- {
- struct hdfsReadStatistics *stats;
- struct native_ro_file *file = (struct native_ro_file *)bfile;
- if (!(file->base.flags & NDFS_FILE_FLAG_RO)) {
- errno = EINVAL;
- return -1;
- }
- stats = calloc(1, sizeof(*stats));
- if (!stats) {
- errno = ENOMEM;
- return -1;
- }
- stats->totalBytesRead = file->bytes_read;
- *out = stats;
- return 0;
- }
- static struct hadoop_err *ndfs_get_server_defaults(struct native_fs *fs,
- struct ndfs_server_defaults *defaults)
- {
- struct hadoop_err *err = NULL;
- GetServerDefaultsRequestProto req =
- GET_SERVER_DEFAULTS_REQUEST_PROTO__INIT;
- GetServerDefaultsResponseProto *resp = NULL;
- struct hrpc_proxy proxy;
- ndfs_nn_proxy_init(fs, &proxy);
- err = cnn_get_server_defaults(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- defaults->blocksize = resp->serverdefaults->blocksize;
- done:
- if (resp) {
- get_server_defaults_response_proto__free_unpacked(resp, NULL);
- }
- return err;
- }
- /**
- * Parse an address in the form <hostname> or <hostname>:<port>.
- *
- * @param host The hostname
- * @param addr (out param) The sockaddr.
- * @param default_port The default port to use, if one is not found in the
- * string.
- *
- * @return NULL on success; the error otherwise.
- */
- static struct hadoop_err *parse_rpc_addr(const char *input,
- struct sockaddr_in *out, int default_port)
- {
- struct hadoop_err *err = NULL;
- char *host, *colon;
- uint32_t addr;
- int port;
- fprintf(stderr, "parse_rpc_addr(input=%s, default_port=%d)\n",
- input, default_port);
- // If the URI doesn't contain a port, we use a default.
- // This may come either from the hdfsBuilder, or from the
- // 'default default' for HDFS.
- // It's kind of silly that hdfsBuilder even includes this field, since this
- // information should just be included in the URI, but this is here for
- // compatibility.
- port = (default_port <= 0) ? DEFAULT_NN_PORT : default_port;
- host = strdup(input);
- if (!host) {
- err = hadoop_lerr_alloc(ENOMEM, "parse_rpc_addr: OOM");
- goto done;
- }
- colon = index(host, ':');
- if (colon) {
- // If the URI has a colon, we parse the next part as a port.
- char *port_str = colon + 1;
- *colon = '\0';
- port = atoi(colon);
- if ((port <= 0) || (port >= 65536)) {
- err = hadoop_lerr_alloc(EINVAL, "parse_rpc_addr: invalid port "
- "string %s", port_str);
- goto done;
- }
- }
- err = get_first_ipv4_addr(host, &addr);
- if (err)
- goto done;
- out->sin_family = AF_INET;
- out->sin_port = htons(port);
- out->sin_addr.s_addr = htonl(addr);
- done:
- free(host);
- return err;
- }
- static struct hadoop_err *get_namenode_addr(const struct hdfsBuilder *hdfs_bld,
- struct sockaddr_in *nn_addr)
- {
- const char *nameservice_id;
- const char *rpc_addr;
- nameservice_id = hconf_get(hdfs_bld->hconf, "dfs.nameservice.id");
- if (nameservice_id) {
- return hadoop_lerr_alloc(ENOTSUP, "get_namenode_addr: we "
- "don't yet support HA or federated configurations.");
- }
- rpc_addr = hconf_get(hdfs_bld->hconf, "dfs.namenode.rpc-address");
- if (rpc_addr) {
- return parse_rpc_addr(rpc_addr, nn_addr, hdfs_bld->port);
- }
- return parse_rpc_addr(hdfs_bld->uri_authority, nn_addr, hdfs_bld->port);
- }
- struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
- struct hdfs_internal **out)
- {
- struct hadoop_err *err = NULL;
- struct native_fs *fs = NULL;
- struct hrpc_messenger_builder *msgr_bld;
- struct ndfs_server_defaults defaults;
- int working_dir_lock_created = 0;
- char *working_dir = NULL;
- UriParserStateA uri_state;
- fs = calloc(1, sizeof(*fs));
- if (!fs) {
- err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
- "for a native_fs structure.");
- goto done;
- }
- fs->base.ty = HADOOP_FS_TY_NDFS;
- fs->user_name = strdup(hdfs_bld->uri_user_info);
- if (!fs->user_name) {
- err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
- "for the user name.");
- goto done;
- }
- msgr_bld = hrpc_messenger_builder_alloc();
- if (!msgr_bld) {
- err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
- "for a messenger builder.");
- goto done;
- }
- err = get_namenode_addr(hdfs_bld, &fs->nn_addr);
- if (err)
- goto done;
- err = hrpc_messenger_create(msgr_bld, &fs->msgr);
- if (err)
- goto done;
- // Get the default working directory
- if (asprintf(&working_dir, "%s:///user/%s/",
- hdfs_bld->uri_scheme, hdfs_bld->uri_user_info) < 0) {
- working_dir = NULL;
- err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
- "working_dir");
- goto done;
- }
- fs->working_uri = calloc(1, sizeof(*(fs->working_uri)));
- if (!fs->working_uri) {
- err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
- "fs->working_uri");
- goto done;
- }
- err = uri_parse_abs(working_dir, &uri_state, fs->working_uri,
- hdfs_bld->uri_scheme);
- if (err) {
- free(fs->working_uri);
- fs->working_uri = NULL;
- goto done;
- }
- if (uv_mutex_init(&fs->working_uri_lock) < 0) {
- err = hadoop_lerr_alloc(ENOMEM, "failed to create a mutex.");
- goto done;
- }
- working_dir_lock_created = 1;
- // Ask the NameNode about our server defaults. We'll use this information
- // later in ndfs_get_default_block_size, and when writing new files. Just
- // as important, tghis validates that we can talk to the NameNode with our
- // current configuration.
- memset(&defaults, 0, sizeof(defaults));
- err = ndfs_get_server_defaults(fs, &defaults);
- if (err)
- goto done;
- fs->default_block_size = defaults.blocksize;
- err = NULL;
- done:
- free(working_dir);
- if (err) {
- if (fs) {
- free(fs->user_name);
- if (fs->working_uri) {
- uriFreeUriMembersA(fs->working_uri);
- free(fs->working_uri);
- }
- if (working_dir_lock_created) {
- uv_mutex_destroy(&fs->working_uri_lock);
- }
- free(fs);
- }
- return err;
- }
- *out = (struct hdfs_internal *)fs;
- return NULL;
- }
- static int ndfs_disconnect(hdfsFS bfs)
- {
- struct native_fs *fs = (struct native_fs*)bfs;
- hrpc_messenger_shutdown(fs->msgr);
- hrpc_messenger_free(fs->msgr);
- free(fs->user_name);
- uriFreeUriMembersA(fs->working_uri);
- free(fs->working_uri);
- uv_mutex_destroy(&fs->working_uri_lock);
- free(fs);
- return 0;
- }
- static struct hadoop_err *ndfs_open_file_for_read(
- struct native_ro_file **out __attribute__((unused)),
- struct native_fs *fs __attribute__((unused)),
- const char *uri __attribute__((unused)))
- {
- errno = ENOTSUP;
- return NULL;
- }
- static struct hadoop_err *ndfs_open_file_for_write(
- struct native_ro_file **out __attribute__((unused)),
- struct native_fs *fs __attribute__((unused)),
- const char *uri __attribute__((unused)),
- int buffer_size __attribute__((unused)),
- short replication __attribute__((unused)),
- tSize block_size __attribute__((unused)))
- {
- errno = ENOTSUP;
- return NULL;
- }
- static hdfsFile ndfs_open_file(hdfsFS bfs, const char* uri, int flags,
- int buffer_size, short replication, tSize block_size)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- struct native_ro_file *file = NULL;
- struct hadoop_err *err;
- int accmode;
- char *path = NULL;
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- accmode = flags & O_ACCMODE;
- if (accmode == O_RDONLY) {
- err = ndfs_open_file_for_read(&file, fs, path);
- } else if (accmode == O_WRONLY) {
- err = ndfs_open_file_for_write(&file, fs, path,
- buffer_size, replication, block_size);
- } else {
- err = hadoop_lerr_alloc(EINVAL, "cannot open a hadoop file in "
- "mode 0x%x\n", accmode);
- }
- done:
- free(path);
- return hadoopfs_errno_and_retptr(err, file);
- }
- static int ndfs_close_file(hdfsFS fs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_file_exists(hdfsFS bfs, const char *uri)
- {
- static hdfsFileInfo *info;
- info = ndfs_get_path_info(bfs, uri);
- if (!info) {
- // errno will be set
- return -1;
- }
- hdfsFreeFileInfo(info, 1);
- return 0;
- }
- static int ndfs_seek(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)),
- tOffset desiredPos __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static tOffset ndfs_tell(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static tSize ndfs_read(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)),
- void *buffer __attribute__((unused)),
- tSize length __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static tSize ndfs_pread(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)),
- tOffset position __attribute__((unused)),
- void* buffer __attribute__((unused)),
- tSize length __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static tSize ndfs_write(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)),
- const void* buffer __attribute__((unused)),
- tSize length __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_flush(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_hflush(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_hsync(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_available(hdfsFS bfs __attribute__((unused)),
- hdfsFile bfile __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_copy(hdfsFS srcFS __attribute__((unused)),
- const char* src __attribute__((unused)),
- hdfsFS dstFS __attribute__((unused)),
- const char* dst __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_move(hdfsFS srcFS __attribute__((unused)),
- const char* src __attribute__((unused)),
- hdfsFS dstFS __attribute__((unused)),
- const char* dst __attribute__((unused)))
- {
- errno = ENOTSUP;
- return -1;
- }
- static int ndfs_unlink(struct hdfs_internal *bfs,
- const char *uri, int recursive)
- {
- struct native_fs *fs = (struct native_fs*)bfs;
- struct hadoop_err *err = NULL;
- DeleteRequestProto req = DELETE_REQUEST_PROTO__INIT;
- struct hrpc_proxy proxy;
- DeleteResponseProto *resp = NULL;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.src = path;
- req.recursive = !!recursive;
- err = cnn_delete(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- done:
- free(path);
- if (resp) {
- delete_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static int ndfs_rename(hdfsFS bfs, const char *src_uri, const char *dst_uri)
- {
- struct native_fs *fs = (struct native_fs*)bfs;
- struct hadoop_err *err = NULL;
- Rename2RequestProto req = RENAME2_REQUEST_PROTO__INIT;
- Rename2ResponseProto *resp = NULL;
- struct hrpc_proxy proxy;
- char *src_path = NULL, *dst_path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, src_uri, &src_path);
- if (err) {
- goto done;
- }
- err = build_path(fs, dst_uri, &dst_path);
- if (err) {
- goto done;
- }
- req.src = src_path;
- req.dst = dst_path;
- req.overwritedest = 0; // TODO: support overwrite
- err = cnn_rename2(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- done:
- free(src_path);
- free(dst_path);
- if (resp) {
- rename2_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static char* ndfs_get_working_directory(hdfsFS bfs, char *buffer,
- size_t bufferSize)
- {
- size_t len;
- struct native_fs *fs = (struct native_fs *)bfs;
- struct hadoop_err *err = NULL;
- char *working_path = NULL;
- uv_mutex_lock(&fs->working_uri_lock);
- err = uri_get_path(fs->working_uri, &working_path);
- if (err) {
- err = hadoop_err_prepend(err, 0, "ndfs_get_working_directory: failed "
- "to get the path of the working_uri.");
- goto done;
- }
- len = strlen(working_path);
- if (len + 1 > bufferSize) {
- err = hadoop_lerr_alloc(ENAMETOOLONG, "ndfs_get_working_directory: "
- "the buffer supplied was only %zd bytes, but we would need "
- "%zd bytes to hold the working directory.",
- bufferSize, len + 1);
- goto done;
- }
- strcpy(buffer, working_path);
- done:
- uv_mutex_unlock(&fs->working_uri_lock);
- free(working_path);
- return hadoopfs_errno_and_retptr(err, buffer);
- }
- static int ndfs_set_working_directory(hdfsFS bfs, const char* uri_str)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- char *path = NULL;
- char *scheme = NULL;
- struct hadoop_err *err = NULL;
- UriParserStateA uri_state;
- UriUriA *uri = NULL;
- uv_mutex_lock(&fs->working_uri_lock);
- uri = calloc(1, sizeof(*uri));
- if (!uri) {
- err = hadoop_lerr_alloc(ENOMEM, "ndfs_set_working_directory: OOM");
- goto done;
- }
- err = uri_get_scheme(fs->working_uri, &scheme);
- if (err) {
- err = hadoop_err_prepend(err, ENOMEM, "ndfs_set_working_directory: "
- "failed to get scheme of current working_uri");
- goto done;
- }
- err = build_path(fs, uri_str, &path);
- if (err)
- goto done;
- err = uri_parse_abs(path, &uri_state, uri, scheme);
- if (err)
- goto done;
- uriFreeUriMembersA(fs->working_uri);
- free(fs->working_uri);
- fs->working_uri = uri;
- err = NULL;
- done:
- if (err) {
- free(uri);
- }
- uv_mutex_unlock(&fs->working_uri_lock);
- free(scheme);
- free(path);
- return hadoopfs_errno_and_retcode(err);
- }
- static int ndfs_mkdir(hdfsFS bfs, const char* uri)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- struct hadoop_err *err = NULL;
- MkdirsRequestProto req = MKDIRS_REQUEST_PROTO__INIT;
- MkdirsResponseProto *resp = NULL;
- struct hrpc_proxy proxy;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.src = path;
- req.createparent = 1; // TODO: add libhdfs API for non-recursive mkdir
- err = cnn_mkdirs(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- if (!resp->result) {
- err = hadoop_lerr_alloc(EEXIST, "ndfs_mkdir(%s): a path "
- "component already exists as a non-directory.", path);
- goto done;
- }
- err = NULL;
- done:
- free(path);
- if (resp) {
- mkdirs_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static int ndfs_set_replication(hdfsFS bfs, const char* uri,
- int16_t replication)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- struct hadoop_err *err = NULL;
- SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
- SetReplicationResponseProto *resp = NULL;
- struct hrpc_proxy proxy;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.src = path;
- req.replication = replication;
- err = cnn_set_replication(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- if (!resp->result) {
- err = hadoop_lerr_alloc(EINVAL, "ndfs_set_replication(%s): path "
- "does not exist or is not a regular file.", path);
- goto done;
- }
- done:
- free(path);
- if (resp) {
- set_replication_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static hdfsFileInfo* ndfs_list_directory(hdfsFS bfs __attribute__((unused)),
- const char* uri __attribute__((unused)),
- int *numEntries __attribute__((unused)))
- {
- errno = ENOTSUP;
- return NULL;
- }
- static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs __attribute__((unused)),
- const char* uri __attribute__((unused)))
- {
- errno = ENOTSUP;
- return NULL;
- }
- char***
- ndfs_get_hosts(hdfsFS bfs __attribute__((unused)),
- const char* path __attribute__((unused)),
- tOffset start __attribute__((unused)),
- tOffset length __attribute__((unused)))
- {
- errno = ENOTSUP;
- return NULL;
- }
- static tOffset ndfs_get_default_block_size(hdfsFS bfs)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- return fs->default_block_size;
- }
- static tOffset ndfs_get_default_block_size_at_path(hdfsFS bfs,
- const char *uri)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- struct hadoop_err *err = NULL;
- GetPreferredBlockSizeRequestProto req =
- GET_PREFERRED_BLOCK_SIZE_REQUEST_PROTO__INIT;
- GetPreferredBlockSizeResponseProto *resp = NULL;
- struct hrpc_proxy proxy;
- tOffset ret = 0;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.filename = path;
- err = cnn_get_preferred_block_size(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- ret = resp->bsize;
- err = NULL;
- done:
- free(path);
- if (resp) {
- get_preferred_block_size_response_proto__free_unpacked(resp, NULL);
- }
- if (err)
- return hadoopfs_errno_and_retcode(err);
- return ret;
- }
- static struct hadoop_err *ndfs_statvfs(struct hadoop_fs_base *hfs,
- struct hadoop_vfs_stats *stats)
- {
- struct native_fs *fs = (struct native_fs*)hfs;
- GetFsStatusRequestProto req = GET_FS_STATUS_REQUEST_PROTO__INIT;
- GetFsStatsResponseProto *resp = NULL;
- struct hadoop_err *err = NULL;
- struct hrpc_proxy proxy;
- ndfs_nn_proxy_init(fs, &proxy);
- err = cnn_get_fs_stats(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- stats->capacity = resp->capacity;
- stats->used = resp->used;
- stats->remaining = resp->remaining;
- stats->under_replicated = resp->under_replicated;
- stats->corrupt_blocks = resp->corrupt_blocks;
- stats->missing_blocks = resp->missing_blocks;
- done:
- if (resp) {
- get_fs_stats_response_proto__free_unpacked(resp, NULL);
- }
- return err;
- }
- static tOffset ndfs_get_capacity(hdfsFS bfs)
- {
- struct hadoop_err *err;
- struct hadoop_vfs_stats stats;
- err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
- if (err)
- return hadoopfs_errno_and_retcode(err);
- return stats.capacity;
- }
- static tOffset ndfs_get_used(hdfsFS bfs)
- {
- struct hadoop_err *err;
- struct hadoop_vfs_stats stats;
- err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
- if (err)
- return hadoopfs_errno_and_retcode(err);
- return stats.used;
- }
- static int ndfs_chown(hdfsFS bfs, const char* uri,
- const char *user, const char *group)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- struct hadoop_err *err = NULL;
- SetOwnerRequestProto req = SET_OWNER_REQUEST_PROTO__INIT;
- SetOwnerResponseProto *resp = NULL;
- struct hrpc_proxy proxy;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.src = path;
- req.username = (char*)user;
- req.groupname = (char*)group;
- err = cnn_set_owner(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- done:
- free(path);
- if (resp) {
- set_owner_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static int ndfs_chmod(hdfsFS bfs, const char* uri, short mode)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- FsPermissionProto perm = FS_PERMISSION_PROTO__INIT;
- SetPermissionRequestProto req = SET_PERMISSION_REQUEST_PROTO__INIT;
- SetPermissionResponseProto *resp = NULL;
- struct hadoop_err *err = NULL;
- struct hrpc_proxy proxy;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.src = path;
- req.permission = &perm;
- perm.perm = mode;
- err = cnn_set_permission(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- done:
- free(path);
- if (resp) {
- set_permission_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static int ndfs_utime(hdfsFS bfs, const char* uri,
- int64_t mtime, int64_t atime)
- {
- struct native_fs *fs = (struct native_fs *)bfs;
- SetTimesRequestProto req = SET_TIMES_REQUEST_PROTO__INIT ;
- SetTimesResponseProto *resp = NULL;
- struct hadoop_err *err = NULL;
- struct hrpc_proxy proxy;
- char *path = NULL;
- ndfs_nn_proxy_init(fs, &proxy);
- err = build_path(fs, uri, &path);
- if (err) {
- goto done;
- }
- req.src = path;
- // If mtime or atime are -1, that means "no change."
- // Otherwise, we need to multiply by 1000, to take into account the fact
- // that libhdfs times are in seconds, and HDFS times are in milliseconds.
- // It's unfortunate that libhdfs doesn't support the full millisecond
- // precision. We need to redo the API at some point.
- if (mtime < 0) {
- req.mtime = -1;
- } else {
- req.mtime = mtime;
- req.mtime *= 1000;
- }
- if (atime < 0) {
- req.atime = -1;
- } else {
- req.atime = atime;
- req.atime *= 1000;
- }
- err = cnn_set_times(&proxy, &req, &resp);
- if (err) {
- goto done;
- }
- done:
- free(path);
- if (resp) {
- set_times_response_proto__free_unpacked(resp, NULL);
- }
- return hadoopfs_errno_and_retcode(err);
- }
- static struct hadoopRzBuffer* ndfs_read_zero(
- hdfsFile bfile __attribute__((unused)),
- struct hadoopRzOptions *opts __attribute__((unused)),
- int32_t maxLength __attribute__((unused)))
- {
- errno = ENOTSUP;
- return NULL;
- }
- static void ndfs_rz_buffer_free(hdfsFile bfile __attribute__((unused)),
- struct hadoopRzBuffer *buffer __attribute__((unused)))
- {
- }
- int ndfs_file_uses_direct_read(hdfsFile bfile)
- {
- // Set the 'disable direct reads' flag so that old test harnesses designed
- // to test jniFS will run against NDFS. The flag doesn't do anything,
- // since all reads are always direct in NDFS.
- struct native_file_base *file = (struct native_file_base *)bfile;
- return (!(file->flags & NDFS_FILE_FLAG_DISABLE_DIRECT_READ));
- }
- void ndfs_file_disable_direct_read(hdfsFile bfile __attribute__((unused)))
- {
- struct native_file_base *file = (struct native_file_base *)bfile;
- file->flags |= NDFS_FILE_FLAG_DISABLE_DIRECT_READ;
- }
- const struct hadoop_fs_ops g_ndfs_ops = {
- .name = "ndfs",
- .file_is_open_for_read = ndfs_file_is_open_for_read,
- .file_is_open_for_write = ndfs_file_is_open_for_write,
- .get_read_statistics = ndfs_file_get_read_statistics,
- .connect = ndfs_connect,
- .disconnect = ndfs_disconnect,
- .open = ndfs_open_file,
- .close = ndfs_close_file,
- .exists = ndfs_file_exists,
- .seek = ndfs_seek,
- .tell = ndfs_tell,
- .read = ndfs_read,
- .pread = ndfs_pread,
- .write = ndfs_write,
- .flush = ndfs_flush,
- .hflush = ndfs_hflush,
- .hsync = ndfs_hsync,
- .available = ndfs_available,
- .copy = ndfs_copy,
- .move = ndfs_move,
- .unlink = ndfs_unlink,
- .rename = ndfs_rename,
- .get_working_directory = ndfs_get_working_directory,
- .set_working_directory = ndfs_set_working_directory,
- .mkdir = ndfs_mkdir,
- .set_replication = ndfs_set_replication,
- .list_directory = ndfs_list_directory,
- .get_path_info = ndfs_get_path_info,
- .get_hosts = ndfs_get_hosts,
- .get_default_block_size = ndfs_get_default_block_size,
- .get_default_block_size_at_path = ndfs_get_default_block_size_at_path,
- .get_capacity = ndfs_get_capacity,
- .get_used = ndfs_get_used,
- .chown = ndfs_chown,
- .chmod = ndfs_chmod,
- .utime = ndfs_utime,
- .read_zero = ndfs_read_zero,
- .rz_buffer_free = ndfs_rz_buffer_free,
- // test
- .file_uses_direct_read = ndfs_file_uses_direct_read,
- .file_disable_direct_read = ndfs_file_disable_direct_read,
- };
- // vim: ts=4:sw=4:tw=79:et
|