Browse Source

HDFS-14478. Add libhdfs APIs for openFile (#4166)

Contributed by Sahil Takiar
Steve Loughran 3 years ago
parent
commit
a7b4e8f03e

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c

@@ -454,6 +454,68 @@ int main(int argc, char **argv) {
         hdfsCloseFile(lfs, localFile);
     }
 
+
+    {
+        // HDFS Open File Builder tests
+
+        exists = hdfsExists(fs, readPath);
+
+        if (exists) {
+            fprintf(stderr, "Failed to validate existence of %s\n", readPath);
+            shutdown_and_exit(cl, -1);
+        }
+
+        hdfsOpenFileBuilder *builder;
+        builder = hdfsOpenFileBuilderAlloc(fs, readPath);
+        hdfsOpenFileBuilderOpt(builder, "hello", "world");
+
+        hdfsOpenFileFuture *future;
+        future = hdfsOpenFileBuilderBuild(builder);
+
+        readFile = hdfsOpenFileFutureGet(future);
+        if (!hdfsOpenFileFutureCancel(future, 0)) {
+            fprintf(stderr, "Cancel on a completed Future should return false");
+            shutdown_and_exit(cl, -1);
+        }
+        hdfsOpenFileFutureFree(future);
+
+        memset(buffer, 0, sizeof(buffer));
+        num_read_bytes = hdfsRead(fs, readFile, (void *) buffer,
+                                  sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr,
+                    "Failed to read. Expected %s but got %s (%d bytes)\n",
+                    fileContents, buffer, num_read_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        hdfsCloseFile(fs, readFile);
+
+        builder = hdfsOpenFileBuilderAlloc(fs, readPath);
+        hdfsOpenFileBuilderOpt(builder, "hello", "world");
+
+        future = hdfsOpenFileBuilderBuild(builder);
+
+        readFile = hdfsOpenFileFutureGetWithTimeout(future, 1, jDays);
+        if (!hdfsOpenFileFutureCancel(future, 0)) {
+            fprintf(stderr, "Cancel on a completed Future should return "
+                            "false");
+            shutdown_and_exit(cl, -1);
+        }
+        hdfsOpenFileFutureFree(future);
+
+        memset(buffer, 0, sizeof(buffer));
+        num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
+                                  sizeof(buffer));
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to read. Expected %s but got "
+                            "%s (%d bytes)\n", fileContents, buffer,
+                            num_read_bytes);
+            shutdown_and_exit(cl, -1);
+        }
+        memset(buffer, 0, strlen(fileContents + 1));
+        hdfsCloseFile(fs, readFile);
+    }
+
     totalResult = 0;
     result = 0;
     {

+ 489 - 11
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c

@@ -38,6 +38,10 @@
 
 #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
 
+// StreamCapability flags taken from o.a.h.fs.StreamCapabilities
+#define IS_READ_BYTE_BUFFER_CAPABILITY "in:readbytebuffer"
+#define IS_PREAD_BYTE_BUFFER_CAPABILITY "in:preadbytebuffer"
+
 // Bit fields for hdfsFile_internal flags
 #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
 #define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
@@ -1075,6 +1079,27 @@ done:
     return 0;
 }
 
+/**
+ * Sets the flags of the given hdfsFile based on the capabilities of the
+ * underlying stream.
+ *
+ * @param file file->flags will be updated based on the capabilities of jFile
+ * @param jFile the underlying stream to check for capabilities
+ */
+static void setFileFlagCapabilities(hdfsFile file, jobject jFile) {
+    // Check the StreamCapabilities of jFile to see if we can do direct
+    // reads
+    if (hdfsHasStreamCapability(jFile, IS_READ_BYTE_BUFFER_CAPABILITY)) {
+        file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
+    }
+
+    // Check the StreamCapabilities of jFile to see if we can do direct
+    // preads
+    if (hdfsHasStreamCapability(jFile, IS_PREAD_BYTE_BUFFER_CAPABILITY)) {
+        file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+    }
+}
+
 static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
                   int32_t bufferSize, int16_t replication, int64_t blockSize)
 {
@@ -1245,17 +1270,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
     file->flags = 0;
 
     if ((flags & O_WRONLY) == 0) {
-        // Check the StreamCapabilities of jFile to see if we can do direct
-        // reads
-        if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
-            file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
-        }
-
-        // Check the StreamCapabilities of jFile to see if we can do direct
-        // preads
-        if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
-            file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
-        }
+        setFileFlagCapabilities(file, jFile);
     }
     ret = 0;
 
