native_mini_dfs.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "exception.h"
  19. #include "jni_helper.h"
  20. #include "native_mini_dfs.h"
  21. #include "platform.h"
  22. #include <errno.h>
  23. #include <jni.h>
  24. #include <limits.h>
  25. #include <stdio.h>
  26. #include <stdlib.h>
  27. #include <string.h>
  28. #include <sys/types.h>
  29. #include <unistd.h>
  30. #ifndef EINTERNAL
  31. #define EINTERNAL 255
  32. #endif
  33. #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
  34. #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
  35. #define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
  36. #define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
  37. #define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
  38. struct NativeMiniDfsCluster {
  39. /**
  40. * The NativeMiniDfsCluster object
  41. */
  42. jobject obj;
  43. /**
  44. * Path to the domain socket, or the empty string if there is none.
  45. */
  46. char domainSocketPath[PATH_MAX];
  47. };
  48. static int hdfsDisableDomainSocketSecurity(void)
  49. {
  50. jthrowable jthr;
  51. JNIEnv* env = getJNIEnv();
  52. if (env == NULL) {
  53. errno = EINTERNAL;
  54. return -1;
  55. }
  56. jthr = invokeMethod(env, NULL, STATIC, NULL,
  57. "org/apache/hadoop/net/unix/DomainSocket",
  58. "disableBindPathValidation", "()V");
  59. if (jthr) {
  60. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  61. "DomainSocket#disableBindPathValidation");
  62. return -1;
  63. }
  64. return 0;
  65. }
  66. static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
  67. struct NativeMiniDfsCluster *cl, jobject cobj)
  68. {
  69. jthrowable jthr;
  70. char *tmpDir;
  71. int ret = hdfsDisableDomainSocketSecurity();
  72. if (ret) {
  73. return newRuntimeError(env, "failed to disable hdfs domain "
  74. "socket security: error %d", ret);
  75. }
  76. jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
  77. if (jthr) {
  78. return jthr;
  79. }
  80. tmpDir = getenv("TMPDIR");
  81. if (!tmpDir) {
  82. tmpDir = "/tmp";
  83. }
  84. snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
  85. tmpDir, getpid(), rand());
  86. snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
  87. tmpDir, getpid(), rand());
  88. jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
  89. cl->domainSocketPath);
  90. if (jthr) {
  91. return jthr;
  92. }
  93. return NULL;
  94. }
  95. struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
  96. {
  97. struct NativeMiniDfsCluster* cl = NULL;
  98. jobject bld = NULL, cobj = NULL, cluster = NULL;
  99. jvalue val;
  100. JNIEnv *env = getJNIEnv();
  101. jthrowable jthr;
  102. jstring jconfStr = NULL;
  103. if (!env) {
  104. fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
  105. return NULL;
  106. }
  107. cl = calloc(1, sizeof(struct NativeMiniDfsCluster));
  108. if (!cl) {
  109. fprintf(stderr, "nmdCreate: OOM");
  110. goto error;
  111. }
  112. jthr = constructNewObjectOfClass(env, &cobj, HADOOP_CONF, "()V");
  113. if (jthr) {
  114. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  115. "nmdCreate: new Configuration");
  116. goto error;
  117. }
  118. if (jthr) {
  119. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  120. "nmdCreate: Configuration::setBoolean");
  121. goto error;
  122. }
  123. // Disable 'minimum block size' -- it's annoying in tests.
  124. (*env)->DeleteLocalRef(env, jconfStr);
  125. jconfStr = NULL;
  126. jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
  127. if (jthr) {
  128. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  129. "nmdCreate: new String");
  130. goto error;
  131. }
  132. jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
  133. "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
  134. if (jthr) {
  135. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  136. "nmdCreate: Configuration::setLong");
  137. goto error;
  138. }
  139. // Creae MiniDFSCluster object
  140. jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
  141. "(L"HADOOP_CONF";)V", cobj);
  142. if (jthr) {
  143. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  144. "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
  145. goto error;
  146. }
  147. if (conf->configureShortCircuit) {
  148. jthr = nmdConfigureShortCircuit(env, cl, cobj);
  149. if (jthr) {
  150. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  151. "nmdCreate: nmdConfigureShortCircuit error");
  152. goto error;
  153. }
  154. }
  155. jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
  156. "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
  157. if (jthr) {
  158. printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
  159. "Builder::format");
  160. goto error;
  161. }
  162. (*env)->DeleteLocalRef(env, val.l);
  163. if (conf->webhdfsEnabled) {
  164. jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
  165. "nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";",
  166. conf->namenodeHttpPort);
  167. if (jthr) {
  168. printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
  169. "Builder::nameNodeHttpPort");
  170. goto error;
  171. }
  172. (*env)->DeleteLocalRef(env, val.l);
  173. }
  174. jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
  175. "build", "()L" MINIDFS_CLUSTER ";");
  176. if (jthr) {
  177. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  178. "nmdCreate: Builder#build");
  179. goto error;
  180. }
  181. cluster = val.l;
  182. cl->obj = (*env)->NewGlobalRef(env, val.l);
  183. if (!cl->obj) {
  184. printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  185. "nmdCreate: NewGlobalRef");
  186. goto error;
  187. }
  188. (*env)->DeleteLocalRef(env, cluster);
  189. (*env)->DeleteLocalRef(env, bld);
  190. (*env)->DeleteLocalRef(env, cobj);
  191. (*env)->DeleteLocalRef(env, jconfStr);
  192. return cl;
  193. error:
  194. (*env)->DeleteLocalRef(env, cluster);
  195. (*env)->DeleteLocalRef(env, bld);
  196. (*env)->DeleteLocalRef(env, cobj);
  197. (*env)->DeleteLocalRef(env, jconfStr);
  198. free(cl);
  199. return NULL;
  200. }
  201. void nmdFree(struct NativeMiniDfsCluster* cl)
  202. {
  203. JNIEnv *env = getJNIEnv();
  204. if (!env) {
  205. fprintf(stderr, "nmdFree: getJNIEnv failed\n");
  206. free(cl);
  207. return;
  208. }
  209. (*env)->DeleteGlobalRef(env, cl->obj);
  210. free(cl);
  211. }
  212. int nmdShutdown(struct NativeMiniDfsCluster* cl)
  213. {
  214. JNIEnv *env = getJNIEnv();
  215. jthrowable jthr;
  216. if (!env) {
  217. fprintf(stderr, "nmdShutdown: getJNIEnv failed\n");
  218. return -EIO;
  219. }
  220. jthr = invokeMethod(env, NULL, INSTANCE, cl->obj,
  221. MINIDFS_CLUSTER, "shutdown", "()V");
  222. if (jthr) {
  223. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  224. "nmdShutdown: MiniDFSCluster#shutdown");
  225. return -EIO;
  226. }
  227. return 0;
  228. }
  229. int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl)
  230. {
  231. jthrowable jthr;
  232. JNIEnv *env = getJNIEnv();
  233. if (!env) {
  234. fprintf(stderr, "nmdWaitClusterUp: getJNIEnv failed\n");
  235. return -EIO;
  236. }
  237. jthr = invokeMethod(env, NULL, INSTANCE, cl->obj,
  238. MINIDFS_CLUSTER, "waitClusterUp", "()V");
  239. if (jthr) {
  240. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  241. "nmdWaitClusterUp: MiniDFSCluster#waitClusterUp ");
  242. return -EIO;
  243. }
  244. return 0;
  245. }
  246. int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
  247. {
  248. JNIEnv *env = getJNIEnv();
  249. jvalue jVal;
  250. jthrowable jthr;
  251. if (!env) {
  252. fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
  253. return -EIO;
  254. }
  255. // Note: this will have to be updated when HA nativeMiniDfs clusters are
  256. // supported
  257. jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj,
  258. MINIDFS_CLUSTER, "getNameNodePort", "()I");
  259. if (jthr) {
  260. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  261. "nmdHdfsConnect: MiniDFSCluster#getNameNodePort");
  262. return -EIO;
  263. }
  264. return jVal.i;
  265. }
  266. int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
  267. int *port, const char **hostName)
  268. {
  269. JNIEnv *env = getJNIEnv();
  270. jvalue jVal;
  271. jobject jNameNode, jAddress;
  272. jthrowable jthr;
  273. int ret = 0;
  274. const char *host;
  275. if (!env) {
  276. fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
  277. return -EIO;
  278. }
  279. // First get the (first) NameNode of the cluster
  280. jthr = invokeMethod(env, &jVal, INSTANCE, cl->obj, MINIDFS_CLUSTER,
  281. "getNameNode", "()L" HADOOP_NAMENODE ";");
  282. if (jthr) {
  283. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  284. "nmdGetNameNodeHttpAddress: "
  285. "MiniDFSCluster#getNameNode");
  286. return -EIO;
  287. }
  288. jNameNode = jVal.l;
  289. // Then get the http address (InetSocketAddress) of the NameNode
  290. jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
  291. "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
  292. if (jthr) {
  293. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  294. "nmdGetNameNodeHttpAddress: "
  295. "NameNode#getHttpAddress");
  296. goto error_dlr_nn;
  297. }
  298. jAddress = jVal.l;
  299. jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
  300. JAVA_INETSOCKETADDRESS, "getPort", "()I");
  301. if (jthr) {
  302. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  303. "nmdGetNameNodeHttpAddress: "
  304. "InetSocketAddress#getPort");
  305. goto error_dlr_addr;
  306. }
  307. *port = jVal.i;
  308. jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
  309. "getHostName", "()Ljava/lang/String;");
  310. if (jthr) {
  311. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  312. "nmdGetNameNodeHttpAddress: "
  313. "InetSocketAddress#getHostName");
  314. goto error_dlr_addr;
  315. }
  316. host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
  317. *hostName = strdup(host);
  318. (*env)->ReleaseStringUTFChars(env, jVal.l, host);
  319. error_dlr_addr:
  320. (*env)->DeleteLocalRef(env, jAddress);
  321. error_dlr_nn:
  322. (*env)->DeleteLocalRef(env, jNameNode);
  323. return ret;
  324. }
  325. const char *hdfsGetDomainSocketPath(const struct NativeMiniDfsCluster *cl) {
  326. if (cl->domainSocketPath[0]) {
  327. return cl->domainSocketPath;
  328. }
  329. return NULL;
  330. }