fuse_connect.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "fuse_connect.h"
  19. #include "fuse_dfs.h"
  20. #include "fuse_users.h"
  21. #include "hdfs/hdfs.h"
  22. #include "util/tree.h"
  23. #include <inttypes.h>
  24. #include <limits.h>
  25. #include <poll.h>
  26. #include <search.h>
  27. #include <stdio.h>
  28. #include <stdlib.h>
  29. #include <string.h>
  30. #include <sys/time.h>
  31. #include <sys/types.h>
  32. #include <utime.h>
  33. #define FUSE_CONN_DEFAULT_TIMER_PERIOD 5
  34. #define FUSE_CONN_DEFAULT_EXPIRY_PERIOD (5 * 60)
  35. #define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
  36. #define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout"
  37. #define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period"
  38. /** Length of the buffer needed by asctime_r */
  39. #define TIME_STR_LEN 26
  40. struct hdfsConn;
  41. static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
  42. static void hdfsConnExpiry(void);
  43. static void* hdfsConnExpiryThread(void *v);
  44. RB_HEAD(hdfsConnTree, hdfsConn);
  45. enum authConf {
  46. AUTH_CONF_UNKNOWN,
  47. AUTH_CONF_KERBEROS,
  48. AUTH_CONF_OTHER,
  49. };
  50. struct hdfsConn {
  51. RB_ENTRY(hdfsConn) entry;
  52. /** How many threads are currently using this hdfsConnection object */
  53. int64_t refcnt;
  54. /** The username used to make this connection. Dynamically allocated. */
  55. char *usrname;
  56. /** Kerberos ticket cache path, or NULL if this is not a kerberized
  57. * connection. Dynamically allocated. */
  58. char *kpath;
  59. /** mtime of the kpath, if the kpath is non-NULL */
  60. time_t kPathMtime;
  61. /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
  62. long kPathMtimeNs;
  63. /** The cached libhdfs fs instance */
  64. hdfsFS fs;
  65. /** Nonzero if this hdfs connection needs to be closed as soon as possible.
  66. * If this is true, the connection has been removed from the tree. */
  67. int condemned;
  68. /** Number of times we should run the expiration timer on this connection
  69. * before removing it. */
  70. int expirationCount;
  71. };
  72. RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
  73. /** Current cached libhdfs connections */
  74. static struct hdfsConnTree gConnTree;
  75. /** The URI used to make our connections. Dynamically allocated. */
  76. static char *gUri;
  77. /** The port used to make our connections, or 0. */
  78. static int gPort;
  79. /** Lock which protects gConnTree and gConnectTimer->active */
  80. static pthread_mutex_t gConnMutex;
  81. /** Type of authentication configured */
  82. static enum authConf gHdfsAuthConf;
  83. /** FUSE connection timer expiration period */
  84. static int32_t gTimerPeriod;
  85. /** FUSE connection expiry period */
  86. static int32_t gExpiryPeriod;
  87. /** FUSE timer expiration thread */
  88. static pthread_t gTimerThread;
  89. /**
  90. * Find out what type of authentication the system administrator
  91. * has configured.
  92. *
  93. * @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
  94. */
  95. static enum authConf discoverAuthConf(void)
  96. {
  97. int ret;
  98. char *val = NULL;
  99. enum authConf authConf;
  100. ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
  101. if (ret)
  102. authConf = AUTH_CONF_UNKNOWN;
  103. else if (!val)
  104. authConf = AUTH_CONF_OTHER;
  105. else if (!strcmp(val, "kerberos"))
  106. authConf = AUTH_CONF_KERBEROS;
  107. else
  108. authConf = AUTH_CONF_OTHER;
  109. free(val);
  110. return authConf;
  111. }
  112. int fuseConnectInit(const char *nnUri, int port)
  113. {
  114. int ret;
  115. gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
  116. ret = hdfsConfGetInt(HADOOP_FUSE_TIMER_PERIOD, &gTimerPeriod);
  117. if (ret) {
  118. fprintf(stderr, "Unable to determine the configured value for %s.",
  119. HADOOP_FUSE_TIMER_PERIOD);
  120. return -EINVAL;
  121. }
  122. if (gTimerPeriod < 1) {
  123. fprintf(stderr, "Invalid value %d given for %s.\n",
  124. gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
  125. return -EINVAL;
  126. }
  127. gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
  128. ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
  129. if (ret) {
  130. fprintf(stderr, "Unable to determine the configured value for %s.",
  131. HADOOP_FUSE_CONNECTION_TIMEOUT);
  132. return -EINVAL;
  133. }
  134. if (gExpiryPeriod < 1) {
  135. fprintf(stderr, "Invalid value %d given for %s.\n",
  136. gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
  137. return -EINVAL;
  138. }
  139. gHdfsAuthConf = discoverAuthConf();
  140. if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
  141. fprintf(stderr, "Unable to determine the configured value for %s.",
  142. HADOOP_SECURITY_AUTHENTICATION);
  143. return -EINVAL;
  144. }
  145. gPort = port;
  146. gUri = strdup(nnUri);
  147. if (!gUri) {
  148. fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
  149. return -ENOMEM;
  150. }
  151. ret = pthread_mutex_init(&gConnMutex, NULL);
  152. if (ret) {
  153. free(gUri);
  154. fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
  155. ret);
  156. return -ret;
  157. }
  158. RB_INIT(&gConnTree);
  159. ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
  160. if (ret) {
  161. free(gUri);
  162. pthread_mutex_destroy(&gConnMutex);
  163. fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
  164. ret);
  165. return -ret;
  166. }
  167. fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
  168. "expiry period %d\n", gTimerPeriod, gExpiryPeriod);
  169. return 0;
  170. }
  171. /**
  172. * Compare two libhdfs connections by username
  173. *
  174. * @param a The first libhdfs connection
  175. * @param b The second libhdfs connection
  176. *
  177. * @return -1, 0, or 1 depending on a < b, a ==b, a > b
  178. */
  179. static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
  180. {
  181. return strcmp(a->usrname, b->usrname);
  182. }
  183. /**
  184. * Find a libhdfs connection by username
  185. *
  186. * @param usrname The username to look up
  187. *
  188. * @return The connection, or NULL if none could be found
  189. */
  190. static struct hdfsConn* hdfsConnFind(const char *usrname)
  191. {
  192. struct hdfsConn exemplar;
  193. memset(&exemplar, 0, sizeof(exemplar));
  194. exemplar.usrname = (char*)usrname;
  195. return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
  196. }
  197. /**
  198. * Free the resource associated with a libhdfs connection.
  199. *
  200. * You must remove the connection from the tree before calling this function.
  201. *
  202. * @param conn The libhdfs connection
  203. */
  204. static void hdfsConnFree(struct hdfsConn *conn)
  205. {
  206. int ret;
  207. ret = hdfsDisconnect(conn->fs);
  208. if (ret) {
  209. fprintf(stderr, "hdfsConnFree(username=%s): "
  210. "hdfsDisconnect failed with error %d\n",
  211. (conn->usrname ? conn->usrname : "(null)"), ret);
  212. }
  213. free(conn->usrname);
  214. free(conn->kpath);
  215. free(conn);
  216. }
  217. /**
  218. * Convert a time_t to a string.
  219. *
  220. * @param sec time in seconds since the epoch
  221. * @param buf (out param) output buffer
  222. * @param bufLen length of output buffer
  223. *
  224. * @return 0 on success; ENAMETOOLONG if the provided buffer was
  225. * too short
  226. */
  227. static int timeToStr(time_t sec, char *buf, size_t bufLen)
  228. {
  229. struct tm tm, *out;
  230. size_t l;
  231. if (bufLen < TIME_STR_LEN) {
  232. return -ENAMETOOLONG;
  233. }
  234. out = localtime_r(&sec, &tm);
  235. asctime_r(out, buf);
  236. // strip trailing newline
  237. l = strlen(buf);
  238. if (l != 0)
  239. buf[l - 1] = '\0';
  240. return 0;
  241. }
  242. /**
  243. * Check an HDFS connection's Kerberos path.
  244. *
  245. * If the mtime of the Kerberos ticket cache file has changed since we first
  246. * opened the connection, mark the connection as condemned and remove it from
  247. * the hdfs connection tree.
  248. *
  249. * @param conn The HDFS connection
  250. */
  251. static int hdfsConnCheckKpath(const struct hdfsConn *conn)
  252. {
  253. int ret;
  254. struct stat st;
  255. char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
  256. if (stat(conn->kpath, &st) < 0) {
  257. ret = errno;
  258. if (ret == ENOENT) {
  259. fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
  260. "ticket cache file '%s' has disappeared. Condemning the "
  261. "connection.\n", conn->usrname, conn->kpath);
  262. } else {
  263. fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
  264. "failed with error code %d. Pessimistically condemning the "
  265. "connection.\n", conn->usrname, conn->kpath, ret);
  266. }
  267. return -ret;
  268. }
  269. if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
  270. (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
  271. timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
  272. timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
  273. fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
  274. "has changed from '%s' to '%s'. Condemning the connection "
  275. "because our cached Kerberos credentials have probably "
  276. "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
  277. return -EINTERNAL;
  278. }
  279. return 0;
  280. }
  281. /**
  282. * Cache expiration logic.
  283. *
  284. * This function is called periodically by the cache expiration thread. For
  285. * each FUSE connection not currently in use (refcnt == 0) it will decrement the
  286. * expirationCount for that connection. Once the expirationCount reaches 0 for
  287. * a connection, it can be garbage collected.
  288. *
  289. * We also check to see if the Kerberos credentials have changed. If so, the
  290. * connecton is immediately condemned, even if it is currently in use.
  291. */
  292. static void hdfsConnExpiry(void)
  293. {
  294. struct hdfsConn *conn, *tmpConn;
  295. pthread_mutex_lock(&gConnMutex);
  296. RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
  297. if (conn->kpath) {
  298. if (hdfsConnCheckKpath(conn)) {
  299. conn->condemned = 1;
  300. RB_REMOVE(hdfsConnTree, &gConnTree, conn);
  301. if (conn->refcnt == 0) {
  302. /* If the connection is not in use by any threads, delete it
  303. * immediately. If it is still in use by some threads, the last
  304. * thread using it will clean it up later inside hdfsConnRelease. */
  305. hdfsConnFree(conn);
  306. continue;
  307. }
  308. }
  309. }
  310. if (conn->refcnt == 0) {
  311. /* If the connection is not currently in use by a thread, check to see if
  312. * it ought to be removed because it's too old. */
  313. conn->expirationCount--;
  314. if (conn->expirationCount <= 0) {
  315. if (conn->condemned) {
  316. fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
  317. "as %s is still in the tree!\n", conn->usrname);
  318. }
  319. fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
  320. "%s because it's now too old.\n", conn->usrname);
  321. RB_REMOVE(hdfsConnTree, &gConnTree, conn);
  322. hdfsConnFree(conn);
  323. }
  324. }
  325. }
  326. pthread_mutex_unlock(&gConnMutex);
  327. }
  328. // The Kerberos FILE: prefix. This indicates that the kerberos ticket cache
  329. // specifier is a file. (Note that we also assume that the specifier is a file
  330. // if no prefix is present.)
  331. #define KRB_FILE_PREFIX "FILE:"
  332. // Length of the Kerberos file prefix, which is equal to the string size in
  333. // bytes minus 1 (because we don't count the null terminator in the length.)
  334. #define KRB_FILE_PREFIX_LEN (sizeof(KRB_FILE_PREFIX) - 1)
  335. /**
  336. * Find the Kerberos ticket cache path.
  337. *
  338. * This function finds the Kerberos ticket cache path from the thread ID and
  339. * user ID of the process making the request.
  340. *
  341. * Normally, the ticket cache path is in a well-known location in /tmp.
  342. * However, it's possible that the calling process could set the KRB5CCNAME
  343. * environment variable, indicating that its Kerberos ticket cache is at a
  344. * non-default location. We try to handle this possibility by reading the
  345. * process' environment here. This will be allowed if we have root
  346. * capabilities, or if our UID is the same as the remote process' UID.
  347. *
  348. * Note that we don't check to see if the cache file actually exists or not.
  349. * We're just trying to find out where it would be if it did exist.
  350. *
  351. * @param path (out param) the path to the ticket cache file
  352. * @param pathLen length of the path buffer
  353. */
  354. static void findKerbTicketCachePath(struct fuse_context *ctx,
  355. char *path, size_t pathLen)
  356. {
  357. FILE *fp = NULL;
  358. static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
  359. int c = '\0', pathIdx = 0, keyIdx = 0;
  360. size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;
  361. // /proc/<tid>/environ contains the remote process' environment. It is
  362. // exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
  363. snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
  364. fp = fopen(path, "r");
  365. if (!fp)
  366. goto done;
  367. while (1) {
  368. if (c == EOF)
  369. goto done;
  370. if (keyIdx == KRB5CCNAME_LEN) {
  371. if (pathIdx >= pathLen - 1)
  372. goto done;
  373. if (c == '\0')
  374. goto done;
  375. path[pathIdx++] = c;
  376. } else if (KRB5CCNAME[keyIdx++] != c) {
  377. keyIdx = 0;
  378. }
  379. c = fgetc(fp);
  380. }
  381. done:
  382. if (fp)
  383. fclose(fp);
  384. if (pathIdx == 0) {
  385. snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
  386. } else {
  387. path[pathIdx] = '\0';
  388. }
  389. if (strncmp(path, KRB_FILE_PREFIX, KRB_FILE_PREFIX_LEN) == 0) {
  390. fprintf(stderr, "stripping " KRB_FILE_PREFIX " from the front of "
  391. "KRB5CCNAME.\n");
  392. memmove(path, path + KRB_FILE_PREFIX_LEN,
  393. strlen(path + KRB_FILE_PREFIX_LEN) + 1);
  394. }
  395. }
  396. /**
  397. * Create a new libhdfs connection.
  398. *
  399. * @param usrname Username to use for the new connection
  400. * @param ctx FUSE context to use for the new connection
  401. * @param out (out param) the new libhdfs connection
  402. *
  403. * @return 0 on success; error code otherwise
  404. */
  405. static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
  406. struct hdfsConn **out)
  407. {
  408. struct hdfsBuilder *bld = NULL;
  409. char kpath[PATH_MAX] = { 0 };
  410. struct hdfsConn *conn = NULL;
  411. int ret;
  412. struct stat st;
  413. conn = calloc(1, sizeof(struct hdfsConn));
  414. if (!conn) {
  415. fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
  416. ret = -ENOMEM;
  417. goto error;
  418. }
  419. bld = hdfsNewBuilder();
  420. if (!bld) {
  421. fprintf(stderr, "Unable to create hdfs builder\n");
  422. ret = -ENOMEM;
  423. goto error;
  424. }
  425. /* We always want to get a new FileSystem instance here-- that's why we call
  426. * hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic
  427. * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
  428. * existing cached connection which we wanted to get rid of.
  429. */
  430. hdfsBuilderSetForceNewInstance(bld);
  431. hdfsBuilderSetNameNode(bld, gUri);
  432. if (gPort) {
  433. hdfsBuilderSetNameNodePort(bld, gPort);
  434. }
  435. hdfsBuilderSetUserName(bld, usrname);
  436. if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
  437. findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
  438. if (stat(kpath, &st) < 0) {
  439. fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
  440. "file '%s'. Did you remember to kinit for UID %d?\n",
  441. kpath, ctx->uid);
  442. ret = -EACCES;
  443. goto error;
  444. }
  445. conn->kPathMtime = st.st_mtim.tv_sec;
  446. conn->kPathMtimeNs = st.st_mtim.tv_nsec;
  447. hdfsBuilderSetKerbTicketCachePath(bld, kpath);
  448. conn->kpath = strdup(kpath);
  449. if (!conn->kpath) {
  450. fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
  451. ret = -ENOMEM;
  452. goto error;
  453. }
  454. }
  455. conn->usrname = strdup(usrname);
  456. if (!conn->usrname) {
  457. fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
  458. ret = -ENOMEM;
  459. goto error;
  460. }
  461. conn->fs = hdfsBuilderConnect(bld);
  462. bld = NULL;
  463. if (!conn->fs) {
  464. ret = errno;
  465. fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
  466. "error code %d\n", usrname, ret);
  467. goto error;
  468. }
  469. RB_INSERT(hdfsConnTree, &gConnTree, conn);
  470. *out = conn;
  471. return 0;
  472. error:
  473. if (bld) {
  474. hdfsFreeBuilder(bld);
  475. }
  476. if (conn) {
  477. free(conn->kpath);
  478. free(conn->usrname);
  479. free(conn);
  480. }
  481. return ret;
  482. }
  483. int fuseConnect(const char *usrname, struct fuse_context *ctx,
  484. struct hdfsConn **out)
  485. {
  486. int ret;
  487. struct hdfsConn* conn;
  488. pthread_mutex_lock(&gConnMutex);
  489. conn = hdfsConnFind(usrname);
  490. if (!conn) {
  491. ret = fuseNewConnect(usrname, ctx, &conn);
  492. if (ret) {
  493. pthread_mutex_unlock(&gConnMutex);
  494. fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
  495. "error code %d\n", usrname, ret);
  496. return ret;
  497. }
  498. }
  499. conn->refcnt++;
  500. conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
  501. if (conn->expirationCount < 2)
  502. conn->expirationCount = 2;
  503. pthread_mutex_unlock(&gConnMutex);
  504. *out = conn;
  505. return 0;
  506. }
  507. int fuseConnectAsThreadUid(struct hdfsConn **conn)
  508. {
  509. struct fuse_context *ctx;
  510. char *usrname;
  511. int ret;
  512. ctx = fuse_get_context();
  513. usrname = getUsername(ctx->uid);
  514. ret = fuseConnect(usrname, ctx, conn);
  515. free(usrname);
  516. return ret;
  517. }
  518. int fuseConnectTest(void)
  519. {
  520. int ret;
  521. struct hdfsConn *conn;
  522. if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
  523. // TODO: call some method which can tell us whether the FS exists. In order
  524. // to implement this, we have to add a method to FileSystem in order to do
  525. // this without valid Kerberos authentication. See HDFS-3674 for details.
  526. return 0;
  527. }
  528. ret = fuseNewConnect("root", NULL, &conn);
  529. if (ret) {
  530. fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
  531. return ret;
  532. }
  533. hdfsConnRelease(conn);
  534. return 0;
  535. }
  536. struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
  537. {
  538. return conn->fs;
  539. }
  540. void hdfsConnRelease(struct hdfsConn *conn)
  541. {
  542. pthread_mutex_lock(&gConnMutex);
  543. conn->refcnt--;
  544. if ((conn->refcnt == 0) && (conn->condemned)) {
  545. fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
  546. conn->usrname);
  547. /* Notice that we're not removing the connection from gConnTree here.
  548. * If the connection is condemned, it must have already been removed from
  549. * the tree, so that no other threads start using it.
  550. */
  551. hdfsConnFree(conn);
  552. }
  553. pthread_mutex_unlock(&gConnMutex);
  554. }
  555. /**
  556. * Get the monotonic time.
  557. *
  558. * Unlike the wall-clock time, monotonic time only ever goes forward. If the
  559. * user adjusts the time, the monotonic time will not be affected.
  560. *
  561. * @return The monotonic time
  562. */
  563. static time_t getMonotonicTime(void)
  564. {
  565. int res;
  566. struct timespec ts;
  567. res = clock_gettime(CLOCK_MONOTONIC, &ts);
  568. if (res)
  569. abort();
  570. return ts.tv_sec;
  571. }
  572. /**
  573. * FUSE connection expiration thread
  574. *
  575. */
  576. static void* hdfsConnExpiryThread(void *v)
  577. {
  578. time_t nextTime, curTime;
  579. int waitTime;
  580. nextTime = getMonotonicTime() + gTimerPeriod;
  581. while (1) {
  582. curTime = getMonotonicTime();
  583. if (curTime >= nextTime) {
  584. hdfsConnExpiry();
  585. nextTime = curTime + gTimerPeriod;
  586. }
  587. waitTime = (nextTime - curTime) * 1000;
  588. poll(NULL, 0, waitTime);
  589. }
  590. return NULL;
  591. }