native_mini_dfs.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "exception.h"
  19. #include "jni_helper.h"
  20. #include "native_mini_dfs.h"
  21. #include <errno.h>
  22. #include <jni.h>
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <string.h>
  26. #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
  27. #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
  28. #define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
  29. #define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
  30. #define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
  31. #define DFS_WEBHDFS_ENABLED_KEY "dfs.webhdfs.enabled"
  32. struct NativeMiniDfsCluster {
  33. /**
  34. * The NativeMiniDfsCluster object
  35. */
  36. jobject obj;
  37. };
  38. struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
  39. {
  40. struct NativeMiniDfsCluster* cl = NULL;
  41. jobject bld = NULL, cobj = NULL, cluster = NULL;
  42. jvalue val;
  43. JNIEnv *env = getJNIEnv();
  44. jthrowable jthr;
  45. jstring jconfStr = NULL;
  46. if (!env) {
  47. fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
  48. return NULL;
  49. }
  50. cl = calloc(1, sizeof(struct NativeMiniDfsCluster));
  51. if (!cl) {
  52. fprintf(stderr, "nmdCreate: OOM");
  53. goto error;
  54. }
  55. jthr = constructNewObjectOfClass(env, &cobj, HADOOP_CONF, "()V");
  56. if (jthr) {
  57. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  58. "nmdCreate: new Configuration");
  59. goto error;
  60. }
  61. if (conf->webhdfsEnabled) {
  62. jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr);
  63. if (jthr) {
  64. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  65. "nmdCreate: new String");
  66. goto error;
  67. }
  68. jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
  69. "setBoolean", "(Ljava/lang/String;Z)V",
  70. jconfStr, conf->webhdfsEnabled);
  71. if (jthr) {
  72. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  73. "nmdCreate: Configuration::setBoolean");
  74. goto error;
  75. }
  76. }
  77. jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
  78. "(L"HADOOP_CONF";)V", cobj);
  79. if (jthr) {
  80. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  81. "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
  82. goto error;
  83. }
  84. jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
  85. "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
  86. if (jthr) {
  87. printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
  88. "Builder::format");
  89. goto error;
  90. }
  91. (*env)->DeleteLocalRef(env, val.l);
  92. if (conf->webhdfsEnabled) {
  93. jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
  94. "nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";",
  95. conf->namenodeHttpPort);
  96. if (jthr) {
  97. printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
  98. "Builder::nameNodeHttpPort");
  99. goto error;
  100. }
  101. (*env)->DeleteLocalRef(env, val.l);
  102. }
  103. jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
  104. "build", "()L" MINIDFS_CLUSTER ";");
  105. if (jthr) {
  106. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  107. "nmdCreate: Builder#build");
  108. goto error;
  109. }
  110. cluster = val.l;
  111. cl->obj = (*env)->NewGlobalRef(env, val.l);
  112. if (!cl->obj) {
  113. printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  114. "nmdCreate: NewGlobalRef");
  115. goto error;
  116. }
  117. (*env)->DeleteLocalRef(env, cluster);
  118. (*env)->DeleteLocalRef(env, bld);
  119. (*env)->DeleteLocalRef(env, cobj);
  120. (*env)->DeleteLocalRef(env, jconfStr);
  121. return cl;
  122. error:
  123. (*env)->DeleteLocalRef(env, cluster);
  124. (*env)->DeleteLocalRef(env, bld);
  125. (*env)->DeleteLocalRef(env, cobj);
  126. (*env)->DeleteLocalRef(env, jconfStr);
  127. free(cl);
  128. return NULL;
  129. }
  130. void nmdFree(struct NativeMiniDfsCluster* cl)
  131. {
  132. JNIEnv *env = getJNIEnv();
  133. if (!env) {
  134. fprintf(stderr, "nmdFree: getJNIEnv failed\n");
  135. free(cl);
  136. return;
  137. }
  138. (*env)->DeleteGlobalRef(env, cl->obj);
  139. free(cl);
  140. }
  141. int nmdShutdown(struct NativeMiniDfsCluster* cl)
  142. {
  143. JNIEnv *env = getJNIEnv();
  144. jthrowable jthr;
  145. if (!env) {
  146. fprintf(stderr, "nmdShutdown: getJNIEnv failed\n");
  147. return -EIO;
  148. }
  149. jthr = invokeMethod(env, NULL, INSTANCE, cl->obj,
  150. MINIDFS_CLUSTER, "shutdown", "()V");
  151. if (jthr) {
  152. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  153. "nmdShutdown: MiniDFSCluster#shutdown");
  154. return -EIO;
  155. }
  156. return 0;
  157. }
  158. int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl)
  159. {
  160. jthrowable jthr;
  161. JNIEnv *env = getJNIEnv();
  162. if (!env) {
  163. fprintf(stderr, "nmdWaitClusterUp: getJNIEnv failed\n");
  164. return -EIO;
  165. }
  166. jthr = invokeMethod(env, NULL, INSTANCE, cl->obj,
  167. MINIDFS_CLUSTER, "waitClusterUp", "()V");
  168. if (jthr) {
  169. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  170. "nmdWaitClusterUp: MiniDFSCluster#waitClusterUp ");
  171. return -EIO;
  172. }
  173. return 0;
  174. }
  175. int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
  176. {
  177. JNIEnv *env = getJNIEnv();
  178. jvalue jVal;
  179. jthrowable jthr;
  180. if (!env) {
  181. fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
  182. return -EIO;
  183. }
  184. // Note: this will have to be updated when HA nativeMiniDfs clusters are
  185. // supported
  186. jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj,
  187. MINIDFS_CLUSTER, "getNameNodePort", "()I");
  188. if (jthr) {
  189. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  190. "nmdHdfsConnect: MiniDFSCluster#getNameNodePort");
  191. return -EIO;
  192. }
  193. return jVal.i;
  194. }
  195. int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
  196. int *port, const char **hostName)
  197. {
  198. JNIEnv *env = getJNIEnv();
  199. jvalue jVal;
  200. jobject jNameNode, jAddress;
  201. jthrowable jthr;
  202. int ret = 0;
  203. const char *host;
  204. if (!env) {
  205. fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
  206. return -EIO;
  207. }
  208. // First get the (first) NameNode of the cluster
  209. jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, MINIDFS_CLUSTER,
  210. "getNameNode", "()L" HADOOP_NAMENODE ";");
  211. if (jthr) {
  212. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  213. "nmdGetNameNodeHttpAddress: "
  214. "MiniDFSCluster#getNameNode");
  215. return -EIO;
  216. }
  217. jNameNode = jVal.l;
  218. // Then get the http address (InetSocketAddress) of the NameNode
  219. jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
  220. "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
  221. if (jthr) {
  222. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  223. "nmdGetNameNodeHttpAddress: "
  224. "NameNode#getHttpAddress");
  225. goto error_dlr_nn;
  226. }
  227. jAddress = jVal.l;
  228. jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
  229. JAVA_INETSOCKETADDRESS, "getPort", "()I");
  230. if (jthr) {
  231. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  232. "nmdGetNameNodeHttpAddress: "
  233. "InetSocketAddress#getPort");
  234. goto error_dlr_addr;
  235. }
  236. *port = jVal.i;
  237. jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
  238. "getHostName", "()Ljava/lang/String;");
  239. if (jthr) {
  240. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  241. "nmdGetNameNodeHttpAddress: "
  242. "InetSocketAddress#getHostName");
  243. goto error_dlr_addr;
  244. }
  245. host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
  246. *hostName = strdup(host);
  247. (*env)->ReleaseStringUTFChars(env, jVal.l, host);
  248. error_dlr_addr:
  249. (*env)->DeleteLocalRef(env, jAddress);
  250. error_dlr_nn:
  251. (*env)->DeleteLocalRef(env, jNameNode);
  252. return ret;
  253. }