|
@@ -18,7 +18,7 @@
|
|
|
|
|
|
#include "namenode_info.h"
|
|
#include "namenode_info.h"
|
|
|
|
|
|
-#include "common/continuation/asio.h"
|
|
|
|
|
|
+#include "common/util.h"
|
|
#include "common/logging.h"
|
|
#include "common/logging.h"
|
|
|
|
|
|
#include <sstream>
|
|
#include <sstream>
|
|
@@ -71,62 +71,107 @@ bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
|
|
|
|
+
|
|
|
|
+// RAII wrapper
|
|
|
|
+class ScopedResolver {
|
|
|
|
+ private:
|
|
|
|
+ ::asio::io_service *io_service_;
|
|
|
|
+ std::string host_;
|
|
|
|
+ std::string port_;
|
|
|
|
+ ::asio::ip::tcp::resolver::query query_;
|
|
|
|
+ ::asio::ip::tcp::resolver resolver_;
|
|
|
|
+ endpoint_vector endpoints_;
|
|
|
|
+
|
|
|
|
+ // Caller blocks on access if resolution isn't finished
|
|
|
|
+ std::shared_ptr<std::promise<Status>> result_status_;
|
|
|
|
+ public:
|
|
|
|
+ ScopedResolver(::asio::io_service *service, const std::string &host, const std::string &port) :
|
|
|
|
+ io_service_(service), host_(host), port_(port), query_(host, port), resolver_(*io_service_)
|
|
|
|
+ {
|
|
|
|
+ if(!io_service_)
|
|
|
|
+ LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service");
|
|
|
|
+ }
|
|
|
|
|
|
-std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) {
|
|
|
|
- using namespace asio_continuation;
|
|
|
|
|
|
+ ~ScopedResolver() {
|
|
|
|
+ resolver_.cancel();
|
|
|
|
+ }
|
|
|
|
|
|
- typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
|
|
|
|
- typedef Pipeline<endpoint_vector> resolve_pipeline_t;
|
|
|
|
|
|
+ bool BeginAsyncResolve() {
|
|
|
|
+ // result_status_ would only exist if this was previously called. Invalid state.
|
|
|
|
+ if(result_status_) {
|
|
|
|
+ LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: may only be called once per instance");
|
|
|
|
+ return false;
|
|
|
|
+ } else if(!io_service_) {
|
|
|
|
+ LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: null io_service");
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ // Now set up the promise, set it in async_resolve's callback
|
|
|
|
+ result_status_ = std::make_shared<std::promise<Status>>();
|
|
|
|
+
|
|
|
|
+ // Callback to pull a copy of endpoints out of resolver and set promise
|
|
|
|
+ auto callback = [this](const asio::error_code &ec, ::asio::ip::tcp::resolver::iterator out) {
|
|
|
|
+ if(!ec) {
|
|
|
|
+ std::copy(out, ::asio::ip::tcp::resolver::iterator(), std::back_inserter(endpoints_));
|
|
|
|
+ }
|
|
|
|
+ result_status_->set_value( ToStatus(ec) );
|
|
|
|
+ };
|
|
|
|
+ resolver_.async_resolve(query_, callback);
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
|
|
- std::vector<std::pair<resolve_pipeline_t*, std::shared_ptr<std::promise<Status>>>> pipelines;
|
|
|
|
- pipelines.reserve(nodes.size());
|
|
|
|
|
|
+ Status Join() {
|
|
|
|
+ if(!result_status_) {
|
|
|
|
+ std::ostringstream errmsg;
|
|
|
|
+ errmsg << "ScopedResolver@" << this << "Join invalid call: promise never set";
|
|
|
|
+ return Status::InvalidArgument(errmsg.str().c_str());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ std::future<Status> future_result = result_status_->get_future();
|
|
|
|
+ Status res = future_result.get();
|
|
|
|
+ return res;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ endpoint_vector GetEndpoints() {
|
|
|
|
+ // Explicitly return by value to decouple lifecycles.
|
|
|
|
+ return endpoints_;
|
|
|
|
+ }
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) {
|
|
|
|
+ std::vector< std::unique_ptr<ScopedResolver> > resolvers;
|
|
|
|
+ resolvers.reserve(nodes.size());
|
|
|
|
|
|
std::vector<ResolvedNamenodeInfo> resolved_info;
|
|
std::vector<ResolvedNamenodeInfo> resolved_info;
|
|
- // This must never reallocate once async ops begin
|
|
|
|
resolved_info.reserve(nodes.size());
|
|
resolved_info.reserve(nodes.size());
|
|
|
|
|
|
for(unsigned int i=0; i<nodes.size(); i++) {
|
|
for(unsigned int i=0; i<nodes.size(); i++) {
|
|
std::string host = nodes[i].get_host();
|
|
std::string host = nodes[i].get_host();
|
|
std::string port = nodes[i].get_port();
|
|
std::string port = nodes[i].get_port();
|
|
|
|
|
|
- ResolvedNamenodeInfo resolved;
|
|
|
|
- resolved = nodes[i];
|
|
|
|
- resolved_info.push_back(resolved);
|
|
|
|
-
|
|
|
|
- // build the pipeline
|
|
|
|
- resolve_pipeline_t *pipeline = resolve_pipeline_t::Create();
|
|
|
|
- auto resolve_step = Resolve(ioservice, host, port, std::back_inserter(pipeline->state()));
|
|
|
|
- pipeline->Push(resolve_step);
|
|
|
|
-
|
|
|
|
- // make a status associated with current pipeline
|
|
|
|
- std::shared_ptr<std::promise<Status>> active_stat = std::make_shared<std::promise<Status>>();
|
|
|
|
- pipelines.push_back(std::make_pair(pipeline, active_stat));
|
|
|
|
-
|
|
|
|
- pipeline->Run([i,active_stat, &resolved_info](const Status &s, const endpoint_vector &ends){
|
|
|
|
- resolved_info[i].endpoints = ends;
|
|
|
|
- active_stat->set_value(s);
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
|
|
+ resolvers.emplace_back(new ScopedResolver(ioservice, host, port));
|
|
|
|
+ resolvers[i]->BeginAsyncResolve();
|
|
}
|
|
}
|
|
|
|
|
|
// Join all async operations
|
|
// Join all async operations
|
|
- std::vector<ResolvedNamenodeInfo> return_set;
|
|
|
|
- for(unsigned int i=0; i<pipelines.size();i++) {
|
|
|
|
- std::shared_ptr<std::promise<Status>> promise = pipelines[i].second;
|
|
|
|
-
|
|
|
|
- std::future<Status> future = promise->get_future();
|
|
|
|
- Status stat = future.get();
|
|
|
|
-
|
|
|
|
- // Clear endpoints if we hit an error
|
|
|
|
- if(!stat.ok()) {
|
|
|
|
- LOG_WARN(kRPC, << "Unable to resolve endpoints for " << nodes[i].uri.str());
|
|
|
|
- resolved_info[i].endpoints.clear();
|
|
|
|
|
|
+ for(unsigned int i=0; i < resolvers.size(); i++) {
|
|
|
|
+ Status asyncReturnStatus = resolvers[i]->Join();
|
|
|
|
+
|
|
|
|
+ ResolvedNamenodeInfo info;
|
|
|
|
+ info = nodes[i];
|
|
|
|
+
|
|
|
|
+ if(asyncReturnStatus.ok()) {
|
|
|
|
+ // Copy out endpoints if things went well
|
|
|
|
+ info.endpoints = resolvers[i]->GetEndpoints();
|
|
|
|
+ } else {
|
|
|
|
+ LOG_ERROR(kAsyncRuntime, << "Unabled to resolve endpoints for host: " << nodes[i].get_host()
|
|
|
|
+ << " port: " << nodes[i].get_port());
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
|
|
+ resolved_info.push_back(info);
|
|
|
|
+ }
|
|
return resolved_info;
|
|
return resolved_info;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
}
|
|
}
|