|
@@ -34,9 +34,11 @@
|
|
|
|
|
|
namespace hdfs {
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
+template <class Socket>
|
|
|
class RpcConnectionImpl : public RpcConnection {
|
|
|
public:
|
|
|
+ MEMCHECKED_CLASS(RpcConnectionImpl);
|
|
|
+
|
|
|
RpcConnectionImpl(RpcEngine *engine);
|
|
|
virtual ~RpcConnectionImpl() override;
|
|
|
|
|
@@ -55,7 +57,7 @@ public:
|
|
|
virtual void FlushPendingRequests() override;
|
|
|
|
|
|
|
|
|
- NextLayer &next_layer() { return next_layer_; }
|
|
|
+ Socket &TEST_get_mutable_socket() { return socket_; }
|
|
|
|
|
|
void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
|
|
|
|
|
@@ -63,35 +65,34 @@ public:
|
|
|
const Options options_;
|
|
|
::asio::ip::tcp::endpoint current_endpoint_;
|
|
|
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
|
|
|
- NextLayer next_layer_;
|
|
|
+ Socket socket_;
|
|
|
::asio::deadline_timer connect_timer_;
|
|
|
|
|
|
void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
|
|
|
};
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
|
|
|
+template <class Socket>
|
|
|
+RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
|
|
|
: RpcConnection(engine),
|
|
|
options_(engine->options()),
|
|
|
- next_layer_(engine->io_service()),
|
|
|
+ socket_(engine->io_service()),
|
|
|
connect_timer_(engine->io_service())
|
|
|
{
|
|
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
|
|
|
+template <class Socket>
|
|
|
+RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
|
|
|
LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
|
|
|
|
|
|
- std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
|
|
if (pending_requests_.size() > 0)
|
|
|
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
|
|
|
if (requests_on_fly_.size() > 0)
|
|
|
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::Connect(
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::Connect(
|
|
|
const std::vector<::asio::ip::tcp::endpoint> &server,
|
|
|
const AuthInfo & auth_info,
|
|
|
RpcCallback &handler) {
|
|
@@ -109,8 +110,8 @@ void RpcConnectionImpl<NextLayer>::Connect(
|
|
|
this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::ConnectAndFlush(
|
|
|
const std::vector<::asio::ip::tcp::endpoint> &server) {
|
|
|
|
|
|
LOG_INFO(kRPC, << "ConnectAndFlush called");
|
|
@@ -139,7 +140,7 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
|
|
|
current_endpoint_ = first_endpoint;
|
|
|
|
|
|
auto shared_this = shared_from_this();
|
|
|
- next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
|
|
+ socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
|
|
ConnectComplete(ec, first_endpoint);
|
|
|
});
|
|
|
|
|
@@ -155,9 +156,9 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
|
|
|
- auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
|
|
|
+ auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
|
|
|
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
|
|
connect_timer_.cancel();
|
|
|
|
|
@@ -190,7 +191,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
|
|
|
});
|
|
|
} else {
|
|
|
LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
|
|
|
- std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_));
|
|
|
+ std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
|
|
|
if(!err.empty()) {
|
|
|
LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
|
|
|
}
|
|
@@ -202,7 +203,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
|
|
|
additional_endpoints_.erase(additional_endpoints_.begin());
|
|
|
current_endpoint_ = next_endpoint;
|
|
|
|
|
|
- next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
|
|
+ socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
|
|
ConnectComplete(ec, next_endpoint);
|
|
|
});
|
|
|
connect_timer_.expires_from_now(
|
|
@@ -219,8 +220,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
|
|
|
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
|
|
|
|
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
|
|
@@ -228,7 +229,7 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
|
|
|
|
|
|
auto shared_this = shared_from_this();
|
|
|
auto handshake_packet = PrepareHandshakePacket();
|
|
|
- ::asio::async_write(next_layer_, asio::buffer(*handshake_packet),
|
|
|
+ ::asio::async_write(socket_, asio::buffer(*handshake_packet),
|
|
|
[handshake_packet, handler, shared_this, this](
|
|
|
const ::asio::error_code &ec, size_t) {
|
|
|
Status status = ToStatus(ec);
|
|
@@ -236,15 +237,15 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) {
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
|
|
|
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
|
|
|
|
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
|
|
|
|
|
|
auto shared_this = shared_from_this();
|
|
|
auto context_packet = PrepareContextPacket();
|
|
|
- ::asio::async_write(next_layer_, asio::buffer(*context_packet),
|
|
|
+ ::asio::async_write(socket_, asio::buffer(*context_packet),
|
|
|
[context_packet, handler, shared_this, this](
|
|
|
const ::asio::error_code &ec, size_t) {
|
|
|
Status status = ToStatus(ec);
|
|
@@ -252,8 +253,8 @@ void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
|
|
|
size_t) {
|
|
|
using std::placeholders::_1;
|
|
|
using std::placeholders::_2;
|
|
@@ -271,8 +272,8 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
|
|
|
FlushPendingRequests();
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::FlushPendingRequests() {
|
|
|
using namespace ::std::placeholders;
|
|
|
|
|
|
// Lock should be held
|
|
@@ -335,7 +336,7 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
|
|
|
this->HandleRpcTimeout(timeout_req, ec);
|
|
|
});
|
|
|
|
|
|
- asio::async_write(next_layer_, asio::buffer(*payload),
|
|
|
+ asio::async_write(socket_, asio::buffer(*payload),
|
|
|
[shared_this, this, payload](const ::asio::error_code &ec,
|
|
|
size_t size) {
|
|
|
OnSendCompleted(ec, size);
|
|
@@ -352,8 +353,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
|
|
|
}
|
|
|
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec,
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
|
|
|
size_t) {
|
|
|
using std::placeholders::_1;
|
|
|
using std::placeholders::_2;
|
|
@@ -396,7 +397,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
|
|
|
auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_),
|
|
|
sizeof(current_response_state_->length_));
|
|
|
asio::async_read(
|
|
|
- next_layer_, buf,
|
|
|
+ socket_, buf,
|
|
|
[shared_this, this](const ::asio::error_code &ec, size_t size) {
|
|
|
OnRecvCompleted(ec, size);
|
|
|
});
|
|
@@ -405,7 +406,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
|
|
|
current_response_state_->length_ = ntohl(current_response_state_->length_);
|
|
|
current_response_state_->data_.resize(current_response_state_->length_);
|
|
|
asio::async_read(
|
|
|
- next_layer_, ::asio::buffer(current_response_state_->data_),
|
|
|
+ socket_, ::asio::buffer(current_response_state_->data_),
|
|
|
[shared_this, this](const ::asio::error_code &ec, size_t size) {
|
|
|
OnRecvCompleted(ec, size);
|
|
|
});
|
|
@@ -425,8 +426,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-template <class NextLayer>
|
|
|
-void RpcConnectionImpl<NextLayer>::Disconnect() {
|
|
|
+template <class Socket>
|
|
|
+void RpcConnectionImpl<Socket>::Disconnect() {
|
|
|
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
|
|
|
|
|
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
|
|
@@ -434,7 +435,7 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
|
|
|
request_over_the_wire_.reset();
|
|
|
if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
|
|
|
// Don't print out errors, we were expecting a disconnect here
|
|
|
- SafeDisconnect(get_asio_socket_ptr(&next_layer_));
|
|
|
+ SafeDisconnect(get_asio_socket_ptr(&socket_));
|
|
|
}
|
|
|
connected_ = kDisconnected;
|
|
|
}
|