|
@@ -29,6 +29,8 @@
|
|
|
#define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
|
|
|
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
|
|
|
#define JAVA_NET_ISA "java/net/InetSocketAddress"
|
|
|
+#define JAVA_NET_URI "java/net/URI"
|
|
|
+
|
|
|
|
|
|
|
|
|
/* Macros for constructing method signatures */
|
|
@@ -57,7 +59,8 @@ typedef struct
|
|
|
*/
|
|
|
static void destroyLocalReference(JNIEnv *env, jobject jObject)
|
|
|
{
|
|
|
- (*env)->DeleteLocalRef(env, jObject);
|
|
|
+ if (jObject)
|
|
|
+ (*env)->DeleteLocalRef(env, jObject);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -102,9 +105,14 @@ hdfsFS hdfsConnect(const char* host, tPort port)
|
|
|
// return fs;
|
|
|
|
|
|
JNIEnv *env = 0;
|
|
|
- jobject jConfiguration;
|
|
|
+ jobject jConfiguration = NULL;
|
|
|
jobject jFS = NULL;
|
|
|
+ jobject jURI = NULL;
|
|
|
+ jstring jURIString = NULL;
|
|
|
jvalue jVal;
|
|
|
+ char *cURI = 0;
|
|
|
+ jobject gFsRef = NULL;
|
|
|
+
|
|
|
|
|
|
//Get the JNIEnv* corresponding to current thread
|
|
|
env = getJNIEnv();
|
|
@@ -122,14 +130,17 @@ hdfsFS hdfsConnect(const char* host, tPort port)
|
|
|
|
|
|
//Check what type of FileSystem the caller wants...
|
|
|
if (host == NULL) {
|
|
|
- //fs = new LocalFileSystem(conf);
|
|
|
- jFS = constructNewObjectOfClass(env, HADOOP_LOCALFS,
|
|
|
- JMETHOD1(JPARAM(HADOOP_CONF), "V"),
|
|
|
- jConfiguration);
|
|
|
- if (jFS == NULL) {
|
|
|
- errno = EINTERNAL;
|
|
|
- goto done;
|
|
|
+ // fs = FileSytem::getLocal(conf);
|
|
|
+ if (invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal",
|
|
|
+ JMETHOD1(JPARAM(HADOOP_CONF),
|
|
|
+ JPARAM(HADOOP_LOCALFS)),
|
|
|
+ jConfiguration) != 0) {
|
|
|
+ fprintf(stderr, "Call to org.apache.hadoop.fs."
|
|
|
+ "FileSystem::getLocal failed!\n");
|
|
|
+ errno = EINTERNAL;
|
|
|
+ goto done;
|
|
|
}
|
|
|
+ jFS = jVal.l;
|
|
|
}
|
|
|
else if (!strcmp(host, "default") && port == 0) {
|
|
|
//fs = FileSystem::get(conf);
|
|
@@ -146,39 +157,43 @@ hdfsFS hdfsConnect(const char* host, tPort port)
|
|
|
jFS = jVal.l;
|
|
|
}
|
|
|
else {
|
|
|
- //fs = new DistributedFileSystem(new InetSocketAddress(host, port), conf)
|
|
|
- jstring jHostName = (*env)->NewStringUTF(env, host);
|
|
|
- jobject jNameNodeAddr =
|
|
|
- constructNewObjectOfClass(env, JAVA_NET_ISA,
|
|
|
- "(Ljava/lang/String;I)V",
|
|
|
- jHostName, port);
|
|
|
-
|
|
|
- destroyLocalReference(env, jHostName);
|
|
|
- if (jNameNodeAddr == NULL) {
|
|
|
- errno = EINTERNAL;
|
|
|
- goto done;
|
|
|
+ // fs = FileSystem::get(URI, conf);
|
|
|
+ cURI = malloc(strlen(host)+16);
|
|
|
+ sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
|
|
+
|
|
|
+ jURIString = (*env)->NewStringUTF(env, cURI);
|
|
|
+ if (invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI,
|
|
|
+ "create", "(Ljava/lang/String;)Ljava/net/URI;",
|
|
|
+ jURIString) != 0) {
|
|
|
+ fprintf(stderr, "Call to java.net.URI::create failed!\n");
|
|
|
+ errno = EINTERNAL;
|
|
|
+ goto done;
|
|
|
}
|
|
|
-
|
|
|
- jFS = constructNewObjectOfClass(env, HADOOP_DFS,
|
|
|
- JMETHOD2(JPARAM(JAVA_NET_ISA),
|
|
|
- JPARAM(HADOOP_CONF), "V"),
|
|
|
- jNameNodeAddr, jConfiguration);
|
|
|
-
|
|
|
- destroyLocalReference(env, jNameNodeAddr);
|
|
|
- if (jFS == NULL) {
|
|
|
- errno = EINTERNAL;
|
|
|
- goto done;
|
|
|
+ jURI = jVal.l;
|
|
|
+
|
|
|
+ if (invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get",
|
|
|
+ JMETHOD2(JPARAM(JAVA_NET_URI),
|
|
|
+ JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)),
|
|
|
+ jURI, jConfiguration) != 0) {
|
|
|
+ fprintf(stderr, "Call to org.apache.hadoop.fs."
|
|
|
+ "FileSystem::get(URI, Configuration) failed!\n");
|
|
|
+ errno = EINTERNAL;
|
|
|
+ goto done;
|
|
|
}
|
|
|
+
|
|
|
+ jFS = jVal.l;
|
|
|
}
|
|
|
|
|
|
done:
|
|
|
|
|
|
- //Release unnecessary local references
|
|
|
+ // Release unnecessary local references
|
|
|
destroyLocalReference(env, jConfiguration);
|
|
|
+ destroyLocalReference(env, jURIString);
|
|
|
+ destroyLocalReference(env, jURI);
|
|
|
|
|
|
- /* Create a global reference for this fs */
|
|
|
- jobject gFsRef = NULL;
|
|
|
+ if (cURI) free(cURI);
|
|
|
|
|
|
+ /* Create a global reference for this fs */
|
|
|
if (jFS) {
|
|
|
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
|
|
destroyLocalReference(env, jFS);
|