Przeglądaj źródła

HADOOP-10725. Implement listStatus and getFileInfo in the native client (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HADOOP-10388@1617563 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 10 lat temu
rodzic
commit
a78c8eefa4

+ 202 - 10
hadoop-native-core/src/main/native/ndfs/ndfs.c

@@ -139,6 +139,9 @@ static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
 
 static void ndfs_free(struct native_fs *fs);
 
+struct hadoop_err *populate_file_info(struct file_info *info,
+                    HdfsFileStatusProto *status, const char *prefix);
+
 static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
 {
     hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
@@ -456,8 +459,10 @@ static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
 
 static void ndfs_free(struct native_fs *fs)
 {
-    hrpc_messenger_shutdown(fs->msgr);
-    hrpc_messenger_free(fs->msgr);
+    if (fs->msgr) {
+        hrpc_messenger_shutdown(fs->msgr);
+        hrpc_messenger_free(fs->msgr);
+    }
     free(fs->url_prefix);
     hadoop_uri_free(fs->conn_uri);
     if (fs->working_uri) {
@@ -822,19 +827,206 @@ done:
     return hadoopfs_errno_and_retcode(err);
 }
 
-static hdfsFileInfo* ndfs_list_directory(hdfsFS bfs __attribute__((unused)),
-                                         const char* uri __attribute__((unused)),
-                                        int *numEntries __attribute__((unused)))
+struct hadoop_err *ndfs_list_partial(struct native_fs *fs,
+        const char *path, const char *prev, uint32_t *entries_len,
+        hdfsFileInfo **entries, uint32_t *remaining)
 {
-    errno = ENOTSUP;
-    return NULL;
+    struct hadoop_err *err = NULL;
+    GetListingRequestProto req = GET_LISTING_REQUEST_PROTO__INIT;
+    GetListingResponseProto *resp = NULL;
+    hdfsFileInfo *nentries;
+    struct hrpc_proxy proxy;
+    uint64_t nlen;
+    size_t i;
+    char *prefix = NULL;
+
+    err = dynprintf(&prefix, "%s%s/", fs->url_prefix, path);
+    if (err)
+        goto done;
+    ndfs_nn_proxy_init(fs, &proxy);
+    req.src = (char*)path;
+    req.startafter.data = (unsigned char*)prev;
+    req.startafter.len = strlen(prev);
+    req.needlocation = 0;
+    err = cnn_get_listing(&proxy, &req, &resp);
+    if (err)
+        goto done;
+    if (!resp->dirlist) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_list_partial(path=%s, "
+                "prev=%s): No such directory.", path, prev);
+        goto done;
+    }
+    nlen = *entries_len;
+    nlen += resp->dirlist->n_partiallisting;
+    nentries = realloc(*entries, nlen * sizeof(hdfsFileInfo));
+    if (!nentries) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_list_partial(path=%s, "
+                "prev=%s): failed to allocate space for %zd new entries.",
+                path, prev, resp->dirlist->n_partiallisting);
+        goto done;
+    }
+    memset(nentries + ((*entries_len) * sizeof(hdfsFileInfo)),
+           0, (resp->dirlist->n_partiallisting * sizeof(hdfsFileInfo)));
+    *entries = nentries;
+    *entries_len = nlen;
+    *remaining = resp->dirlist->remainingentries;
+    for (i = 0; i < resp->dirlist->n_partiallisting; i++) {
+        err = populate_file_info(&nentries[i],
+                    resp->dirlist->partiallisting[i], prefix);
+        if (err)
+            goto done;
+    }
+    err = NULL;
+
+done:
+    free(prefix);
+    if (resp) {
+        get_listing_response_proto__free_unpacked(resp, NULL);
+    }
+    return err;
 }
 
-static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs __attribute__((unused)),
-                                        const char* uri __attribute__((unused)))
+struct hadoop_err *populate_file_info(struct file_info *info,
+                    HdfsFileStatusProto *status, const char *prefix)
 {
-    errno = ENOTSUP;
+    if (status->filetype == IS_DIR) {
+        info->mKind = kObjectKindDirectory; 
+    } else {
+        // note: we don't support symlinks yet here.
+        info->mKind = kObjectKindFile; 
+    }
+    info->mName = malloc(strlen(prefix) + status->path.len + 1);
+    if (!info->mName)
+        goto oom;
+    strcpy(info->mName, prefix);
+    memcpy(info->mName + strlen(prefix), status->path.data, status->path.len);
+    info->mName[strlen(prefix) + status->path.len] = '\0';
+    info->mLastMod = status->modification_time / 1000LL;
+    info->mSize = status->length;
+    if (status->has_block_replication) {
+        info->mReplication = status->block_replication;
+    } else {
+        info->mReplication = 0;
+    }
+    if (status->has_blocksize) {
+        info->mBlockSize = status->blocksize;
+    } else {
+        info->mBlockSize = 0;
+    }
+    info->mOwner = strdup(status->owner);
+    if (!info->mOwner)
+        goto oom;
+    info->mGroup = strdup(status->group);
+    if (!info->mGroup)
+        goto oom;
+    info->mPermissions = status->permission->perm;
+    info->mLastAccess = status->access_time / 1000LL;
     return NULL;
+oom:
+    return hadoop_lerr_alloc(ENOMEM, "populate_file_info(%s): OOM",
+                            info->mName);
+}
+
+static hdfsFileInfo* ndfs_list_directory(hdfsFS bfs,
+                    const char* uri, int *numEntries)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    hdfsFileInfo *entries = NULL;
+    uint32_t entries_len = 0, remaining = 0;
+    char *prev, *path = NULL;
+
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    // We may need to make multiple RPCs to the Namenode to get all the
+    // entries in this directory.  We need to keep making RPCs as long as the
+    // 'remaining' value we get back is more than 0.  The actual value of
+    // 'remaining' isn't interesting, because it may have changed by the time
+    // we make the next RPC.
+    do {
+        if (entries_len > 0) {
+            prev = entries[entries_len - 1].mName;
+        } else {
+            prev = "";
+        }
+        err = ndfs_list_partial(fs, path, prev, &entries_len,
+                                &entries, &remaining);
+        if (err)
+            goto done;
+    } while (remaining != 0);
+    err = NULL;
+
+done:
+    free(path);
+    if (err) {
+        if (entries) {
+            hdfsFreeFileInfo(entries, entries_len);
+            entries = NULL;
+        }
+    } else {
+        *numEntries = entries_len;
+    }
+    return hadoopfs_errno_and_retptr(err, entries);
+}
+
+static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+    struct hadoop_err *err = NULL;
+    GetFileInfoRequestProto req = GET_FILE_INFO_REQUEST_PROTO__INIT;
+    GetFileInfoResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *prefix = NULL, *path = NULL;
+    hdfsFileInfo *info = NULL;
+
+    // The GetFileInfo RPC returns a blank 'path' field.
+    // To maintain 100% compatibility with the JNI client, we need to fill it
+    // in with a URI containing the absolute path to the file.
+    err = dynprintf(&prefix, "%s%s", fs->url_prefix, path);
+    if (err)
+        goto done;
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    err = cnn_get_file_info(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (!resp->fs) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_get_path_info(%s): no such "
+                                "file or directory.", path);
+        goto done;
+    }
+    info = calloc(1, sizeof(*info));
+    if (!info) {
+        err = hadoop_lerr_alloc(ENOMEM, "ndfs_get_path_info(%s): OOM", path);
+        goto done;
+    }
+    err = populate_file_info(info, resp->fs, prefix);
+    if (err) {
+        err = hadoop_err_prepend(err, 0, "ndfs_get_path_info(%s)", path);
+        goto done;
+    }
+    err = NULL;
+
+done:
+    free(prefix);
+    free(path);
+    if (resp) {
+        get_file_info_response_proto__free_unpacked(resp, NULL);
+    }
+    if (err) {
+        if (info) {
+            hdfsFreeFileInfo(info, 1);
+            info = NULL;
+        }
+    }
+    return hadoopfs_errno_and_retptr(err, info);
 }
 
 char***