@@ -1288,6 +1303,469 @@ hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld)
     return file;
 }
 
+/**
+ * A wrapper around o.a.h.fs.FutureDataInputStreamBuilder and the file name
+ * associated with the builder.
+ */
+struct hdfsOpenFileBuilder {
+    jobject jBuilder;
+    const char *path;
+};
+
+/**
+ * A wrapper around a java.util.concurrent.Future (created by calling
+ * FutureDataInputStreamBuilder#build) and the file name associated with the
+ * builder.
+ */
+struct hdfsOpenFileFuture {
+    jobject jFuture;
+    const char *path;
+};
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
+        const char *path) {
+    int ret = 0;
+    jthrowable jthr;
+    jvalue jVal;
+    jobject jFS = (jobject) fs;
+
+    jobject jPath = NULL;
+    jobject jBuilder = NULL;
+
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return NULL;
+    }
+
+    hdfsOpenFileBuilder *builder;
+    builder = calloc(1, sizeof(hdfsOpenFileBuilder));
+    if (!builder) {
+        fprintf(stderr, "hdfsOpenFileBuilderAlloc(%s): OOM when creating "
+                        "hdfsOpenFileBuilder\n", path);
+        errno = ENOMEM;
+        goto done;
+    }
+    builder->path = path;
+
+    jthr = constructNewObjectOfPath(env, path, &jPath);
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderAlloc(%s): constructNewObjectOfPath",
+                path);
+        goto done;
+    }
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
+            "openFile", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FDISB)),
+            jPath);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderAlloc(%s): %s#openFile(Path) failed",
+                HADOOP_FS, path);
+        goto done;
+    }
+    jBuilder = jVal.l;
+
+    builder->jBuilder = (*env)->NewGlobalRef(env, jBuilder);
+    if (!builder->jBuilder) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderAlloc(%s): NewGlobalRef(%s) failed", path,
+                HADOOP_FDISB);
+        ret = EINVAL;
+        goto done;
+    }
+
+done:
+    destroyLocalReference(env, jPath);
+    destroyLocalReference(env, jBuilder);
+    if (ret) {
+        if (builder) {
+            if (builder->jBuilder) {
+                (*env)->DeleteGlobalRef(env, builder->jBuilder);
+            }
+            free(builder);
+        }
+        errno = ret;
+        return NULL;
+    }
+    return builder;
+}
+
+/**
+ * Used internally by hdfsOpenFileBuilderWithOption to switch between
+ * FSBuilder#must and #opt.
+ */
+typedef enum { must, opt } openFileBuilderOptionType;
+
+/**
+ * Shared implementation of hdfsOpenFileBuilderMust and hdfsOpenFileBuilderOpt
+ * that switches between each method depending on the value of
+ * openFileBuilderOptionType.
+ */
+static hdfsOpenFileBuilder *hdfsOpenFileBuilderWithOption(
+        hdfsOpenFileBuilder *builder, const char *key,
+        const char *value, openFileBuilderOptionType optionType) {
+    int ret = 0;
+    jthrowable jthr;
+    jvalue jVal;
+    jobject localJBuilder = NULL;
+    jobject globalJBuilder;
+    jstring jKeyString = NULL;
+    jstring jValueString = NULL;
+
+    // If the builder was not previously created by a prior call to
+    // hdfsOpenFileBuilderAlloc then exit
+    if (builder == NULL || builder->jBuilder == NULL) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return NULL;
+    }
+    jthr = newJavaStr(env, key, &jKeyString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
+                builder->path, key);
+        goto done;
+    }
+    jthr = newJavaStr(env, value, &jValueString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
+                builder->path, value);
+        goto done;
+    }
+
+    const char *optionTypeMethodName;
+    switch (optionType) {
+        case must:
+            optionTypeMethodName = "must";
+            break;
+        case opt:
+            optionTypeMethodName = "opt";
+            break;
+        default:
+            ret = EINTERNAL;
+            goto done;
+    }
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
+            JC_FUTURE_DATA_IS_BUILDER, optionTypeMethodName,
+            JMETHOD2(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING),
+                    JPARAM(HADOOP_FS_BLDR)), jKeyString,
+            jValueString);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderWithOption(%s): %s#%s(%s, %s) failed",
+                builder->path, HADOOP_FS_BLDR, optionTypeMethodName, key,
+                value);
+        goto done;
+    }
+
+    localJBuilder = jVal.l;
+    globalJBuilder = (*env)->NewGlobalRef(env, localJBuilder);
+    if (!globalJBuilder) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderWithOption(%s): NewGlobalRef(%s) failed",
+                builder->path, HADOOP_FDISB);
+        ret = EINVAL;
+        goto done;
+    }
+    (*env)->DeleteGlobalRef(env, builder->jBuilder);
+    builder->jBuilder = globalJBuilder;
+
+done:
+    destroyLocalReference(env, jKeyString);
+    destroyLocalReference(env, jValueString);
+    destroyLocalReference(env, localJBuilder);
+    if (ret) {
+        errno = ret;
+        return NULL;
+    }
+    return builder;
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
+        const char *key, const char *value) {
+    openFileBuilderOptionType optionType;
+    optionType = must;
+    return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
+        const char *key, const char *value) {
+    openFileBuilderOptionType optionType;
+    optionType = opt;
+    return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
+}
+
+hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder) {
+    int ret = 0;
+    jthrowable jthr;
+    jvalue jVal;
+
+    jobject jFuture = NULL;
+
+    // If the builder was not previously created by a prior call to
+    // hdfsOpenFileBuilderAlloc then exit
+    if (builder == NULL || builder->jBuilder == NULL) {
+        ret = EINVAL;
+        return NULL;
+    }
+
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return NULL;
+    }
+
+    hdfsOpenFileFuture *future;
+    future = calloc(1, sizeof(hdfsOpenFileFuture));
+    if (!future) {
+        fprintf(stderr, "hdfsOpenFileBuilderBuild: OOM when creating "
+                        "hdfsOpenFileFuture\n");
+        errno = ENOMEM;
+        goto done;
+    }
+    future->path = builder->path;
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
+            JC_FUTURE_DATA_IS_BUILDER, "build",
+            JMETHOD1("", JPARAM(JAVA_CFUTURE)));
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderBuild(%s): %s#build() failed",
+                builder->path, HADOOP_FDISB);
+        goto done;
+    }
+    jFuture = jVal.l;
+
+    future->jFuture = (*env)->NewGlobalRef(env, jFuture);
+    if (!future->jFuture) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hdfsOpenFileBuilderBuild(%s): NewGlobalRef(%s) failed",
+                builder->path, JAVA_CFUTURE);
+        ret = EINVAL;
+        goto done;
+    }
+
+done:
+    destroyLocalReference(env, jFuture);
+    if (ret) {
+        if (future) {
+            if (future->jFuture) {
+                (*env)->DeleteGlobalRef(env, future->jFuture);
+            }
+            free(future);
+        }
+        hdfsOpenFileBuilderFree(builder);
+        errno = ret;
+        return NULL;
+    }
+    hdfsOpenFileBuilderFree(builder);
+    return future;
+}
+
+void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
+    JNIEnv *env;
+    env = getJNIEnv();
+    if (!env) {
+        return;
+    }
+    if (builder->jBuilder) {
+        (*env)->DeleteGlobalRef(env, builder->jBuilder);
+        builder->jBuilder = NULL;
+    }
+    free(builder);
+}
+
+/**
+ * Shared implementation of hdfsOpenFileFutureGet and
+ * hdfsOpenFileFutureGetWithTimeout. If a timeout is specified, calls
+ * Future#get() otherwise it calls Future#get(long, TimeUnit).
+ */
+static hdfsFile fileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+        int64_t timeout, jobject jTimeUnit) {
+    int ret = 0;
+    jthrowable jthr;
+    jvalue jVal;
+
+    hdfsFile file = NULL;
+    jobject jFile = NULL;
+
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        ret = EINTERNAL;
+        return NULL;
+    }
+
+    if (!jTimeUnit) {
+        jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
+                JC_CFUTURE, "get", JMETHOD1("", JPARAM(JAVA_OBJECT)));
+    } else {
+        jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
+                JC_CFUTURE, "get", JMETHOD2("J",
+                        JPARAM(JAVA_TIMEUNIT), JPARAM(JAVA_OBJECT)), timeout,
+                        jTimeUnit);
+    }
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
+                JAVA_CFUTURE);
+        goto done;
+    }
+
+    file = calloc(1, sizeof(struct hdfsFile_internal));
+    if (!file) {
+        fprintf(stderr, "hdfsOpenFileFutureGet(%s): OOM when creating "
+                        "hdfsFile\n", future->path);
+        ret = ENOMEM;
+        goto done;
+    }
+    jFile = jVal.l;
+    file->file = (*env)->NewGlobalRef(env, jFile);
+    if (!file->file) {
+        ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hdfsOpenFileFutureGet(%s): NewGlobalRef(jFile) failed",
+                future->path);
+        goto done;
+    }
+
+    file->type = HDFS_STREAM_INPUT;
+    file->flags = 0;
+
+    setFileFlagCapabilities(file, jFile);
+
+done:
+    destroyLocalReference(env, jTimeUnit);
+    destroyLocalReference(env, jFile);
+    if (ret) {
+        if (file) {
+            if (file->file) {
+                (*env)->DeleteGlobalRef(env, file->file);
+            }
+            free(file);
+        }
+        errno = ret;
+        return NULL;
+    }
+    return file;
+}
+
+hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
+    return fileFutureGetWithTimeout(future, -1, NULL);
+}
+
+hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+        int64_t timeout, javaConcurrentTimeUnit timeUnit) {
+    int ret = 0;
+    jthrowable jthr;
+    jobject jTimeUnit = NULL;
+
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        ret = EINTERNAL;
+        return NULL;
+    }
+
+    const char *timeUnitEnumName;
+    switch (timeUnit) {
+        case jNanoseconds:
+            timeUnitEnumName = "NANOSECONDS";
+            break;
+        case jMicroseconds:
+            timeUnitEnumName = "MICROSECONDS";
+            break;
+        case jMilliseconds:
+            timeUnitEnumName = "MILLISECONDS";
+            break;
+        case jSeconds:
+            timeUnitEnumName = "SECONDS";
+            break;
+        case jMinutes:
+            timeUnitEnumName = "MINUTES";
+            break;
+        case jHours:
+            timeUnitEnumName = "HOURS";
+            break;
+        case jDays:
+            timeUnitEnumName = "DAYS";
+            break;
+        default:
+            ret = EINTERNAL;
+            goto done;
+    }
+
+    jthr = fetchEnumInstance(env, JAVA_TIMEUNIT, timeUnitEnumName, &jTimeUnit);
+
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
+                JAVA_CFUTURE);
+        goto done;
+    }
+    return fileFutureGetWithTimeout(future, timeout, jTimeUnit);
+
+done:
+    if (ret) {
+        errno = ret;
+    }
+    return NULL;
+}
+
+int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
+        int mayInterruptIfRunning) {
+    int ret = 0;
+    jthrowable jthr;
+    jvalue jVal;
+
+    jboolean jMayInterruptIfRunning;
+
+    JNIEnv *env = getJNIEnv();
+    if (!env) {
+        ret = EINTERNAL;
+        return -1;
+    }
+
+    jMayInterruptIfRunning = mayInterruptIfRunning ? JNI_TRUE : JNI_FALSE;
+    jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture, JC_CFUTURE,
+            "cancel", JMETHOD1("Z", "Z"), jMayInterruptIfRunning);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hdfsOpenFileFutureCancel(%s): %s#cancel failed", future->path,
+                JAVA_CFUTURE);
+        goto done;
+    }
+
+done:
+    if (ret) {
+        errno = ret;
+        return -1;
+    }
+    if (!jVal.z) {
+        return -1;
+    }
+    return 0;
+}
+
+void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
+    JNIEnv *env;
+    env = getJNIEnv();
+    if (!env) {
+        return;
+    }
+    if (future->jFuture) {
+        (*env)->DeleteGlobalRef(env, future->jFuture);
+        future->jFuture = NULL;
+    }
+    free(future);
+}
+
 int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
 {
     jobject jFS = (jobject)fs;

+ 135 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h

@@ -82,6 +82,29 @@ extern  "C" {
     } tObjectKind;
     struct hdfsStreamBuilder;
 
+    /**
+     * The C reflection of the enum values from java.util.concurrent.TimeUnit .
+     */
+    typedef enum javaConcurrentTimeUnit {
+        jNanoseconds,
+        jMicroseconds,
+        jMilliseconds,
+        jSeconds,
+        jMinutes,
+        jHours,
+        jDays,
+    } javaConcurrentTimeUnit;
+
+    /**
+     * The C reflection of java.util.concurrent.Future specifically used for
+     * opening HDFS files asynchronously.
+     */
+    typedef struct hdfsOpenFileFuture hdfsOpenFileFuture;
+
+    /**
+     * The C reflection of o.a.h.fs.FutureDataInputStreamBuilder .
+     */
+    typedef struct hdfsOpenFileBuilder hdfsOpenFileBuilder;
 
     /**
      * The C reflection of org.apache.org.hadoop.FileSystem .
@@ -429,6 +452,118 @@ extern  "C" {
     hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
                           int bufferSize, short replication, tSize blocksize);
 
+    /**
+     * hdfsOpenFileBuilderAlloc - Allocate a HDFS open file builder.
+     *
+     * @param fs The configured filesystem handle.
+     * @param path The full path to the file.
+     * @return Returns the hdfsOpenFileBuilder, or NULL on error.
+     */
+    LIBHDFS_EXTERNAL
+    hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
+            const char *path);
+
+    /**
+     * hdfsOpenFileBuilderMust - Specifies a mandatory parameter for the open
+     * file builder. While the underlying FsBuilder supports various various
+     * types for the value (boolean, int, float, double), currently only
+     * strings are supported.
+     *
+     * @param builder The open file builder to set the config for.
+     * @param key The config key
+     * @param value The config value
+     * @return Returns the hdfsOpenFileBuilder, or NULL on error.
+     */
+    LIBHDFS_EXTERNAL
+    hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
+            const char *key, const char *value);
+
+    /**
+     * hdfsOpenFileBuilderOpt - Specifies an optional parameter for the open
+     * file builder. While the underlying FsBuilder supports various various
+     * types for the value (boolean, int, float, double), currently only
+     * strings are supported.
+     *
+     * @param builder The open file builder to set the config for.
+     * @param key The config key
+     * @param value The config value
+     * @return Returns the hdfsOpenFileBuilder, or NULL on error.
+     */
+    LIBHDFS_EXTERNAL
+    hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
+            const char *key, const char *value);
+
+    /**
+     * hdfsOpenFileBuilderBuild - Builds the open file builder and returns a
+     * hdfsOpenFileFuture which tracks the asynchronous call to open the
+     * specified file.
+     *
+     * @param builder The open file builder to build.
+     * @return Returns the hdfsOpenFileFuture, or NULL on error.
+     */
+    LIBHDFS_EXTERNAL
+    hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder);
+
+    /**
+     * hdfsOpenFileBuilderFree - Free a HDFS open file builder.
+     *
+     * It is normally not necessary to call this function since
+     * hdfsOpenFileBuilderBuild frees the builder.
+     *
+     * @param builder The hdfsOpenFileBuilder to free.
+     */
+    LIBHDFS_EXTERNAL
+    void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder);
+
+    /**
+     * hdfsOpenFileFutureGet - Call Future#get() on the underlying Java Future
+     * object. A call to #get() will block until the asynchronous operation has
+     * completed. In this case, until the open file call has completed. This
+     * method blocks indefinitely until blocking call completes.
+     *
+     * @param future The hdfsOpenFileFuture to call #get on
+     * @return Returns the opened hdfsFile, or NULL on error.
+     */
+    LIBHDFS_EXTERNAL
+    hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future);
+
+    /**
+     * hdfsOpenFileFutureGetWithTimeout - Call Future#get(long, TimeUnit) on
+     * the underlying Java Future object. A call to #get(long, TimeUnit) will
+     * block until the asynchronous operation has completed (in this case,
+     * until the open file call has completed) or the specified timeout has
+     * been reached.
+     *
+     * @param future The hdfsOpenFileFuture to call #get on
+     * @return Returns the opened hdfsFile, or NULL on error or if the timeout
+     *         has been reached.
+     */
+    LIBHDFS_EXTERNAL
+    hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+            int64_t timeout, javaConcurrentTimeUnit timeUnit);
+
+    /**
+     * hdfsOpenFileFutureCancel - Call Future#cancel(boolean) on the
+     * underlying Java Future object. The value of mayInterruptedIfRunning
+     * controls whether the Java thread running the Future should be
+     * interrupted or not.
+     *
+     * @param future The hdfsOpenFileFuture to call #cancel on
+     * @param mayInterruptIfRunning if true, interrupts the running thread
+     * @return Returns 0 if the thread was successfully cancelled, else -1
+     */
+    LIBHDFS_EXTERNAL
+    int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
+            int mayInterruptIfRunning);
+
+    /**
+     * hdfsOpenFileFutureFree - Free a HDFS open file future.
+     *
+     * @param hdfsOpenFileFuture The hdfsOpenFileFuture to free.
+     */
+    LIBHDFS_EXTERNAL
+    void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future);
+
     /**
      * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder.
      *

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.c

@@ -98,6 +98,8 @@ jthrowable initCachedClasses(JNIEnv* env) {
                 "org/apache/hadoop/hdfs/ReadStatistics";
         cachedJavaClasses[JC_HDFS_DATA_INPUT_STREAM].className =
                 "org/apache/hadoop/hdfs/client/HdfsDataInputStream";
+        cachedJavaClasses[JC_FUTURE_DATA_IS_BUILDER].className =
+                "org/apache/hadoop/fs/FutureDataInputStreamBuilder";
         cachedJavaClasses[JC_DOMAIN_SOCKET].className =
                 "org/apache/hadoop/net/unix/DomainSocket";
         cachedJavaClasses[JC_URI].className =
@@ -108,6 +110,8 @@ jthrowable initCachedClasses(JNIEnv* env) {
                 "java/util/EnumSet";
         cachedJavaClasses[JC_EXCEPTION_UTILS].className =
                 "org/apache/commons/lang3/exception/ExceptionUtils";
+        cachedJavaClasses[JC_CFUTURE].className =
+                "java/util/concurrent/CompletableFuture";
 
         // Create and set the jclass objects based on the class names set above
         jthrowable jthr;

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.h

@@ -54,11 +54,13 @@ typedef enum {
     JC_FS_PERMISSION,
     JC_READ_STATISTICS,
     JC_HDFS_DATA_INPUT_STREAM,
+    JC_FUTURE_DATA_IS_BUILDER,
     JC_DOMAIN_SOCKET,
     JC_URI,
     JC_BYTE_BUFFER,
     JC_ENUM_SET,
     JC_EXCEPTION_UTILS,
+    JC_CFUTURE,
     // A special marker enum that counts the number of cached jclasses
     NUM_CACHED_CLASSES
 } CachedJavaClass;
@@ -95,6 +97,8 @@ const char *getClassName(CachedJavaClass cachedJavaClass);
 #define HADOOP_FSPERM   "org/apache/hadoop/fs/permission/FsPermission"
 #define HADOOP_RSTAT    "org/apache/hadoop/hdfs/ReadStatistics"
 #define HADOOP_HDISTRM  "org/apache/hadoop/hdfs/client/HdfsDataInputStream"
+#define HADOOP_FDISB    "org/apache/hadoop/fs/FutureDataInputStreamBuilder"
+#define HADOOP_FS_BLDR  "org/apache/hadoop/fs/FSBuilder"
 #define HADOOP_RO       "org/apache/hadoop/fs/ReadOption"
 #define HADOOP_DS       "org/apache/hadoop/net/unix/DomainSocket"
 
@@ -104,6 +108,9 @@ const char *getClassName(CachedJavaClass cachedJavaClass);
 #define JAVA_BYTEBUFFER "java/nio/ByteBuffer"
 #define JAVA_STRING     "java/lang/String"
 #define JAVA_ENUMSET    "java/util/EnumSet"
+#define JAVA_CFUTURE    "java/util/concurrent/CompletableFuture"
+#define JAVA_TIMEUNIT   "java/util/concurrent/TimeUnit"
+#define JAVA_OBJECT     "java/lang/Object"
 
 /* Some frequently used third-party class names */
 

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c

