123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "fuse_connect.h"
- #include "fuse_dfs.h"
- #include "fuse_users.h"
- #include "hdfs/hdfs.h"
- #include "util/tree.h"
- #include <inttypes.h>
- #include <limits.h>
- #include <poll.h>
- #include <search.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/time.h>
- #include <sys/types.h>
- #include <utime.h>
- #define FUSE_CONN_DEFAULT_TIMER_PERIOD 5
- #define FUSE_CONN_DEFAULT_EXPIRY_PERIOD (5 * 60)
- #define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
- #define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout"
- #define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period"
- /** Length of the buffer needed by asctime_r */
- #define TIME_STR_LEN 26
- struct hdfsConn;
- static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
- static void hdfsConnExpiry(void);
- static void* hdfsConnExpiryThread(void *v);
- RB_HEAD(hdfsConnTree, hdfsConn);
- enum authConf {
- AUTH_CONF_UNKNOWN,
- AUTH_CONF_KERBEROS,
- AUTH_CONF_OTHER,
- };
- struct hdfsConn {
- RB_ENTRY(hdfsConn) entry;
- /** How many threads are currently using this hdfsConnection object */
- int64_t refcnt;
- /** The username used to make this connection. Dynamically allocated. */
- char *usrname;
- /** Kerberos ticket cache path, or NULL if this is not a kerberized
- * connection. Dynamically allocated. */
- char *kpath;
- /** mtime of the kpath, if the kpath is non-NULL */
- time_t kPathMtime;
- /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
- long kPathMtimeNs;
- /** The cached libhdfs fs instance */
- hdfsFS fs;
- /** Nonzero if this hdfs connection needs to be closed as soon as possible.
- * If this is true, the connection has been removed from the tree. */
- int condemned;
- /** Number of times we should run the expiration timer on this connection
- * before removing it. */
- int expirationCount;
- };
- RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
- /** Current cached libhdfs connections */
- static struct hdfsConnTree gConnTree;
- /** The URI used to make our connections. Dynamically allocated. */
- static char *gUri;
- /** The port used to make our connections, or 0. */
- static int gPort;
- /** Lock which protects gConnTree and gConnectTimer->active */
- static pthread_mutex_t gConnMutex;
- /** Type of authentication configured */
- static enum authConf gHdfsAuthConf;
- /** FUSE connection timer expiration period */
- static int32_t gTimerPeriod;
- /** FUSE connection expiry period */
- static int32_t gExpiryPeriod;
- /** FUSE timer expiration thread */
- static pthread_t gTimerThread;
- /**
- * Find out what type of authentication the system administrator
- * has configured.
- *
- * @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
- */
- static enum authConf discoverAuthConf(void)
- {
- int ret;
- char *val = NULL;
- enum authConf authConf;
- ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
- if (ret)
- authConf = AUTH_CONF_UNKNOWN;
- else if (!val)
- authConf = AUTH_CONF_OTHER;
- else if (!strcmp(val, "kerberos"))
- authConf = AUTH_CONF_KERBEROS;
- else
- authConf = AUTH_CONF_OTHER;
- free(val);
- return authConf;
- }
- int fuseConnectInit(const char *nnUri, int port)
- {
- int ret;
- gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
- ret = hdfsConfGetInt(HADOOP_FUSE_TIMER_PERIOD, &gTimerPeriod);
- if (ret) {
- fprintf(stderr, "Unable to determine the configured value for %s.",
- HADOOP_FUSE_TIMER_PERIOD);
- return -EINVAL;
- }
- if (gTimerPeriod < 1) {
- fprintf(stderr, "Invalid value %d given for %s.\n",
- gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
- return -EINVAL;
- }
- gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
- ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
- if (ret) {
- fprintf(stderr, "Unable to determine the configured value for %s.",
- HADOOP_FUSE_CONNECTION_TIMEOUT);
- return -EINVAL;
- }
- if (gExpiryPeriod < 1) {
- fprintf(stderr, "Invalid value %d given for %s.\n",
- gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
- return -EINVAL;
- }
- gHdfsAuthConf = discoverAuthConf();
- if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
- fprintf(stderr, "Unable to determine the configured value for %s.",
- HADOOP_SECURITY_AUTHENTICATION);
- return -EINVAL;
- }
- gPort = port;
- gUri = strdup(nnUri);
- if (!gUri) {
- fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
- return -ENOMEM;
- }
- ret = pthread_mutex_init(&gConnMutex, NULL);
- if (ret) {
- free(gUri);
- fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
- ret);
- return -ret;
- }
- RB_INIT(&gConnTree);
- ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
- if (ret) {
- free(gUri);
- pthread_mutex_destroy(&gConnMutex);
- fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
- ret);
- return -ret;
- }
- fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
- "expiry period %d\n", gTimerPeriod, gExpiryPeriod);
- return 0;
- }
- /**
- * Compare two libhdfs connections by username
- *
- * @param a The first libhdfs connection
- * @param b The second libhdfs connection
- *
- * @return -1, 0, or 1 depending on a < b, a ==b, a > b
- */
- static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
- {
- return strcmp(a->usrname, b->usrname);
- }
- /**
- * Find a libhdfs connection by username
- *
- * @param usrname The username to look up
- *
- * @return The connection, or NULL if none could be found
- */
- static struct hdfsConn* hdfsConnFind(const char *usrname)
- {
- struct hdfsConn exemplar;
- memset(&exemplar, 0, sizeof(exemplar));
- exemplar.usrname = (char*)usrname;
- return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
- }
- /**
- * Free the resource associated with a libhdfs connection.
- *
- * You must remove the connection from the tree before calling this function.
- *
- * @param conn The libhdfs connection
- */
- static void hdfsConnFree(struct hdfsConn *conn)
- {
- int ret;
- ret = hdfsDisconnect(conn->fs);
- if (ret) {
- fprintf(stderr, "hdfsConnFree(username=%s): "
- "hdfsDisconnect failed with error %d\n",
- (conn->usrname ? conn->usrname : "(null)"), ret);
- }
- free(conn->usrname);
- free(conn->kpath);
- free(conn);
- }
- /**
- * Convert a time_t to a string.
- *
- * @param sec time in seconds since the epoch
- * @param buf (out param) output buffer
- * @param bufLen length of output buffer
- *
- * @return 0 on success; ENAMETOOLONG if the provided buffer was
- * too short
- */
- static int timeToStr(time_t sec, char *buf, size_t bufLen)
- {
- struct tm tm, *out;
- size_t l;
- if (bufLen < TIME_STR_LEN) {
- return -ENAMETOOLONG;
- }
- out = localtime_r(&sec, &tm);
- asctime_r(out, buf);
- // strip trailing newline
- l = strlen(buf);
- if (l != 0)
- buf[l - 1] = '\0';
- return 0;
- }
- /**
- * Check an HDFS connection's Kerberos path.
- *
- * If the mtime of the Kerberos ticket cache file has changed since we first
- * opened the connection, mark the connection as condemned and remove it from
- * the hdfs connection tree.
- *
- * @param conn The HDFS connection
- */
- static int hdfsConnCheckKpath(const struct hdfsConn *conn)
- {
- int ret;
- struct stat st;
- char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
- if (stat(conn->kpath, &st) < 0) {
- ret = errno;
- if (ret == ENOENT) {
- fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
- "ticket cache file '%s' has disappeared. Condemning the "
- "connection.\n", conn->usrname, conn->kpath);
- } else {
- fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
- "failed with error code %d. Pessimistically condemning the "
- "connection.\n", conn->usrname, conn->kpath, ret);
- }
- return -ret;
- }
- if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
- (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
- timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
- timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
- fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
- "has changed from '%s' to '%s'. Condemning the connection "
- "because our cached Kerberos credentials have probably "
- "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
- return -EINTERNAL;
- }
- return 0;
- }
- /**
- * Cache expiration logic.
- *
- * This function is called periodically by the cache expiration thread. For
- * each FUSE connection not currently in use (refcnt == 0) it will decrement the
- * expirationCount for that connection. Once the expirationCount reaches 0 for
- * a connection, it can be garbage collected.
- *
- * We also check to see if the Kerberos credentials have changed. If so, the
- * connecton is immediately condemned, even if it is currently in use.
- */
- static void hdfsConnExpiry(void)
- {
- struct hdfsConn *conn, *tmpConn;
- pthread_mutex_lock(&gConnMutex);
- RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
- if (conn->kpath) {
- if (hdfsConnCheckKpath(conn)) {
- conn->condemned = 1;
- RB_REMOVE(hdfsConnTree, &gConnTree, conn);
- if (conn->refcnt == 0) {
- /* If the connection is not in use by any threads, delete it
- * immediately. If it is still in use by some threads, the last
- * thread using it will clean it up later inside hdfsConnRelease. */
- hdfsConnFree(conn);
- continue;
- }
- }
- }
- if (conn->refcnt == 0) {
- /* If the connection is not currently in use by a thread, check to see if
- * it ought to be removed because it's too old. */
- conn->expirationCount--;
- if (conn->expirationCount <= 0) {
- if (conn->condemned) {
- fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
- "as %s is still in the tree!\n", conn->usrname);
- }
- fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
- "%s because it's now too old.\n", conn->usrname);
- RB_REMOVE(hdfsConnTree, &gConnTree, conn);
- hdfsConnFree(conn);
- }
- }
- }
- pthread_mutex_unlock(&gConnMutex);
- }
- // The Kerberos FILE: prefix. This indicates that the kerberos ticket cache
- // specifier is a file. (Note that we also assume that the specifier is a file
- // if no prefix is present.)
- #define KRB_FILE_PREFIX "FILE:"
- // Length of the Kerberos file prefix, which is equal to the string size in
- // bytes minus 1 (because we don't count the null terminator in the length.)
- #define KRB_FILE_PREFIX_LEN (sizeof(KRB_FILE_PREFIX) - 1)
- /**
- * Find the Kerberos ticket cache path.
- *
- * This function finds the Kerberos ticket cache path from the thread ID and
- * user ID of the process making the request.
- *
- * Normally, the ticket cache path is in a well-known location in /tmp.
- * However, it's possible that the calling process could set the KRB5CCNAME
- * environment variable, indicating that its Kerberos ticket cache is at a
- * non-default location. We try to handle this possibility by reading the
- * process' environment here. This will be allowed if we have root
- * capabilities, or if our UID is the same as the remote process' UID.
- *
- * Note that we don't check to see if the cache file actually exists or not.
- * We're just trying to find out where it would be if it did exist.
- *
- * @param path (out param) the path to the ticket cache file
- * @param pathLen length of the path buffer
- */
- static void findKerbTicketCachePath(struct fuse_context *ctx,
- char *path, size_t pathLen)
- {
- FILE *fp = NULL;
- static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
- int c = '\0', pathIdx = 0, keyIdx = 0;
- size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;
- // /proc/<tid>/environ contains the remote process' environment. It is
- // exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
- snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
- fp = fopen(path, "r");
- if (!fp)
- goto done;
- while (1) {
- if (c == EOF)
- goto done;
- if (keyIdx == KRB5CCNAME_LEN) {
- if (pathIdx >= pathLen - 1)
- goto done;
- if (c == '\0')
- goto done;
- path[pathIdx++] = c;
- } else if (KRB5CCNAME[keyIdx++] != c) {
- keyIdx = 0;
- }
- c = fgetc(fp);
- }
- done:
- if (fp)
- fclose(fp);
- if (pathIdx == 0) {
- snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
- } else {
- path[pathIdx] = '\0';
- }
- if (strncmp(path, KRB_FILE_PREFIX, KRB_FILE_PREFIX_LEN) == 0) {
- fprintf(stderr, "stripping " KRB_FILE_PREFIX " from the front of "
- "KRB5CCNAME.\n");
- memmove(path, path + KRB_FILE_PREFIX_LEN,
- strlen(path + KRB_FILE_PREFIX_LEN) + 1);
- }
- }
- /**
- * Create a new libhdfs connection.
- *
- * @param usrname Username to use for the new connection
- * @param ctx FUSE context to use for the new connection
- * @param out (out param) the new libhdfs connection
- *
- * @return 0 on success; error code otherwise
- */
- static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
- struct hdfsConn **out)
- {
- struct hdfsBuilder *bld = NULL;
- char kpath[PATH_MAX] = { 0 };
- struct hdfsConn *conn = NULL;
- int ret;
- struct stat st;
- conn = calloc(1, sizeof(struct hdfsConn));
- if (!conn) {
- fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
- ret = -ENOMEM;
- goto error;
- }
- bld = hdfsNewBuilder();
- if (!bld) {
- fprintf(stderr, "Unable to create hdfs builder\n");
- ret = -ENOMEM;
- goto error;
- }
- /* We always want to get a new FileSystem instance here-- that's why we call
- * hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic
- * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
- * existing cached connection which we wanted to get rid of.
- */
- hdfsBuilderSetForceNewInstance(bld);
- hdfsBuilderSetNameNode(bld, gUri);
- if (gPort) {
- hdfsBuilderSetNameNodePort(bld, gPort);
- }
- hdfsBuilderSetUserName(bld, usrname);
- if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
- findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
- if (stat(kpath, &st) < 0) {
- fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
- "file '%s'. Did you remember to kinit for UID %d?\n",
- kpath, ctx->uid);
- ret = -EACCES;
- goto error;
- }
- conn->kPathMtime = st.st_mtim.tv_sec;
- conn->kPathMtimeNs = st.st_mtim.tv_nsec;
- hdfsBuilderSetKerbTicketCachePath(bld, kpath);
- conn->kpath = strdup(kpath);
- if (!conn->kpath) {
- fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
- ret = -ENOMEM;
- goto error;
- }
- }
- conn->usrname = strdup(usrname);
- if (!conn->usrname) {
- fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
- ret = -ENOMEM;
- goto error;
- }
- conn->fs = hdfsBuilderConnect(bld);
- bld = NULL;
- if (!conn->fs) {
- ret = errno;
- fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
- "error code %d\n", usrname, ret);
- goto error;
- }
- RB_INSERT(hdfsConnTree, &gConnTree, conn);
- *out = conn;
- return 0;
- error:
- if (bld) {
- hdfsFreeBuilder(bld);
- }
- if (conn) {
- free(conn->kpath);
- free(conn->usrname);
- free(conn);
- }
- return ret;
- }
- int fuseConnect(const char *usrname, struct fuse_context *ctx,
- struct hdfsConn **out)
- {
- int ret;
- struct hdfsConn* conn;
- pthread_mutex_lock(&gConnMutex);
- conn = hdfsConnFind(usrname);
- if (!conn) {
- ret = fuseNewConnect(usrname, ctx, &conn);
- if (ret) {
- pthread_mutex_unlock(&gConnMutex);
- fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
- "error code %d\n", usrname, ret);
- return ret;
- }
- }
- conn->refcnt++;
- conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
- if (conn->expirationCount < 2)
- conn->expirationCount = 2;
- pthread_mutex_unlock(&gConnMutex);
- *out = conn;
- return 0;
- }
- int fuseConnectAsThreadUid(struct hdfsConn **conn)
- {
- struct fuse_context *ctx;
- char *usrname;
- int ret;
-
- ctx = fuse_get_context();
- usrname = getUsername(ctx->uid);
- ret = fuseConnect(usrname, ctx, conn);
- free(usrname);
- return ret;
- }
- int fuseConnectTest(void)
- {
- int ret;
- struct hdfsConn *conn;
- if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
- // TODO: call some method which can tell us whether the FS exists. In order
- // to implement this, we have to add a method to FileSystem in order to do
- // this without valid Kerberos authentication. See HDFS-3674 for details.
- return 0;
- }
- ret = fuseNewConnect("root", NULL, &conn);
- if (ret) {
- fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
- return ret;
- }
- hdfsConnRelease(conn);
- return 0;
- }
- struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
- {
- return conn->fs;
- }
- void hdfsConnRelease(struct hdfsConn *conn)
- {
- pthread_mutex_lock(&gConnMutex);
- conn->refcnt--;
- if ((conn->refcnt == 0) && (conn->condemned)) {
- fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
- conn->usrname);
- /* Notice that we're not removing the connection from gConnTree here.
- * If the connection is condemned, it must have already been removed from
- * the tree, so that no other threads start using it.
- */
- hdfsConnFree(conn);
- }
- pthread_mutex_unlock(&gConnMutex);
- }
- /**
- * Get the monotonic time.
- *
- * Unlike the wall-clock time, monotonic time only ever goes forward. If the
- * user adjusts the time, the monotonic time will not be affected.
- *
- * @return The monotonic time
- */
- static time_t getMonotonicTime(void)
- {
- int res;
- struct timespec ts;
-
- res = clock_gettime(CLOCK_MONOTONIC, &ts);
- if (res)
- abort();
- return ts.tv_sec;
- }
- /**
- * FUSE connection expiration thread
- *
- */
- static void* hdfsConnExpiryThread(void *v)
- {
- time_t nextTime, curTime;
- int waitTime;
- nextTime = getMonotonicTime() + gTimerPeriod;
- while (1) {
- curTime = getMonotonicTime();
- if (curTime >= nextTime) {
- hdfsConnExpiry();
- nextTime = curTime + gTimerPeriod;
- }
- waitTime = (nextTime - curTime) * 1000;
- poll(NULL, 0, waitTime);
- }
- return NULL;
- }
|