filesystem.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "filesystem.h"
  19. #include "common/continuation/asio.h"
  20. #include "common/util.h"
  21. #include "common/logging.h"
  22. #include <asio/ip/tcp.hpp>
  23. #include <functional>
  24. #include <limits>
  25. #include <future>
  26. #include <tuple>
  27. #include <iostream>
  28. #include <pwd.h>
  29. #define FMT_THIS_ADDR "this=" << (void*)this
  30. namespace hdfs {
  31. static const char kNamenodeProtocol[] =
  32. "org.apache.hadoop.hdfs.protocol.ClientProtocol";
  33. static const int kNamenodeProtocolVersion = 1;
  34. using ::asio::ip::tcp;
  35. static constexpr uint16_t kDefaultPort = 8020;
  36. /*****************************************************************************
  37. * NAMENODE OPERATIONS
  38. ****************************************************************************/
  39. void NameNodeOperations::Connect(const std::string &cluster_name,
  40. const std::string &server,
  41. const std::string &service,
  42. std::function<void(const Status &)> &&handler) {
  43. using namespace asio_continuation;
  44. typedef std::vector<tcp::endpoint> State;
  45. auto m = Pipeline<State>::Create();
  46. m->Push(Resolve(io_service_, server, service,
  47. std::back_inserter(m->state())))
  48. .Push(Bind([this, m, cluster_name](const Continuation::Next &next) {
  49. engine_.Connect(cluster_name, m->state(), next);
  50. }));
  51. m->Run([this, handler](const Status &status, const State &) {
  52. handler(status);
  53. });
  54. }
  55. void NameNodeOperations::GetBlockLocations(const std::string & path,
  56. std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
  57. {
  58. using ::hadoop::hdfs::GetBlockLocationsRequestProto;
  59. using ::hadoop::hdfs::GetBlockLocationsResponseProto;
  60. LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
  61. << FMT_THIS_ADDR << ", path=" << path << ", ...) called");
  62. struct State {
  63. GetBlockLocationsRequestProto req;
  64. std::shared_ptr<GetBlockLocationsResponseProto> resp;
  65. };
  66. auto m = continuation::Pipeline<State>::Create();
  67. auto &req = m->state().req;
  68. req.set_src(path);
  69. req.set_offset(0);
  70. req.set_length(std::numeric_limits<long long>::max());
  71. m->state().resp.reset(new GetBlockLocationsResponseProto());
  72. State *s = &m->state();
  73. m->Push(continuation::Bind(
  74. [this, s](const continuation::Continuation::Next &next) {
  75. namenode_.GetBlockLocations(&s->req, s->resp, next);
  76. }));
  77. m->Run([this, handler](const Status &stat, const State &s) {
  78. if (stat.ok()) {
  79. auto file_info = std::make_shared<struct FileInfo>();
  80. auto locations = s.resp->locations();
  81. file_info->file_length_ = locations.filelength();
  82. for (const auto &block : locations.blocks()) {
  83. file_info->blocks_.push_back(block);
  84. }
  85. if (locations.has_lastblock() && locations.lastblock().b().numbytes()) {
  86. file_info->blocks_.push_back(locations.lastblock());
  87. file_info->file_length_ += locations.lastblock().b().numbytes();
  88. }
  89. handler(stat, file_info);
  90. } else {
  91. handler(stat, nullptr);
  92. }
  93. });
  94. }
  95. void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
  96. engine_.SetFsEventCallback(callback);
  97. }
  98. /*****************************************************************************
  99. * FILESYSTEM BASE CLASS
  100. ****************************************************************************/
  101. FileSystem * FileSystem::New(
  102. IoService *&io_service, const std::string &user_name, const Options &options) {
  103. return new FileSystemImpl(io_service, user_name, options);
  104. }
  105. /*****************************************************************************
  106. * FILESYSTEM IMPLEMENTATION
  107. ****************************************************************************/
  108. const std::string get_effective_user_name(const std::string &user_name) {
  109. if (!user_name.empty())
  110. return user_name;
  111. // If no user name was provided, try the HADOOP_USER_NAME and USER environment
  112. // variables
  113. const char * env = getenv("HADOOP_USER_NAME");
  114. if (env) {
  115. return env;
  116. }
  117. env = getenv("USER");
  118. if (env) {
  119. return env;
  120. }
  121. // If running on POSIX, use the currently logged in user
  122. #if defined(_POSIX_VERSION)
  123. uid_t uid = geteuid();
  124. struct passwd *pw = getpwuid(uid);
  125. if (pw && pw->pw_name)
  126. {
  127. return pw->pw_name;
  128. }
  129. #endif
  130. return "unknown_user";
  131. }
  132. FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name,
  133. const Options &options)
  134. : options_(options),
  135. io_service_(static_cast<IoServiceImpl *>(io_service)),
  136. nn_(&io_service_->io_service(), options,
  137. GetRandomClientName(), get_effective_user_name(user_name), kNamenodeProtocol,
  138. kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
  139. bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
  140. event_handlers_(std::make_shared<LibhdfsEvents>())
  141. {
  142. LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
  143. << FMT_THIS_ADDR << ") called");
  144. // Poor man's move
  145. io_service = nullptr;
  146. /* spawn background threads for asio delegation */
  147. unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
  148. for (unsigned int i = 0; i < threads; i++) {
  149. AddWorkerThread();
  150. }
  151. }
  152. FileSystemImpl::~FileSystemImpl() {
  153. LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl("
  154. << FMT_THIS_ADDR << ") called");
  155. /**
  156. * Note: IoService must be stopped before getting rid of worker threads.
  157. * Once worker threads are joined and deleted the service can be deleted.
  158. **/
  159. io_service_->Stop();
  160. worker_threads_.clear();
  161. }
  162. void FileSystemImpl::Connect(const std::string &server,
  163. const std::string &service,
  164. const std::function<void(const Status &, FileSystem * fs)> &handler) {
  165. LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
  166. << ", server=" << server << ", service="
  167. << service << ") called");
  168. /* IoService::New can return nullptr */
  169. if (!io_service_) {
  170. handler (Status::Error("Null IoService"), this);
  171. }
  172. cluster_name_ = server + ":" + service;
  173. nn_.Connect(cluster_name_, server, service, [this, handler](const Status & s) {
  174. handler(s, this);
  175. });
  176. }
  177. Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
  178. LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
  179. << ", server=" << server << ", service=" << service << ") called");
  180. /* synchronized */
  181. auto stat = std::make_shared<std::promise<Status>>();
  182. std::future<Status> future = stat->get_future();
  183. auto callback = [stat](const Status &s, FileSystem *fs) {
  184. (void)fs;
  185. stat->set_value(s);
  186. };
  187. Connect(server, service, callback);
  188. /* block until promise is set */
  189. auto s = future.get();
  190. return s;
  191. }
  192. void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) {
  193. std::string scheme = options_.defaultFS.get_scheme();
  194. if (strcasecmp(scheme.c_str(), "hdfs") != 0) {
  195. std::string error_message;
  196. error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not supported";
  197. handler(Status::InvalidArgument(error_message.c_str()), nullptr);
  198. return;
  199. }
  200. std::string host = options_.defaultFS.get_host();
  201. if (host.empty()) {
  202. handler(Status::InvalidArgument("defaultFS must specify a hostname"), nullptr);
  203. return;
  204. }
  205. optional<uint16_t> port = options_.defaultFS.get_port();
  206. if (!port) {
  207. port = kDefaultPort;
  208. }
  209. std::string port_as_string = std::to_string(*port);
  210. Connect(host, port_as_string, handler);
  211. }
  212. Status FileSystemImpl::ConnectToDefaultFs() {
  213. auto stat = std::make_shared<std::promise<Status>>();
  214. std::future<Status> future = stat->get_future();
  215. auto callback = [stat](const Status &s, FileSystem *fs) {
  216. (void)fs;
  217. stat->set_value(s);
  218. };
  219. ConnectToDefaultFs(callback);
  220. /* block until promise is set */
  221. auto s = future.get();
  222. return s;
  223. }
  224. int FileSystemImpl::AddWorkerThread() {
  225. LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
  226. << FMT_THIS_ADDR << ") called."
  227. << " Existing thread count = " << worker_threads_.size());
  228. auto service_task = [](IoService *service) { service->Run(); };
  229. worker_threads_.push_back(
  230. WorkerPtr(new std::thread(service_task, io_service_.get())));
  231. return worker_threads_.size();
  232. }
  233. void FileSystemImpl::Open(
  234. const std::string &path,
  235. const std::function<void(const Status &, FileHandle *)> &handler) {
  236. LOG_INFO(kFileSystem, << "FileSystemImpl::Open("
  237. << FMT_THIS_ADDR << ", path="
  238. << path << ") called");
  239. nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
  240. handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_)
  241. : nullptr);
  242. });
  243. }
  244. Status FileSystemImpl::Open(const std::string &path,
  245. FileHandle **handle) {
  246. LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Open("
  247. << FMT_THIS_ADDR << ", path="
  248. << path << ") called");
  249. auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
  250. std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
  251. /* wrap async FileSystem::Open with promise to make it a blocking call */
  252. auto h = [callstate](const Status &s, FileHandle *is) {
  253. callstate->set_value(std::make_tuple(s, is));
  254. };
  255. Open(path, h);
  256. /* block until promise is set */
  257. auto returnstate = future.get();
  258. Status stat = std::get<0>(returnstate);
  259. FileHandle *file_handle = std::get<1>(returnstate);
  260. if (!stat.ok()) {
  261. delete file_handle;
  262. return stat;
  263. }
  264. if (!file_handle) {
  265. return stat;
  266. }
  267. *handle = file_handle;
  268. return stat;
  269. }
  270. void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
  271. // It is far too easy to destroy the filesystem (and thus the threadpool)
  272. // from within one of the worker threads, leading to a deadlock. Let's
  273. // provide some explicit protection.
  274. if(t->get_id() == std::this_thread::get_id()) {
  275. LOG_ERROR(kFileSystem, << "FileSystemImpl::WorkerDeleter::operator(treadptr="
  276. << t << ") : FATAL: Attempted to destroy a thread pool"
  277. "from within a callback of the thread pool!");
  278. }
  279. t->join();
  280. delete t;
  281. }
  282. void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
  283. if (event_handlers_) {
  284. event_handlers_->set_fs_callback(callback);
  285. nn_.SetFsEventCallback(callback);
  286. }
  287. }
  288. std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
  289. return event_handlers_;
  290. }
  291. }