@@ -250,6 +250,65 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
   return ret;
 }
 
+hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
+        const char *path) {
+  return libhdfs_hdfsOpenFileBuilderAlloc(fs->libhdfsRep, path);
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(
+        hdfsOpenFileBuilder *builder, const char *key,
+        const char *value) {
+  return libhdfs_hdfsOpenFileBuilderMust(builder, key, value);
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(
+        hdfsOpenFileBuilder *builder, const char *key,
+        const char *value) {
+  return libhdfs_hdfsOpenFileBuilderOpt(builder, key, value);
+}
+
+hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(
+        hdfsOpenFileBuilder *builder) {
+  return libhdfs_hdfsOpenFileBuilderBuild(builder);
+}
+
+void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
+  libhdfs_hdfsOpenFileBuilderFree(builder);
+}
+
+hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
+  hdfsFile ret = calloc(1, sizeof(struct hdfsFile_internal));
+  ret->libhdfsppRep = 0;
+  ret->libhdfsRep = libhdfs_hdfsOpenFileFutureGet(future);
+  if (!ret->libhdfsRep) {
+    free(ret);
+    ret = NULL;
+  }
+  return ret;
+}
+
+hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+        int64_t timeout, javaConcurrentTimeUnit timeUnit) {
+  hdfsFile ret = calloc(1, sizeof(struct hdfsFile_internal));
+  ret->libhdfsppRep = 0;
+  ret->libhdfsRep = libhdfs_hdfsOpenFileFutureGetWithTimeout(future, timeout,
+                                                             timeUnit);
+  if (!ret->libhdfsRep) {
+    free(ret);
+    ret = NULL;
+  }
+  return ret;
+}
+
+int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
+        int mayInterruptIfRunning) {
+  return libhdfs_hdfsOpenFileFutureCancel(future, mayInterruptIfRunning);
+}
+
+void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
+  libhdfs_hdfsOpenFileFutureFree(future);
+}
+
 int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) {
   return libhdfs_hdfsTruncateFile(fs->libhdfsRep, path, newlength);
 }

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h

