filesystem.cc 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "filesystem.h"
  19. #include "common/continuation/asio.h"
  20. #include "common/util.h"
  21. #include <asio/ip/tcp.hpp>
  22. #include <functional>
  23. #include <limits>
  24. #include <future>
  25. #include <tuple>
  26. namespace hdfs {
  27. static const char kNamenodeProtocol[] =
  28. "org.apache.hadoop.hdfs.protocol.ClientProtocol";
  29. static const int kNamenodeProtocolVersion = 1;
  30. using ::asio::ip::tcp;
  31. /*****************************************************************************
  32. * NAMENODE OPERATIONS
  33. ****************************************************************************/
  34. void NameNodeOperations::Connect(const std::string &server,
  35. const std::string &service,
  36. std::function<void(const Status &)> &handler) {
  37. using namespace asio_continuation;
  38. typedef std::vector<tcp::endpoint> State;
  39. auto m = Pipeline<State>::Create();
  40. m->Push(Resolve(io_service_, server, service,
  41. std::back_inserter(m->state())))
  42. .Push(Bind([this, m](const Continuation::Next &next) {
  43. engine_.Connect(m->state().front(), next);
  44. }));
  45. m->Run([this, handler](const Status &status, const State &) {
  46. if (status.ok()) {
  47. engine_.Start();
  48. }
  49. handler(status);
  50. });
  51. }
  52. void NameNodeOperations::GetBlockLocations(const std::string & path,
  53. std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
  54. {
  55. using ::hadoop::hdfs::GetBlockLocationsRequestProto;
  56. using ::hadoop::hdfs::GetBlockLocationsResponseProto;
  57. struct State {
  58. GetBlockLocationsRequestProto req;
  59. std::shared_ptr<GetBlockLocationsResponseProto> resp;
  60. };
  61. auto m = continuation::Pipeline<State>::Create();
  62. auto &req = m->state().req;
  63. req.set_src(path);
  64. req.set_offset(0);
  65. req.set_length(std::numeric_limits<long long>::max());
  66. m->state().resp.reset(new GetBlockLocationsResponseProto());
  67. State *s = &m->state();
  68. m->Push(continuation::Bind(
  69. [this, s](const continuation::Continuation::Next &next) {
  70. namenode_.GetBlockLocations(&s->req, s->resp, next);
  71. }));
  72. m->Run([this, handler](const Status &stat, const State &s) {
  73. if (stat.ok()) {
  74. auto file_info = std::make_shared<struct FileInfo>();
  75. auto locations = s.resp->locations();
  76. file_info->file_length_ = locations.filelength();
  77. for (const auto &block : locations.blocks()) {
  78. file_info->blocks_.push_back(block);
  79. }
  80. if (locations.has_lastblock() && locations.lastblock().b().numbytes()) {
  81. file_info->blocks_.push_back(locations.lastblock());
  82. }
  83. handler(stat, file_info);
  84. } else {
  85. handler(stat, nullptr);
  86. }
  87. });
  88. }
  89. /*****************************************************************************
  90. * FILESYSTEM BASE CLASS
  91. ****************************************************************************/
  92. void FileSystem::New(
  93. IoService *io_service, const Options &options, const std::string &server,
  94. const std::string &service,
  95. const std::function<void(const Status &, FileSystem *)> &handler) {
  96. FileSystemImpl *impl = new FileSystemImpl(io_service, options);
  97. impl->Connect(server, service, [impl, handler](const Status &stat) {
  98. if (stat.ok()) {
  99. handler(stat, impl);
  100. } else {
  101. delete impl;
  102. handler(stat, nullptr);
  103. }
  104. });
  105. }
  106. FileSystem * FileSystem::New(
  107. IoService *io_service, const Options &options, const std::string &server,
  108. const std::string &service) {
  109. auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
  110. std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
  111. auto callback = [callstate](const Status &s, FileSystem * fs) {
  112. callstate->set_value(std::make_tuple(s, fs));
  113. };
  114. New(io_service, options, server, service, callback);
  115. /* block until promise is set */
  116. auto returnstate = future.get();
  117. if (std::get<0>(returnstate).ok()) {
  118. return std::get<1>(returnstate);
  119. } else {
  120. return nullptr;
  121. }
  122. }
  123. /*****************************************************************************
  124. * FILESYSTEM IMPLEMENTATION
  125. ****************************************************************************/
  126. FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options)
  127. : io_service_(static_cast<IoServiceImpl *>(io_service)),
  128. nn_(&io_service_->io_service(), options,
  129. GetRandomClientName(), kNamenodeProtocol,
  130. kNamenodeProtocolVersion),
  131. client_name_(GetRandomClientName())
  132. {
  133. // Poor man's move
  134. io_service = nullptr;
  135. /* spawn background threads for asio delegation */
  136. unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
  137. for (unsigned int i = 0; i < threads; i++) {
  138. AddWorkerThread();
  139. }
  140. }
  141. FileSystemImpl::~FileSystemImpl() {
  142. /**
  143. * Note: IoService must be stopped before getting rid of worker threads.
  144. * Once worker threads are joined and deleted the service can be deleted.
  145. **/
  146. io_service_->Stop();
  147. worker_threads_.clear();
  148. io_service_.reset(nullptr);
  149. }
  150. void FileSystemImpl::Connect(const std::string &server,
  151. const std::string &service,
  152. std::function<void(const Status &)> &&handler) {
  153. /* IoService::New can return nullptr */
  154. if (!io_service_) {
  155. handler (Status::Error("Null IoService"));
  156. }
  157. nn_.Connect(server, service, handler);
  158. }
  159. Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
  160. /* synchronized */
  161. auto stat = std::make_shared<std::promise<Status>>();
  162. std::future<Status> future = stat->get_future();
  163. auto callback = [stat](const Status &s) {
  164. stat->set_value(s);
  165. };
  166. Connect(server, service, callback);
  167. /* block until promise is set */
  168. auto s = future.get();
  169. return s;
  170. }
  171. int FileSystemImpl::AddWorkerThread() {
  172. auto service_task = [](IoService *service) { service->Run(); };
  173. worker_threads_.push_back(
  174. WorkerPtr(new std::thread(service_task, io_service_.get())));
  175. return worker_threads_.size();
  176. }
  177. void FileSystemImpl::Open(
  178. const std::string &path,
  179. const std::function<void(const Status &, FileHandle *)> &handler) {
  180. nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
  181. handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_)
  182. : nullptr);
  183. });
  184. }
  185. Status FileSystemImpl::Open(const std::string &path,
  186. FileHandle **handle) {
  187. auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
  188. std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
  189. /* wrap async FileSystem::Open with promise to make it a blocking call */
  190. auto h = [callstate](const Status &s, FileHandle *is) {
  191. callstate->set_value(std::make_tuple(s, is));
  192. };
  193. Open(path, h);
  194. /* block until promise is set */
  195. auto returnstate = future.get();
  196. Status stat = std::get<0>(returnstate);
  197. FileHandle *file_handle = std::get<1>(returnstate);
  198. if (!stat.ok()) {
  199. delete file_handle;
  200. return stat;
  201. }
  202. if (!file_handle) {
  203. return stat;
  204. }
  205. *handle = file_handle;
  206. return stat;
  207. }
  208. }