hdfs.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hdfspp/hdfspp.h"
  19. #include "fs/filesystem.h"
  20. #include "common/hdfs_configuration.h"
  21. #include "common/configuration_loader.h"
  22. #include <hdfs/hdfs.h>
  23. #include <hdfspp/hdfs_ext.h>
  24. #include <string>
  25. #include <cstring>
  26. #include <iostream>
  27. #include <algorithm>
  28. using namespace hdfs;
  29. /* Separate the handles used by the C api from the C++ API*/
  30. struct hdfs_internal {
  31. hdfs_internal(FileSystem *p) : filesystem_(p) {}
  32. hdfs_internal(std::unique_ptr<FileSystem> p)
  33. : filesystem_(std::move(p)) {}
  34. virtual ~hdfs_internal(){};
  35. FileSystem *get_impl() { return filesystem_.get(); }
  36. const FileSystem *get_impl() const { return filesystem_.get(); }
  37. private:
  38. std::unique_ptr<FileSystem> filesystem_;
  39. };
  40. struct hdfsFile_internal {
  41. hdfsFile_internal(FileHandle *p) : file_(p) {}
  42. hdfsFile_internal(std::unique_ptr<FileHandle> p) : file_(std::move(p)) {}
  43. virtual ~hdfsFile_internal(){};
  44. FileHandle *get_impl() { return file_.get(); }
  45. const FileHandle *get_impl() const { return file_.get(); }
  46. private:
  47. std::unique_ptr<FileHandle> file_;
  48. };
  49. /* Keep thread local copy of last error string */
  50. thread_local std::string errstr;
  51. /* Fetch last error that happened in this thread */
  52. void hdfsGetLastError(char *buf, int len) {
  53. if(nullptr == buf || len < 1) {
  54. return;
  55. }
  56. /* leave space for a trailing null */
  57. size_t copylen = std::min((size_t)errstr.size(), (size_t)len);
  58. if(copylen == (size_t)len) {
  59. copylen--;
  60. }
  61. strncpy(buf, errstr.c_str(), copylen);
  62. /* stick in null */
  63. buf[copylen] = 0;
  64. }
  65. struct hdfsBuilder {
  66. hdfsBuilder();
  67. hdfsBuilder(const char * directory);
  68. virtual ~hdfsBuilder() {}
  69. ConfigurationLoader loader;
  70. HdfsConfiguration config;
  71. std::string overrideHost;
  72. tPort overridePort; // 0 --> use default
  73. std::string user;
  74. static constexpr tPort kUseDefaultPort = 0;
  75. static constexpr tPort kDefaultPort = 8020;
  76. };
  77. /* Error handling with optional debug to stderr */
  78. static void ReportError(int errnum, const std::string & msg) {
  79. errno = errnum;
  80. errstr = msg;
  81. #ifdef LIBHDFSPP_C_API_ENABLE_DEBUG
  82. std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg
  83. << "\"" << std::endl;
  84. #else
  85. (void)msg;
  86. #endif
  87. }
  88. /* Convert Status wrapped error into appropriate errno and return code */
  89. static int Error(const Status &stat) {
  90. const char * default_message;
  91. int errnum;
  92. int code = stat.code();
  93. switch (code) {
  94. case Status::Code::kOk:
  95. return 0;
  96. case Status::Code::kInvalidArgument:
  97. errnum = EINVAL;
  98. default_message = "Invalid argument";
  99. break;
  100. case Status::Code::kResourceUnavailable:
  101. errnum = EAGAIN;
  102. default_message = "Resource temporarily unavailable";
  103. break;
  104. case Status::Code::kUnimplemented:
  105. errnum = ENOSYS;
  106. default_message = "Function not implemented";
  107. break;
  108. case Status::Code::kException:
  109. errnum = EINTR;
  110. default_message = "Exception raised";
  111. break;
  112. case Status::Code::kOperationCanceled:
  113. errnum = EINTR;
  114. default_message = "Operation canceled";
  115. break;
  116. case Status::Code::kPermissionDenied:
  117. errnum = EACCES;
  118. default_message = "Permission denied";
  119. break;
  120. default:
  121. errnum = ENOSYS;
  122. default_message = "Error: unrecognised code";
  123. }
  124. if (stat.ToString().empty())
  125. ReportError(errnum, default_message);
  126. else
  127. ReportError(errnum, stat.ToString());
  128. return -1;
  129. }
  130. static int ReportException(const std::exception & e)
  131. {
  132. return Error(Status::Exception("Uncaught exception", e.what()));
  133. }
  134. static int ReportCaughtNonException()
  135. {
  136. return Error(Status::Exception("Uncaught value not derived from std::exception", ""));
  137. }
  138. /* return false on failure */
  139. bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
  140. if (!fs) {
  141. ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
  142. return false;
  143. }
  144. if (!file) {
  145. ReportError(EBADF, "Cannot perform FS operations with null File handle.");
  146. return false;
  147. }
  148. return true;
  149. }
  150. /**
  151. * C API implementations
  152. **/
  153. int hdfsFileIsOpenForRead(hdfsFile file) {
  154. /* files can only be open for reads at the moment, do a quick check */
  155. if (file) {
  156. return 1; // Update implementation when we get file writing
  157. }
  158. return 0;
  159. }
  160. hdfsFS hdfsConnect(const char *nn, tPort port) {
  161. return hdfsConnectAsUser(nn, port, "");
  162. }
  163. hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
  164. try
  165. {
  166. std::string port_as_string = std::to_string(port);
  167. IoService * io_service = IoService::New();
  168. std::string user_name;
  169. if (user) {
  170. user_name = user;
  171. }
  172. FileSystem *fs = FileSystem::New(io_service, user_name, Options());
  173. if (!fs) {
  174. ReportError(ENODEV, "Could not create FileSystem object");
  175. return nullptr;
  176. }
  177. if (!fs->Connect(nn, port_as_string).ok()) {
  178. ReportError(ENODEV, "Unable to connect to NameNode.");
  179. // FileSystem's ctor might take ownership of the io_service; if it does,
  180. // it will null out the pointer
  181. if (io_service)
  182. delete io_service;
  183. delete fs;
  184. return nullptr;
  185. }
  186. return new hdfs_internal(fs);
  187. } catch (const std::exception & e) {
  188. ReportException(e);
  189. return nullptr;
  190. } catch (...) {
  191. ReportCaughtNonException();
  192. return nullptr;
  193. }
  194. }
  195. int hdfsDisconnect(hdfsFS fs) {
  196. try
  197. {
  198. if (!fs) {
  199. ReportError(ENODEV, "Cannot disconnect null FS handle.");
  200. return -1;
  201. }
  202. delete fs;
  203. return 0;
  204. } catch (const std::exception & e) {
  205. return ReportException(e);
  206. } catch (...) {
  207. return ReportCaughtNonException();
  208. }
  209. }
  210. hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
  211. short replication, tSize blocksize) {
  212. try
  213. {
  214. (void)flags;
  215. (void)bufferSize;
  216. (void)replication;
  217. (void)blocksize;
  218. if (!fs) {
  219. ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
  220. return nullptr;
  221. }
  222. FileHandle *f = nullptr;
  223. Status stat = fs->get_impl()->Open(path, &f);
  224. if (!stat.ok()) {
  225. Error(stat);
  226. return nullptr;
  227. }
  228. return new hdfsFile_internal(f);
  229. } catch (const std::exception & e) {
  230. ReportException(e);
  231. return nullptr;
  232. } catch (...) {
  233. ReportCaughtNonException();
  234. return nullptr;
  235. }
  236. }
  237. int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
  238. try
  239. {
  240. if (!CheckSystemAndHandle(fs, file)) {
  241. return -1;
  242. }
  243. delete file;
  244. return 0;
  245. } catch (const std::exception & e) {
  246. return ReportException(e);
  247. } catch (...) {
  248. return ReportCaughtNonException();
  249. }
  250. }
  251. tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
  252. tSize length) {
  253. try
  254. {
  255. if (!CheckSystemAndHandle(fs, file)) {
  256. return -1;
  257. }
  258. size_t len = length;
  259. Status stat = file->get_impl()->PositionRead(buffer, &len, position);
  260. if(!stat.ok()) {
  261. return Error(stat);
  262. }
  263. return (tSize)len;
  264. } catch (const std::exception & e) {
  265. return ReportException(e);
  266. } catch (...) {
  267. return ReportCaughtNonException();
  268. }
  269. }
  270. tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
  271. try
  272. {
  273. if (!CheckSystemAndHandle(fs, file)) {
  274. return -1;
  275. }
  276. size_t len = length;
  277. Status stat = file->get_impl()->Read(buffer, &len);
  278. if (!stat.ok()) {
  279. return Error(stat);
  280. }
  281. return (tSize)len;
  282. } catch (const std::exception & e) {
  283. return ReportException(e);
  284. } catch (...) {
  285. return ReportCaughtNonException();
  286. }
  287. }
  288. /* 0 on success, -1 on error*/
  289. int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
  290. try
  291. {
  292. if (!CheckSystemAndHandle(fs, file)) {
  293. return -1;
  294. }
  295. off_t desired = desiredPos;
  296. Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
  297. if (!stat.ok()) {
  298. return Error(stat);
  299. }
  300. return 0;
  301. } catch (const std::exception & e) {
  302. return ReportException(e);
  303. } catch (...) {
  304. return ReportCaughtNonException();
  305. }
  306. }
  307. tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
  308. try
  309. {
  310. if (!CheckSystemAndHandle(fs, file)) {
  311. return -1;
  312. }
  313. ssize_t offset = 0;
  314. Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
  315. if (!stat.ok()) {
  316. return Error(stat);
  317. }
  318. return offset;
  319. } catch (const std::exception & e) {
  320. return ReportException(e);
  321. } catch (...) {
  322. return ReportCaughtNonException();
  323. }
  324. }
  325. /* extended API */
  326. int hdfsCancel(hdfsFS fs, hdfsFile file) {
  327. try
  328. {
  329. if (!CheckSystemAndHandle(fs, file)) {
  330. return -1;
  331. }
  332. static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
  333. return 0;
  334. } catch (const std::exception & e) {
  335. return ReportException(e);
  336. } catch (...) {
  337. return ReportCaughtNonException();
  338. }
  339. }
  340. /*******************************************************************
  341. * BUILDER INTERFACE
  342. *******************************************************************/
  343. HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
  344. {
  345. optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
  346. if (result)
  347. {
  348. return result.value();
  349. }
  350. else
  351. {
  352. return loader.New<HdfsConfiguration>();
  353. }
  354. }
  355. hdfsBuilder::hdfsBuilder() : config(LoadDefault(loader)), overridePort(kUseDefaultPort)
  356. {
  357. }
  358. hdfsBuilder::hdfsBuilder(const char * directory) :
  359. config(loader.New<HdfsConfiguration>()), overridePort(kUseDefaultPort)
  360. {
  361. loader.SetSearchPath(directory);
  362. config = LoadDefault(loader);
  363. }
  364. struct hdfsBuilder *hdfsNewBuilder(void)
  365. {
  366. try
  367. {
  368. return new struct hdfsBuilder();
  369. } catch (const std::exception & e) {
  370. ReportException(e);
  371. return nullptr;
  372. } catch (...) {
  373. ReportCaughtNonException();
  374. return nullptr;
  375. }
  376. }
  377. void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
  378. {
  379. bld->overrideHost = nn;
  380. }
  381. void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
  382. {
  383. bld->overridePort = port;
  384. }
  385. void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
  386. {
  387. if (userName) {
  388. bld->user = userName;
  389. } else {
  390. bld->user = "";
  391. }
  392. }
  393. void hdfsFreeBuilder(struct hdfsBuilder *bld)
  394. {
  395. try
  396. {
  397. delete bld;
  398. } catch (const std::exception & e) {
  399. ReportException(e);
  400. } catch (...) {
  401. ReportCaughtNonException();
  402. }
  403. }
  404. int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
  405. const char *val)
  406. {
  407. try
  408. {
  409. optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
  410. if (newConfig)
  411. {
  412. bld->config = newConfig.value();
  413. return 0;
  414. }
  415. else
  416. {
  417. ReportError(EINVAL, "Could not change Builder value");
  418. return 1;
  419. }
  420. } catch (const std::exception & e) {
  421. return ReportException(e);
  422. } catch (...) {
  423. return ReportCaughtNonException();
  424. }
  425. }
  426. void hdfsConfStrFree(char *val)
  427. {
  428. free(val);
  429. }
  430. hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
  431. try
  432. {
  433. if (!bld->overrideHost.empty())
  434. {
  435. // TODO: pass rest of config once we get that done (HDFS-9556)
  436. tPort port = bld->overridePort;
  437. if (port == hdfsBuilder::kUseDefaultPort)
  438. {
  439. port = hdfsBuilder::kDefaultPort;
  440. }
  441. if (bld->user.empty())
  442. return hdfsConnect(bld->overrideHost.c_str(), port);
  443. else
  444. return hdfsConnectAsUser(bld->overrideHost.c_str(), port, bld->user.c_str());
  445. }
  446. else
  447. {
  448. //TODO: allow construction from default port once that is done (HDFS-9556)
  449. ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect");
  450. return nullptr;
  451. }
  452. } catch (const std::exception & e) {
  453. ReportException(e);
  454. return nullptr;
  455. } catch (...) {
  456. ReportCaughtNonException();
  457. return nullptr;
  458. }
  459. }
  460. int hdfsConfGetStr(const char *key, char **val)
  461. {
  462. try
  463. {
  464. hdfsBuilder builder;
  465. return hdfsBuilderConfGetStr(&builder, key, val);
  466. } catch (const std::exception & e) {
  467. return ReportException(e);
  468. } catch (...) {
  469. return ReportCaughtNonException();
  470. }
  471. }
  472. int hdfsConfGetInt(const char *key, int32_t *val)
  473. {
  474. try
  475. {
  476. hdfsBuilder builder;
  477. return hdfsBuilderConfGetInt(&builder, key, val);
  478. } catch (const std::exception & e) {
  479. return ReportException(e);
  480. } catch (...) {
  481. return ReportCaughtNonException();
  482. }
  483. }
  484. //
  485. // Extended builder interface
  486. //
  487. struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
  488. {
  489. try
  490. {
  491. return new struct hdfsBuilder(configDirectory);
  492. } catch (const std::exception & e) {
  493. ReportException(e);
  494. return nullptr;
  495. } catch (...) {
  496. ReportCaughtNonException();
  497. return nullptr;
  498. }
  499. }
  500. int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
  501. char **val)
  502. {
  503. try
  504. {
  505. optional<std::string> value = bld->config.Get(key);
  506. if (value)
  507. {
  508. size_t len = value->length() + 1;
  509. *val = static_cast<char *>(malloc(len));
  510. strncpy(*val, value->c_str(), len);
  511. }
  512. else
  513. {
  514. *val = nullptr;
  515. }
  516. return 0;
  517. } catch (const std::exception & e) {
  518. return ReportException(e);
  519. } catch (...) {
  520. return ReportCaughtNonException();
  521. }
  522. }
  523. // If we're running on a 32-bit platform, we might get 64-bit values that
  524. // don't fit in an int, and int is specified by the java hdfs.h interface
  525. bool isValidInt(int64_t value)
  526. {
  527. return (value >= std::numeric_limits<int>::min() &&
  528. value <= std::numeric_limits<int>::max());
  529. }
  530. int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
  531. {
  532. try
  533. {
  534. // Pull from default configuration
  535. optional<int64_t> value = bld->config.GetInt(key);
  536. if (value)
  537. {
  538. if (!isValidInt(*value))
  539. return 1;
  540. *val = *value;
  541. }
  542. // If not found, don't change val
  543. ReportError(EINVAL, "Could not get Builder value");
  544. return 0;
  545. } catch (const std::exception & e) {
  546. return ReportException(e);
  547. } catch (...) {
  548. return ReportCaughtNonException();
  549. }
  550. }