/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "hdfs.h" #include "jni_helper.h" #include #include /* Some frequently used Java paths */ #define HADOOP_CONF "org/apache/hadoop/conf/Configuration" #define HADOOP_PATH "org/apache/hadoop/fs/Path" #define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem" #define HADOOP_FS "org/apache/hadoop/fs/FileSystem" #define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus" #define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation" #define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem" #define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream" #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream" #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus" #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission" #define JAVA_NET_ISA "java/net/InetSocketAddress" #define JAVA_NET_URI "java/net/URI" #define JAVA_STRING "java/lang/String" #define JAVA_VOID "V" /* Macros for constructing method signatures */ #define JPARAM(X) "L" X ";" #define JARRPARAM(X) "[L" X ";" #define JMETHOD1(X, R) "(" X ")" R #define JMETHOD2(X, Y, R) "(" X Y ")" R #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path" // Bit fields for hdfsFile_internal flags #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); /** * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream . */ enum hdfsStreamType { UNINITIALIZED = 0, INPUT = 1, OUTPUT = 2, }; /** * The 'file-handle' to a file in hdfs. */ struct hdfsFile_internal { void* file; enum hdfsStreamType type; int flags; }; int hdfsFileIsOpenForRead(hdfsFile file) { return (file->type == INPUT); } int hdfsFileIsOpenForWrite(hdfsFile file) { return (file->type == OUTPUT); } int hdfsFileUsesDirectRead(hdfsFile file) { return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); } void hdfsFileDisableDirectRead(hdfsFile file) { file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; } /** * hdfsJniEnv: A wrapper struct to be used as 'value' * while saving thread -> JNIEnv* mappings */ typedef struct { JNIEnv* env; } hdfsJniEnv; /** * Helper function to destroy a local reference of java.lang.Object * @param env: The JNIEnv pointer. * @param jFile: The local reference of java.lang.Object object * @return None. */ static void destroyLocalReference(JNIEnv *env, jobject jObject) { if (jObject) (*env)->DeleteLocalRef(env, jObject); } /** * Helper function to create a org.apache.hadoop.fs.Path object. * @param env: The JNIEnv pointer. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path * object. * @return Returns a jobject on success and NULL on error. */ static jobject constructNewObjectOfPath(JNIEnv *env, const char *path) { //Construct a java.lang.String object jstring jPathString = (*env)->NewStringUTF(env, path); //Construct the org.apache.hadoop.fs.Path object jobject jPath = constructNewObjectOfClass(env, NULL, "org/apache/hadoop/fs/Path", "(Ljava/lang/String;)V", jPathString); if (jPath == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.fs.Path for %s\n", path); errno = EINTERNAL; return NULL; } // Destroy the local reference to the java.lang.String object destroyLocalReference(env, jPathString); return jPath; } /** * Helper function to translate an exception into a meaningful errno value. * @param exc: The exception. * @param env: The JNIEnv Pointer. * @param method: The name of the method that threw the exception. This * may be format string to be used in conjuction with additional arguments. * @return Returns a meaningful errno value if possible, or EINTERNAL if not. */ static int errnoFromException(jthrowable exc, JNIEnv *env, const char *method, ...) { va_list ap; int errnum = 0; char *excClass = NULL; if (exc == NULL) goto default_error; if ((excClass = classNameOfObject((jobject) exc, env)) == NULL) { errnum = EINTERNAL; goto done; } if (!strcmp(excClass, "java.lang.UnsupportedOperationException")) { errnum = ENOTSUP; goto done; } if (!strcmp(excClass, "org.apache.hadoop.security." "AccessControlException")) { errnum = EACCES; goto done; } if (!strcmp(excClass, "org.apache.hadoop.hdfs.protocol." "QuotaExceededException")) { errnum = EDQUOT; goto done; } if (!strcmp(excClass, "java.io.FileNotFoundException")) { errnum = ENOENT; goto done; } //TODO: interpret more exceptions; maybe examine exc.getMessage() default_error: //Can't tell what went wrong, so just punt (*env)->ExceptionDescribe(env); fprintf(stderr, "Call to "); va_start(ap, method); vfprintf(stderr, method, ap); va_end(ap); fprintf(stderr, " failed!\n"); errnum = EINTERNAL; done: (*env)->ExceptionClear(env); if (excClass != NULL) free(excClass); return errnum; } /** * Set a configuration value. * * @param env The JNI environment * @param jConfiguration The configuration object to modify * @param key The key to modify * @param value The value to set the key to * * @return 0 on success; error code otherwise */ static int hadoopConfSet(JNIEnv *env, jobject jConfiguration, const char *key, const char *value) { int ret; jthrowable jExc = NULL; jstring jkey = NULL, jvalue = NULL; char buf[1024]; jkey = (*env)->NewStringUTF(env, key); if (!jkey) { ret = ENOMEM; goto done; } jvalue = (*env)->NewStringUTF(env, value); if (!jvalue) { ret = ENOMEM; goto done; } ret = invokeMethod(env, NULL, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID), jkey, jvalue); if (ret) { snprintf(buf, sizeof(buf), "hadoopConfSet(%s, %s)", key, value); ret = errnoFromException(jExc, env, buf); goto done; } done: if (jkey) destroyLocalReference(env, jkey); if (jvalue) destroyLocalReference(env, jvalue); if (ret) errno = ret; return ret; } /** * Convert a Java string into a C string. * * @param env The JNI environment * @param jStr The Java string to convert * @param cstr (out param) the C string. * This will be set to a dynamically allocated * UTF-8 C string on success. * * @return 0 on success; error code otherwise */ static int jStrToCstr(JNIEnv *env, jstring jstr, char **cstr) { const char *tmp; tmp = (*env)->GetStringUTFChars(env, jstr, NULL); *cstr = strdup(tmp); (*env)->ReleaseStringUTFChars(env, jstr, tmp); return 0; } static int hadoopConfGet(JNIEnv *env, jobject jConfiguration, const char *key, char **val) { int ret; jthrowable jExc = NULL; jvalue jVal; jstring jkey = NULL; char buf[1024]; jkey = (*env)->NewStringUTF(env, key); if (!jkey) { ret = ENOMEM; goto done; } ret = invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING)), jkey); if (ret) { snprintf(buf, sizeof(buf), "hadoopConfGet(%s)", key); ret = errnoFromException(jExc, env, buf); goto done; } if (!jVal.l) { *val = NULL; goto done; } ret = jStrToCstr(env, jVal.l, val); if (ret) goto done; done: if (jkey) destroyLocalReference(env, jkey); if (ret) errno = ret; return ret; } int hdfsConfGet(const char *key, char **val) { JNIEnv *env; int ret; jobject jConfiguration = NULL; env = getJNIEnv(); if (env == NULL) { ret = EINTERNAL; goto done; } jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); ret = EINTERNAL; goto done; } ret = hadoopConfGet(env, jConfiguration, key, val); if (ret) goto done; ret = 0; done: if (jConfiguration) destroyLocalReference(env, jConfiguration); if (ret) errno = ret; return ret; } void hdfsConfFree(char *val) { free(val); } struct hdfsBuilder { int forceNewInstance; const char *nn; tPort port; const char *kerbTicketCachePath; const char *userName; }; struct hdfsBuilder *hdfsNewBuilder(void) { struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder)); if (!bld) { errno = ENOMEM; return NULL; } return bld; } void hdfsFreeBuilder(struct hdfsBuilder *bld) { free(bld); } void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) { bld->forceNewInstance = 1; } void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) { bld->nn = nn; } void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) { bld->port = port; } void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) { bld->userName = userName; } void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, const char *kerbTicketCachePath) { bld->kerbTicketCachePath = kerbTicketCachePath; } hdfsFS hdfsConnect(const char* host, tPort port) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); return hdfsBuilderConnect(bld); } /** Always return a new FileSystem handle */ hdfsFS hdfsConnectNewInstance(const char* host, tPort port) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderSetForceNewInstance(bld); return hdfsBuilderConnect(bld); } hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderSetUserName(bld, user); return hdfsBuilderConnect(bld); } /** Always return a new FileSystem handle */ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user) { struct hdfsBuilder *bld = hdfsNewBuilder(); if (!bld) return NULL; hdfsBuilderSetNameNode(bld, host); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderSetForceNewInstance(bld); hdfsBuilderSetUserName(bld, user); return hdfsBuilderConnect(bld); } /** * Calculate the effective URI to use, given a builder configuration. * * If there is not already a URI scheme, we prepend 'hdfs://'. * * If there is not already a port specified, and a port was given to the * builder, we suffix that port. If there is a port specified but also one in * the URI, that is an error. * * @param bld The hdfs builder object * @param uri (out param) dynamically allocated string representing the * effective URI * * @return 0 on success; error code otherwise */ static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri) { const char *scheme; char suffix[64]; const char *lastColon; char *u; size_t uriLen; if (!bld->nn) return EINVAL; scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://"; if (bld->port == 0) { suffix[0] = '\0'; } else { lastColon = rindex(bld->nn, ':'); if (lastColon && (strspn(lastColon + 1, "0123456789") == strlen(lastColon + 1))) { fprintf(stderr, "port %d was given, but URI '%s' already " "contains a port!\n", bld->port, bld->nn); return EINVAL; } snprintf(suffix, sizeof(suffix), ":%d", bld->port); } uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix); u = malloc((uriLen + 1) * (sizeof(char))); if (!u) { fprintf(stderr, "calcEffectiveURI: out of memory"); return ENOMEM; } snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix); *uri = u; return 0; } hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { JNIEnv *env = 0; jobject gFsRef = NULL; jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL; jstring jURIString = NULL, jUserString = NULL; jvalue jVal; jthrowable jExc = NULL; char *cURI = 0; int ret = 0; //Get the JNIEnv* corresponding to current thread env = getJNIEnv(); if (env == NULL) { ret = EINTERNAL; goto done; } // jConfiguration = new Configuration(); jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); errno = EINTERNAL; goto done; } //Check what type of FileSystem the caller wants... if (bld->nn == NULL) { // Get a local filesystem. if (bld->forceNewInstance) { // fs = FileSytem::newInstanceLocal(conf); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)), jConfiguration)) { ret = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::newInstanceLocal"); goto done; } jFS = jVal.l; } else { // fs = FileSytem::getLocal(conf); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getLocal", JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)), jConfiguration) != 0) { ret = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getLocal"); goto done; } jFS = jVal.l; } } else { if (!strcmp(bld->nn, "default")) { // jURI = FileSystem.getDefaultUri(conf) if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getDefaultUri", "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;", jConfiguration) != 0) { ret = errnoFromException(jExc, env, "org.apache.hadoop.fs.", "FileSystem::getDefaultUri"); goto done; } jURI = jVal.l; } else { // fs = FileSystem::get(URI, conf, ugi); ret = calcEffectiveURI(bld, &cURI); if (ret) goto done; jURIString = (*env)->NewStringUTF(env, cURI); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI, "create", "(Ljava/lang/String;)Ljava/net/URI;", jURIString) != 0) { ret = errnoFromException(jExc, env, "java.net.URI::create"); goto done; } jURI = jVal.l; } if (bld->kerbTicketCachePath) { ret = hadoopConfSet(env, jConfiguration, KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath); if (ret) goto done; } if (bld->userName) { jUserString = (*env)->NewStringUTF(env, bld->userName); } if (bld->forceNewInstance) { if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstance", JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI, jConfiguration, jUserString)) { ret = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Filesystem::newInstance(URI, Configuration)"); goto done; } jFS = jVal.l; } else { if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get", JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI, jConfiguration, jUserString)) { ret = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Filesystem::get(URI, Configuration, String)"); goto done; } jFS = jVal.l; } } done: if (jFS) { /* Create a global reference for this fs */ gFsRef = (*env)->NewGlobalRef(env, jFS); } // Release unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jFS); destroyLocalReference(env, jURI); destroyLocalReference(env, jCachePath); destroyLocalReference(env, jURIString); destroyLocalReference(env, jUserString); free(cURI); free(bld); if (ret) errno = ret; return (hdfsFS)gFsRef; } int hdfsDisconnect(hdfsFS fs) { // JAVA EQUIVALENT: // fs.close() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jFS = (jobject)fs; //Caught exception jthrowable jExc = NULL; //Sanity check if (fs == NULL) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "close", "()V") != 0) { errno = errnoFromException(jExc, env, "Filesystem::close"); return -1; } //Release unnecessary references (*env)->DeleteGlobalRef(env, jFS); return 0; } hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, short replication, tSize blockSize) { /* JAVA EQUIVALENT: File f = new File(path); FSData{Input|Output}Stream f{is|os} = fs.create(f); return f{is|os}; */ /* Get the JNIEnv* corresponding to current thread */ JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; if (flags & O_RDWR) { fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n"); errno = ENOTSUP; return NULL; } if ((flags & O_CREAT) && (flags & O_EXCL)) { fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); } /* The hadoop java api/signature */ const char* method = ((flags & O_WRONLY) == 0) ? "open" : (flags & O_APPEND) ? "append" : "create"; const char* signature = ((flags & O_WRONLY) == 0) ? JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM)) : (flags & O_APPEND) ? JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM)) : JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM)); /* Return value */ hdfsFile file = NULL; /* Create an object of org.apache.hadoop.fs.Path */ jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } /* Get the Configuration object from the FileSystem object */ jvalue jVal; jobject jConfiguration = NULL; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getConf", JMETHOD1("", JPARAM(HADOOP_CONF))) != 0) { errno = errnoFromException(jExc, env, "get configuration object " "from filesystem"); destroyLocalReference(env, jPath); return NULL; } jConfiguration = jVal.l; jint jBufferSize = bufferSize; jshort jReplication = replication; jlong jBlockSize = blockSize; jstring jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size"); jstring jStrReplication = (*env)->NewStringUTF(env, "dfs.replication"); jstring jStrBlockSize = (*env)->NewStringUTF(env, "dfs.block.size"); //bufferSize if (!bufferSize) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", jStrBufferSize, 4096) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "Configuration::getInt"); goto done; } jBufferSize = jVal.i; } if ((flags & O_WRONLY) && (flags & O_APPEND) == 0) { //replication if (!replication) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", jStrReplication, 1) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "Configuration::getInt"); goto done; } jReplication = jVal.i; } //blockSize if (!blockSize) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "getLong", "(Ljava/lang/String;J)J", jStrBlockSize, (jlong)67108864)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } jBlockSize = jVal.j; } } /* Create and return either the FSDataInputStream or FSDataOutputStream references jobject jStream */ // READ? if ((flags & O_WRONLY) == 0) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, method, signature, jPath, jBufferSize)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } } else if ((flags & O_WRONLY) && (flags & O_APPEND)) { // WRITE/APPEND? if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, method, signature, jPath)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } } else { // WRITE/CREATE jboolean jOverWrite = 1; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, method, signature, jPath, jOverWrite, jBufferSize, jReplication, jBlockSize)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } } file = malloc(sizeof(struct hdfsFile_internal)); if (!file) { errno = ENOMEM; } else { file->file = (*env)->NewGlobalRef(env, jVal.l); file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT); file->flags = 0; destroyLocalReference(env, jVal.l); if ((flags & O_WRONLY) == 0) { // Try a test read to see if we can do direct reads errno = 0; char buf; if (readDirect(fs, file, &buf, 0) == 0) { // Success - 0-byte read should return 0 file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; } else { if (errno != ENOTSUP) { // Unexpected error. Clear it, don't set the direct flag. fprintf(stderr, "WARN: Unexpected error %d when testing " "for direct read compatibility\n", errno); errno = 0; goto done; } } errno = 0; } } done: //Delete unnecessary local references destroyLocalReference(env, jStrBufferSize); destroyLocalReference(env, jStrReplication); destroyLocalReference(env, jStrBlockSize); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); return file; } int hdfsCloseFile(hdfsFS fs, hdfsFile file) { // JAVA EQUIVALENT: // file.close //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jStream = (jobject)(file ? file->file : NULL); //Caught exception jthrowable jExc = NULL; //Sanity check if (!file || file->type == UNINITIALIZED) { errno = EBADF; return -1; } //The interface whose 'close' method to be called const char* interface = (file->type == INPUT) ? HADOOP_ISTRM : HADOOP_OSTRM; if (invokeMethod(env, NULL, &jExc, INSTANCE, jStream, interface, "close", "()V") != 0) { errno = errnoFromException(jExc, env, "%s::close", interface); return -1; } //De-allocate memory free(file); (*env)->DeleteGlobalRef(env, jStream); return 0; } int hdfsExists(hdfsFS fs, const char *path) { JNIEnv *env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jPath = constructNewObjectOfPath(env, path); jvalue jVal; jthrowable jExc = NULL; jobject jFS = (jobject)fs; if (jPath == NULL) { return -1; } if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::exists"); destroyLocalReference(env, jPath); return -1; } destroyLocalReference(env, jPath); return jVal.z ? 0 : -1; } // Checks input file for readiness for reading. static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, jobject* jInputStream) { *jInputStream = (jobject)(f ? f->file : NULL); //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } return 0; } // Common error-handling code between read paths. static int handleReadResult(int success, jvalue jVal, jthrowable jExc, JNIEnv* env) { int noReadBytes; if (success != 0) { if ((*env)->ExceptionCheck(env)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::read"); } noReadBytes = -1; } else { noReadBytes = jVal.i; if (noReadBytes == 0) { // 0 from Java means try again, which is EINTR here errno = EINTR; noReadBytes = -1; } else if (noReadBytes < 0) { // -1 from Java is EOF, which is 0 here errno = 0; noReadBytes = 0; } } return noReadBytes; } tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { return readDirect(fs, f, buffer, length); } // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(bR); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream; if (readPrepare(env, fs, f, &jInputStream) == -1) { return -1; } jbyteArray jbRarray; jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "([B)I", jbRarray); noReadBytes = handleReadResult(success, jVal, jExc, env);; if (noReadBytes > 0) { (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } destroyLocalReference(env, jbRarray); return noReadBytes; } // Reads using the read(ByteBuffer) API, which does fewer copies tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { // JAVA EQUIVALENT: // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer // fis.read(bbuffer); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jInputStream; if (readPrepare(env, fs, f, &jInputStream) == -1) { return -1; } jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; //Read the requisite bytes jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length); if (bb == NULL) { fprintf(stderr, "Could not allocate ByteBuffer"); if ((*env)->ExceptionCheck(env)) { errno = errnoFromException(NULL, env, "JNIEnv::NewDirectByteBuffer"); } else { errno = ENOMEM; // Best guess if there's no exception waiting } return -1; } int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb); noReadBytes = handleReadResult(success, jVal, jExc, env); destroyLocalReference(env, bb); return noReadBytes; } tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length) { // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(pos, bR, 0, length); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : NULL); jbyteArray jbRarray; jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "(J[BII)I", position, jbRarray, 0, length) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::read"); noReadBytes = -1; } else { noReadBytes = jVal.i; if (noReadBytes > 0) { (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } else { //This is a valid case: there aren't any bytes left to read! if (noReadBytes == 0 || noReadBytes < -1) { fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes); } noReadBytes = 0; } errno = 0; } destroyLocalReference(env, jbRarray); return noReadBytes; } tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) { // JAVA EQUIVALENT // byte b[] = str.getBytes(); // fso.write(b); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jOutputStream = (jobject)(f ? f->file : 0); jbyteArray jbWarray; //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } if (length < 0) { errno = EINVAL; return -1; } //Error checking... make sure that this file is 'writable' if (f->type != OUTPUT) { fprintf(stderr, "Cannot write into a non-OutputStream object!\n"); errno = EINVAL; return -1; } // 'length' equals 'zero' is a valid use-case according to Posix! if (length != 0) { //Write the requisite bytes into the file jbWarray = (*env)->NewByteArray(env, length); (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer); if (invokeMethod(env, NULL, &jExc, INSTANCE, jOutputStream, HADOOP_OSTRM, "write", "([B)V", jbWarray) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataOutputStream::write"); length = -1; } destroyLocalReference(env, jbWarray); } //Return no. of bytes succesfully written (libc way) //i.e. 'length' itself! ;-) return length; } int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos) { // JAVA EQUIVALENT // fis.seek(pos); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "seek", "(J)V", desiredPos) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::seek"); return -1; } return 0; } tOffset hdfsTell(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // pos = f.getPos(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jStream = (jobject)(f ? f->file : 0); //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } const char* interface = (f->type == INPUT) ? HADOOP_ISTRM : HADOOP_OSTRM; jlong currentPos = -1; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStream, interface, "getPos", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::getPos"); return -1; } currentPos = jVal.j; return (tOffset)currentPos; } int hdfsFlush(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fos.flush(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jOutputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jOutputStream, HADOOP_OSTRM, "flush", "()V") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::flush"); return -1; } return 0; } int hdfsHFlush(hdfsFS fs, hdfsFile f) { //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jOutputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jOutputStream, HADOOP_OSTRM, "hflush", "()V") != 0) { errno = errnoFromException(jExc, env, HADOOP_OSTRM "::hflush"); return -1; } return 0; } int hdfsAvailable(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fis.available(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } jint available = -1; jvalue jVal; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "available", "()I") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::available"); return -1; } available = jVal.i; return available; } int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { //JAVA EQUIVALENT // FileUtil::copy(srcFS, srcPath, dstFS, dstPath, // deleteSource = false, conf) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jSrcFS = (jobject)srcFS; jobject jDstFS = (jobject)dstFS; jobject jSrcPath = NULL; jobject jDstPath = NULL; jSrcPath = constructNewObjectOfPath(env, src); if (jSrcPath == NULL) { return -1; } jDstPath = constructNewObjectOfPath(env, dst); if (jDstPath == NULL) { destroyLocalReference(env, jSrcPath); return -1; } int retval = 0; //Create the org.apache.hadoop.conf.Configuration object jobject jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); errno = EINTERNAL; return -1; } //FileUtil::copy jboolean deleteSource = 0; //Only copy jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, "org/apache/hadoop/fs/FileUtil", "copy", "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z", jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource, jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileUtil::copy"); retval = -1; goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); return retval; } int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { //JAVA EQUIVALENT // FileUtil::copy(srcFS, srcPath, dstFS, dstPath, // deleteSource = true, conf) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jSrcFS = (jobject)srcFS; jobject jDstFS = (jobject)dstFS; jobject jSrcPath = NULL; jobject jDstPath = NULL; jSrcPath = constructNewObjectOfPath(env, src); if (jSrcPath == NULL) { return -1; } jDstPath = constructNewObjectOfPath(env, dst); if (jDstPath == NULL) { destroyLocalReference(env, jSrcPath); return -1; } int retval = 0; //Create the org.apache.hadoop.conf.Configuration object jobject jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); errno = EINTERNAL; return -1; } //FileUtil::copy jboolean deleteSource = 1; //Delete src after copy jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, "org/apache/hadoop/fs/FileUtil", "copy", "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z", jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource, jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileUtil::copy(move)"); retval = -1; goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); return retval; } int hdfsDelete(hdfsFS fs, const char* path, int recursive) { // JAVA EQUIVALENT: // File f = new File(path); // bool retval = fs.delete(f); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of java.io.File jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //Delete the file jvalue jVal; jthrowable jExc = NULL; jboolean jRecursive = recursive ? JNI_TRUE : JNI_FALSE; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "delete", "(Lorg/apache/hadoop/fs/Path;Z)Z", jPath, jRecursive) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::delete"); destroyLocalReference(env, jPath); return -1; } //Delete unnecessary local references destroyLocalReference(env, jPath); return (jVal.z) ? 0 : -1; } int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { // JAVA EQUIVALENT: // Path old = new Path(oldPath); // Path new = new Path(newPath); // fs.rename(old, new); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create objects of org.apache.hadoop.fs.Path jobject jOldPath = NULL; jobject jNewPath = NULL; jOldPath = constructNewObjectOfPath(env, oldPath); if (jOldPath == NULL) { return -1; } jNewPath = constructNewObjectOfPath(env, newPath); if (jNewPath == NULL) { destroyLocalReference(env, jOldPath); return -1; } //Rename the file jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "rename", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"), jOldPath, jNewPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::rename"); destroyLocalReference(env, jOldPath); destroyLocalReference(env, jNewPath); return -1; } //Delete unnecessary local references destroyLocalReference(env, jOldPath); destroyLocalReference(env, jNewPath); return (jVal.z) ? 0 : -1; } char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { // JAVA EQUIVALENT: // Path p = fs.getWorkingDirectory(); // return p.toString() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; jobject jPath = NULL; jvalue jVal; jthrowable jExc = NULL; //FileSystem::getWorkingDirectory() if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getWorkingDirectory", "()Lorg/apache/hadoop/fs/Path;") != 0 || jVal.l == NULL) { errno = errnoFromException(jExc, env, "FileSystem::" "getWorkingDirectory"); return NULL; } jPath = jVal.l; //Path::toString() jstring jPathString; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jPath, "org/apache/hadoop/fs/Path", "toString", "()Ljava/lang/String;") != 0) { errno = errnoFromException(jExc, env, "Path::toString"); destroyLocalReference(env, jPath); return NULL; } jPathString = jVal.l; const char *jPathChars = (const char*) ((*env)->GetStringUTFChars(env, jPathString, NULL)); //Copy to user-provided buffer strncpy(buffer, jPathChars, bufferSize); //Delete unnecessary local references (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars); destroyLocalReference(env, jPathString); destroyLocalReference(env, jPath); return buffer; } int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // fs.setWorkingDirectory(Path(path)); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; int retval = 0; jthrowable jExc = NULL; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //FileSystem::setWorkingDirectory() if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setWorkingDirectory", "(Lorg/apache/hadoop/fs/Path;)V", jPath) != 0) { errno = errnoFromException(jExc, env, "FileSystem::" "setWorkingDirectory"); retval = -1; } //Delete unnecessary local references destroyLocalReference(env, jPath); return retval; } int hdfsCreateDirectory(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // fs.mkdirs(new Path(path)); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //Create the directory jvalue jVal; jVal.z = 0; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z", jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::mkdirs"); goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jPath); return (jVal.z) ? 0 : -1; } int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { // JAVA EQUIVALENT: // fs.setReplication(new Path(path), replication); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //Create the directory jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z", jPath, replication) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setReplication"); goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jPath); return (jVal.z) ? 0 : -1; } int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) { // JAVA EQUIVALENT: // fs.setOwner(path, owner, group) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } if (owner == NULL && group == NULL) { fprintf(stderr, "Both owner and group cannot be null in chown"); errno = EINVAL; return -1; } jobject jFS = (jobject)fs; jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } jstring jOwnerString = (*env)->NewStringUTF(env, owner); jstring jGroupString = (*env)->NewStringUTF(env, group); //Create the directory int ret = 0; jthrowable jExc = NULL; if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setOwner", JMETHOD3(JPARAM(HADOOP_PATH), JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID), jPath, jOwnerString, jGroupString) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setOwner"); ret = -1; goto done; } done: destroyLocalReference(env, jPath); destroyLocalReference(env, jOwnerString); destroyLocalReference(env, jGroupString); return ret; } int hdfsChmod(hdfsFS fs, const char* path, short mode) { int ret; // JAVA EQUIVALENT: // fs.setPermission(path, FsPermission) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; // construct jPerm = FsPermission.createImmutable(short mode); jshort jmode = mode; jobject jPermObj = constructNewObjectOfClass(env, NULL, HADOOP_FSPERM,"(S)V",jmode); if (jPermObj == NULL) { errno = EINTERNAL; return -1; } //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { destroyLocalReference(env, jPermObj); return -1; } //Create the directory jthrowable jExc = NULL; if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setPermission", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID), jPath, jPermObj) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setPermission"); ret = -1; goto done; } ret = 0; done: destroyLocalReference(env, jPath); destroyLocalReference(env, jPermObj); return ret; } int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { // JAVA EQUIVALENT: // fs.setTimes(src, mtime, atime) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { fprintf(stderr, "could not construct path object\n"); return -1; } const tTime NO_CHANGE = -1; jlong jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000); jlong jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000); int ret = 0; jthrowable jExc = NULL; if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID), jPath, jmtime, jatime) != 0) { fprintf(stderr, "call to setTime failed\n"); errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setTimes"); ret = -1; goto done; } done: destroyLocalReference(env, jPath); return ret; } char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { // JAVA EQUIVALENT: // fs.getFileBlockLoctions(new Path(path), start, length); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } jvalue jFSVal; jthrowable jFSExc = NULL; if (invokeMethod(env, &jFSVal, &jFSExc, INSTANCE, jFS, HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)" "Lorg/apache/hadoop/fs/FileStatus;", jPath) != 0) { errno = errnoFromException(jFSExc, env, "org.apache.hadoop.fs." "FileSystem::getFileStatus"); destroyLocalReference(env, jPath); return NULL; } jobject jFileStatus = jFSVal.l; //org.apache.hadoop.fs.FileSystem::getFileBlockLocations char*** blockHosts = NULL; jobjectArray jBlockLocations;; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getFileBlockLocations", "(Lorg/apache/hadoop/fs/FileStatus;JJ)" "[Lorg/apache/hadoop/fs/BlockLocation;", jFileStatus, start, length) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getFileBlockLocations"); destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); return NULL; } jBlockLocations = jVal.l; //Figure out no of entries in jBlockLocations //Allocate memory and add NULL at the end jsize jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations); blockHosts = malloc(sizeof(char**) * (jNumFileBlocks+1)); if (blockHosts == NULL) { errno = ENOMEM; goto done; } blockHosts[jNumFileBlocks] = NULL; if (jNumFileBlocks == 0) { errno = 0; goto done; } //Now parse each block to get hostnames int i = 0; for (i=0; i < jNumFileBlocks; ++i) { jobject jFileBlock = (*env)->GetObjectArrayElement(env, jBlockLocations, i); jvalue jVal; jobjectArray jFileBlockHosts; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFileBlock, HADOOP_BLK_LOC, "getHosts", "()[Ljava/lang/String;") || jVal.l == NULL) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "BlockLocation::getHosts"); destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); destroyLocalReference(env, jBlockLocations); return NULL; } jFileBlockHosts = jVal.l; //Figure out no of hosts in jFileBlockHosts //Allocate memory and add NULL at the end jsize jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts); blockHosts[i] = malloc(sizeof(char*) * (jNumBlockHosts+1)); if (blockHosts[i] == NULL) { int x = 0; for (x=0; x < i; ++x) { free(blockHosts[x]); } free(blockHosts); errno = ENOMEM; goto done; } blockHosts[i][jNumBlockHosts] = NULL; //Now parse each hostname int j = 0; const char *hostName; for (j=0; j < jNumBlockHosts; ++j) { jstring jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j); hostName = (const char*)((*env)->GetStringUTFChars(env, jHost, NULL)); blockHosts[i][j] = strdup(hostName); (*env)->ReleaseStringUTFChars(env, jHost, hostName); destroyLocalReference(env, jHost); } destroyLocalReference(env, jFileBlockHosts); } done: //Delete unnecessary local references destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); destroyLocalReference(env, jBlockLocations); return blockHosts; } void hdfsFreeHosts(char ***blockHosts) { int i, j; for (i=0; blockHosts[i]; i++) { for (j=0; blockHosts[i][j]; j++) { free(blockHosts[i][j]); } free(blockHosts[i]); } free(blockHosts); } tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { // JAVA EQUIVALENT: // fs.getDefaultBlockSize(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem::getDefaultBlockSize() tOffset blockSize = -1; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getDefaultBlockSize", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getDefaultBlockSize"); return -1; } blockSize = jVal.j; return blockSize; } tOffset hdfsGetCapacity(hdfsFS fs) { // JAVA EQUIVALENT: // FsStatus fss = fs.getStatus(); // return Fss.getCapacity(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem::getStatus jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getStatus"); return -1; } jobject fss = (jobject)jVal.l; if (invokeMethod(env, &jVal, &jExc, INSTANCE, fss, HADOOP_FSSTATUS, "getCapacity", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FsStatus::getCapacity"); destroyLocalReference(env, fss); return -1; } destroyLocalReference(env, fss); return jVal.j; } tOffset hdfsGetUsed(hdfsFS fs) { // JAVA EQUIVALENT: // FsStatus fss = fs.getStatus(); // return Fss.getUsed(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem::getStatus jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getStatus"); return -1; } jobject fss = (jobject)jVal.l; if (invokeMethod(env, &jVal, &jExc, INSTANCE, fss, HADOOP_FSSTATUS, "getUsed", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FsStatus::getUsed"); destroyLocalReference(env, fss); return -1; } destroyLocalReference(env, fss); return jVal.j; } static int getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo) { jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "isDir", "()Z") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::isDir"); return -1; } fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getReplication", "()S") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getReplication"); return -1; } fileInfo->mReplication = jVal.s; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getBlockSize", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getBlockSize"); return -1; } fileInfo->mBlockSize = jVal.j; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getModificationTime", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getModificationTime"); return -1; } fileInfo->mLastMod = jVal.j / 1000; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getAccessTime", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getAccessTime"); return -1; } fileInfo->mLastAccess = (tTime) (jVal.j / 1000); if (fileInfo->mKind == kObjectKindFile) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getLen", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getLen"); return -1; } fileInfo->mSize = jVal.j; } jobject jPath; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getPath", "()Lorg/apache/hadoop/fs/Path;") || jVal.l == NULL) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Path::getPath"); return -1; } jPath = jVal.l; jstring jPathName; const char *cPathName; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jPath, HADOOP_PATH, "toString", "()Ljava/lang/String;")) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Path::toString"); destroyLocalReference(env, jPath); return -1; } jPathName = jVal.l; cPathName = (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL)); fileInfo->mName = strdup(cPathName); (*env)->ReleaseStringUTFChars(env, jPathName, cPathName); destroyLocalReference(env, jPath); destroyLocalReference(env, jPathName); jstring jUserName; const char* cUserName; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getOwner", "()Ljava/lang/String;")) { fprintf(stderr, "Call to org.apache.hadoop.fs." "FileStatus::getOwner failed!\n"); errno = EINTERNAL; return -1; } jUserName = jVal.l; cUserName = (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL)); fileInfo->mOwner = strdup(cUserName); (*env)->ReleaseStringUTFChars(env, jUserName, cUserName); destroyLocalReference(env, jUserName); jstring jGroupName; const char* cGroupName; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getGroup", "()Ljava/lang/String;")) { fprintf(stderr, "Call to org.apache.hadoop.fs." "FileStatus::getGroup failed!\n"); errno = EINTERNAL; return -1; } jGroupName = jVal.l; cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL)); fileInfo->mGroup = strdup(cGroupName); (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName); destroyLocalReference(env, jGroupName); jobject jPermission; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getPermission", "()Lorg/apache/hadoop/fs/permission/FsPermission;") || jVal.l == NULL) { fprintf(stderr, "Call to org.apache.hadoop.fs." "FileStatus::getPermission failed!\n"); errno = EINTERNAL; return -1; } jPermission = jVal.l; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jPermission, HADOOP_FSPERM, "toShort", "()S") != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.permission." "FsPermission::toShort failed!\n"); errno = EINTERNAL; return -1; } fileInfo->mPermissions = jVal.s; destroyLocalReference(env, jPermission); return 0; } static int getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo *fileInfo) { // JAVA EQUIVALENT: // fs.isDirectory(f) // fs.getModificationTime() // fs.getAccessTime() // fs.getLength(f) // f.getPath() // f.getOwner() // f.getGroup() // f.getPermission().toShort() jobject jStat; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::exists"); return -1; } if (jVal.z == 0) { errno = ENOENT; return -1; } if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getFileStatus", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getFileStatus"); return -1; } jStat = jVal.l; int ret = getFileInfoFromStat(env, jStat, fileInfo); destroyLocalReference(env, jStat); return ret; } hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { // JAVA EQUIVALENT: // Path p(path); // Path []pathList = fs.listPaths(p) // foreach path in pathList // getFileInfo(path) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } hdfsFileInfo *pathList = 0; jobjectArray jPathList = NULL; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_DFS, "listStatus", JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::listStatus"); destroyLocalReference(env, jPath); return NULL; } jPathList = jVal.l; //Figure out no of entries in that directory jsize jPathListSize = (*env)->GetArrayLength(env, jPathList); *numEntries = jPathListSize; if (jPathListSize == 0) { errno = 0; goto done; } //Allocate memory pathList = calloc(jPathListSize, sizeof(hdfsFileInfo)); if (pathList == NULL) { errno = ENOMEM; goto done; } //Save path information in pathList jsize i; jobject tmpStat; for (i=0; i < jPathListSize; ++i) { tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i); if (getFileInfoFromStat(env, tmpStat, &pathList[i])) { hdfsFreeFileInfo(pathList, jPathListSize); destroyLocalReference(env, tmpStat); pathList = NULL; goto done; } destroyLocalReference(env, tmpStat); } done: //Delete unnecessary local references destroyLocalReference(env, jPath); destroyLocalReference(env, jPathList); return pathList; } hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // File f(path); // fs.isDirectory(f) // fs.lastModified() ?? // fs.getLength(f) // f.getPath() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } hdfsFileInfo *fileInfo = calloc(1, sizeof(hdfsFileInfo)); if (getFileInfo(env, jFS, jPath, fileInfo)) { hdfsFreeFileInfo(fileInfo, 1); fileInfo = NULL; goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jPath); return fileInfo; } void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { //Free the mName, mOwner, and mGroup int i; for (i=0; i < numEntries; ++i) { if (hdfsFileInfo[i].mName) { free(hdfsFileInfo[i].mName); } if (hdfsFileInfo[i].mOwner) { free(hdfsFileInfo[i].mOwner); } if (hdfsFileInfo[i].mGroup) { free(hdfsFileInfo[i].mGroup); } } //Free entire block free(hdfsFileInfo); } /** * vim: ts=4: sw=4: et: */