+ 29 - 0
hadoop-native-core/src/main/native/test/fs/test_libhdfs_meta_ops.c

@@ -30,8 +30,16 @@
 
 /** Test performing metadata operations via libhdfs. */
 
+static void print_file_info(const hdfsFileInfo *info)
+{
+    printf("file_info(mName=%s): mOwner=%s, mGroup=%s, "
+           "mPermissions=%04o\n", info->mName, info->mOwner,
+           info->mGroup, info->mPermissions);
+}
+
 int main(void)
 {
+    hdfsFileInfo *infos, *info;
     struct hdfsBuilder *hdfs_bld = NULL;
     hdfsFS fs = NULL;
     struct NativeMiniDfsCluster* dfs_cluster = NULL;
@@ -39,6 +47,7 @@ int main(void)
         .doFormat = 1,
     };
     const char *nn_uri;
+    int i, num_entries;
 
     nn_uri = getenv("NAMENODE_URI");
     if (!nn_uri) {
@@ -65,6 +74,26 @@ int main(void)
     EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc/3"));
     EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc/alpha"));
     EXPECT_INT_ZERO(hdfsDelete(fs, "/abc", 1));
+    hdfsDelete(fs, "/abc", 1);
+    EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc"));
+    EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc/1"));
+    EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc/2"));
+    EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc/3"));
+    EXPECT_INT_ZERO(hdfsCreateDirectory(fs, "/abc/alpha"));
+    infos = hdfsListDirectory(fs, "/abc", &num_entries);
+    EXPECT_NONNULL(infos);
+    EXPECT_INT_EQ(4, num_entries);
+    for (i = 0; i < num_entries; i++) {
+        print_file_info(&infos[i]);
+    }
+    hdfsFreeFileInfo(infos, num_entries);
+    info = hdfsGetPathInfo(fs, "/abc");
+    EXPECT_NONNULL(info);
+    EXPECT_INT_ZERO(info->mReplication);
+    EXPECT_INT_ZERO(info->mBlockSize);
+    EXPECT_INT_EQ(kObjectKindDirectory, info->mKind);
+    print_file_info(info);
+    hdfsFreeFileInfo(info, 1);
     EXPECT_INT_ZERO(hdfsDisconnect(fs));
     if (dfs_cluster) {
         EXPECT_INT_ZERO(nmdShutdown(dfs_cluster));