|
@@ -15,17 +15,19 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
-#include <string.h>
|
|
|
+
|
|
|
+#include "config.h"
|
|
|
#include "hdfsJniHelper.h"
|
|
|
|
|
|
+#include <stdio.h>
|
|
|
+#include <string.h>
|
|
|
+
|
|
|
static pthread_mutex_t hdfsHashMutex = PTHREAD_MUTEX_INITIALIZER;
|
|
|
static pthread_mutex_t jvmMutex = PTHREAD_MUTEX_INITIALIZER;
|
|
|
static volatile int hashTableInited = 0;
|
|
|
|
|
|
#define LOCK_HASH_TABLE() pthread_mutex_lock(&hdfsHashMutex)
|
|
|
#define UNLOCK_HASH_TABLE() pthread_mutex_unlock(&hdfsHashMutex)
|
|
|
-#define LOCK_JVM_MUTEX() pthread_mutex_lock(&jvmMutex)
|
|
|
-#define UNLOCK_JVM_MUTEX() pthread_mutex_unlock(&jvmMutex)
|
|
|
|
|
|
|
|
|
/** The Native return types that methods could return */
|
|
@@ -48,6 +50,43 @@ static volatile int hashTableInited = 0;
|
|
|
*/
|
|
|
#define MAX_HASH_TABLE_ELEM 4096
|
|
|
|
|
|
+/** Key that allows us to retrieve thread-local storage */
|
|
|
+static pthread_key_t gTlsKey;
|
|
|
+
|
|
|
+/** nonzero if we succeeded in initializing gTlsKey. Protected by the jvmMutex */
|
|
|
+static int gTlsKeyInitialized = 0;
|
|
|
+
|
|
|
+/** Pthreads thread-local storage for each library thread. */
|
|
|
+struct hdfsTls {
|
|
|
+ JNIEnv *env;
|
|
|
+};
|
|
|
+
|
|
|
+/**
|
|
|
+ * The function that is called whenever a thread with libhdfs thread local data
|
|
|
+ * is destroyed.
|
|
|
+ *
|
|
|
+ * @param v The thread-local data
|
|
|
+ */
|
|
|
+static void hdfsThreadDestructor(void *v)
|
|
|
+{
|
|
|
+ struct hdfsTls *tls = v;
|
|
|
+ JavaVM *vm;
|
|
|
+ JNIEnv *env = tls->env;
|
|
|
+ jint ret;
|
|
|
+
|
|
|
+ if (!tls)
|
|
|
+ return;
|
|
|
+ ret = (*env)->GetJavaVM(env, &vm);
|
|
|
+ if (ret) {
|
|
|
+ fprintf(stderr, "hdfsThreadDestructor: GetJavaVM failed with "
|
|
|
+ "error %d\n", ret);
|
|
|
+ (*env)->ExceptionDescribe(env);
|
|
|
+ } else {
|
|
|
+ (*vm)->DetachCurrentThread(vm);
|
|
|
+ }
|
|
|
+ free(tls);
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
static int validateMethodType(MethType methType)
|
|
|
{
|
|
@@ -375,34 +414,26 @@ char *classNameOfObject(jobject jobj, JNIEnv *env) {
|
|
|
return newstr;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
- * getJNIEnv: A helper function to get the JNIEnv* for the given thread.
|
|
|
- * If no JVM exists, then one will be created. JVM command line arguments
|
|
|
- * are obtained from the LIBHDFS_OPTS environment variable.
|
|
|
+ * Get the global JNI environemnt.
|
|
|
*
|
|
|
- * @param: None.
|
|
|
- * @return The JNIEnv* corresponding to the thread.
|
|
|
+ * We only have to create the JVM once. After that, we can use it in
|
|
|
+ * every thread. You must be holding the jvmMutex when you call this
|
|
|
+ * function.
|
|
|
+ *
|
|
|
+ * @return The JNIEnv on success; error code otherwise
|
|
|
*/
|
|
|
-JNIEnv* getJNIEnv(void)
|
|
|
+static JNIEnv* getGlobalJNIEnv(void)
|
|
|
{
|
|
|
-
|
|
|
const jsize vmBufLength = 1;
|
|
|
JavaVM* vmBuf[vmBufLength];
|
|
|
JNIEnv *env;
|
|
|
jint rv = 0;
|
|
|
jint noVMs = 0;
|
|
|
|
|
|
- // Only the first thread should create the JVM. The other trheads should
|
|
|
- // just use the JVM created by the first thread.
|
|
|
- LOCK_JVM_MUTEX();
|
|
|
-
|
|
|
rv = JNI_GetCreatedJavaVMs(&(vmBuf[0]), vmBufLength, &noVMs);
|
|
|
if (rv != 0) {
|
|
|
fprintf(stderr, "JNI_GetCreatedJavaVMs failed with error: %d\n", rv);
|
|
|
- UNLOCK_JVM_MUTEX();
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
@@ -411,7 +442,6 @@ JNIEnv* getJNIEnv(void)
|
|
|
char *hadoopClassPath = getenv("CLASSPATH");
|
|
|
if (hadoopClassPath == NULL) {
|
|
|
fprintf(stderr, "Environment variable CLASSPATH not set!\n");
|
|
|
- UNLOCK_JVM_MUTEX();
|
|
|
return NULL;
|
|
|
}
|
|
|
char *hadoopClassPathVMArg = "-Djava.class.path=";
|
|
@@ -470,7 +500,6 @@ JNIEnv* getJNIEnv(void)
|
|
|
if (rv != 0) {
|
|
|
fprintf(stderr, "Call to JNI_CreateJavaVM failed "
|
|
|
"with error: %d\n", rv);
|
|
|
- UNLOCK_JVM_MUTEX();
|
|
|
return NULL;
|
|
|
}
|
|
|
}
|
|
@@ -481,11 +510,82 @@ JNIEnv* getJNIEnv(void)
|
|
|
if (rv != 0) {
|
|
|
fprintf(stderr, "Call to AttachCurrentThread "
|
|
|
"failed with error: %d\n", rv);
|
|
|
- UNLOCK_JVM_MUTEX();
|
|
|
return NULL;
|
|
|
}
|
|
|
}
|
|
|
- UNLOCK_JVM_MUTEX();
|
|
|
|
|
|
return env;
|
|
|
}
|
|
|
+
|
|
|
+/**
|
|
|
+ * getJNIEnv: A helper function to get the JNIEnv* for the given thread.
|
|
|
+ * If no JVM exists, then one will be created. JVM command line arguments
|
|
|
+ * are obtained from the LIBHDFS_OPTS environment variable.
|
|
|
+ *
|
|
|
+ * Implementation note: we rely on POSIX thread-local storage (tls).
|
|
|
+ * This allows us to associate a destructor function with each thread, that
|
|
|
+ * will detach the thread from the Java VM when the thread terminates. If we
|
|
|
+ * failt to do this, it will cause a memory leak.
|
|
|
+ *
|
|
|
+ * However, POSIX TLS is not the most efficient way to do things. It requires a
|
|
|
+ * key to be initialized before it can be used. Since we don't know if this key
|
|
|
+ * is initialized at the start of this function, we have to lock a mutex first
|
|
|
+ * and check. Luckily, most operating systems support the more efficient
|
|
|
+ * __thread construct, which is initialized by the linker.
|
|
|
+ *
|
|
|
+ * @param: None.
|
|
|
+ * @return The JNIEnv* corresponding to the thread.
|
|
|
+ */
|
|
|
+JNIEnv* getJNIEnv(void)
|
|
|
+{
|
|
|
+ JNIEnv *env;
|
|
|
+ struct hdfsTls *tls;
|
|
|
+ int ret;
|
|
|
+
|
|
|
+#ifdef HAVE_BETTER_TLS
|
|
|
+ static __thread struct hdfsTls *quickTls = NULL;
|
|
|
+ if (quickTls)
|
|
|
+ return quickTls->env;
|
|
|
+#endif
|
|
|
+ pthread_mutex_lock(&jvmMutex);
|
|
|
+ if (!gTlsKeyInitialized) {
|
|
|
+ ret = pthread_key_create(&gTlsKey, hdfsThreadDestructor);
|
|
|
+ if (ret) {
|
|
|
+ pthread_mutex_unlock(&jvmMutex);
|
|
|
+ fprintf("pthread_key_create failed with error %d\n", ret);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ gTlsKeyInitialized = 1;
|
|
|
+ }
|
|
|
+ tls = pthread_getspecific(gTlsKey);
|
|
|
+ if (tls) {
|
|
|
+ pthread_mutex_unlock(&jvmMutex);
|
|
|
+ return tls->env;
|
|
|
+ }
|
|
|
+
|
|
|
+ env = getGlobalJNIEnv();
|
|
|
+ pthread_mutex_unlock(&jvmMutex);
|
|
|
+ if (!env) {
|
|
|
+ fprintf(stderr, "getJNIEnv: getGlobalJNIEnv failed\n");
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ tls = calloc(1, sizeof(struct hdfsTls));
|
|
|
+ if (!tls) {
|
|
|
+ fprintf(stderr, "getJNIEnv: OOM allocating %d bytes\n",
|
|
|
+ sizeof(struct hdfsTls));
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ tls->env = env;
|
|
|
+ ret = pthread_setspecific(gTlsKey, tls);
|
|
|
+ if (ret) {
|
|
|
+ fprintf(stderr, "getJNIEnv: pthread_setspecific failed with "
|
|
|
+ "error code %d\n", ret);
|
|
|
+ hdfsThreadDestructor(tls);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+#ifdef HAVE_BETTER_TLS
|
|
|
+ quickTls = tls;
|
|
|
+#endif
|
|
|
+ return env;
|
|
|
+}
|
|
|
+
|