ndfs.c 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/hadoop_err.h"
  19. #include "common/hconf.h"
  20. #include "common/net.h"
  21. #include "common/string.h"
  22. #include "common/uri.h"
  23. #include "fs/common.h"
  24. #include "fs/fs.h"
  25. #include "protobuf/ClientNamenodeProtocol.call.h"
  26. #include "protobuf/hdfs.pb-c.h.s"
  27. #include "rpc/messenger.h"
  28. #include "rpc/proxy.h"
  29. #include <netinet/in.h>
  30. #include <stdio.h>
  31. #include <stdlib.h>
  32. #include <string.h>
  33. #include <strings.h>
  34. #include <uriparser/Uri.h>
  35. #include <uv.h>
  36. #define DEFAULT_NN_PORT 8020
  37. #define CLIENT_NN_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientProtocol"
  38. struct native_fs {
  39. /** Fields common to all filesystems. */
  40. struct hadoop_fs_base base;
  41. /**
  42. * Address of the namenode.
  43. * TODO: implement HA failover
  44. * TODO: implement IPv6
  45. */
  46. struct sockaddr_in nn_addr;
  47. /** The messenger used to perform RPCs. */
  48. struct hrpc_messenger *msgr;
  49. /** The default block size obtained from getServerDefaults. */
  50. int64_t default_block_size;
  51. /** User name to use for RPCs. Immutable. */
  52. char *user_name;
  53. /**
  54. * A dynamically allocated working directory which will be prepended to
  55. * all relative paths.
  56. */
  57. UriUriA *working_uri;
  58. /** Lock which protects the working_uri. */
  59. uv_mutex_t working_uri_lock;
  60. };
  61. /**
  62. * Set if the file is read-only... otherwise, the file is assumed to be
  63. * write-only.
  64. */
  65. #define NDFS_FILE_FLAG_RO 0x1
  66. /** This flag is for compatibility with some old test harnesses. */
  67. #define NDFS_FILE_FLAG_DISABLE_DIRECT_READ 0x2
  68. /** Base class for both read-only and write-only files. */
  69. struct native_file_base {
  70. /** Fields common to all filesystems. */
  71. struct hadoop_file_base base;
  72. /** NDFS file flags. */
  73. int flags;
  74. };
  75. /** A read-only file. */
  76. struct native_ro_file {
  77. struct native_file_base base;
  78. uint64_t bytes_read;
  79. };
  80. /** A write-only file. */
  81. struct native_wo_file {
  82. struct native_file_base base;
  83. };
  84. /** Whole-filesystem stats sent back from the NameNode. */
  85. struct hadoop_vfs_stats {
  86. int64_t capacity;
  87. int64_t used;
  88. int64_t remaining;
  89. int64_t under_replicated;
  90. int64_t corrupt_blocks;
  91. int64_t missing_blocks;
  92. };
  93. /** Server defaults sent back from the NameNode. */
  94. struct ndfs_server_defaults {
  95. uint64_t blocksize;
  96. };
  97. static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri);
  98. static void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
  99. {
  100. hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
  101. fs->user_name);
  102. }
  103. /**
  104. * Construct a canonical path from a URI.
  105. *
  106. * @param fs The filesystem.
  107. * @param uri The URI.
  108. * @param out (out param) the canonical path.
  109. *
  110. * @return NULL on success; the error otherwise.
  111. */
  112. static struct hadoop_err *build_path(struct native_fs *fs, const char *uri_str,
  113. char **out)
  114. {
  115. char *path = NULL;
  116. struct hadoop_err *err = NULL;
  117. UriParserStateA uri_state;
  118. UriUriA uri;
  119. memset(&uri_state, 0, sizeof(uri_state));
  120. uv_mutex_lock(&fs->working_uri_lock);
  121. err = uri_parse(uri_str, &uri_state, &uri, fs->working_uri);
  122. if (err)
  123. goto done;
  124. // TODO: check URI scheme and user against saved values?
  125. err = uri_get_path(&uri, &path);
  126. if (err)
  127. goto done;
  128. // As a special case, when the URI given has an empty path, we assume that
  129. // we want the current working directory. This is to allow things like
  130. // hdfs://mynamenode to map to the current working directory, as they do in
  131. // Hadoop. Note that this is different than hdfs://mynamenode/ (note the
  132. // trailing slash) which maps to the root directory.
  133. if (!path[0]) {
  134. free(path);
  135. path = NULL;
  136. err = uri_get_path(fs->working_uri, &path);
  137. if (err) {
  138. goto done;
  139. }
  140. }
  141. err = NULL;
  142. done:
  143. uv_mutex_unlock(&fs->working_uri_lock);
  144. if (uri_state.uri) {
  145. uriFreeUriMembersA(&uri);
  146. }
  147. if (err) {
  148. free(path);
  149. return err;
  150. }
  151. *out = path;
  152. return NULL;
  153. }
  154. static int ndfs_file_is_open_for_read(hdfsFile bfile)
  155. {
  156. struct native_file_base *file = (struct native_file_base *)bfile;
  157. return !!(file->flags & NDFS_FILE_FLAG_RO);
  158. }
  159. static int ndfs_file_is_open_for_write(hdfsFile bfile)
  160. {
  161. struct native_file_base *file = (struct native_file_base *)bfile;
  162. return !(file->flags & NDFS_FILE_FLAG_RO);
  163. }
  164. static int ndfs_file_get_read_statistics(hdfsFile bfile,
  165. struct hdfsReadStatistics **out)
  166. {
  167. struct hdfsReadStatistics *stats;
  168. struct native_ro_file *file = (struct native_ro_file *)bfile;
  169. if (!(file->base.flags & NDFS_FILE_FLAG_RO)) {
  170. errno = EINVAL;
  171. return -1;
  172. }
  173. stats = calloc(1, sizeof(*stats));
  174. if (!stats) {
  175. errno = ENOMEM;
  176. return -1;
  177. }
  178. stats->totalBytesRead = file->bytes_read;
  179. *out = stats;
  180. return 0;
  181. }
  182. static struct hadoop_err *ndfs_get_server_defaults(struct native_fs *fs,
  183. struct ndfs_server_defaults *defaults)
  184. {
  185. struct hadoop_err *err = NULL;
  186. GetServerDefaultsRequestProto req =
  187. GET_SERVER_DEFAULTS_REQUEST_PROTO__INIT;
  188. GetServerDefaultsResponseProto *resp = NULL;
  189. struct hrpc_proxy proxy;
  190. ndfs_nn_proxy_init(fs, &proxy);
  191. err = cnn_get_server_defaults(&proxy, &req, &resp);
  192. if (err) {
  193. goto done;
  194. }
  195. defaults->blocksize = resp->serverdefaults->blocksize;
  196. done:
  197. if (resp) {
  198. get_server_defaults_response_proto__free_unpacked(resp, NULL);
  199. }
  200. return err;
  201. }
  202. /**
  203. * Parse an address in the form <hostname> or <hostname>:<port>.
  204. *
  205. * @param host The hostname
  206. * @param addr (out param) The sockaddr.
  207. * @param default_port The default port to use, if one is not found in the
  208. * string.
  209. *
  210. * @return NULL on success; the error otherwise.
  211. */
  212. static struct hadoop_err *parse_rpc_addr(const char *input,
  213. struct sockaddr_in *out, int default_port)
  214. {
  215. struct hadoop_err *err = NULL;
  216. char *host, *colon;
  217. uint32_t addr;
  218. int port;
  219. fprintf(stderr, "parse_rpc_addr(input=%s, default_port=%d)\n",
  220. input, default_port);
  221. // If the URI doesn't contain a port, we use a default.
  222. // This may come either from the hdfsBuilder, or from the
  223. // 'default default' for HDFS.
  224. // It's kind of silly that hdfsBuilder even includes this field, since this
  225. // information should just be included in the URI, but this is here for
  226. // compatibility.
  227. port = (default_port <= 0) ? DEFAULT_NN_PORT : default_port;
  228. host = strdup(input);
  229. if (!host) {
  230. err = hadoop_lerr_alloc(ENOMEM, "parse_rpc_addr: OOM");
  231. goto done;
  232. }
  233. colon = index(host, ':');
  234. if (colon) {
  235. // If the URI has a colon, we parse the next part as a port.
  236. char *port_str = colon + 1;
  237. *colon = '\0';
  238. port = atoi(colon);
  239. if ((port <= 0) || (port >= 65536)) {
  240. err = hadoop_lerr_alloc(EINVAL, "parse_rpc_addr: invalid port "
  241. "string %s", port_str);
  242. goto done;
  243. }
  244. }
  245. err = get_first_ipv4_addr(host, &addr);
  246. if (err)
  247. goto done;
  248. out->sin_family = AF_INET;
  249. out->sin_port = htons(port);
  250. out->sin_addr.s_addr = htonl(addr);
  251. done:
  252. free(host);
  253. return err;
  254. }
  255. static struct hadoop_err *get_namenode_addr(const struct hdfsBuilder *hdfs_bld,
  256. struct sockaddr_in *nn_addr)
  257. {
  258. const char *nameservice_id;
  259. const char *rpc_addr;
  260. nameservice_id = hconf_get(hdfs_bld->hconf, "dfs.nameservice.id");
  261. if (nameservice_id) {
  262. return hadoop_lerr_alloc(ENOTSUP, "get_namenode_addr: we "
  263. "don't yet support HA or federated configurations.");
  264. }
  265. rpc_addr = hconf_get(hdfs_bld->hconf, "dfs.namenode.rpc-address");
  266. if (rpc_addr) {
  267. return parse_rpc_addr(rpc_addr, nn_addr, hdfs_bld->port);
  268. }
  269. return parse_rpc_addr(hdfs_bld->uri_authority, nn_addr, hdfs_bld->port);
  270. }
  271. struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
  272. struct hdfs_internal **out)
  273. {
  274. struct hadoop_err *err = NULL;
  275. struct native_fs *fs = NULL;
  276. struct hrpc_messenger_builder *msgr_bld;
  277. struct ndfs_server_defaults defaults;
  278. int working_dir_lock_created = 0;
  279. char *working_dir = NULL;
  280. UriParserStateA uri_state;
  281. fs = calloc(1, sizeof(*fs));
  282. if (!fs) {
  283. err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
  284. "for a native_fs structure.");
  285. goto done;
  286. }
  287. fs->base.ty = HADOOP_FS_TY_NDFS;
  288. fs->user_name = strdup(hdfs_bld->uri_user_info);
  289. if (!fs->user_name) {
  290. err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
  291. "for the user name.");
  292. goto done;
  293. }
  294. msgr_bld = hrpc_messenger_builder_alloc();
  295. if (!msgr_bld) {
  296. err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
  297. "for a messenger builder.");
  298. goto done;
  299. }
  300. err = get_namenode_addr(hdfs_bld, &fs->nn_addr);
  301. if (err)
  302. goto done;
  303. err = hrpc_messenger_create(msgr_bld, &fs->msgr);
  304. if (err)
  305. goto done;
  306. // Get the default working directory
  307. if (asprintf(&working_dir, "%s:///user/%s/",
  308. hdfs_bld->uri_scheme, hdfs_bld->uri_user_info) < 0) {
  309. working_dir = NULL;
  310. err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
  311. "working_dir");
  312. goto done;
  313. }
  314. fs->working_uri = calloc(1, sizeof(*(fs->working_uri)));
  315. if (!fs->working_uri) {
  316. err = hadoop_lerr_alloc(ENOMEM, "ndfs_connect: OOM allocating "
  317. "fs->working_uri");
  318. goto done;
  319. }
  320. err = uri_parse_abs(working_dir, &uri_state, fs->working_uri,
  321. hdfs_bld->uri_scheme);
  322. if (err) {
  323. free(fs->working_uri);
  324. fs->working_uri = NULL;
  325. goto done;
  326. }
  327. if (uv_mutex_init(&fs->working_uri_lock) < 0) {
  328. err = hadoop_lerr_alloc(ENOMEM, "failed to create a mutex.");
  329. goto done;
  330. }
  331. working_dir_lock_created = 1;
  332. // Ask the NameNode about our server defaults. We'll use this information
  333. // later in ndfs_get_default_block_size, and when writing new files. Just
  334. // as important, tghis validates that we can talk to the NameNode with our
  335. // current configuration.
  336. memset(&defaults, 0, sizeof(defaults));
  337. err = ndfs_get_server_defaults(fs, &defaults);
  338. if (err)
  339. goto done;
  340. fs->default_block_size = defaults.blocksize;
  341. err = NULL;
  342. done:
  343. free(working_dir);
  344. if (err) {
  345. if (fs) {
  346. free(fs->user_name);
  347. if (fs->working_uri) {
  348. uriFreeUriMembersA(fs->working_uri);
  349. free(fs->working_uri);
  350. }
  351. if (working_dir_lock_created) {
  352. uv_mutex_destroy(&fs->working_uri_lock);
  353. }
  354. free(fs);
  355. }
  356. return err;
  357. }
  358. *out = (struct hdfs_internal *)fs;
  359. return NULL;
  360. }
  361. static int ndfs_disconnect(hdfsFS bfs)
  362. {
  363. struct native_fs *fs = (struct native_fs*)bfs;
  364. hrpc_messenger_shutdown(fs->msgr);
  365. hrpc_messenger_free(fs->msgr);
  366. free(fs->user_name);
  367. uriFreeUriMembersA(fs->working_uri);
  368. free(fs->working_uri);
  369. uv_mutex_destroy(&fs->working_uri_lock);
  370. free(fs);
  371. return 0;
  372. }
  373. static struct hadoop_err *ndfs_open_file_for_read(
  374. struct native_ro_file **out __attribute__((unused)),
  375. struct native_fs *fs __attribute__((unused)),
  376. const char *uri __attribute__((unused)))
  377. {
  378. errno = ENOTSUP;
  379. return NULL;
  380. }
  381. static struct hadoop_err *ndfs_open_file_for_write(
  382. struct native_ro_file **out __attribute__((unused)),
  383. struct native_fs *fs __attribute__((unused)),
  384. const char *uri __attribute__((unused)),
  385. int buffer_size __attribute__((unused)),
  386. short replication __attribute__((unused)),
  387. tSize block_size __attribute__((unused)))
  388. {
  389. errno = ENOTSUP;
  390. return NULL;
  391. }
  392. static hdfsFile ndfs_open_file(hdfsFS bfs, const char* uri, int flags,
  393. int buffer_size, short replication, tSize block_size)
  394. {
  395. struct native_fs *fs = (struct native_fs *)bfs;
  396. struct native_ro_file *file = NULL;
  397. struct hadoop_err *err;
  398. int accmode;
  399. char *path = NULL;
  400. err = build_path(fs, uri, &path);
  401. if (err) {
  402. goto done;
  403. }
  404. accmode = flags & O_ACCMODE;
  405. if (accmode == O_RDONLY) {
  406. err = ndfs_open_file_for_read(&file, fs, path);
  407. } else if (accmode == O_WRONLY) {
  408. err = ndfs_open_file_for_write(&file, fs, path,
  409. buffer_size, replication, block_size);
  410. } else {
  411. err = hadoop_lerr_alloc(EINVAL, "cannot open a hadoop file in "
  412. "mode 0x%x\n", accmode);
  413. }
  414. done:
  415. free(path);
  416. return hadoopfs_errno_and_retptr(err, file);
  417. }
  418. static int ndfs_close_file(hdfsFS fs __attribute__((unused)),
  419. hdfsFile bfile __attribute__((unused)))
  420. {
  421. errno = ENOTSUP;
  422. return -1;
  423. }
  424. static int ndfs_file_exists(hdfsFS bfs, const char *uri)
  425. {
  426. static hdfsFileInfo *info;
  427. info = ndfs_get_path_info(bfs, uri);
  428. if (!info) {
  429. // errno will be set
  430. return -1;
  431. }
  432. hdfsFreeFileInfo(info, 1);
  433. return 0;
  434. }
  435. static int ndfs_seek(hdfsFS bfs __attribute__((unused)),
  436. hdfsFile bfile __attribute__((unused)),
  437. tOffset desiredPos __attribute__((unused)))
  438. {
  439. errno = ENOTSUP;
  440. return -1;
  441. }
  442. static tOffset ndfs_tell(hdfsFS bfs __attribute__((unused)),
  443. hdfsFile bfile __attribute__((unused)))
  444. {
  445. errno = ENOTSUP;
  446. return -1;
  447. }
  448. static tSize ndfs_read(hdfsFS bfs __attribute__((unused)),
  449. hdfsFile bfile __attribute__((unused)),
  450. void *buffer __attribute__((unused)),
  451. tSize length __attribute__((unused)))
  452. {
  453. errno = ENOTSUP;
  454. return -1;
  455. }
  456. static tSize ndfs_pread(hdfsFS bfs __attribute__((unused)),
  457. hdfsFile bfile __attribute__((unused)),
  458. tOffset position __attribute__((unused)),
  459. void* buffer __attribute__((unused)),
  460. tSize length __attribute__((unused)))
  461. {
  462. errno = ENOTSUP;
  463. return -1;
  464. }
  465. static tSize ndfs_write(hdfsFS bfs __attribute__((unused)),
  466. hdfsFile bfile __attribute__((unused)),
  467. const void* buffer __attribute__((unused)),
  468. tSize length __attribute__((unused)))
  469. {
  470. errno = ENOTSUP;
  471. return -1;
  472. }
  473. static int ndfs_flush(hdfsFS bfs __attribute__((unused)),
  474. hdfsFile bfile __attribute__((unused)))
  475. {
  476. errno = ENOTSUP;
  477. return -1;
  478. }
  479. static int ndfs_hflush(hdfsFS bfs __attribute__((unused)),
  480. hdfsFile bfile __attribute__((unused)))
  481. {
  482. errno = ENOTSUP;
  483. return -1;
  484. }
  485. static int ndfs_hsync(hdfsFS bfs __attribute__((unused)),
  486. hdfsFile bfile __attribute__((unused)))
  487. {
  488. errno = ENOTSUP;
  489. return -1;
  490. }
  491. static int ndfs_available(hdfsFS bfs __attribute__((unused)),
  492. hdfsFile bfile __attribute__((unused)))
  493. {
  494. errno = ENOTSUP;
  495. return -1;
  496. }
  497. static int ndfs_copy(hdfsFS srcFS __attribute__((unused)),
  498. const char* src __attribute__((unused)),
  499. hdfsFS dstFS __attribute__((unused)),
  500. const char* dst __attribute__((unused)))
  501. {
  502. errno = ENOTSUP;
  503. return -1;
  504. }
  505. static int ndfs_move(hdfsFS srcFS __attribute__((unused)),
  506. const char* src __attribute__((unused)),
  507. hdfsFS dstFS __attribute__((unused)),
  508. const char* dst __attribute__((unused)))
  509. {
  510. errno = ENOTSUP;
  511. return -1;
  512. }
  513. static int ndfs_unlink(struct hdfs_internal *bfs,
  514. const char *uri, int recursive)
  515. {
  516. struct native_fs *fs = (struct native_fs*)bfs;
  517. struct hadoop_err *err = NULL;
  518. DeleteRequestProto req = DELETE_REQUEST_PROTO__INIT;
  519. struct hrpc_proxy proxy;
  520. DeleteResponseProto *resp = NULL;
  521. char *path = NULL;
  522. ndfs_nn_proxy_init(fs, &proxy);
  523. err = build_path(fs, uri, &path);
  524. if (err) {
  525. goto done;
  526. }
  527. req.src = path;
  528. req.recursive = !!recursive;
  529. err = cnn_delete(&proxy, &req, &resp);
  530. if (err) {
  531. goto done;
  532. }
  533. done:
  534. free(path);
  535. if (resp) {
  536. delete_response_proto__free_unpacked(resp, NULL);
  537. }
  538. return hadoopfs_errno_and_retcode(err);
  539. }
  540. static int ndfs_rename(hdfsFS bfs, const char *src_uri, const char *dst_uri)
  541. {
  542. struct native_fs *fs = (struct native_fs*)bfs;
  543. struct hadoop_err *err = NULL;
  544. Rename2RequestProto req = RENAME2_REQUEST_PROTO__INIT;
  545. Rename2ResponseProto *resp = NULL;
  546. struct hrpc_proxy proxy;
  547. char *src_path = NULL, *dst_path = NULL;
  548. ndfs_nn_proxy_init(fs, &proxy);
  549. err = build_path(fs, src_uri, &src_path);
  550. if (err) {
  551. goto done;
  552. }
  553. err = build_path(fs, dst_uri, &dst_path);
  554. if (err) {
  555. goto done;
  556. }
  557. req.src = src_path;
  558. req.dst = dst_path;
  559. req.overwritedest = 0; // TODO: support overwrite
  560. err = cnn_rename2(&proxy, &req, &resp);
  561. if (err) {
  562. goto done;
  563. }
  564. done:
  565. free(src_path);
  566. free(dst_path);
  567. if (resp) {
  568. rename2_response_proto__free_unpacked(resp, NULL);
  569. }
  570. return hadoopfs_errno_and_retcode(err);
  571. }
  572. static char* ndfs_get_working_directory(hdfsFS bfs, char *buffer,
  573. size_t bufferSize)
  574. {
  575. size_t len;
  576. struct native_fs *fs = (struct native_fs *)bfs;
  577. struct hadoop_err *err = NULL;
  578. char *working_path = NULL;
  579. uv_mutex_lock(&fs->working_uri_lock);
  580. err = uri_get_path(fs->working_uri, &working_path);
  581. if (err) {
  582. err = hadoop_err_prepend(err, 0, "ndfs_get_working_directory: failed "
  583. "to get the path of the working_uri.");
  584. goto done;
  585. }
  586. len = strlen(working_path);
  587. if (len + 1 > bufferSize) {
  588. err = hadoop_lerr_alloc(ENAMETOOLONG, "ndfs_get_working_directory: "
  589. "the buffer supplied was only %zd bytes, but we would need "
  590. "%zd bytes to hold the working directory.",
  591. bufferSize, len + 1);
  592. goto done;
  593. }
  594. strcpy(buffer, working_path);
  595. done:
  596. uv_mutex_unlock(&fs->working_uri_lock);
  597. free(working_path);
  598. return hadoopfs_errno_and_retptr(err, buffer);
  599. }
  600. static int ndfs_set_working_directory(hdfsFS bfs, const char* uri_str)
  601. {
  602. struct native_fs *fs = (struct native_fs *)bfs;
  603. char *path = NULL;
  604. char *scheme = NULL;
  605. struct hadoop_err *err = NULL;
  606. UriParserStateA uri_state;
  607. UriUriA *uri = NULL;
  608. uv_mutex_lock(&fs->working_uri_lock);
  609. uri = calloc(1, sizeof(*uri));
  610. if (!uri) {
  611. err = hadoop_lerr_alloc(ENOMEM, "ndfs_set_working_directory: OOM");
  612. goto done;
  613. }
  614. err = uri_get_scheme(fs->working_uri, &scheme);
  615. if (err) {
  616. err = hadoop_err_prepend(err, ENOMEM, "ndfs_set_working_directory: "
  617. "failed to get scheme of current working_uri");
  618. goto done;
  619. }
  620. err = build_path(fs, uri_str, &path);
  621. if (err)
  622. goto done;
  623. err = uri_parse_abs(path, &uri_state, uri, scheme);
  624. if (err)
  625. goto done;
  626. uriFreeUriMembersA(fs->working_uri);
  627. free(fs->working_uri);
  628. fs->working_uri = uri;
  629. err = NULL;
  630. done:
  631. if (err) {
  632. free(uri);
  633. }
  634. uv_mutex_unlock(&fs->working_uri_lock);
  635. free(scheme);
  636. free(path);
  637. return hadoopfs_errno_and_retcode(err);
  638. }
  639. static int ndfs_mkdir(hdfsFS bfs, const char* uri)
  640. {
  641. struct native_fs *fs = (struct native_fs *)bfs;
  642. struct hadoop_err *err = NULL;
  643. MkdirsRequestProto req = MKDIRS_REQUEST_PROTO__INIT;
  644. MkdirsResponseProto *resp = NULL;
  645. struct hrpc_proxy proxy;
  646. char *path = NULL;
  647. ndfs_nn_proxy_init(fs, &proxy);
  648. err = build_path(fs, uri, &path);
  649. if (err) {
  650. goto done;
  651. }
  652. req.src = path;
  653. req.createparent = 1; // TODO: add libhdfs API for non-recursive mkdir
  654. err = cnn_mkdirs(&proxy, &req, &resp);
  655. if (err) {
  656. goto done;
  657. }
  658. if (!resp->result) {
  659. err = hadoop_lerr_alloc(EEXIST, "ndfs_mkdir(%s): a path "
  660. "component already exists as a non-directory.", path);
  661. goto done;
  662. }
  663. err = NULL;
  664. done:
  665. free(path);
  666. if (resp) {
  667. mkdirs_response_proto__free_unpacked(resp, NULL);
  668. }
  669. return hadoopfs_errno_and_retcode(err);
  670. }
  671. static int ndfs_set_replication(hdfsFS bfs, const char* uri,
  672. int16_t replication)
  673. {
  674. struct native_fs *fs = (struct native_fs *)bfs;
  675. struct hadoop_err *err = NULL;
  676. SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
  677. SetReplicationResponseProto *resp = NULL;
  678. struct hrpc_proxy proxy;
  679. char *path = NULL;
  680. ndfs_nn_proxy_init(fs, &proxy);
  681. err = build_path(fs, uri, &path);
  682. if (err) {
  683. goto done;
  684. }
  685. req.src = path;
  686. req.replication = replication;
  687. err = cnn_set_replication(&proxy, &req, &resp);
  688. if (err) {
  689. goto done;
  690. }
  691. if (!resp->result) {
  692. err = hadoop_lerr_alloc(EINVAL, "ndfs_set_replication(%s): path "
  693. "does not exist or is not a regular file.", path);
  694. goto done;
  695. }
  696. done:
  697. free(path);
  698. if (resp) {
  699. set_replication_response_proto__free_unpacked(resp, NULL);
  700. }
  701. return hadoopfs_errno_and_retcode(err);
  702. }
  703. static hdfsFileInfo* ndfs_list_directory(hdfsFS bfs __attribute__((unused)),
  704. const char* uri __attribute__((unused)),
  705. int *numEntries __attribute__((unused)))
  706. {
  707. errno = ENOTSUP;
  708. return NULL;
  709. }
  710. static hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs __attribute__((unused)),
  711. const char* uri __attribute__((unused)))
  712. {
  713. errno = ENOTSUP;
  714. return NULL;
  715. }
  716. char***
  717. ndfs_get_hosts(hdfsFS bfs __attribute__((unused)),
  718. const char* path __attribute__((unused)),
  719. tOffset start __attribute__((unused)),
  720. tOffset length __attribute__((unused)))
  721. {
  722. errno = ENOTSUP;
  723. return NULL;
  724. }
  725. static tOffset ndfs_get_default_block_size(hdfsFS bfs)
  726. {
  727. struct native_fs *fs = (struct native_fs *)bfs;
  728. return fs->default_block_size;
  729. }
  730. static tOffset ndfs_get_default_block_size_at_path(hdfsFS bfs,
  731. const char *uri)
  732. {
  733. struct native_fs *fs = (struct native_fs *)bfs;
  734. struct hadoop_err *err = NULL;
  735. GetPreferredBlockSizeRequestProto req =
  736. GET_PREFERRED_BLOCK_SIZE_REQUEST_PROTO__INIT;
  737. GetPreferredBlockSizeResponseProto *resp = NULL;
  738. struct hrpc_proxy proxy;
  739. tOffset ret = 0;
  740. char *path = NULL;
  741. ndfs_nn_proxy_init(fs, &proxy);
  742. err = build_path(fs, uri, &path);
  743. if (err) {
  744. goto done;
  745. }
  746. req.filename = path;
  747. err = cnn_get_preferred_block_size(&proxy, &req, &resp);
  748. if (err) {
  749. goto done;
  750. }
  751. ret = resp->bsize;
  752. err = NULL;
  753. done:
  754. free(path);
  755. if (resp) {
  756. get_preferred_block_size_response_proto__free_unpacked(resp, NULL);
  757. }
  758. if (err)
  759. return hadoopfs_errno_and_retcode(err);
  760. return ret;
  761. }
  762. static struct hadoop_err *ndfs_statvfs(struct hadoop_fs_base *hfs,
  763. struct hadoop_vfs_stats *stats)
  764. {
  765. struct native_fs *fs = (struct native_fs*)hfs;
  766. GetFsStatusRequestProto req = GET_FS_STATUS_REQUEST_PROTO__INIT;
  767. GetFsStatsResponseProto *resp = NULL;
  768. struct hadoop_err *err = NULL;
  769. struct hrpc_proxy proxy;
  770. ndfs_nn_proxy_init(fs, &proxy);
  771. err = cnn_get_fs_stats(&proxy, &req, &resp);
  772. if (err) {
  773. goto done;
  774. }
  775. stats->capacity = resp->capacity;
  776. stats->used = resp->used;
  777. stats->remaining = resp->remaining;
  778. stats->under_replicated = resp->under_replicated;
  779. stats->corrupt_blocks = resp->corrupt_blocks;
  780. stats->missing_blocks = resp->missing_blocks;
  781. done:
  782. if (resp) {
  783. get_fs_stats_response_proto__free_unpacked(resp, NULL);
  784. }
  785. return err;
  786. }
  787. static tOffset ndfs_get_capacity(hdfsFS bfs)
  788. {
  789. struct hadoop_err *err;
  790. struct hadoop_vfs_stats stats;
  791. err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
  792. if (err)
  793. return hadoopfs_errno_and_retcode(err);
  794. return stats.capacity;
  795. }
  796. static tOffset ndfs_get_used(hdfsFS bfs)
  797. {
  798. struct hadoop_err *err;
  799. struct hadoop_vfs_stats stats;
  800. err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
  801. if (err)
  802. return hadoopfs_errno_and_retcode(err);
  803. return stats.used;
  804. }
  805. static int ndfs_chown(hdfsFS bfs, const char* uri,
  806. const char *user, const char *group)
  807. {
  808. struct native_fs *fs = (struct native_fs *)bfs;
  809. struct hadoop_err *err = NULL;
  810. SetOwnerRequestProto req = SET_OWNER_REQUEST_PROTO__INIT;
  811. SetOwnerResponseProto *resp = NULL;
  812. struct hrpc_proxy proxy;
  813. char *path = NULL;
  814. ndfs_nn_proxy_init(fs, &proxy);
  815. err = build_path(fs, uri, &path);
  816. if (err) {
  817. goto done;
  818. }
  819. req.src = path;
  820. req.username = (char*)user;
  821. req.groupname = (char*)group;
  822. err = cnn_set_owner(&proxy, &req, &resp);
  823. if (err) {
  824. goto done;
  825. }
  826. done:
  827. free(path);
  828. if (resp) {
  829. set_owner_response_proto__free_unpacked(resp, NULL);
  830. }
  831. return hadoopfs_errno_and_retcode(err);
  832. }
  833. static int ndfs_chmod(hdfsFS bfs, const char* uri, short mode)
  834. {
  835. struct native_fs *fs = (struct native_fs *)bfs;
  836. FsPermissionProto perm = FS_PERMISSION_PROTO__INIT;
  837. SetPermissionRequestProto req = SET_PERMISSION_REQUEST_PROTO__INIT;
  838. SetPermissionResponseProto *resp = NULL;
  839. struct hadoop_err *err = NULL;
  840. struct hrpc_proxy proxy;
  841. char *path = NULL;
  842. ndfs_nn_proxy_init(fs, &proxy);
  843. err = build_path(fs, uri, &path);
  844. if (err) {
  845. goto done;
  846. }
  847. req.src = path;
  848. req.permission = &perm;
  849. perm.perm = mode;
  850. err = cnn_set_permission(&proxy, &req, &resp);
  851. if (err) {
  852. goto done;
  853. }
  854. done:
  855. free(path);
  856. if (resp) {
  857. set_permission_response_proto__free_unpacked(resp, NULL);
  858. }
  859. return hadoopfs_errno_and_retcode(err);
  860. }
  861. static int ndfs_utime(hdfsFS bfs, const char* uri,
  862. int64_t mtime, int64_t atime)
  863. {
  864. struct native_fs *fs = (struct native_fs *)bfs;
  865. SetTimesRequestProto req = SET_TIMES_REQUEST_PROTO__INIT ;
  866. SetTimesResponseProto *resp = NULL;
  867. struct hadoop_err *err = NULL;
  868. struct hrpc_proxy proxy;
  869. char *path = NULL;
  870. ndfs_nn_proxy_init(fs, &proxy);
  871. err = build_path(fs, uri, &path);
  872. if (err) {
  873. goto done;
  874. }
  875. req.src = path;
  876. // If mtime or atime are -1, that means "no change."
  877. // Otherwise, we need to multiply by 1000, to take into account the fact
  878. // that libhdfs times are in seconds, and HDFS times are in milliseconds.
  879. // It's unfortunate that libhdfs doesn't support the full millisecond
  880. // precision. We need to redo the API at some point.
  881. if (mtime < 0) {
  882. req.mtime = -1;
  883. } else {
  884. req.mtime = mtime;
  885. req.mtime *= 1000;
  886. }
  887. if (atime < 0) {
  888. req.atime = -1;
  889. } else {
  890. req.atime = atime;
  891. req.atime *= 1000;
  892. }
  893. err = cnn_set_times(&proxy, &req, &resp);
  894. if (err) {
  895. goto done;
  896. }
  897. done:
  898. free(path);
  899. if (resp) {
  900. set_times_response_proto__free_unpacked(resp, NULL);
  901. }
  902. return hadoopfs_errno_and_retcode(err);
  903. }
  904. static struct hadoopRzBuffer* ndfs_read_zero(
  905. hdfsFile bfile __attribute__((unused)),
  906. struct hadoopRzOptions *opts __attribute__((unused)),
  907. int32_t maxLength __attribute__((unused)))
  908. {
  909. errno = ENOTSUP;
  910. return NULL;
  911. }
  912. static void ndfs_rz_buffer_free(hdfsFile bfile __attribute__((unused)),
  913. struct hadoopRzBuffer *buffer __attribute__((unused)))
  914. {
  915. }
  916. int ndfs_file_uses_direct_read(hdfsFile bfile)
  917. {
  918. // Set the 'disable direct reads' flag so that old test harnesses designed
  919. // to test jniFS will run against NDFS. The flag doesn't do anything,
  920. // since all reads are always direct in NDFS.
  921. struct native_file_base *file = (struct native_file_base *)bfile;
  922. return (!(file->flags & NDFS_FILE_FLAG_DISABLE_DIRECT_READ));
  923. }
  924. void ndfs_file_disable_direct_read(hdfsFile bfile __attribute__((unused)))
  925. {
  926. struct native_file_base *file = (struct native_file_base *)bfile;
  927. file->flags |= NDFS_FILE_FLAG_DISABLE_DIRECT_READ;
  928. }
  929. const struct hadoop_fs_ops g_ndfs_ops = {
  930. .name = "ndfs",
  931. .file_is_open_for_read = ndfs_file_is_open_for_read,
  932. .file_is_open_for_write = ndfs_file_is_open_for_write,
  933. .get_read_statistics = ndfs_file_get_read_statistics,
  934. .connect = ndfs_connect,
  935. .disconnect = ndfs_disconnect,
  936. .open = ndfs_open_file,
  937. .close = ndfs_close_file,
  938. .exists = ndfs_file_exists,
  939. .seek = ndfs_seek,
  940. .tell = ndfs_tell,
  941. .read = ndfs_read,
  942. .pread = ndfs_pread,
  943. .write = ndfs_write,
  944. .flush = ndfs_flush,
  945. .hflush = ndfs_hflush,
  946. .hsync = ndfs_hsync,
  947. .available = ndfs_available,
  948. .copy = ndfs_copy,
  949. .move = ndfs_move,
  950. .unlink = ndfs_unlink,
  951. .rename = ndfs_rename,
  952. .get_working_directory = ndfs_get_working_directory,
  953. .set_working_directory = ndfs_set_working_directory,
  954. .mkdir = ndfs_mkdir,
  955. .set_replication = ndfs_set_replication,
  956. .list_directory = ndfs_list_directory,
  957. .get_path_info = ndfs_get_path_info,
  958. .get_hosts = ndfs_get_hosts,
  959. .get_default_block_size = ndfs_get_default_block_size,
  960. .get_default_block_size_at_path = ndfs_get_default_block_size_at_path,
  961. .get_capacity = ndfs_get_capacity,
  962. .get_used = ndfs_get_used,
  963. .chown = ndfs_chown,
  964. .chmod = ndfs_chmod,
  965. .utime = ndfs_utime,
  966. .read_zero = ndfs_read_zero,
  967. .rz_buffer_free = ndfs_rz_buffer_free,
  968. // test
  969. .file_uses_direct_read = ndfs_file_uses_direct_read,
  970. .file_disable_direct_read = ndfs_file_disable_direct_read,
  971. };
  972. // vim: ts=4:sw=4:tw=79:et