hdfs.cc 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hdfspp/hdfspp.h"
  19. #include "fs/filesystem.h"
  20. #include "common/hdfs_configuration.h"
  21. #include "common/configuration_loader.h"
  22. #include <hdfs/hdfs.h>
  23. #include <hdfspp/hdfs_ext.h>
  24. #include <string>
  25. #include <cstring>
  26. #include <iostream>
  27. #include <algorithm>
  28. using namespace hdfs;
  29. /* Separate the handles used by the C api from the C++ API*/
  30. struct hdfs_internal {
  31. hdfs_internal(FileSystem *p) : filesystem_(p) {}
  32. hdfs_internal(std::unique_ptr<FileSystem> p)
  33. : filesystem_(std::move(p)) {}
  34. virtual ~hdfs_internal(){};
  35. FileSystem *get_impl() { return filesystem_.get(); }
  36. const FileSystem *get_impl() const { return filesystem_.get(); }
  37. private:
  38. std::unique_ptr<FileSystem> filesystem_;
  39. };
  40. struct hdfsFile_internal {
  41. hdfsFile_internal(FileHandle *p) : file_(p) {}
  42. hdfsFile_internal(std::unique_ptr<FileHandle> p) : file_(std::move(p)) {}
  43. virtual ~hdfsFile_internal(){};
  44. FileHandle *get_impl() { return file_.get(); }
  45. const FileHandle *get_impl() const { return file_.get(); }
  46. private:
  47. std::unique_ptr<FileHandle> file_;
  48. };
  49. /* Keep thread local copy of last error string */
  50. thread_local std::string errstr;
  51. /* Fetch last error that happened in this thread */
  52. void hdfsGetLastError(char *buf, int len) {
  53. if(nullptr == buf || len < 1 || errstr.empty()) {
  54. return;
  55. }
  56. /* leave space for a trailing null */
  57. size_t copylen = std::min((size_t)errstr.size(), (size_t)len);
  58. if(copylen == (size_t)len) {
  59. copylen--;
  60. }
  61. strncpy(buf, errstr.c_str(), copylen);
  62. /* stick in null */
  63. buf[copylen] = 0;
  64. }
  65. struct hdfsBuilder {
  66. hdfsBuilder();
  67. hdfsBuilder(const char * directory);
  68. virtual ~hdfsBuilder() {}
  69. ConfigurationLoader loader;
  70. HdfsConfiguration config;
  71. std::string overrideHost;
  72. tPort overridePort; // 0 --> use default
  73. static constexpr tPort kUseDefaultPort = 0;
  74. static constexpr tPort kDefaultPort = 8020;
  75. };
  76. /* Error handling with optional debug to stderr */
  77. static void ReportError(int errnum, std::string msg) {
  78. errno = errnum;
  79. errstr = msg;
  80. #ifdef LIBHDFSPP_C_API_ENABLE_DEBUG
  81. std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg
  82. << "\"" << std::endl;
  83. #else
  84. (void)msg;
  85. #endif
  86. }
  87. /* Convert Status wrapped error into appropriate errno and return code */
  88. static int Error(const Status &stat) {
  89. int code = stat.code();
  90. switch (code) {
  91. case Status::Code::kOk:
  92. return 0;
  93. case Status::Code::kInvalidArgument:
  94. ReportError(EINVAL, "Invalid argument");
  95. break;
  96. case Status::Code::kResourceUnavailable:
  97. ReportError(EAGAIN, "Resource temporarily unavailable");
  98. break;
  99. case Status::Code::kUnimplemented:
  100. ReportError(ENOSYS, "Function not implemented");
  101. break;
  102. case Status::Code::kException:
  103. ReportError(EINTR, "Exception raised");
  104. break;
  105. case Status::Code::kOperationCanceled:
  106. ReportError(EINTR, "Operation canceled");
  107. break;
  108. default:
  109. ReportError(ENOSYS, "Error: unrecognised code");
  110. }
  111. return -1;
  112. }
  113. /* return false on failure */
  114. bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
  115. if (!fs) {
  116. ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
  117. return false;
  118. }
  119. if (!file) {
  120. ReportError(EBADF, "Cannot perform FS operations with null File handle.");
  121. return false;
  122. }
  123. return true;
  124. }
  125. /**
  126. * C API implementations
  127. **/
  128. int hdfsFileIsOpenForRead(hdfsFile file) {
  129. /* files can only be open for reads at the moment, do a quick check */
  130. if (file) {
  131. return 1; // Update implementation when we get file writing
  132. }
  133. return 0;
  134. }
  135. hdfsFS hdfsConnect(const char *nn, tPort port) {
  136. std::string port_as_string = std::to_string(port);
  137. IoService * io_service = IoService::New();
  138. FileSystem *fs = FileSystem::New(io_service, Options());
  139. if (!fs) {
  140. return nullptr;
  141. }
  142. if (!fs->Connect(nn, port_as_string).ok()) {
  143. ReportError(ENODEV, "Unable to connect to NameNode.");
  144. // FileSystem's ctor might take ownership of the io_service; if it does,
  145. // it will null out the pointer
  146. if (io_service)
  147. delete io_service;
  148. delete fs;
  149. return nullptr;
  150. }
  151. return new hdfs_internal(fs);
  152. }
  153. int hdfsDisconnect(hdfsFS fs) {
  154. if (!fs) {
  155. ReportError(ENODEV, "Cannot disconnect null FS handle.");
  156. return -1;
  157. }
  158. delete fs;
  159. return 0;
  160. }
  161. hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
  162. short replication, tSize blocksize) {
  163. (void)flags;
  164. (void)bufferSize;
  165. (void)replication;
  166. (void)blocksize;
  167. if (!fs) {
  168. ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
  169. return nullptr;
  170. }
  171. FileHandle *f = nullptr;
  172. Status stat = fs->get_impl()->Open(path, &f);
  173. if (!stat.ok()) {
  174. return nullptr;
  175. }
  176. return new hdfsFile_internal(f);
  177. }
  178. int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
  179. if (!CheckSystemAndHandle(fs, file)) {
  180. return -1;
  181. }
  182. delete file;
  183. return 0;
  184. }
  185. tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
  186. tSize length) {
  187. if (!CheckSystemAndHandle(fs, file)) {
  188. return -1;
  189. }
  190. size_t len = length;
  191. Status stat = file->get_impl()->PositionRead(buffer, &len, position);
  192. if(!stat.ok()) {
  193. return Error(stat);
  194. }
  195. return (tSize)len;
  196. }
  197. tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
  198. if (!CheckSystemAndHandle(fs, file)) {
  199. return -1;
  200. }
  201. size_t len = length;
  202. Status stat = file->get_impl()->Read(buffer, &len);
  203. if (!stat.ok()) {
  204. return Error(stat);
  205. }
  206. return (tSize)len;
  207. }
  208. /* 0 on success, -1 on error*/
  209. int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
  210. if (!CheckSystemAndHandle(fs, file)) {
  211. return -1;
  212. }
  213. off_t desired = desiredPos;
  214. Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
  215. if (!stat.ok()) {
  216. return Error(stat);
  217. }
  218. return 0;
  219. }
  220. tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
  221. if (!CheckSystemAndHandle(fs, file)) {
  222. return -1;
  223. }
  224. ssize_t offset = 0;
  225. Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
  226. if (!stat.ok()) {
  227. return Error(stat);
  228. }
  229. return offset;
  230. }
  231. /* extended API */
  232. int hdfsCancel(hdfsFS fs, hdfsFile file) {
  233. if (!CheckSystemAndHandle(fs, file)) {
  234. return -1;
  235. }
  236. static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
  237. return 0;
  238. }
  239. /*******************************************************************
  240. * BUILDER INTERFACE
  241. *******************************************************************/
  242. HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
  243. {
  244. optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
  245. if (result)
  246. {
  247. return result.value();
  248. }
  249. else
  250. {
  251. return loader.New<HdfsConfiguration>();
  252. }
  253. }
  254. hdfsBuilder::hdfsBuilder() : config(LoadDefault(loader)), overridePort(kUseDefaultPort)
  255. {
  256. }
  257. hdfsBuilder::hdfsBuilder(const char * directory) :
  258. config(loader.New<HdfsConfiguration>()), overridePort(kUseDefaultPort)
  259. {
  260. loader.SetSearchPath(directory);
  261. config = LoadDefault(loader);
  262. }
  263. struct hdfsBuilder *hdfsNewBuilder(void)
  264. {
  265. return new struct hdfsBuilder();
  266. }
  267. void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
  268. {
  269. bld->overrideHost = nn;
  270. }
  271. void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
  272. {
  273. bld->overridePort = port;
  274. }
  275. void hdfsFreeBuilder(struct hdfsBuilder *bld)
  276. {
  277. delete bld;
  278. }
  279. int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
  280. const char *val)
  281. {
  282. optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
  283. if (newConfig)
  284. {
  285. bld->config = newConfig.value();
  286. return 0;
  287. }
  288. else
  289. {
  290. return 1;
  291. }
  292. }
  293. void hdfsConfStrFree(char *val)
  294. {
  295. free(val);
  296. }
  297. hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
  298. if (!bld->overrideHost.empty())
  299. {
  300. // TODO: pass rest of config once we get that done (HDFS-9556)
  301. tPort port = bld->overridePort;
  302. if (port == hdfsBuilder::kUseDefaultPort)
  303. {
  304. port = hdfsBuilder::kDefaultPort;
  305. }
  306. return hdfsConnect(bld->overrideHost.c_str(), port);
  307. }
  308. else
  309. {
  310. //TODO: allow construction from default port once that is done (HDFS-9556)
  311. ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect");
  312. return nullptr;
  313. }
  314. }
  315. int hdfsConfGetStr(const char *key, char **val)
  316. {
  317. hdfsBuilder builder;
  318. return hdfsBuilderConfGetStr(&builder, key, val);
  319. }
  320. int hdfsConfGetInt(const char *key, int32_t *val)
  321. {
  322. hdfsBuilder builder;
  323. return hdfsBuilderConfGetInt(&builder, key, val);
  324. }
  325. //
  326. // Extended builder interface
  327. //
  328. struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
  329. {
  330. return new struct hdfsBuilder(configDirectory);
  331. }
  332. int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
  333. char **val)
  334. {
  335. optional<std::string> value = bld->config.Get(key);
  336. if (value)
  337. {
  338. size_t len = value->length() + 1;
  339. *val = static_cast<char *>(malloc(len));
  340. strncpy(*val, value->c_str(), len);
  341. }
  342. else
  343. {
  344. *val = nullptr;
  345. }
  346. return 0;
  347. }
  348. // If we're running on a 32-bit platform, we might get 64-bit values that
  349. // don't fit in an int, and int is specified by the java hdfs.h interface
  350. bool isValidInt(int64_t value)
  351. {
  352. return (value >= std::numeric_limits<int>::min() &&
  353. value <= std::numeric_limits<int>::max());
  354. }
  355. int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
  356. {
  357. // Pull from default configuration
  358. optional<int64_t> value = bld->config.GetInt(key);
  359. if (value)
  360. {
  361. if (!isValidInt(*value))
  362. return 1;
  363. *val = *value;
  364. }
  365. // If not found, don't change val
  366. return 0;
  367. }