|
@@ -279,12 +279,19 @@ done:
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+struct hdfsBuilderConfOpt {
|
|
|
+ struct hdfsBuilderConfOpt *next;
|
|
|
+ const char *key;
|
|
|
+ const char *val;
|
|
|
+};
|
|
|
+
|
|
|
struct hdfsBuilder {
|
|
|
int forceNewInstance;
|
|
|
const char *nn;
|
|
|
tPort port;
|
|
|
const char *kerbTicketCachePath;
|
|
|
const char *userName;
|
|
|
+ struct hdfsBuilderConfOpt *opts;
|
|
|
};
|
|
|
|
|
|
struct hdfsBuilder *hdfsNewBuilder(void)
|
|
@@ -297,8 +304,32 @@ struct hdfsBuilder *hdfsNewBuilder(void)
|
|
|
return bld;
|
|
|
}
|
|
|
|
|
|
+int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
|
|
+ const char *val)
|
|
|
+{
|
|
|
+ struct hdfsBuilderConfOpt *opt, *next;
|
|
|
+
|
|
|
+ opt = calloc(1, sizeof(struct hdfsBuilderConfOpt));
|
|
|
+ if (!opt)
|
|
|
+ return -ENOMEM;
|
|
|
+ next = bld->opts;
|
|
|
+ bld->opts = opt;
|
|
|
+ opt->next = next;
|
|
|
+ opt->key = key;
|
|
|
+ opt->val = val;
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
|
|
{
|
|
|
+ struct hdfsBuilderConfOpt *cur, *next;
|
|
|
+
|
|
|
+ cur = bld->opts;
|
|
|
+ for (cur = bld->opts; cur; ) {
|
|
|
+ next = cur->next;
|
|
|
+ free(cur);
|
|
|
+ cur = next;
|
|
|
+ }
|
|
|
free(bld);
|
|
|
}
|
|
|
|
|
@@ -451,6 +482,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
|
|
|
char *cURI = 0, buf[512];
|
|
|
int ret;
|
|
|
jobject jRet = NULL;
|
|
|
+ struct hdfsBuilderConfOpt *opt;
|
|
|
|
|
|
//Get the JNIEnv* corresponding to current thread
|
|
|
env = getJNIEnv();
|
|
@@ -466,6 +498,16 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
|
|
|
"hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
|
|
|
goto done;
|
|
|
}
|
|
|
+ // set configuration values
|
|
|
+ for (opt = bld->opts; opt; opt = opt->next) {
|
|
|
+ jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val);
|
|
|
+ if (jthr) {
|
|
|
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
|
|
+ "hdfsBuilderConnect(%s): error setting conf '%s' to '%s'",
|
|
|
+ hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val);
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
//Check what type of FileSystem the caller wants...
|
|
|
if (bld->nn == NULL) {
|
|
@@ -596,7 +638,7 @@ done:
|
|
|
destroyLocalReference(env, jURIString);
|
|
|
destroyLocalReference(env, jUserString);
|
|
|
free(cURI);
|
|
|
- free(bld);
|
|
|
+ hdfsFreeBuilder(bld);
|
|
|
|
|
|
if (ret) {
|
|
|
errno = ret;
|
|
@@ -644,7 +686,29 @@ int hdfsDisconnect(hdfsFS fs)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Get the default block size of a FileSystem object.
|
|
|
+ *
|
|
|
+ * @param env The Java env
|
|
|
+ * @param jFS The FileSystem object
|
|
|
+ * @param jPath The path to find the default blocksize at
|
|
|
+ * @param out (out param) the default block size
|
|
|
+ *
|
|
|
+ * @return NULL on success; or the exception
|
|
|
+ */
|
|
|
+static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS,
|
|
|
+ jobject jPath, jlong *out)
|
|
|
+{
|
|
|
+ jthrowable jthr;
|
|
|
+ jvalue jVal;
|
|
|
|
|
|
+ jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
|
|
|
+ "getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH), "J"), jPath);
|
|
|
+ if (jthr)
|
|
|
+ return jthr;
|
|
|
+ *out = jVal.j;
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
|
|
|
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
int bufferSize, short replication, tSize blockSize)
|
|
@@ -665,7 +729,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
}
|
|
|
|
|
|
jstring jStrBufferSize = NULL, jStrReplication = NULL;
|
|
|
- jstring jStrBlockSize = NULL;
|
|
|
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
|
|
|
jobject jFS = (jobject)fs;
|
|
|
jthrowable jthr;
|
|
@@ -724,7 +787,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
|
|
|
jint jBufferSize = bufferSize;
|
|
|
jshort jReplication = replication;
|
|
|
- jlong jBlockSize = blockSize;
|
|
|
jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size");
|
|
|
if (!jStrBufferSize) {
|
|
|
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
|
|
@@ -735,11 +797,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
|
|
|
goto done;
|
|
|
}
|
|
|
- jStrBlockSize = (*env)->NewStringUTF(env, "dfs.block.size");
|
|
|
- if (!jStrBlockSize) {
|
|
|
- ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
|
|
|
- goto done;
|
|
|
- }
|
|
|
|
|
|
if (!bufferSize) {
|
|
|
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
|
|
@@ -768,20 +825,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
}
|
|
|
jReplication = jVal.i;
|
|
|
}
|
|
|
-
|
|
|
- //blockSize
|
|
|
- if (!blockSize) {
|
|
|
- jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
|
|
|
- HADOOP_CONF, "getLong", "(Ljava/lang/String;J)J",
|
|
|
- jStrBlockSize, (jlong)67108864);
|
|
|
- if (jthr) {
|
|
|
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
|
|
- "hdfsOpenFile(%s): Configuration#getLong(dfs.block.size)",
|
|
|
- path);
|
|
|
- goto done;
|
|
|
- }
|
|
|
- jBlockSize = jVal.j;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/* Create and return either the FSDataInputStream or
|
|
@@ -798,6 +841,15 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
} else {
|
|
|
// WRITE/CREATE
|
|
|
jboolean jOverWrite = 1;
|
|
|
+ jlong jBlockSize = blockSize;
|
|
|
+
|
|
|
+ if (jBlockSize == 0) {
|
|
|
+ jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize);
|
|
|
+ if (jthr) {
|
|
|
+ ret = EIO;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+ }
|
|
|
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
|
|
|
method, signature, jPath, jOverWrite,
|
|
|
jBufferSize, jReplication, jBlockSize);
|
|
@@ -842,7 +894,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
|
|
done:
|
|
|
destroyLocalReference(env, jStrBufferSize);
|
|
|
destroyLocalReference(env, jStrReplication);
|
|
|
- destroyLocalReference(env, jStrBlockSize);
|
|
|
destroyLocalReference(env, jConfiguration);
|
|
|
destroyLocalReference(env, jPath);
|
|
|
destroyLocalReference(env, jFile);
|
|
@@ -2142,6 +2193,39 @@ tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
|
|
|
}
|
|
|
|
|
|
|
|
|
+tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path)
|
|
|
+{
|
|
|
+ // JAVA EQUIVALENT:
|
|
|
+ // fs.getDefaultBlockSize(path);
|
|
|
+
|
|
|
+ jthrowable jthr;
|
|
|
+ jobject jFS = (jobject)fs;
|
|
|
+ jobject jPath;
|
|
|
+ tOffset blockSize;
|
|
|
+ JNIEnv* env = getJNIEnv();
|
|
|
+
|
|
|
+ if (env == NULL) {
|
|
|
+ errno = EINTERNAL;
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ jthr = constructNewObjectOfPath(env, path, &jPath);
|
|
|
+ if (jthr) {
|
|
|
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
|
|
+ "hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath",
|
|
|
+ path);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ jthr = getDefaultBlockSize(env, jFS, jPath, &blockSize);
|
|
|
+ (*env)->DeleteLocalRef(env, jPath);
|
|
|
+ if (jthr) {
|
|
|
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
|
|
+ "hdfsGetDefaultBlockSize(path=%s): "
|
|
|
+ "FileSystem#getDefaultBlockSize", path);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return blockSize;
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
tOffset hdfsGetCapacity(hdfsFS fs)
|
|
|
{
|