1
0

fuse_connect.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "fuse_connect.h"
  19. #include "fuse_dfs.h"
  20. #include "fuse_users.h"
  21. #include "hdfs/hdfs.h"
  22. #include "util/tree.h"
  23. #include <inttypes.h>
  24. #include <limits.h>
  25. #include <poll.h>
  26. #include <search.h>
  27. #include <stdio.h>
  28. #include <stdlib.h>
  29. #include <string.h>
  30. #include <sys/time.h>
  31. #include <sys/types.h>
  32. #include <utime.h>
  33. #define FUSE_CONN_DEFAULT_TIMER_PERIOD 5
  34. #define FUSE_CONN_DEFAULT_EXPIRY_PERIOD (5 * 60)
  35. #define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
  36. #define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout"
  37. #define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period"
  38. /** Length of the buffer needed by asctime_r */
  39. #define TIME_STR_LEN 26
  40. struct hdfsConn;
  41. static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
  42. static void hdfsConnExpiry(void);
  43. static void* hdfsConnExpiryThread(void *v);
  44. RB_HEAD(hdfsConnTree, hdfsConn);
  45. enum authConf {
  46. AUTH_CONF_UNKNOWN,
  47. AUTH_CONF_KERBEROS,
  48. AUTH_CONF_OTHER,
  49. };
  50. struct hdfsConn {
  51. RB_ENTRY(hdfsConn) entry;
  52. /** How many threads are currently using this hdfsConnection object */
  53. int64_t refcnt;
  54. /** The username used to make this connection. Dynamically allocated. */
  55. char *usrname;
  56. /** Kerberos ticket cache path, or NULL if this is not a kerberized
  57. * connection. Dynamically allocated. */
  58. char *kpath;
  59. /** mtime of the kpath, if the kpath is non-NULL */
  60. time_t kPathMtime;
  61. /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
  62. long kPathMtimeNs;
  63. /** The cached libhdfs fs instance */
  64. hdfsFS fs;
  65. /** Nonzero if this hdfs connection needs to be closed as soon as possible.
  66. * If this is true, the connection has been removed from the tree. */
  67. int condemned;
  68. /** Number of times we should run the expiration timer on this connection
  69. * before removing it. */
  70. int expirationCount;
  71. };
  72. RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
  73. /** Current cached libhdfs connections */
  74. static struct hdfsConnTree gConnTree;
  75. /** The URI used to make our connections. Dynamically allocated. */
  76. static char *gUri;
  77. /** The port used to make our connections, or 0. */
  78. static int gPort;
  79. /** Lock which protects gConnTree and gConnectTimer->active */
  80. static pthread_mutex_t gConnMutex;
  81. /** Type of authentication configured */
  82. static enum authConf gHdfsAuthConf;
  83. /** FUSE connection timer expiration period */
  84. static int32_t gTimerPeriod;
  85. /** FUSE connection expiry period */
  86. static int32_t gExpiryPeriod;
  87. /** FUSE timer expiration thread */
  88. static pthread_t gTimerThread;
  89. /**
  90. * Find out what type of authentication the system administrator
  91. * has configured.
  92. *
  93. * @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
  94. */
  95. static enum authConf discoverAuthConf(void)
  96. {
  97. int ret;
  98. char *val = NULL;
  99. enum authConf authConf;
  100. ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
  101. if (ret)
  102. authConf = AUTH_CONF_UNKNOWN;
  103. else if (!val)
  104. authConf = AUTH_CONF_OTHER;
  105. else if (!strcmp(val, "kerberos"))
  106. authConf = AUTH_CONF_KERBEROS;
  107. else
  108. authConf = AUTH_CONF_OTHER;
  109. free(val);
  110. return authConf;
  111. }
  112. int fuseConnectInit(const char *nnUri, int port)
  113. {
  114. int ret;
  115. gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
  116. ret = hdfsConfGetInt(HADOOP_FUSE_TIMER_PERIOD, &gTimerPeriod);
  117. if (ret) {
  118. fprintf(stderr, "Unable to determine the configured value for %s.",
  119. HADOOP_FUSE_TIMER_PERIOD);
  120. return -EINVAL;
  121. }
  122. if (gTimerPeriod < 1) {
  123. fprintf(stderr, "Invalid value %d given for %s.\n",
  124. gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
  125. return -EINVAL;
  126. }
  127. gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
  128. ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
  129. if (ret) {
  130. fprintf(stderr, "Unable to determine the configured value for %s.",
  131. HADOOP_FUSE_CONNECTION_TIMEOUT);
  132. return -EINVAL;
  133. }
  134. if (gExpiryPeriod < 1) {
  135. fprintf(stderr, "Invalid value %d given for %s.\n",
  136. gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
  137. return -EINVAL;
  138. }
  139. gHdfsAuthConf = discoverAuthConf();
  140. if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
  141. fprintf(stderr, "Unable to determine the configured value for %s.",
  142. HADOOP_SECURITY_AUTHENTICATION);
  143. return -EINVAL;
  144. }
  145. gPort = port;
  146. gUri = strdup(nnUri);
  147. if (!gUri) {
  148. fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
  149. return -ENOMEM;
  150. }
  151. ret = pthread_mutex_init(&gConnMutex, NULL);
  152. if (ret) {
  153. free(gUri);
  154. fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
  155. ret);
  156. return -ret;
  157. }
  158. RB_INIT(&gConnTree);
  159. ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
  160. if (ret) {
  161. free(gUri);
  162. pthread_mutex_destroy(&gConnMutex);
  163. fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
  164. ret);
  165. return -ret;
  166. }
  167. fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
  168. "expiry period %d\n", gTimerPeriod, gExpiryPeriod);
  169. return 0;
  170. }
  171. /**
  172. * Compare two libhdfs connections by username and Kerberos ticket cache path
  173. *
  174. * @param a The first libhdfs connection
  175. * @param b The second libhdfs connection
  176. *
  177. * @return -1, 0, or 1 depending on a < b, a ==b, a > b
  178. */
  179. static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
  180. {
  181. int rc = strcmp(a->usrname, b->usrname);
  182. if (rc) return rc;
  183. return gHdfsAuthConf == AUTH_CONF_KERBEROS && strcmp(a->kpath, b->kpath);
  184. }
  185. /**
  186. * Find a libhdfs connection by username
  187. *
  188. * @param usrname The username to look up
  189. * @param kpath The Kerberos ticket cache file path
  190. *
  191. * @return The connection, or NULL if none could be found
  192. */
  193. static struct hdfsConn* hdfsConnFind(const char *usrname, const char *kpath)
  194. {
  195. struct hdfsConn exemplar;
  196. memset(&exemplar, 0, sizeof(exemplar));
  197. exemplar.usrname = (char*)usrname;
  198. exemplar.kpath = (char*)kpath;
  199. return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
  200. }
  201. /**
  202. * Free the resource associated with a libhdfs connection.
  203. *
  204. * You must remove the connection from the tree before calling this function.
  205. *
  206. * @param conn The libhdfs connection
  207. */
  208. static void hdfsConnFree(struct hdfsConn *conn)
  209. {
  210. int ret;
  211. ret = hdfsDisconnect(conn->fs);
  212. if (ret) {
  213. fprintf(stderr, "hdfsConnFree(username=%s): "
  214. "hdfsDisconnect failed with error %d\n",
  215. (conn->usrname ? conn->usrname : "(null)"), ret);
  216. }
  217. free(conn->usrname);
  218. free(conn->kpath);
  219. free(conn);
  220. }
  221. /**
  222. * Convert a time_t to a string.
  223. *
  224. * @param sec time in seconds since the epoch
  225. * @param buf (out param) output buffer
  226. * @param bufLen length of output buffer
  227. *
  228. * @return 0 on success; ENAMETOOLONG if the provided buffer was
  229. * too short
  230. */
  231. static int timeToStr(time_t sec, char *buf, size_t bufLen)
  232. {
  233. struct tm tm, *out;
  234. size_t l;
  235. if (bufLen < TIME_STR_LEN) {
  236. return -ENAMETOOLONG;
  237. }
  238. out = localtime_r(&sec, &tm);
  239. asctime_r(out, buf);
  240. // strip trailing newline
  241. l = strlen(buf);
  242. if (l != 0)
  243. buf[l - 1] = '\0';
  244. return 0;
  245. }
  246. /**
  247. * Check an HDFS connection's Kerberos path.
  248. *
  249. * If the mtime of the Kerberos ticket cache file has changed since we first
  250. * opened the connection, mark the connection as condemned and remove it from
  251. * the hdfs connection tree.
  252. *
  253. * @param conn The HDFS connection
  254. */
  255. static int hdfsConnCheckKpath(const struct hdfsConn *conn)
  256. {
  257. int ret;
  258. struct stat st;
  259. char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
  260. if (stat(conn->kpath, &st) < 0) {
  261. ret = errno;
  262. if (ret == ENOENT) {
  263. fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
  264. "ticket cache file '%s' has disappeared. Condemning the "
  265. "connection.\n", conn->usrname, conn->kpath);
  266. } else {
  267. fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
  268. "failed with error code %d. Pessimistically condemning the "
  269. "connection.\n", conn->usrname, conn->kpath, ret);
  270. }
  271. return -ret;
  272. }
  273. if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
  274. (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
  275. timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
  276. timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
  277. fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
  278. "has changed from '%s' to '%s'. Condemning the connection "
  279. "because our cached Kerberos credentials have probably "
  280. "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
  281. return -EINTERNAL;
  282. }
  283. return 0;
  284. }
  285. /**
  286. * Cache expiration logic.
  287. *
  288. * This function is called periodically by the cache expiration thread. For
  289. * each FUSE connection not currently in use (refcnt == 0) it will decrement the
  290. * expirationCount for that connection. Once the expirationCount reaches 0 for
  291. * a connection, it can be garbage collected.
  292. *
  293. * We also check to see if the Kerberos credentials have changed. If so, the
  294. * connecton is immediately condemned, even if it is currently in use.
  295. */
  296. static void hdfsConnExpiry(void)
  297. {
  298. struct hdfsConn *conn, *tmpConn;
  299. pthread_mutex_lock(&gConnMutex);
  300. RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
  301. if (conn->kpath) {
  302. if (hdfsConnCheckKpath(conn)) {
  303. conn->condemned = 1;
  304. RB_REMOVE(hdfsConnTree, &gConnTree, conn);
  305. if (conn->refcnt == 0) {
  306. /* If the connection is not in use by any threads, delete it
  307. * immediately. If it is still in use by some threads, the last
  308. * thread using it will clean it up later inside hdfsConnRelease. */
  309. hdfsConnFree(conn);
  310. continue;
  311. }
  312. }
  313. }
  314. if (conn->refcnt == 0) {
  315. /* If the connection is not currently in use by a thread, check to see if
  316. * it ought to be removed because it's too old. */
  317. conn->expirationCount--;
  318. if (conn->expirationCount <= 0) {
  319. if (conn->condemned) {
  320. fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
  321. "as %s is still in the tree!\n", conn->usrname);
  322. }
  323. fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
  324. "%s because it's now too old.\n", conn->usrname);
  325. RB_REMOVE(hdfsConnTree, &gConnTree, conn);
  326. hdfsConnFree(conn);
  327. }
  328. }
  329. }
  330. pthread_mutex_unlock(&gConnMutex);
  331. }
  332. // The Kerberos FILE: prefix. This indicates that the kerberos ticket cache
  333. // specifier is a file. (Note that we also assume that the specifier is a file
  334. // if no prefix is present.)
  335. #define KRB_FILE_PREFIX "FILE:"
  336. // Length of the Kerberos file prefix, which is equal to the string size in
  337. // bytes minus 1 (because we don't count the null terminator in the length.)
  338. #define KRB_FILE_PREFIX_LEN (sizeof(KRB_FILE_PREFIX) - 1)
  339. /**
  340. * Find the Kerberos ticket cache path.
  341. *
  342. * This function finds the Kerberos ticket cache path from the thread ID and
  343. * user ID of the process making the request.
  344. *
  345. * Normally, the ticket cache path is in a well-known location in /tmp.
  346. * However, it's possible that the calling process could set the KRB5CCNAME
  347. * environment variable, indicating that its Kerberos ticket cache is at a
  348. * non-default location. We try to handle this possibility by reading the
  349. * process' environment here. This will be allowed if we have root
  350. * capabilities, or if our UID is the same as the remote process' UID.
  351. *
  352. * Note that we don't check to see if the cache file actually exists or not.
  353. * We're just trying to find out where it would be if it did exist.
  354. *
  355. * @param path (out param) the path to the ticket cache file
  356. * @param pathLen length of the path buffer
  357. */
  358. static void findKerbTicketCachePath(struct fuse_context *ctx,
  359. char *path, size_t pathLen)
  360. {
  361. FILE *fp = NULL;
  362. static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
  363. int c = '\0', pathIdx = 0, keyIdx = 0;
  364. size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;
  365. // /proc/<tid>/environ contains the remote process' environment. It is
  366. // exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
  367. snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
  368. fp = fopen(path, "r");
  369. if (!fp)
  370. goto done;
  371. while (1) {
  372. if (c == EOF)
  373. goto done;
  374. if (keyIdx == KRB5CCNAME_LEN) {
  375. if (pathIdx >= pathLen - 1)
  376. goto done;
  377. if (c == '\0')
  378. goto done;
  379. path[pathIdx++] = c;
  380. } else if (KRB5CCNAME[keyIdx++] != c) {
  381. keyIdx = 0;
  382. }
  383. c = fgetc(fp);
  384. }
  385. done:
  386. if (fp)
  387. fclose(fp);
  388. if (pathIdx == 0) {
  389. snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
  390. } else {
  391. path[pathIdx] = '\0';
  392. }
  393. if (strncmp(path, KRB_FILE_PREFIX, KRB_FILE_PREFIX_LEN) == 0) {
  394. fprintf(stderr, "stripping " KRB_FILE_PREFIX " from the front of "
  395. "KRB5CCNAME.\n");
  396. memmove(path, path + KRB_FILE_PREFIX_LEN,
  397. strlen(path + KRB_FILE_PREFIX_LEN) + 1);
  398. }
  399. }
  400. /**
  401. * Create a new libhdfs connection.
  402. *
  403. * @param usrname Username to use for the new connection
  404. * @param ctx FUSE context to use for the new connection
  405. * @param out (out param) the new libhdfs connection
  406. *
  407. * @return 0 on success; error code otherwise
  408. */
  409. static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
  410. struct hdfsConn **out)
  411. {
  412. struct hdfsBuilder *bld = NULL;
  413. char kpath[PATH_MAX] = { 0 };
  414. struct hdfsConn *conn = NULL;
  415. int ret;
  416. struct stat st;
  417. conn = calloc(1, sizeof(struct hdfsConn));
  418. if (!conn) {
  419. fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
  420. ret = -ENOMEM;
  421. goto error;
  422. }
  423. bld = hdfsNewBuilder();
  424. if (!bld) {
  425. fprintf(stderr, "Unable to create hdfs builder\n");
  426. ret = -ENOMEM;
  427. goto error;
  428. }
  429. /* We always want to get a new FileSystem instance here-- that's why we call
  430. * hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic
  431. * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
  432. * existing cached connection which we wanted to get rid of.
  433. */
  434. hdfsBuilderSetForceNewInstance(bld);
  435. hdfsBuilderSetNameNode(bld, gUri);
  436. if (gPort) {
  437. hdfsBuilderSetNameNodePort(bld, gPort);
  438. }
  439. if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
  440. findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
  441. if (stat(kpath, &st) < 0) {
  442. fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
  443. "file '%s'. Did you remember to kinit for UID %d?\n",
  444. kpath, ctx->uid);
  445. ret = -EACCES;
  446. goto error;
  447. }
  448. conn->kPathMtime = st.st_mtim.tv_sec;
  449. conn->kPathMtimeNs = st.st_mtim.tv_nsec;
  450. hdfsBuilderSetKerbTicketCachePath(bld, kpath);
  451. conn->kpath = strdup(kpath);
  452. if (!conn->kpath) {
  453. fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
  454. ret = -ENOMEM;
  455. goto error;
  456. }
  457. } else {
  458. // earlier the username was set to the builder always, but due to
  459. // HADOOP-9747 if we specify the username in case of kerberos authentication
  460. // the username will be used as the principal name, and that will conflict
  461. // with ticket cache based authentication as we have the OS user name here
  462. // not the real kerberos principal name. So with SIMPLE auth we pass on the
  463. // OS username still, and the UGI will use that as the username, but with
  464. // kerberos authentication we do not pass in the OS username and let the
  465. // authentication happen with the principal who's ticket is in the ticket
  466. // cache. (HDFS-15034 is still a possible improvement for SIMPLE AUTH.)
  467. hdfsBuilderSetUserName(bld, usrname);
  468. }
  469. conn->usrname = strdup(usrname);
  470. if (!conn->usrname) {
  471. fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
  472. ret = -ENOMEM;
  473. goto error;
  474. }
  475. conn->fs = hdfsBuilderConnect(bld);
  476. bld = NULL;
  477. if (!conn->fs) {
  478. ret = errno;
  479. fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
  480. "error code %d\n", usrname, ret);
  481. goto error;
  482. }
  483. RB_INSERT(hdfsConnTree, &gConnTree, conn);
  484. *out = conn;
  485. return 0;
  486. error:
  487. if (bld) {
  488. hdfsFreeBuilder(bld);
  489. }
  490. if (conn) {
  491. free(conn->kpath);
  492. free(conn->usrname);
  493. free(conn);
  494. }
  495. return ret;
  496. }
  497. /**
  498. * Get a libhdfs connection.
  499. *
  500. * If there is an existing connection, it will be reused. If not, a new one
  501. * will be created.
  502. *
  503. * You must call hdfsConnRelease on the connection you get back!
  504. *
  505. * @param usrname The username to use
  506. * @param ctx The FUSE context to use (contains UID, PID of requestor)
  507. * @param conn (out param) The HDFS connection
  508. *
  509. * @return 0 on success; error code otherwise
  510. */
  511. static int fuseConnect(const char *usrname, struct fuse_context *ctx,
  512. struct hdfsConn **out)
  513. {
  514. int ret;
  515. struct hdfsConn* conn;
  516. char kpath[PATH_MAX] = { 0 };
  517. if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
  518. findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
  519. }
  520. pthread_mutex_lock(&gConnMutex);
  521. conn = hdfsConnFind(usrname, kpath);
  522. if (!conn) {
  523. ret = fuseNewConnect(usrname, ctx, &conn);
  524. if (ret) {
  525. pthread_mutex_unlock(&gConnMutex);
  526. fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
  527. "error code %d\n", usrname, ret);
  528. return ret;
  529. }
  530. }
  531. conn->refcnt++;
  532. conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
  533. if (conn->expirationCount < 2)
  534. conn->expirationCount = 2;
  535. pthread_mutex_unlock(&gConnMutex);
  536. *out = conn;
  537. return 0;
  538. }
  539. int fuseConnectAsThreadUid(struct hdfsConn **conn)
  540. {
  541. struct fuse_context *ctx;
  542. char *usrname;
  543. int ret;
  544. ctx = fuse_get_context();
  545. usrname = getUsername(ctx->uid);
  546. if (!usrname) {
  547. ERROR("fuseConnectAsThreadUid(): failed to get username for uid %"PRId64
  548. "\n", (uint64_t)ctx->uid);
  549. return EIO;
  550. }
  551. ret = fuseConnect(usrname, ctx, conn);
  552. free(usrname);
  553. return ret;
  554. }
  555. int fuseConnectTest(void)
  556. {
  557. int ret;
  558. struct hdfsConn *conn;
  559. if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
  560. // TODO: call some method which can tell us whether the FS exists. In order
  561. // to implement this, we have to add a method to FileSystem in order to do
  562. // this without valid Kerberos authentication. See HDFS-3674 for details.
  563. return 0;
  564. }
  565. ret = fuseNewConnect("root", NULL, &conn);
  566. if (ret) {
  567. fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
  568. return ret;
  569. }
  570. hdfsConnRelease(conn);
  571. return 0;
  572. }
  573. struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
  574. {
  575. return conn->fs;
  576. }
  577. void hdfsConnRelease(struct hdfsConn *conn)
  578. {
  579. pthread_mutex_lock(&gConnMutex);
  580. conn->refcnt--;
  581. if ((conn->refcnt == 0) && (conn->condemned)) {
  582. fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
  583. conn->usrname);
  584. /* Notice that we're not removing the connection from gConnTree here.
  585. * If the connection is condemned, it must have already been removed from
  586. * the tree, so that no other threads start using it.
  587. */
  588. hdfsConnFree(conn);
  589. }
  590. pthread_mutex_unlock(&gConnMutex);
  591. }
  592. /**
  593. * Get the monotonic time.
  594. *
  595. * Unlike the wall-clock time, monotonic time only ever goes forward. If the
  596. * user adjusts the time, the monotonic time will not be affected.
  597. *
  598. * @return The monotonic time
  599. */
  600. static time_t getMonotonicTime(void)
  601. {
  602. int res;
  603. struct timespec ts;
  604. res = clock_gettime(CLOCK_MONOTONIC, &ts);
  605. if (res)
  606. abort();
  607. return ts.tv_sec;
  608. }
  609. /**
  610. * FUSE connection expiration thread
  611. *
  612. */
  613. static void* hdfsConnExpiryThread(void *v)
  614. {
  615. time_t nextTime, curTime;
  616. int waitTime;
  617. nextTime = getMonotonicTime() + gTimerPeriod;
  618. while (1) {
  619. curTime = getMonotonicTime();
  620. if (curTime >= nextTime) {
  621. hdfsConnExpiry();
  622. nextTime = curTime + gTimerPeriod;
  623. }
  624. waitTime = (nextTime - curTime) * 1000;
  625. poll(NULL, 0, waitTime);
  626. }
  627. return NULL;
  628. }