|
@@ -32,8 +32,6 @@
|
|
|
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
|
|
|
#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
|
|
|
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
|
|
|
-#define HADOOP_UNIX_USER_GROUP_INFO "org/apache/hadoop/security/UnixUserGroupInformation"
|
|
|
-#define HADOOP_USER_GROUP_INFO "org/apache/hadoop/security/UserGroupInformation"
|
|
|
#define JAVA_NET_ISA "java/net/InetSocketAddress"
|
|
|
#define JAVA_NET_URI "java/net/URI"
|
|
|
#define JAVA_STRING "java/lang/String"
|
|
@@ -169,17 +167,17 @@ done:
|
|
|
|
|
|
|
|
|
hdfsFS hdfsConnect(const char* host, tPort port) {
|
|
|
- // connect with NULL as user name/groups
|
|
|
- return hdfsConnectAsUser(host, port, NULL, NULL, 0);
|
|
|
+ // connect with NULL as user name
|
|
|
+ return hdfsConnectAsUser(host, port, NULL);
|
|
|
}
|
|
|
|
|
|
/** Always return a new FileSystem handle */
|
|
|
hdfsFS hdfsConnectNewInstance(const char* host, tPort port) {
|
|
|
// connect with NULL as user name/groups
|
|
|
- return hdfsConnectAsUserNewInstance(host, port, NULL, NULL, 0);
|
|
|
+ return hdfsConnectAsUserNewInstance(host, port, NULL);
|
|
|
}
|
|
|
|
|
|
-hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char **groups, int groups_size )
|
|
|
+hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user)
|
|
|
{
|
|
|
// JAVA EQUIVALENT:
|
|
|
// FileSystem fs = FileSystem.get(new Configuration());
|
|
@@ -194,6 +192,7 @@ hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const
|
|
|
jthrowable jExc = NULL;
|
|
|
char *cURI = 0;
|
|
|
jobject gFsRef = NULL;
|
|
|
+ jstring jUserString = NULL;
|
|
|
|
|
|
|
|
|
//Get the JNIEnv* corresponding to current thread
|
|
@@ -215,86 +214,8 @@ hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const
|
|
|
}
|
|
|
|
|
|
if (user != NULL) {
|
|
|
-
|
|
|
- if (groups == NULL || groups_size <= 0) {
|
|
|
- fprintf(stderr, "ERROR: groups must not be empty/null\n");
|
|
|
- errno = EINVAL;
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- jstring jUserString = (*env)->NewStringUTF(env, user);
|
|
|
- jarray jGroups = constructNewArrayString(env, &jExc, groups, groups_size);
|
|
|
- if (jGroups == NULL) {
|
|
|
- errno = EINTERNAL;
|
|
|
- fprintf(stderr, "ERROR: could not construct groups array\n");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- jobject jUgi;
|
|
|
- if ((jUgi = constructNewObjectOfClass(env, &jExc, HADOOP_UNIX_USER_GROUP_INFO, JMETHOD2(JPARAM(JAVA_STRING), JARRPARAM(JAVA_STRING), JAVA_VOID), jUserString, jGroups)) == NULL) {
|
|
|
- fprintf(stderr,"failed to construct hadoop user unix group info object\n");
|
|
|
- errno = errnoFromException(jExc, env, HADOOP_UNIX_USER_GROUP_INFO,
|
|
|
- "init");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- if (jGroups != NULL) {
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- }
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-#define USE_UUGI
|
|
|
-#ifdef USE_UUGI
|
|
|
-
|
|
|
- // UnixUserGroupInformation.UGI_PROPERTY_NAME
|
|
|
- jstring jAttrString = (*env)->NewStringUTF(env,"hadoop.job.ugi");
|
|
|
-
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_UNIX_USER_GROUP_INFO, "saveToConf",
|
|
|
- JMETHOD3(JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_UNIX_USER_GROUP_INFO), JAVA_VOID),
|
|
|
- jConfiguration, jAttrString, jUgi) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, HADOOP_FSPERM,
|
|
|
- "init");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- if (jGroups != NULL) {
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- }
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- destroyLocalReference(env, jAttrString);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- destroyLocalReference(env, jAttrString);
|
|
|
- }
|
|
|
-#else
|
|
|
-
|
|
|
- // what does "current" mean in the context of libhdfs ? does it mean for the last hdfs connection we used?
|
|
|
- // that's why this code cannot be activated. We know the above use of the conf object should work well with
|
|
|
- // multiple connections.
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_USER_GROUP_INFO, "setCurrentUGI",
|
|
|
- JMETHOD1(JPARAM(HADOOP_USER_GROUP_INFO), JAVA_VOID),
|
|
|
- jUgi) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, HADOOP_USER_GROUP_INFO,
|
|
|
- "setCurrentUGI");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- if (jGroups != NULL) {
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- }
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
+ jUserString = (*env)->NewStringUTF(env, user);
|
|
|
}
|
|
|
-#endif
|
|
|
//Check what type of FileSystem the caller wants...
|
|
|
if (host == NULL) {
|
|
|
// fs = FileSytem::getLocal(conf);
|
|
@@ -308,43 +229,61 @@ hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const
|
|
|
}
|
|
|
jFS = jVal.l;
|
|
|
}
|
|
|
+ //FileSystem.get(conf) -> FileSystem.get(FileSystem.getDefaultUri(conf),
|
|
|
+ // conf, user)
|
|
|
else if (!strcmp(host, "default") && port == 0) {
|
|
|
- //fs = FileSystem::get(conf);
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
|
|
- HADOOP_FS, "get",
|
|
|
- JMETHOD1(JPARAM(HADOOP_CONF),
|
|
|
- JPARAM(HADOOP_FS)),
|
|
|
- jConfiguration) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
- "FileSystem::get");
|
|
|
- goto done;
|
|
|
- }
|
|
|
- jFS = jVal.l;
|
|
|
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
|
|
+ "getDefaultUri",
|
|
|
+ "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
|
|
+ jConfiguration) != 0) {
|
|
|
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
|
|
+ "FileSystem::getDefaultUri");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jURI = jVal.l;
|
|
|
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
|
|
+ JMETHOD3(JPARAM(JAVA_NET_URI),
|
|
|
+ JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
|
|
+ JPARAM(HADOOP_FS)),
|
|
|
+ jURI, jConfiguration, jUserString) != 0) {
|
|
|
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
+ "Filesystem::get(URI, Configuration)");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ jFS = jVal.l;
|
|
|
}
|
|
|
else {
|
|
|
- // fs = FileSystem::get(URI, conf);
|
|
|
- cURI = malloc(strlen(host)+16);
|
|
|
- sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
|
|
+ // fs = FileSystem::get(URI, conf, ugi);
|
|
|
+ cURI = malloc(strlen(host)+16);
|
|
|
+ sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
|
|
+ if (cURI == NULL) {
|
|
|
+ fprintf (stderr, "Couldn't allocate an object of size %d",
|
|
|
+ strlen(host) + 16);
|
|
|
+ errno = EINTERNAL;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
|
|
|
- jURIString = (*env)->NewStringUTF(env, cURI);
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
|
|
- "create", "(Ljava/lang/String;)Ljava/net/URI;",
|
|
|
- jURIString) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, "java.net.URI::create");
|
|
|
- goto done;
|
|
|
- }
|
|
|
- jURI = jVal.l;
|
|
|
+ jURIString = (*env)->NewStringUTF(env, cURI);
|
|
|
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
|
|
+ "create", "(Ljava/lang/String;)Ljava/net/URI;",
|
|
|
+ jURIString) != 0) {
|
|
|
+ errno = errnoFromException(jExc, env, "java.net.URI::create");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jURI = jVal.l;
|
|
|
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
|
|
- JMETHOD2(JPARAM(JAVA_NET_URI),
|
|
|
- JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)),
|
|
|
- jURI, jConfiguration) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
- "Filesystem::get(URI, Configuration)");
|
|
|
- goto done;
|
|
|
- }
|
|
|
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
|
|
+ JMETHOD3(JPARAM(JAVA_NET_URI),
|
|
|
+ JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
|
|
+ JPARAM(HADOOP_FS)),
|
|
|
+ jURI, jConfiguration, jUserString) != 0) {
|
|
|
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
+ "Filesystem::get(URI, Configuration)");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
|
|
|
- jFS = jVal.l;
|
|
|
+ jFS = jVal.l;
|
|
|
}
|
|
|
|
|
|
done:
|
|
@@ -353,6 +292,7 @@ hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const
|
|
|
destroyLocalReference(env, jConfiguration);
|
|
|
destroyLocalReference(env, jURIString);
|
|
|
destroyLocalReference(env, jURI);
|
|
|
+ destroyLocalReference(env, jUserString);
|
|
|
|
|
|
if (cURI) free(cURI);
|
|
|
|
|
@@ -367,7 +307,7 @@ hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const
|
|
|
|
|
|
|
|
|
/** Always return a new FileSystem handle */
|
|
|
-hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char **groups, int groups_size )
|
|
|
+hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user)
|
|
|
{
|
|
|
// JAVA EQUIVALENT:
|
|
|
// FileSystem fs = FileSystem.get(new Configuration());
|
|
@@ -382,7 +322,7 @@ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *us
|
|
|
jthrowable jExc = NULL;
|
|
|
char *cURI = 0;
|
|
|
jobject gFsRef = NULL;
|
|
|
-
|
|
|
+ jstring jUserString = NULL;
|
|
|
|
|
|
//Get the JNIEnv* corresponding to current thread
|
|
|
env = getJNIEnv();
|
|
@@ -403,86 +343,8 @@ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *us
|
|
|
}
|
|
|
|
|
|
if (user != NULL) {
|
|
|
-
|
|
|
- if (groups == NULL || groups_size <= 0) {
|
|
|
- fprintf(stderr, "ERROR: groups must not be empty/null\n");
|
|
|
- errno = EINVAL;
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- jstring jUserString = (*env)->NewStringUTF(env, user);
|
|
|
- jarray jGroups = constructNewArrayString(env, &jExc, groups, groups_size);
|
|
|
- if (jGroups == NULL) {
|
|
|
- errno = EINTERNAL;
|
|
|
- fprintf(stderr, "ERROR: could not construct groups array\n");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- jobject jUgi;
|
|
|
- if ((jUgi = constructNewObjectOfClass(env, &jExc, HADOOP_UNIX_USER_GROUP_INFO, JMETHOD2(JPARAM(JAVA_STRING), JARRPARAM(JAVA_STRING), JAVA_VOID), jUserString, jGroups)) == NULL) {
|
|
|
- fprintf(stderr,"failed to construct hadoop user unix group info object\n");
|
|
|
- errno = errnoFromException(jExc, env, HADOOP_UNIX_USER_GROUP_INFO,
|
|
|
- "init");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- if (jGroups != NULL) {
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- }
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-#define USE_UUGI
|
|
|
-#ifdef USE_UUGI
|
|
|
-
|
|
|
- // UnixUserGroupInformation.UGI_PROPERTY_NAME
|
|
|
- jstring jAttrString = (*env)->NewStringUTF(env,"hadoop.job.ugi");
|
|
|
-
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_UNIX_USER_GROUP_INFO, "saveToConf",
|
|
|
- JMETHOD3(JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_UNIX_USER_GROUP_INFO), JAVA_VOID),
|
|
|
- jConfiguration, jAttrString, jUgi) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, HADOOP_FSPERM,
|
|
|
- "init");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- if (jGroups != NULL) {
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- }
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- destroyLocalReference(env, jAttrString);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- destroyLocalReference(env, jAttrString);
|
|
|
- }
|
|
|
-#else
|
|
|
-
|
|
|
- // what does "current" mean in the context of libhdfs ? does it mean for the last hdfs connection we used?
|
|
|
- // that's why this code cannot be activated. We know the above use of the conf object should work well with
|
|
|
- // multiple connections.
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_USER_GROUP_INFO, "setCurrentUGI",
|
|
|
- JMETHOD1(JPARAM(HADOOP_USER_GROUP_INFO), JAVA_VOID),
|
|
|
- jUgi) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, HADOOP_USER_GROUP_INFO,
|
|
|
- "setCurrentUGI");
|
|
|
- destroyLocalReference(env, jConfiguration);
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- if (jGroups != NULL) {
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- }
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- destroyLocalReference(env, jUserString);
|
|
|
- destroyLocalReference(env, jGroups);
|
|
|
- destroyLocalReference(env, jUgi);
|
|
|
- }
|
|
|
-#endif
|
|
|
+ jUserString = (*env)->NewStringUTF(env, user);
|
|
|
+ }
|
|
|
//Check what type of FileSystem the caller wants...
|
|
|
if (host == NULL) {
|
|
|
// fs = FileSytem::newInstanceLocal(conf);
|
|
@@ -497,17 +359,28 @@ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *us
|
|
|
jFS = jVal.l;
|
|
|
}
|
|
|
else if (!strcmp(host, "default") && port == 0) {
|
|
|
- //fs = FileSystem::get(conf);
|
|
|
- if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
|
|
+ //fs = FileSystem::get(conf);
|
|
|
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
|
|
+ "getDefaultUri",
|
|
|
+ "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
|
|
+ jConfiguration) != 0) {
|
|
|
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
|
|
+ "FileSystem::getDefaultUri");
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jURI = jVal.l;
|
|
|
+ if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
|
|
HADOOP_FS, "newInstance",
|
|
|
- JMETHOD1(JPARAM(HADOOP_CONF),
|
|
|
+ JMETHOD3(JPARAM(JAVA_NET_URI),
|
|
|
+ JPARAM(HADOOP_CONF),
|
|
|
+ JPARAM(JAVA_STRING),
|
|
|
JPARAM(HADOOP_FS)),
|
|
|
- jConfiguration) != 0) {
|
|
|
- errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
+ jURI, jConfiguration, jUserString) != 0) {
|
|
|
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
"FileSystem::newInstance");
|
|
|
- goto done;
|
|
|
- }
|
|
|
- jFS = jVal.l;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ jFS = jVal.l;
|
|
|
}
|
|
|
else {
|
|
|
// fs = FileSystem::newInstance(URI, conf);
|
|
@@ -524,9 +397,10 @@ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *us
|
|
|
jURI = jVal.l;
|
|
|
|
|
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstance",
|
|
|
- JMETHOD2(JPARAM(JAVA_NET_URI),
|
|
|
- JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)),
|
|
|
- jURI, jConfiguration) != 0) {
|
|
|
+ JMETHOD3(JPARAM(JAVA_NET_URI),
|
|
|
+ JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
|
|
+ JPARAM(HADOOP_FS)),
|
|
|
+ jURI, jConfiguration, jUserString) != 0) {
|
|
|
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
|
"Filesystem::newInstance(URI, Configuration)");
|
|
|
goto done;
|
|
@@ -541,6 +415,7 @@ hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *us
|
|
|
destroyLocalReference(env, jConfiguration);
|
|
|
destroyLocalReference(env, jURIString);
|
|
|
destroyLocalReference(env, jURI);
|
|
|
+ destroyLocalReference(env, jUserString);
|
|
|
|
|
|
if (cURI) free(cURI);
|
|
|
|