hdfs.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hdfspp/hdfspp.h"
  19. #include "fs/filesystem.h"
  20. #include "common/hdfs_configuration.h"
  21. #include "common/configuration_loader.h"
  22. #include "common/logging.h"
  23. #include <hdfs/hdfs.h>
  24. #include <hdfspp/hdfs_ext.h>
  25. #include <string>
  26. #include <cstring>
  27. #include <iostream>
  28. #include <algorithm>
  29. #include <functional>
  30. using namespace hdfs;
  31. using std::experimental::nullopt;
  32. using namespace std::placeholders;
  33. static constexpr tPort kDefaultPort = 8020;
  34. /* Separate the handles used by the C api from the C++ API*/
  35. struct hdfs_internal {
  36. hdfs_internal(FileSystem *p) : filesystem_(p) {}
  37. hdfs_internal(std::unique_ptr<FileSystem> p)
  38. : filesystem_(std::move(p)) {}
  39. virtual ~hdfs_internal(){};
  40. FileSystem *get_impl() { return filesystem_.get(); }
  41. const FileSystem *get_impl() const { return filesystem_.get(); }
  42. private:
  43. std::unique_ptr<FileSystem> filesystem_;
  44. };
  45. struct hdfsFile_internal {
  46. hdfsFile_internal(FileHandle *p) : file_(p) {}
  47. hdfsFile_internal(std::unique_ptr<FileHandle> p) : file_(std::move(p)) {}
  48. virtual ~hdfsFile_internal(){};
  49. FileHandle *get_impl() { return file_.get(); }
  50. const FileHandle *get_impl() const { return file_.get(); }
  51. private:
  52. std::unique_ptr<FileHandle> file_;
  53. };
  54. /* Keep thread local copy of last error string */
  55. thread_local std::string errstr;
  56. /* Fetch last error that happened in this thread */
  57. void hdfsGetLastError(char *buf, int len) {
  58. if(nullptr == buf || len < 1) {
  59. return;
  60. }
  61. /* leave space for a trailing null */
  62. size_t copylen = std::min((size_t)errstr.size(), (size_t)len);
  63. if(copylen == (size_t)len) {
  64. copylen--;
  65. }
  66. strncpy(buf, errstr.c_str(), copylen);
  67. /* stick in null */
  68. buf[copylen] = 0;
  69. }
  70. /* Event callbacks for next open calls */
  71. thread_local std::experimental::optional<fs_event_callback> fsEventCallback;
  72. thread_local std::experimental::optional<file_event_callback> fileEventCallback;
  73. struct hdfsBuilder {
  74. hdfsBuilder();
  75. hdfsBuilder(const char * directory);
  76. virtual ~hdfsBuilder() {}
  77. ConfigurationLoader loader;
  78. HdfsConfiguration config;
  79. optional<std::string> overrideHost;
  80. optional<tPort> overridePort;
  81. optional<std::string> user;
  82. static constexpr tPort kUseDefaultPort = 0;
  83. };
  84. /* Error handling with optional debug to stderr */
  85. static void ReportError(int errnum, const std::string & msg) {
  86. errno = errnum;
  87. errstr = msg;
  88. #ifdef LIBHDFSPP_C_API_ENABLE_DEBUG
  89. std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg
  90. << "\"" << std::endl;
  91. #else
  92. (void)msg;
  93. #endif
  94. }
  95. /* Convert Status wrapped error into appropriate errno and return code */
  96. static int Error(const Status &stat) {
  97. const char * default_message;
  98. int errnum;
  99. int code = stat.code();
  100. switch (code) {
  101. case Status::Code::kOk:
  102. return 0;
  103. case Status::Code::kInvalidArgument:
  104. errnum = EINVAL;
  105. default_message = "Invalid argument";
  106. break;
  107. case Status::Code::kResourceUnavailable:
  108. errnum = EAGAIN;
  109. default_message = "Resource temporarily unavailable";
  110. break;
  111. case Status::Code::kUnimplemented:
  112. errnum = ENOSYS;
  113. default_message = "Function not implemented";
  114. break;
  115. case Status::Code::kException:
  116. errnum = EINTR;
  117. default_message = "Exception raised";
  118. break;
  119. case Status::Code::kOperationCanceled:
  120. errnum = EINTR;
  121. default_message = "Operation canceled";
  122. break;
  123. case Status::Code::kPermissionDenied:
  124. errnum = EACCES;
  125. default_message = "Permission denied";
  126. break;
  127. default:
  128. errnum = ENOSYS;
  129. default_message = "Error: unrecognised code";
  130. }
  131. if (stat.ToString().empty())
  132. ReportError(errnum, default_message);
  133. else
  134. ReportError(errnum, stat.ToString());
  135. return -1;
  136. }
  137. static int ReportException(const std::exception & e)
  138. {
  139. return Error(Status::Exception("Uncaught exception", e.what()));
  140. }
  141. static int ReportCaughtNonException()
  142. {
  143. return Error(Status::Exception("Uncaught value not derived from std::exception", ""));
  144. }
  145. bool CheckSystem(hdfsFS fs) {
  146. if (!fs) {
  147. ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
  148. return false;
  149. }
  150. return true;
  151. }
  152. /* return false on failure */
  153. bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
  154. if (!CheckSystem(fs))
  155. return false;
  156. if (!file) {
  157. ReportError(EBADF, "Cannot perform FS operations with null File handle.");
  158. return false;
  159. }
  160. return true;
  161. }
  162. /**
  163. * C API implementations
  164. **/
  165. int hdfsFileIsOpenForRead(hdfsFile file) {
  166. /* files can only be open for reads at the moment, do a quick check */
  167. if (file) {
  168. return 1; // Update implementation when we get file writing
  169. }
  170. return 0;
  171. }
  172. hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<std::string> user, const Options & options) {
  173. try
  174. {
  175. IoService * io_service = IoService::New();
  176. FileSystem *fs = FileSystem::New(io_service, user.value_or(""), options);
  177. if (!fs) {
  178. ReportError(ENODEV, "Could not create FileSystem object");
  179. return nullptr;
  180. }
  181. if (fsEventCallback) {
  182. fs->SetFsEventCallback(fsEventCallback.value());
  183. }
  184. Status status;
  185. if (nn || port) {
  186. if (!port) {
  187. port = kDefaultPort;
  188. }
  189. std::string port_as_string = std::to_string(*port);
  190. status = fs->Connect(nn.value_or(""), port_as_string);
  191. } else {
  192. status = fs->ConnectToDefaultFs();
  193. }
  194. if (!status.ok()) {
  195. Error(status);
  196. // FileSystem's ctor might take ownership of the io_service; if it does,
  197. // it will null out the pointer
  198. if (io_service)
  199. delete io_service;
  200. delete fs;
  201. return nullptr;
  202. }
  203. return new hdfs_internal(fs);
  204. } catch (const std::exception & e) {
  205. ReportException(e);
  206. return nullptr;
  207. } catch (...) {
  208. ReportCaughtNonException();
  209. return nullptr;
  210. }
  211. }
  212. hdfsFS hdfsConnect(const char *nn, tPort port) {
  213. return hdfsConnectAsUser(nn, port, "");
  214. }
  215. hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
  216. return doHdfsConnect(std::string(nn), port, std::string(user), Options());
  217. }
  218. int hdfsDisconnect(hdfsFS fs) {
  219. try
  220. {
  221. if (!fs) {
  222. ReportError(ENODEV, "Cannot disconnect null FS handle.");
  223. return -1;
  224. }
  225. delete fs;
  226. return 0;
  227. } catch (const std::exception & e) {
  228. return ReportException(e);
  229. } catch (...) {
  230. return ReportCaughtNonException();
  231. }
  232. }
  233. hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
  234. short replication, tSize blocksize) {
  235. try
  236. {
  237. (void)flags;
  238. (void)bufferSize;
  239. (void)replication;
  240. (void)blocksize;
  241. if (!fs) {
  242. ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
  243. return nullptr;
  244. }
  245. FileHandle *f = nullptr;
  246. Status stat = fs->get_impl()->Open(path, &f);
  247. if (!stat.ok()) {
  248. Error(stat);
  249. return nullptr;
  250. }
  251. return new hdfsFile_internal(f);
  252. } catch (const std::exception & e) {
  253. ReportException(e);
  254. return nullptr;
  255. } catch (...) {
  256. ReportCaughtNonException();
  257. return nullptr;
  258. }
  259. }
  260. int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
  261. try
  262. {
  263. if (!CheckSystemAndHandle(fs, file)) {
  264. return -1;
  265. }
  266. delete file;
  267. return 0;
  268. } catch (const std::exception & e) {
  269. return ReportException(e);
  270. } catch (...) {
  271. return ReportCaughtNonException();
  272. }
  273. }
  274. tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
  275. tSize length) {
  276. try
  277. {
  278. if (!CheckSystemAndHandle(fs, file)) {
  279. return -1;
  280. }
  281. size_t len = length;
  282. Status stat = file->get_impl()->PositionRead(buffer, &len, position);
  283. if(!stat.ok()) {
  284. return Error(stat);
  285. }
  286. return (tSize)len;
  287. } catch (const std::exception & e) {
  288. return ReportException(e);
  289. } catch (...) {
  290. return ReportCaughtNonException();
  291. }
  292. }
  293. tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
  294. try
  295. {
  296. if (!CheckSystemAndHandle(fs, file)) {
  297. return -1;
  298. }
  299. size_t len = length;
  300. Status stat = file->get_impl()->Read(buffer, &len);
  301. if (!stat.ok()) {
  302. return Error(stat);
  303. }
  304. return (tSize)len;
  305. } catch (const std::exception & e) {
  306. return ReportException(e);
  307. } catch (...) {
  308. return ReportCaughtNonException();
  309. }
  310. }
  311. /* 0 on success, -1 on error*/
  312. int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
  313. try
  314. {
  315. if (!CheckSystemAndHandle(fs, file)) {
  316. return -1;
  317. }
  318. off_t desired = desiredPos;
  319. Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
  320. if (!stat.ok()) {
  321. return Error(stat);
  322. }
  323. return 0;
  324. } catch (const std::exception & e) {
  325. return ReportException(e);
  326. } catch (...) {
  327. return ReportCaughtNonException();
  328. }
  329. }
  330. tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
  331. try
  332. {
  333. if (!CheckSystemAndHandle(fs, file)) {
  334. return -1;
  335. }
  336. ssize_t offset = 0;
  337. Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
  338. if (!stat.ok()) {
  339. return Error(stat);
  340. }
  341. return offset;
  342. } catch (const std::exception & e) {
  343. return ReportException(e);
  344. } catch (...) {
  345. return ReportCaughtNonException();
  346. }
  347. }
  348. /* extended API */
  349. int hdfsCancel(hdfsFS fs, hdfsFile file) {
  350. try
  351. {
  352. if (!CheckSystemAndHandle(fs, file)) {
  353. return -1;
  354. }
  355. static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
  356. return 0;
  357. } catch (const std::exception & e) {
  358. return ReportException(e);
  359. } catch (...) {
  360. return ReportCaughtNonException();
  361. }
  362. }
  363. int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out)
  364. {
  365. try
  366. {
  367. if (!CheckSystem(fs)) {
  368. return -1;
  369. }
  370. if (locations_out == nullptr) {
  371. ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations");
  372. return -2;
  373. }
  374. std::shared_ptr<FileBlockLocation> ppLocations;
  375. Status stat = fs->get_impl()->GetBlockLocations(path, &ppLocations);
  376. if (!stat.ok()) {
  377. return Error(stat);
  378. }
  379. hdfsBlockLocations *locations = new struct hdfsBlockLocations();
  380. (*locations_out) = locations;
  381. bzero(locations, sizeof(*locations));
  382. locations->fileLength = ppLocations->getFileLength();
  383. locations->isLastBlockComplete = ppLocations->isLastBlockComplete();
  384. locations->isUnderConstruction = ppLocations->isUnderConstruction();
  385. const std::vector<BlockLocation> & ppBlockLocations = ppLocations->getBlockLocations();
  386. locations->num_blocks = ppBlockLocations.size();
  387. locations->blocks = new struct hdfsBlockInfo[locations->num_blocks];
  388. for (size_t i=0; i < ppBlockLocations.size(); i++) {
  389. auto ppBlockLocation = ppBlockLocations[i];
  390. auto block = &locations->blocks[i];
  391. block->num_bytes = ppBlockLocation.getLength();
  392. block->start_offset = ppBlockLocation.getOffset();
  393. const std::vector<DNInfo> & ppDNInfos = ppBlockLocation.getDataNodes();
  394. block->num_locations = ppDNInfos.size();
  395. block->locations = new hdfsDNInfo[block->num_locations];
  396. for (size_t j=0; j < block->num_locations; j++) {
  397. auto ppDNInfo = ppDNInfos[j];
  398. auto dn_info = &block->locations[j];
  399. dn_info->xfer_port = ppDNInfo.getXferPort();
  400. dn_info->info_port = ppDNInfo.getInfoPort();
  401. dn_info->IPC_port = ppDNInfo.getIPCPort();
  402. dn_info->info_secure_port = ppDNInfo.getInfoSecurePort();
  403. char * buf;
  404. buf = new char[ppDNInfo.getHostname().size() + 1];
  405. strncpy(buf, ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size());
  406. dn_info->hostname = buf;
  407. buf = new char[ppDNInfo.getIPAddr().size() + 1];
  408. strncpy(buf, ppDNInfo.getIPAddr().c_str(), ppDNInfo.getIPAddr().size());
  409. dn_info->ip_address = buf;
  410. }
  411. }
  412. return 0;
  413. } catch (const std::exception & e) {
  414. return ReportException(e);
  415. } catch (...) {
  416. return ReportCaughtNonException();
  417. }
  418. }
  419. int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
  420. if (blockLocations == nullptr)
  421. return 0;
  422. for (size_t i=0; i < blockLocations->num_blocks; i++) {
  423. auto block = &blockLocations->blocks[i];
  424. for (size_t j=0; j < block->num_locations; j++) {
  425. auto location = &block->locations[j];
  426. delete[] location->hostname;
  427. delete[] location->ip_address;
  428. }
  429. }
  430. delete[] blockLocations->blocks;
  431. delete blockLocations;
  432. return 0;
  433. }
  434. /*******************************************************************
  435. * EVENT CALLBACKS
  436. *******************************************************************/
  437. const char * FS_NN_CONNECT_EVENT = hdfs::FS_NN_CONNECT_EVENT;
  438. const char * FS_NN_READ_EVENT = hdfs::FS_NN_READ_EVENT;
  439. const char * FS_NN_WRITE_EVENT = hdfs::FS_NN_WRITE_EVENT;
  440. const char * FILE_DN_CONNECT_EVENT = hdfs::FILE_DN_CONNECT_EVENT;
  441. const char * FILE_DN_READ_EVENT = hdfs::FILE_DN_READ_EVENT;
  442. const char * FILE_DN_WRITE_EVENT = hdfs::FILE_DN_WRITE_EVENT;
  443. event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
  444. int64_t cookie,
  445. const char * event,
  446. const char * cluster,
  447. int64_t value) {
  448. int result = handler(event, cluster, value, cookie);
  449. if (result == LIBHDFSPP_EVENT_OK) {
  450. return event_response::ok();
  451. }
  452. #ifndef NDEBUG
  453. if (result == DEBUG_SIMULATE_ERROR) {
  454. return event_response::test_err(Status::Error("Simulated error"));
  455. }
  456. #endif
  457. return event_response::ok();
  458. }
  459. event_response file_callback_glue(libhdfspp_file_event_callback handler,
  460. int64_t cookie,
  461. const char * event,
  462. const char * cluster,
  463. const char * file,
  464. int64_t value) {
  465. int result = handler(event, cluster, file, value, cookie);
  466. if (result == LIBHDFSPP_EVENT_OK) {
  467. return event_response::ok();
  468. }
  469. #ifndef NDEBUG
  470. if (result == DEBUG_SIMULATE_ERROR) {
  471. return event_response::test_err(Status::Error("Simulated error"));
  472. }
  473. #endif
  474. return event_response::ok();
  475. }
  476. int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
  477. {
  478. fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3);
  479. fsEventCallback = callback;
  480. return 0;
  481. }
  482. int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie)
  483. {
  484. file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4);
  485. fileEventCallback = callback;
  486. return 0;
  487. }
  488. /*******************************************************************
  489. * BUILDER INTERFACE
  490. *******************************************************************/
  491. HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
  492. {
  493. optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
  494. if (result)
  495. {
  496. return result.value();
  497. }
  498. else
  499. {
  500. return loader.New<HdfsConfiguration>();
  501. }
  502. }
  503. hdfsBuilder::hdfsBuilder() : config(loader.New<HdfsConfiguration>())
  504. {
  505. loader.SetDefaultSearchPath();
  506. config = LoadDefault(loader);
  507. }
  508. hdfsBuilder::hdfsBuilder(const char * directory) :
  509. config(loader.New<HdfsConfiguration>())
  510. {
  511. loader.SetSearchPath(directory);
  512. config = LoadDefault(loader);
  513. }
  514. struct hdfsBuilder *hdfsNewBuilder(void)
  515. {
  516. try
  517. {
  518. return new struct hdfsBuilder();
  519. } catch (const std::exception & e) {
  520. ReportException(e);
  521. return nullptr;
  522. } catch (...) {
  523. ReportCaughtNonException();
  524. return nullptr;
  525. }
  526. }
  527. void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
  528. {
  529. bld->overrideHost = std::string(nn);
  530. }
  531. void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
  532. {
  533. bld->overridePort = port;
  534. }
  535. void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
  536. {
  537. if (userName && *userName) {
  538. bld->user = std::string(userName);
  539. }
  540. }
  541. void hdfsFreeBuilder(struct hdfsBuilder *bld)
  542. {
  543. try
  544. {
  545. delete bld;
  546. } catch (const std::exception & e) {
  547. ReportException(e);
  548. } catch (...) {
  549. ReportCaughtNonException();
  550. }
  551. }
  552. int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
  553. const char *val)
  554. {
  555. try
  556. {
  557. optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
  558. if (newConfig)
  559. {
  560. bld->config = newConfig.value();
  561. return 0;
  562. }
  563. else
  564. {
  565. ReportError(EINVAL, "Could not change Builder value");
  566. return 1;
  567. }
  568. } catch (const std::exception & e) {
  569. return ReportException(e);
  570. } catch (...) {
  571. return ReportCaughtNonException();
  572. }
  573. }
  574. void hdfsConfStrFree(char *val)
  575. {
  576. free(val);
  577. }
  578. hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
  579. return doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
  580. }
  581. int hdfsConfGetStr(const char *key, char **val)
  582. {
  583. try
  584. {
  585. hdfsBuilder builder;
  586. return hdfsBuilderConfGetStr(&builder, key, val);
  587. } catch (const std::exception & e) {
  588. return ReportException(e);
  589. } catch (...) {
  590. return ReportCaughtNonException();
  591. }
  592. }
  593. int hdfsConfGetInt(const char *key, int32_t *val)
  594. {
  595. try
  596. {
  597. hdfsBuilder builder;
  598. return hdfsBuilderConfGetInt(&builder, key, val);
  599. } catch (const std::exception & e) {
  600. return ReportException(e);
  601. } catch (...) {
  602. return ReportCaughtNonException();
  603. }
  604. }
  605. //
  606. // Extended builder interface
  607. //
  608. struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
  609. {
  610. try
  611. {
  612. return new struct hdfsBuilder(configDirectory);
  613. } catch (const std::exception & e) {
  614. ReportException(e);
  615. return nullptr;
  616. } catch (...) {
  617. ReportCaughtNonException();
  618. return nullptr;
  619. }
  620. }
  621. int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
  622. char **val)
  623. {
  624. try
  625. {
  626. optional<std::string> value = bld->config.Get(key);
  627. if (value)
  628. {
  629. size_t len = value->length() + 1;
  630. *val = static_cast<char *>(malloc(len));
  631. strncpy(*val, value->c_str(), len);
  632. }
  633. else
  634. {
  635. *val = nullptr;
  636. }
  637. return 0;
  638. } catch (const std::exception & e) {
  639. return ReportException(e);
  640. } catch (...) {
  641. return ReportCaughtNonException();
  642. }
  643. }
  644. // If we're running on a 32-bit platform, we might get 64-bit values that
  645. // don't fit in an int, and int is specified by the java hdfs.h interface
  646. bool isValidInt(int64_t value)
  647. {
  648. return (value >= std::numeric_limits<int>::min() &&
  649. value <= std::numeric_limits<int>::max());
  650. }
  651. int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
  652. {
  653. try
  654. {
  655. // Pull from default configuration
  656. optional<int64_t> value = bld->config.GetInt(key);
  657. if (value)
  658. {
  659. if (!isValidInt(*value))
  660. return 1;
  661. *val = *value;
  662. }
  663. // If not found, don't change val
  664. ReportError(EINVAL, "Could not get Builder value");
  665. return 0;
  666. } catch (const std::exception & e) {
  667. return ReportException(e);
  668. } catch (...) {
  669. return ReportCaughtNonException();
  670. }
  671. }
  672. /**
  673. * Logging functions
  674. **/
  675. class CForwardingLogger : public LoggerInterface {
  676. public:
  677. CForwardingLogger() : callback_(nullptr) {};
  678. // Converts LogMessage into LogData, a POD type,
  679. // and invokes callback_ if it's not null.
  680. void Write(const LogMessage& msg);
  681. // pass in NULL to clear the hook
  682. void SetCallback(void (*callback)(LogData*));
  683. //return a copy, or null on failure.
  684. static LogData *CopyLogData(const LogData*);
  685. //free LogData allocated with CopyLogData
  686. static void FreeLogData(LogData*);
  687. private:
  688. void (*callback_)(LogData*);
  689. };
  690. /**
  691. * Plugin to forward message to a C function pointer
  692. **/
  693. void CForwardingLogger::Write(const LogMessage& msg) {
  694. if(!callback_)
  695. return;
  696. const std::string text = msg.MsgString();
  697. LogData data;
  698. data.level = msg.level();
  699. data.component = msg.component();
  700. data.msg = text.c_str();
  701. data.file_name = msg.file_name();
  702. data.file_line = msg.file_line();
  703. callback_(&data);
  704. }
  705. void CForwardingLogger::SetCallback(void (*callback)(LogData*)) {
  706. callback_ = callback;
  707. }
  708. LogData *CForwardingLogger::CopyLogData(const LogData *orig) {
  709. if(!orig)
  710. return nullptr;
  711. LogData *copy = (LogData*)malloc(sizeof(LogData));
  712. if(!copy)
  713. return nullptr;
  714. copy->level = orig->level;
  715. copy->component = orig->component;
  716. if(orig->msg)
  717. copy->msg = strdup(orig->msg);
  718. copy->file_name = orig->file_name;
  719. copy->file_line = orig->file_line;
  720. return copy;
  721. }
  722. void CForwardingLogger::FreeLogData(LogData *data) {
  723. if(!data)
  724. return;
  725. if(data->msg)
  726. free((void*)data->msg);
  727. // Inexpensive way to help catch use-after-free
  728. memset(data, 0, sizeof(LogData));
  729. free(data);
  730. }
  731. LogData *hdfsCopyLogData(LogData *data) {
  732. return CForwardingLogger::CopyLogData(data);
  733. }
  734. void hdfsFreeLogData(LogData *data) {
  735. CForwardingLogger::FreeLogData(data);
  736. }
  737. void hdfsSetLogFunction(void (*callback)(LogData*)) {
  738. CForwardingLogger *logger = new CForwardingLogger();
  739. logger->SetCallback(callback);
  740. LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger));
  741. }
  742. static bool IsLevelValid(int component) {
  743. if(component < HDFSPP_LOG_LEVEL_TRACE || component > HDFSPP_LOG_LEVEL_ERROR)
  744. return false;
  745. return true;
  746. }
  747. // should use __builtin_popcnt as optimization on some platforms
  748. static int popcnt(int val) {
  749. int bits = sizeof(val) * 8;
  750. int count = 0;
  751. for(int i=0; i<bits; i++) {
  752. if((val >> i) & 0x1)
  753. count++;
  754. }
  755. return count;
  756. }
  757. static bool IsComponentValid(int component) {
  758. if(component < HDFSPP_LOG_COMPONENT_UNKNOWN || component > HDFSPP_LOG_COMPONENT_FILESYSTEM)
  759. return false;
  760. if(popcnt(component) != 1)
  761. return false;
  762. return true;
  763. }
  764. int hdfsEnableLoggingForComponent(int component) {
  765. if(!IsComponentValid(component))
  766. return 1;
  767. LogManager::EnableLogForComponent(static_cast<LogSourceComponent>(component));
  768. return 0;
  769. }
  770. int hdfsDisableLoggingForComponent(int component) {
  771. if(!IsComponentValid(component))
  772. return 1;
  773. LogManager::DisableLogForComponent(static_cast<LogSourceComponent>(component));
  774. return 0;
  775. }
  776. int hdfsSetLoggingLevel(int level) {
  777. if(!IsLevelValid(level))
  778. return 1;
  779. LogManager::SetLogLevel(static_cast<LogLevel>(level));
  780. return 0;
  781. }