fuse_dfs.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #define FUSE_USE_VERSION 26
  19. #ifdef HAVE_CONFIG_H
  20. #include <config.h>
  21. #endif
  22. #ifdef linux
  23. /* For pread()/pwrite() */
  24. #define _XOPEN_SOURCE 500
  25. #endif
  26. #include <fuse.h>
  27. #include <fuse/fuse_opt.h>
  28. #include <stdio.h>
  29. #include <string.h>
  30. #include <unistd.h>
  31. #include <fcntl.h>
  32. #include <dirent.h>
  33. #include <errno.h>
  34. #include <sys/time.h>
  35. #ifdef HAVE_SETXATTR
  36. #include <sys/xattr.h>
  37. #endif
  38. #include <math.h> // for ceil
  39. #include <getopt.h>
  40. #include <assert.h>
  41. #include <syslog.h>
  42. #include <strings.h>
  43. #include <hdfs.h>
  44. // Constants
  45. //
  46. static const int default_id = 99; // nobody - not configurable since soon uids in dfs, yeah!
  47. static const size_t rd_buf_size = 128 * 1024;
  48. static const int blksize = 512;
  49. static const size_t rd_cache_buf_size = 10*1024*1024;//how much of reads to buffer here
  50. /** options for fuse_opt.h */
  51. struct options {
  52. char* server;
  53. int port;
  54. int debug;
  55. int nowrites;
  56. int no_trash;
  57. }options;
  58. static const char *const TrashPrefixDir = "/Trash";
  59. static const char *const TrashDir = "/Trash/Current";
  60. typedef struct dfs_fh_struct {
  61. hdfsFile hdfsFH;
  62. char *buf;
  63. tSize sizeBuffer; //what is the size of the buffer we have
  64. off_t startOffset; //where the buffer starts in the file
  65. } dfs_fh;
  66. #include <stddef.h>
  67. /** macro to define options */
  68. #define DFSFS_OPT_KEY(t, p, v) { t, offsetof(struct options, p), v }
  69. /** keys for FUSE_OPT_ options */
  70. static void print_usage(const char *pname)
  71. {
  72. fprintf(stdout,"USAGE: %s [--debug] [--help] [--version] [--nowrites] [--notrash] --server=<hadoop_servername> --port=<hadoop_port> <mntpoint> [fuse options]\n",pname);
  73. fprintf(stdout,"NOTE: a useful fuse option is -o allow_others and -o default_permissions\n");
  74. fprintf(stdout,"NOTE: optimizations include -o entry_timeout=500 -o attr_timeout=500\n");
  75. fprintf(stdout,"NOTE: debugging option for fuse is -debug\n");
  76. }
  77. static char **protectedpaths;
  78. #define OPTIMIZED_READS 1
  79. enum
  80. {
  81. KEY_VERSION,
  82. KEY_HELP,
  83. };
  84. static struct fuse_opt dfs_opts[] =
  85. {
  86. DFSFS_OPT_KEY("--server=%s", server, 0),
  87. DFSFS_OPT_KEY("--port=%d", port, 0),
  88. DFSFS_OPT_KEY("--debug", debug, 1),
  89. DFSFS_OPT_KEY("--nowrites", nowrites, 1),
  90. DFSFS_OPT_KEY("--notrash", no_trash, 1),
  91. FUSE_OPT_KEY("-v", KEY_VERSION),
  92. FUSE_OPT_KEY("--version", KEY_VERSION),
  93. FUSE_OPT_KEY("-h", KEY_HELP),
  94. FUSE_OPT_KEY("--help", KEY_HELP),
  95. FUSE_OPT_END
  96. };
  97. static const char *program;
  98. int dfs_options(void *data, const char *arg, int key, struct fuse_args *outargs)
  99. {
  100. if (key == KEY_VERSION) {
  101. fprintf(stdout,"%s %s\n",program,_FUSE_DFS_VERSION);
  102. exit(0);
  103. } else if (key == KEY_HELP) {
  104. print_usage(program);
  105. exit(0);
  106. } else {
  107. // try and see if the arg is a URI for DFS
  108. int tmp_port;
  109. char tmp_server[1024];
  110. if (!sscanf(arg,"dfs://%1024[a-zA-Z0-9_.-]:%d",tmp_server,&tmp_port)) {
  111. printf("didn't recognize %s\n",arg);
  112. fuse_opt_add_arg(outargs,arg);
  113. } else {
  114. options.port = tmp_port;
  115. options.server = strdup(tmp_server);
  116. }
  117. }
  118. return 0;
  119. }
  120. //
  121. // Structure to store fuse_dfs specific data
  122. // this will be created and passed to fuse at startup
  123. // and fuse will pass it back to us via the context function
  124. // on every operation.
  125. //
  126. typedef struct dfs_context_struct {
  127. int debug;
  128. char *nn_hostname;
  129. int nn_port;
  130. hdfsFS fs;
  131. int nowrites;
  132. int no_trash;
  133. // todo:
  134. // total hack city - use this to strip off the dfs url from the filenames
  135. // that the dfs API is now providing in 0.14.5
  136. // Will do a better job of fixing this once I am back from vacation
  137. //
  138. char dfs_uri[1024];
  139. int dfs_uri_len;
  140. } dfs_context;
  141. #define TRASH_RENAME_TRIES 100
  142. //
  143. // Some forward declarations
  144. //
  145. static int dfs_mkdir(const char *path, mode_t mode);
  146. static int dfs_rename(const char *from, const char *to);
  147. //
  148. // NOTE: this function is a c implementation of org.apache.hadoop.fs.Trash.moveToTrash(Path path).
  149. //
  150. int move_to_trash(const char *item) {
  151. // retrieve dfs specific data
  152. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  153. // check params and the context var
  154. assert(item);
  155. assert(dfs);
  156. assert('/' == *item);
  157. assert(rindex(item,'/') >= 0);
  158. // if not connected, try to connect and fail out if we can't.
  159. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  160. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  161. return -EIO;
  162. }
  163. char fname[4096]; // or last element of the directory path
  164. char parent_directory[4096]; // the directory the fname resides in
  165. if (strlen(item) > sizeof(fname) - strlen(TrashDir)) {
  166. syslog(LOG_ERR, "ERROR: internal buffer too small to accomodate path of length %d %s:%d\n", (int)strlen(item), __FILE__, __LINE__);
  167. return -EIO;
  168. }
  169. // separate the file name and the parent directory of the item to be deleted
  170. {
  171. int length_of_parent_dir = rindex(item, '/') - item ;
  172. int length_of_fname = strlen(item) - length_of_parent_dir - 1; // the '/'
  173. // note - the below strncpys should be safe from overflow because of the check on item's string length above.
  174. strncpy(parent_directory, item, length_of_parent_dir);
  175. parent_directory[length_of_parent_dir ] = 0;
  176. strncpy(fname, item + length_of_parent_dir + 1, strlen(item));
  177. fname[length_of_fname + 1] = 0;
  178. }
  179. // create the target trash directory
  180. char trash_dir[4096];
  181. if(snprintf(trash_dir, sizeof(trash_dir), "%s%s",TrashDir,parent_directory) >= sizeof trash_dir) {
  182. syslog(LOG_ERR, "move_to_trash error target is not big enough to hold new name for %s %s:%d\n",item, __FILE__, __LINE__);
  183. return -EIO;
  184. }
  185. // create the target trash directory in trash (if needed)
  186. if ( hdfsExists(dfs->fs, trash_dir)) {
  187. int status;
  188. // make the directory to put it in in the Trash - NOTE
  189. // dfs_mkdir also creates parents, so Current will be created if it does not exist.
  190. if ((status = dfs_mkdir(trash_dir,0)) != 0) {
  191. return status;
  192. }
  193. }
  194. //
  195. // if the target path in Trash already exists, then append with
  196. // a number. Start from 1.
  197. //
  198. char target[4096];
  199. int j ;
  200. if( snprintf(target, sizeof target,"%s/%s",trash_dir, fname) >= sizeof target) {
  201. syslog(LOG_ERR, "move_to_trash error target is not big enough to hold new name for %s %s:%d\n",item, __FILE__, __LINE__);
  202. return -EIO;
  203. }
  204. // NOTE: this loop differs from the java version by capping the #of tries
  205. for (j = 1; ! hdfsExists(dfs->fs, target) && j < TRASH_RENAME_TRIES ; j++) {
  206. if(snprintf(target, sizeof target,"%s/%s.%d",trash_dir, fname, j) >= sizeof target) {
  207. syslog(LOG_ERR, "move_to_trash error target is not big enough to hold new name for %s %s:%d\n",item, __FILE__, __LINE__);
  208. return -EIO;
  209. }
  210. }
  211. return dfs_rename(item,target);
  212. }
  213. //
  214. // Start of read-only functions
  215. //
  216. static int dfs_getattr(const char *path, struct stat *st)
  217. {
  218. // retrieve dfs specific data
  219. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  220. syslog(LOG_ERR, "starting dfs_getattr for %s\n",path);
  221. // check params and the context var
  222. assert(dfs);
  223. assert(path);
  224. assert(st);
  225. // if not connected, try to connect and fail out if we can't.
  226. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  227. syslog(LOG_ERR, "ERROR: could not connect to %s:%d %s:%d\n", dfs->nn_hostname, dfs->nn_port,__FILE__, __LINE__);
  228. return -EIO;
  229. }
  230. // call the dfs API to get the actual information
  231. hdfsFileInfo *info = hdfsGetPathInfo(dfs->fs,path);
  232. if (NULL == info) {
  233. return -ENOENT;
  234. }
  235. // initialize the stat structure
  236. memset(st, 0, sizeof(struct stat));
  237. // setup hard link info - for a file it is 1 else num entries in a dir + 2 (for . and ..)
  238. if (info[0].mKind == kObjectKindDirectory) {
  239. int numEntries = 0;
  240. hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries);
  241. if (info) {
  242. hdfsFreeFileInfo(info,numEntries);
  243. }
  244. st->st_nlink = numEntries + 2;
  245. } else {
  246. // not a directory
  247. st->st_nlink = 1;
  248. }
  249. // set stat metadata
  250. st->st_size = (info[0].mKind == kObjectKindDirectory) ? 4096 : info[0].mSize;
  251. st->st_blksize = blksize;
  252. st->st_blocks = ceil(st->st_size/st->st_blksize);
  253. st->st_mode = (info[0].mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) : (S_IFREG | 0666);
  254. st->st_uid = default_id;
  255. st->st_gid = default_id;
  256. st->st_atime = info[0].mLastMod;
  257. st->st_mtime = info[0].mLastMod;
  258. st->st_ctime = info[0].mLastMod;
  259. // free the info pointer
  260. hdfsFreeFileInfo(info,1);
  261. return 0;
  262. }
  263. static int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
  264. off_t offset, struct fuse_file_info *fi)
  265. {
  266. (void) offset;
  267. (void) fi;
  268. syslog(LOG_ERR, "starting dfs_readdir for %s\n",path);
  269. // retrieve dfs specific data
  270. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  271. // check params and the context var
  272. assert(dfs);
  273. assert(path);
  274. assert(buf);
  275. // if not connected, try to connect and fail out if we can't.
  276. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  277. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  278. return -EIO;
  279. }
  280. int path_len = strlen(path);
  281. // call dfs to read the dir
  282. int numEntries = 0;
  283. hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries);
  284. // NULL means either the directory doesn't exist or maybe IO error.
  285. if (NULL == info) {
  286. return -ENOENT;
  287. }
  288. int i ;
  289. for (i = 0; i < numEntries; i++) {
  290. // check the info[i] struct
  291. if (NULL == info[i].mName) {
  292. syslog(LOG_ERR,"ERROR: for <%s> info[%d].mName==NULL %s:%d", path, i, __FILE__,__LINE__);
  293. continue;
  294. }
  295. struct stat st;
  296. memset(&st, 0, sizeof(struct stat));
  297. // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory
  298. st.st_nlink = (info[i].mKind == kObjectKindDirectory) ? 0 : 1;
  299. // setup stat size and acl meta data
  300. st.st_size = info[i].mSize;
  301. st.st_blksize = 512;
  302. st.st_blocks = ceil(st.st_size/st.st_blksize);
  303. st.st_mode = (info[i].mKind == kObjectKindDirectory) ? (S_IFDIR | 0777) : (S_IFREG | 0666);
  304. st.st_uid = default_id;
  305. st.st_gid = default_id;
  306. st.st_atime = info[i].mLastMod;
  307. st.st_mtime = info[i].mLastMod;
  308. st.st_ctime = info[i].mLastMod;
  309. // hack city: todo fix the below to something nicer and more maintainable but
  310. // with good performance
  311. // strip off the path but be careful if the path is solely '/'
  312. // NOTE - this API started returning filenames as full dfs uris
  313. const char *const str = info[i].mName + dfs->dfs_uri_len + path_len + ((path_len == 1 && *path == '/') ? 0 : 1);
  314. // pack this entry into the fuse buffer
  315. int res = 0;
  316. if ((res = filler(buf,str,&st,0)) != 0) {
  317. syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d\n",res, __FILE__, __LINE__);
  318. }
  319. }
  320. // insert '.' and '..'
  321. const char *const dots [] = { ".",".."};
  322. for (i = 0 ; i < 2 ; i++)
  323. {
  324. struct stat st;
  325. memset(&st, 0, sizeof(struct stat));
  326. // set to 0 to indicate not supported for directory because we cannot (efficiently) get this info for every subdirectory
  327. st.st_nlink = 0;
  328. // setup stat size and acl meta data
  329. st.st_size = 512;
  330. st.st_blksize = 512;
  331. st.st_blocks = 1;
  332. st.st_mode = (S_IFDIR | 0777);
  333. st.st_uid = default_id;
  334. st.st_gid = default_id;
  335. // todo fix below times
  336. st.st_atime = 0;
  337. st.st_mtime = 0;
  338. st.st_ctime = 0;
  339. const char *const str = dots[i];
  340. // flatten the info using fuse's function into a buffer
  341. int res = 0;
  342. if ((res = filler(buf,str,&st,0)) != 0) {
  343. syslog(LOG_ERR, "ERROR: readdir filling the buffer %d %s:%d", res, __FILE__, __LINE__);
  344. }
  345. }
  346. // free the info pointers
  347. hdfsFreeFileInfo(info,numEntries);
  348. syslog(LOG_ERR, "returning dfs_readdir for %s\n",path);
  349. return 0;
  350. }
  351. static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
  352. struct fuse_file_info *fi)
  353. {
  354. // retrieve dfs specific data
  355. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  356. // check params and the context var
  357. assert(dfs);
  358. assert(path);
  359. assert(buf);
  360. // if not connected, try to connect and fail out if we can't.
  361. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  362. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  363. return -EIO;
  364. }
  365. #ifdef OPTIMIZED_READS
  366. dfs_fh *fh = (dfs_fh*)fi->fh;
  367. //fprintf(stderr, "Cache bounds for %s: %llu -> %llu (%d bytes). Check for offset %llu\n", path, fh->startOffset, fh->startOffset + fh->sizeBuffer, fh->sizeBuffer, offset);
  368. if (fh->sizeBuffer == 0 || offset < fh->startOffset || offset > (fh->startOffset + fh->sizeBuffer) )
  369. {
  370. // do the actual read
  371. //fprintf (stderr,"Reading %s from HDFS, offset %llu, amount %d\n", path, offset, rd_cache_buf_size);
  372. const tSize num_read = hdfsPread(dfs->fs, fh->hdfsFH, offset, fh->buf, rd_cache_buf_size);
  373. if (num_read < 0) {
  374. syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, num_read, __FILE__, __LINE__);
  375. hdfsDisconnect(dfs->fs);
  376. dfs->fs = NULL;
  377. return -EIO;
  378. }
  379. fh->sizeBuffer = num_read;
  380. fh->startOffset = offset;
  381. //fprintf (stderr,"Read %d bytes of %s from HDFS\n", num_read, path);
  382. }
  383. char* local_buf = fh->buf;
  384. const tSize cacheLookupOffset = offset - fh->startOffset;
  385. local_buf += cacheLookupOffset;
  386. //fprintf(stderr,"FUSE requested %d bytes of %s for offset %d in file\n", size, path, offset);
  387. const tSize amount = cacheLookupOffset + size > fh->sizeBuffer
  388. ? fh->sizeBuffer - cacheLookupOffset
  389. : size;
  390. //fprintf(stderr,"Reading %s from cache, %d bytes from position %d\n", path, amount, cacheLookupOffset);
  391. //fprintf(stderr,"Cache status for %s: %d bytes cached from offset %llu\n", path, fh->sizeBuffer, fh->startOffset);
  392. memcpy(buf, local_buf, amount);
  393. //fprintf(stderr,"Read %s from cache, %d bytes from position %d\n", path, amount, cacheLookupOffset);
  394. //fprintf(stderr,"Cache status for %s: %d bytes cached from offset %llu\n", path, fh->sizeBuffer, fh->startOffset);
  395. return amount;
  396. #else
  397. // NULL means either file doesn't exist or maybe IO error - i.e., the dfs_open must have failed
  398. if (NULL == (void*)fi->fh) {
  399. // should never happen
  400. return -EIO;
  401. }
  402. syslog(LOG_DEBUG,"buffer size=%d\n",(int)size);
  403. // do the actual read
  404. const tSize num_read = hdfsPread(dfs->fs, (hdfsFile)fi->fh, offset, buf, size);
  405. // handle errors
  406. if (num_read < 0) {
  407. syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, num_read, __FILE__, __LINE__);
  408. hdfsDisconnect(dfs->fs);
  409. dfs->fs = NULL;
  410. return -EIO;
  411. }
  412. return num_read;
  413. #endif
  414. }
  415. static int dfs_statfs(const char *path, struct statvfs *st)
  416. {
  417. // retrieve dfs specific data
  418. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  419. // check params and the context var
  420. assert(path);
  421. assert(st);
  422. assert(dfs);
  423. // init the stat structure
  424. memset(st,0,sizeof(struct statvfs));
  425. // if not connected, try to connect and fail out if we can't.
  426. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  427. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  428. return -EIO;
  429. }
  430. const long cap = hdfsGetCapacity(dfs->fs);
  431. const long used = hdfsGetUsed(dfs->fs);
  432. const long bsize = hdfsGetDefaultBlockSize(dfs->fs);
  433. // fill in the statvfs structure
  434. /* FOR REFERENCE:
  435. struct statvfs {
  436. unsigned long f_bsize; // file system block size
  437. unsigned long f_frsize; // fragment size
  438. fsblkcnt_t f_blocks; // size of fs in f_frsize units
  439. fsblkcnt_t f_bfree; // # free blocks
  440. fsblkcnt_t f_bavail; // # free blocks for non-root
  441. fsfilcnt_t f_files; // # inodes
  442. fsfilcnt_t f_ffree; // # free inodes
  443. fsfilcnt_t f_favail; // # free inodes for non-root
  444. unsigned long f_fsid; // file system id
  445. unsigned long f_flag; / mount flags
  446. unsigned long f_namemax; // maximum filename length
  447. };
  448. */
  449. st->f_bsize = bsize;
  450. st->f_frsize = st->f_bsize;
  451. st->f_blocks = cap/st->f_bsize;
  452. st->f_bfree = (cap-used)/st->f_bsize;
  453. st->f_bavail = st->f_bfree;
  454. st->f_files = 1000;
  455. st->f_ffree = 500;
  456. st->f_favail = 500;
  457. st->f_fsid = 1023;
  458. st->f_flag = ST_RDONLY | ST_NOSUID;
  459. st->f_namemax = 1023;
  460. return 0;
  461. }
  462. static int dfs_access(const char *path, int mask)
  463. {
  464. // no permissions on dfs, always a success
  465. return 0;
  466. }
  467. //
  468. // The remainder are write functionality and therefore not implemented right now
  469. //
  470. static int dfs_mkdir(const char *path, mode_t mode)
  471. {
  472. // retrieve dfs specific data
  473. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  474. // check params and the context var
  475. assert(path);
  476. assert(dfs);
  477. // if not connected, try to connect and fail out if we can't.
  478. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  479. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  480. return -EIO;
  481. }
  482. assert('/' == *path);
  483. int i ;
  484. for (i = 0; protectedpaths[i]; i++) {
  485. if (strcmp(path, protectedpaths[i]) == 0) {
  486. syslog(LOG_ERR,"ERROR: hdfs trying to create the directory: %s", path);
  487. return -EACCES;
  488. }
  489. }
  490. if (dfs->nowrites) {
  491. syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot create the directory %s\n",path);
  492. return -EACCES;
  493. }
  494. if (hdfsCreateDirectory(dfs->fs, path)) {
  495. syslog(LOG_ERR,"ERROR: hdfs trying to create directory %s",path);
  496. return -EIO;
  497. }
  498. return 0;
  499. }
  500. static int dfs_rename(const char *from, const char *to)
  501. {
  502. // retrieve dfs specific data
  503. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  504. // check params and the context var
  505. assert(from);
  506. assert(to);
  507. assert(dfs);
  508. // if not connected, try to connect and fail out if we can't.
  509. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  510. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  511. return -EIO;
  512. }
  513. assert('/' == *from);
  514. assert('/' == *to);
  515. int i ;
  516. for (i = 0; protectedpaths[i] != NULL; i++) {
  517. if (strcmp(from, protectedpaths[i]) == 0) {
  518. syslog(LOG_ERR,"ERROR: hdfs trying to rename directories %s to %s",from,to);
  519. return -EACCES;
  520. }
  521. if (strcmp(to, protectedpaths[i]) == 0) {
  522. syslog(LOG_ERR,"ERROR: hdfs trying to rename directories %s to %s",from,to);
  523. return -EACCES;
  524. }
  525. }
  526. if (dfs->nowrites) {
  527. syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot rename the directory %s\n",from);
  528. return -EACCES;
  529. }
  530. if (hdfsRename(dfs->fs, from, to)) {
  531. syslog(LOG_ERR,"ERROR: hdfs trying to rename %s to %s",from, to);
  532. return -EIO;
  533. }
  534. return 0;
  535. }
  536. static int is_protected(const char *path) {
  537. int i ;
  538. for (i = 0; protectedpaths[i]; i++) {
  539. if (strcmp(path, protectedpaths[i]) == 0) {
  540. return 1;
  541. }
  542. }
  543. return 0;
  544. }
  545. static int dfs_rmdir(const char *path)
  546. {
  547. // retrieve dfs specific data
  548. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  549. // check params and the context var
  550. assert(path);
  551. assert(dfs);
  552. // if not connected, try to connect and fail out if we can't.
  553. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  554. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  555. return -EIO;
  556. }
  557. assert('/' == *path);
  558. if(is_protected(path)) {
  559. syslog(LOG_ERR,"ERROR: hdfs trying to delete a protected directory: %s ",path);
  560. return -EACCES;
  561. }
  562. int numEntries = 0;
  563. hdfsFileInfo *info = hdfsListDirectory(dfs->fs,path,&numEntries);
  564. // free the info pointers
  565. hdfsFreeFileInfo(info,numEntries);
  566. if (numEntries) {
  567. return -ENOTEMPTY;
  568. }
  569. if (!dfs->no_trash && strncmp(path, TrashPrefixDir, strlen(TrashPrefixDir)) != 0) {
  570. return move_to_trash(path);
  571. }
  572. if (dfs->nowrites) {
  573. syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot delete the directory %s\n",path);
  574. return -EACCES;
  575. }
  576. if(hdfsDelete(dfs->fs, path)) {
  577. syslog(LOG_ERR,"ERROR: hdfs error trying to delete the directory %s\n",path);
  578. return -EIO;
  579. }
  580. return 0;
  581. }
  582. static int dfs_unlink(const char *path)
  583. {
  584. // retrieve dfs specific data
  585. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  586. // check params and the context var
  587. assert(path);
  588. assert(dfs);
  589. // if not connected, try to connect and fail out if we can't.
  590. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  591. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  592. return -EIO;
  593. }
  594. assert('/' == *path);
  595. if(is_protected(path)) {
  596. syslog(LOG_ERR,"ERROR: hdfs trying to delete a protected directory: %s ",path);
  597. return -EACCES;
  598. }
  599. // move the file to the trash if this is enabled and its not actually in the trash.
  600. if (!dfs->no_trash && strncmp(path, TrashPrefixDir, strlen(TrashPrefixDir)) != 0) {
  601. return move_to_trash(path);
  602. }
  603. if (dfs->nowrites) {
  604. syslog(LOG_ERR,"ERROR: hdfs is configured as read-only, cannot create the directory %s\n",path);
  605. return -EACCES;
  606. }
  607. if (hdfsDelete(dfs->fs, path)) {
  608. syslog(LOG_ERR,"ERROR: hdfs trying to delete the file %s",path);
  609. return -EIO;
  610. }
  611. return 0;
  612. }
  613. static int dfs_chmod(const char *path, mode_t mode)
  614. {
  615. (void)path;
  616. (void)mode;
  617. return -ENOTSUP;
  618. }
  619. static int dfs_chown(const char *path, uid_t uid, gid_t gid)
  620. {
  621. (void)path;
  622. (void)uid;
  623. (void)gid;
  624. return -ENOTSUP;
  625. }
  626. //static int dfs_truncate(const char *path, off_t size)
  627. //{
  628. // (void)path;
  629. // (void)size;
  630. // return -ENOTSUP;
  631. //}
  632. long tempfh = 0;
  633. static int dfs_open(const char *path, struct fuse_file_info *fi)
  634. {
  635. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  636. // check params and the context var
  637. assert(path);
  638. assert('/' == *path);
  639. assert(dfs);
  640. int ret = 0;
  641. // if not connected, try to connect and fail out if we can't.
  642. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  643. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  644. return -EIO;
  645. }
  646. // 0x8000 is always passed in and hadoop doesn't like it, so killing it here
  647. // bugbug figure out what this flag is and report problem to Hadoop JIRA
  648. int flags = (fi->flags & 0x7FFF);
  649. #ifdef OPTIMIZED_READS
  650. // retrieve dfs specific data
  651. dfs_fh *fh = (dfs_fh*)malloc(sizeof (dfs_fh));
  652. fi->fh = (uint64_t)fh;
  653. fh->hdfsFH = (hdfsFile)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
  654. fh->buf = (char*)malloc(rd_cache_buf_size*sizeof (char));
  655. fh->startOffset = 0;
  656. fh->sizeBuffer = 0;
  657. if (0 == fh->hdfsFH) {
  658. syslog(LOG_ERR, "ERROR: could not open file %s dfs %s:%d\n", path,__FILE__, __LINE__);
  659. ret = -EIO;
  660. }
  661. #else
  662. // fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
  663. // bugbug should stop O_RDWR flag here.
  664. // bugbug when fix https://issues.apache.org/jira/browse/HADOOP-3723 can remove the below code
  665. if (flags & O_WRONLY) {
  666. flags = O_WRONLY;
  667. }
  668. if (flags & O_RDWR) {
  669. // NOTE - should not normally be checking policy in the middleman, but the handling of Unix flags in DFS is not
  670. // consistent right now. 2008-07-16
  671. syslog(LOG_ERR, "ERROR: trying to open a file with O_RDWR and DFS does not support that %s dfs %s:%d\n", path,__FILE__, __LINE__);
  672. return -EIO;
  673. }
  674. // fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
  675. // retrieve dfs specific data
  676. fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
  677. if (0 == fi->fh) {
  678. syslog(LOG_ERR, "ERROR: could not open file %s dfs %s:%d\n", path,__FILE__, __LINE__);
  679. ret = -EIO;
  680. }
  681. #endif
  682. return ret;
  683. }
  684. static int dfs_write(const char *path, const char *buf, size_t size,
  685. off_t offset, struct fuse_file_info *fi)
  686. {
  687. // retrieve dfs specific data
  688. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  689. // check params and the context var
  690. assert(path);
  691. assert(dfs);
  692. assert('/' == *path);
  693. // if not connected, try to connect and fail out if we can't.
  694. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  695. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  696. return -EIO;
  697. }
  698. #ifdef OPTIMIZED_READS
  699. dfs_fh *fh = (dfs_fh*)fi->fh;
  700. hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
  701. #else
  702. hdfsFile file_handle = (hdfsFile)fi->fh;
  703. if (NULL == file_handle) {
  704. syslog(LOG_ERR, "ERROR: fuse problem - no file_handle for %s %s:%d\n",path, __FILE__, __LINE__);
  705. return -EIO;
  706. }
  707. #endif
  708. syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
  709. tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
  710. if (cur_offset != offset) {
  711. syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
  712. return -EIO;
  713. }
  714. tSize length = hdfsWrite(dfs->fs, file_handle, buf, size);
  715. if(length <= 0) {
  716. syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
  717. return -EIO;
  718. }
  719. if (length != size) {
  720. syslog(LOG_ERR, "WARN: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
  721. }
  722. return length;
  723. }
  724. int dfs_release (const char *path, struct fuse_file_info *fi) {
  725. // retrieve dfs specific data
  726. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  727. // check params and the context var
  728. assert(path);
  729. assert(dfs);
  730. assert('/' == *path);
  731. // if not connected, try to connect and fail out if we can't.
  732. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  733. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  734. return -EIO;
  735. }
  736. if (NULL == (void*)fi->fh) {
  737. return 0;
  738. }
  739. #ifdef OPTIMIZED_READS
  740. dfs_fh *fh = (dfs_fh*)fi->fh;
  741. hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
  742. free(fh->buf);
  743. free(fh);
  744. #else
  745. hdfsFile file_handle = (hdfsFile)fi->fh;
  746. #endif
  747. if (NULL == file_handle) {
  748. return 0;
  749. }
  750. if (hdfsCloseFile(dfs->fs, file_handle) != 0) {
  751. syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
  752. // fprintf(stderr, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
  753. return -EIO;
  754. }
  755. fi->fh = (uint64_t)0;
  756. return 0;
  757. }
  758. static int dfs_mknod(const char *path, mode_t mode, dev_t rdev) {
  759. syslog(LOG_DEBUG,"in dfs_mknod");
  760. return 0;
  761. }
  762. static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
  763. {
  764. fi->flags |= mode;
  765. return dfs_open(path, fi);
  766. }
  767. int dfs_flush(const char *path, struct fuse_file_info *fi) {
  768. // retrieve dfs specific data
  769. dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
  770. // check params and the context var
  771. assert(path);
  772. assert(dfs);
  773. assert('/' == *path);
  774. // if not connected, try to connect and fail out if we can't.
  775. if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
  776. syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
  777. return -EIO;
  778. }
  779. if (NULL == (void*)fi->fh) {
  780. return 0;
  781. }
  782. // note that fuse calls flush on RO files too and hdfs does not like that and will return an error
  783. if(fi->flags & O_WRONLY) {
  784. #ifdef OPTIMIZED_READS
  785. dfs_fh *fh = (dfs_fh*)fi->fh;
  786. hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
  787. #else
  788. hdfsFile file_handle = (hdfsFile)fi->fh;
  789. #endif
  790. if (hdfsFlush(dfs->fs, file_handle) != 0) {
  791. syslog(LOG_ERR, "ERROR: dfs problem - could not flush file_handle(%x) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
  792. return -EIO;
  793. }
  794. }
  795. return 0;
  796. }
  797. void dfs_setattr(struct stat *attr, int to_set, struct fuse_file_info *fi)
  798. {
  799. }
  800. void dfs_destroy (void *ptr)
  801. {
  802. dfs_context *dfs = (dfs_context*)ptr;
  803. hdfsDisconnect(dfs->fs);
  804. dfs->fs = NULL;
  805. }
  806. // Hacked up function to basically do:
  807. // protectedpaths = split(PROTECTED_PATHS,',');
  808. static void init_protectedpaths() {
  809. // PROTECTED_PATHS should be a #defined value from autoconf
  810. // set it with configure --with-protectedpaths=/,/user,/user/foo
  811. // note , seped with no other spaces and no quotes around it
  812. char *tmp = PROTECTED_PATHS;
  813. assert(tmp);
  814. // handle degenerate case up front.
  815. if (0 == *tmp) {
  816. protectedpaths = (char**)malloc(sizeof(char*));
  817. protectedpaths[0] = NULL;
  818. return;
  819. }
  820. int i = 0;
  821. while (tmp && (NULL != (tmp = index(tmp,',')))) {
  822. tmp++; // pass the ,
  823. i++;
  824. }
  825. i++; // for the last entry
  826. i++; // for the final NULL
  827. protectedpaths = (char**)malloc(sizeof(char*)*i);
  828. printf("i=%d\n",i);
  829. tmp = PROTECTED_PATHS;
  830. int j = 0;
  831. while (NULL != tmp && j < i) {
  832. int length;
  833. char *eos = index(tmp,',');
  834. if (NULL != eos) {
  835. length = eos - tmp; // length of this value
  836. } else {
  837. length = strlen(tmp);
  838. }
  839. protectedpaths[j] = (char*)malloc(sizeof(char)*length+1);
  840. strncpy(protectedpaths[j], tmp, length);
  841. protectedpaths[j][length] = '\0';
  842. if (eos) {
  843. tmp = eos + 1;
  844. } else {
  845. tmp = NULL;
  846. }
  847. j++;
  848. }
  849. protectedpaths[j] = NULL;
  850. /*
  851. j = 0;
  852. while (protectedpaths[j]) {
  853. printf("protectedpaths[%d]=%s\n",j,protectedpaths[j]);
  854. fflush(stdout);
  855. j++;
  856. }
  857. exit(1);
  858. */
  859. }
  860. void *dfs_init()
  861. {
  862. //
  863. // Create a private struct of data we will pass to fuse here and which
  864. // will then be accessible on every call.
  865. //
  866. dfs_context *dfs = (dfs_context*)malloc(sizeof (dfs_context));
  867. if (NULL == dfs) {
  868. syslog(LOG_ERR, "FATAL: could not malloc fuse dfs context struct - out of memory %s:%d", __FILE__, __LINE__);
  869. exit(1);
  870. }
  871. // initialize the context
  872. dfs->debug = options.debug;
  873. dfs->nn_hostname = options.server;
  874. dfs->nn_port = options.port;
  875. dfs->fs = NULL;
  876. dfs->nowrites = options.nowrites;
  877. dfs->no_trash = options.no_trash;
  878. bzero(dfs->dfs_uri,0);
  879. sprintf(dfs->dfs_uri,"dfs://%s:%d/",dfs->nn_hostname,dfs->nn_port);
  880. dfs->dfs_uri_len = strlen(dfs->dfs_uri);
  881. // use ERR level to ensure it makes it into the log.
  882. syslog(LOG_ERR, "mounting %s", dfs->dfs_uri);
  883. init_protectedpaths();
  884. return (void*)dfs;
  885. }
  886. static struct fuse_operations dfs_oper = {
  887. .getattr = dfs_getattr,
  888. .access = dfs_access,
  889. .readdir = dfs_readdir,
  890. .destroy = dfs_destroy,
  891. .init = dfs_init,
  892. .open = dfs_open,
  893. .read = dfs_read,
  894. .statfs = dfs_statfs,
  895. .mkdir = dfs_mkdir,
  896. .rmdir = dfs_rmdir,
  897. .rename = dfs_rename,
  898. .unlink = dfs_unlink,
  899. .release = dfs_release,
  900. .create = dfs_create,
  901. .write = dfs_write,
  902. .flush = dfs_flush,
  903. //.xsetattr = dfs_setattr,
  904. .mknod = dfs_mknod,
  905. .chmod = dfs_chmod,
  906. .chown = dfs_chown,
  907. // .truncate = dfs_truncate,
  908. };
  909. int main(int argc, char *argv[])
  910. {
  911. umask(0);
  912. program = argv[0];
  913. struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
  914. /* clear structure that holds our options */
  915. memset(&options, 0, sizeof(struct options));
  916. if (fuse_opt_parse(&args, &options, dfs_opts, dfs_options) == -1)
  917. /** error parsing options */
  918. return -1;
  919. if (options.server == NULL || options.port == 0) {
  920. print_usage(argv[0]);
  921. exit(0);
  922. }
  923. int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
  924. if (ret) printf("\n");
  925. /** free arguments */
  926. fuse_opt_free_args(&args);
  927. return ret;
  928. }