@@ -39,6 +39,23 @@
 #define hdfsConfStrFree libhdfs_hdfsConfStrFree
 #define hdfsDisconnect libhdfs_hdfsDisconnect
 #define hdfsOpenFile libhdfs_hdfsOpenFile
+#define hdfsOpenFileBuilderAlloc libhdfs_hdfsOpenFileBuilderAlloc
+#define hdfsOpenFileBuilderMust libhdfs_hdfsOpenFileBuilderMust
+#define hdfsOpenFileBuilderOpt libhdfs_hdfsOpenFileBuilderOpt
+#define hdfsOpenFileBuilderBuild libhdfs_hdfsOpenFileBuilderBuild
+#define hdfsOpenFileBuilderFree libhdfs_hdfsOpenFileBuilderFree
+#define hdfsOpenFileFutureGet libhdfs_hdfsOpenFileFutureGet
+#define javaConcurrentTimeUnit libhdfs_javaConcurrentTimeUnit
+#define jNanoseconds libhdfs_jNanoseconds
+#define jMicroseconds libhdfs_jMicroseconds
+#define jMilliseconds libhdfs_jMilliseconds
+#define jSeconds libhdfsj_jSeconds
+#define jMinutes libhdfs_jMinutes
+#define jHours libhdfs_jHours
+#define jDays libhdfs_jDays
+#define hdfsOpenFileFutureGetWithTimeout libhdfs_hdfsOpenFileFutureGetWithTimeout
+#define hdfsOpenFileFutureCancel libhdfs_hdfsOpenFileFutureCancel
+#define hdfsOpenFileFutureFree libhdfs_hdfsOpenFileFutureFree
 #define hdfsTruncateFile libhdfs_hdfsTruncateFile
 #define hdfsUnbufferFile libhdfs_hdfsUnbufferFile
 #define hdfsCloseFile libhdfs_hdfsCloseFile

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h

@@ -39,6 +39,23 @@
 #undef hdfsConfStrFree
 #undef hdfsDisconnect
 #undef hdfsOpenFile
+#undef hdfsOpenFileBuilderAlloc
+#undef hdfsOpenFileBuilderMust
+#undef hdfsOpenFileBuilderOpt
+#undef hdfsOpenFileBuilderBuild
+#undef hdfsOpenFileBuilderFree
+#undef hdfsOpenFileFutureGet
+#undef javaConcurrentTimeUnit
+#undef jNanoseconds
+#undef jMicroseconds
+#undef jMilliseconds
+#undef jSeconds
+#undef jMinutes
+#undef jHours
+#undef jDays
+#undef hdfsOpenFileFutureGetWithTimeout
+#undef hdfsOpenFileFutureCancel
+#undef hdfsOpenFileFutureFree
 #undef hdfsTruncateFile
 #undef hdfsUnbufferFile
 #undef hdfsCloseFile

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h

@@ -39,6 +39,23 @@
 #define hdfsConfStrFree libhdfspp_hdfsConfStrFree
 #define hdfsDisconnect libhdfspp_hdfsDisconnect
 #define hdfsOpenFile libhdfspp_hdfsOpenFile
+#define hdfsOpenFileBuilderAlloc libhdfspp_hdfsOpenFileBuilderAlloc
+#define hdfsOpenFileBuilderMust libhdfspp_hdfsOpenFileBuilderMust
+#define hdfsOpenFileBuilderOpt libhdfspp_hdfsOpenFileBuilderOpt
+#define hdfsOpenFileBuilderBuild libhdfspp_hdfsOpenFileBuilderBuild
+#define hdfsOpenFileBuilderFree libhdfspp_hdfsOpenFileBuilderFree
+#define hdfsOpenFileFutureGet libhdfspp_hdfsOpenFileFutureGet
+#define javaConcurrentTimeUnit libhdfspp_javaConcurrentTimeUnit
+#define jNanoseconds libhdfspp_jNanoseconds
+#define jMicroseconds libhdfspp_jMicroseconds
+#define jMilliseconds libhdfspp_jMilliseconds
+#define jSeconds libhdfspp_jSeconds
+#define jMinutes libhdfspp_jMinutes
+#define jHours libhdfspp_jHours
+#define jDays libhdfspp_jDays
+#define hdfsOpenFileFutureGetWithTimeout libhdfspp_hdfsOpenFileFutureGetWithTimeout
+#define hdfsOpenFileFutureCancel libhdfspp_hdfsOpenFileFutureCancel
+#define hdfsOpenFileFutureFree libhdfspp_hdfsOpenFileFutureFree
 #define hdfsTruncateFile libhdfspp_hdfsTruncateFile
 #define hdfsUnbufferFile libhdfspp_hdfsUnbufferFile
 #define hdfsCloseFile libhdfspp_hdfsCloseFile