|
@@ -158,15 +158,17 @@ void Request::OnResponseArrived(pbio::CodedInputStream *is,
|
|
|
|
|
|
RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
|
|
|
: engine_(engine),
|
|
|
- connected_(false) {}
|
|
|
+ connected_(kNotYetConnected) {}
|
|
|
|
|
|
::asio::io_service &RpcConnection::io_service() {
|
|
|
return engine_->io_service();
|
|
|
}
|
|
|
|
|
|
void RpcConnection::StartReading() {
|
|
|
- io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
|
|
|
- ::asio::error_code(), 0));
|
|
|
+ auto shared_this = shared_from_this();
|
|
|
+ io_service().post([shared_this, this] () {
|
|
|
+ OnRecvCompleted(::asio::error_code(), 0);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
void RpcConnection::AsyncFlushPendingRequests() {
|
|
@@ -174,6 +176,8 @@ void RpcConnection::AsyncFlushPendingRequests() {
|
|
|
io_service().post([shared_this, this]() {
|
|
|
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
|
|
|
|
|
+ LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc called (connected=" << ToString(connected_) << ")");
|
|
|
+
|
|
|
if (!request_over_the_wire_) {
|
|
|
FlushPendingRequests();
|
|
|
}
|
|
@@ -281,40 +285,53 @@ void RpcConnection::AsyncRpc(
|
|
|
|
|
|
auto r = std::make_shared<Request>(engine_, method_name, req,
|
|
|
std::move(wrapped_handler));
|
|
|
- pending_requests_.push_back(r);
|
|
|
- FlushPendingRequests();
|
|
|
-}
|
|
|
|
|
|
-void RpcConnection::AsyncRawRpc(const std::string &method_name,
|
|
|
- const std::string &req,
|
|
|
- std::shared_ptr<std::string> resp,
|
|
|
- RpcCallback &&handler) {
|
|
|
- std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
|
|
+ if (connected_ == kDisconnected) {
|
|
|
+ // Oops. The connection failed _just_ before the engine got a chance
|
|
|
+ // to send it. Register it as a failure
|
|
|
+ Status status = Status::ResourceUnavailable("RpcConnection closed before send.");
|
|
|
+ auto r_vector = std::vector<std::shared_ptr<Request> > (1, r);
|
|
|
+ assert(r_vector[0].get() != nullptr);
|
|
|
|
|
|
- std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
|
|
- auto wrapped_handler = [shared_this, this, resp, handler](
|
|
|
- pbio::CodedInputStream *is, const Status &status) {
|
|
|
- if (status.ok()) {
|
|
|
- uint32_t size = 0;
|
|
|
- is->ReadVarint32(&size);
|
|
|
- auto limit = is->PushLimit(size);
|
|
|
- is->ReadString(resp.get(), limit);
|
|
|
- is->PopLimit(limit);
|
|
|
+ engine_->AsyncRpcCommsError(status, shared_from_this(), r_vector);
|
|
|
+ } else {
|
|
|
+ pending_requests_.push_back(r);
|
|
|
+
|
|
|
+ if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking
|
|
|
+ FlushPendingRequests();
|
|
|
}
|
|
|
- handler(status);
|
|
|
- };
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- auto r = std::make_shared<Request>(engine_, method_name, req,
|
|
|
- std::move(wrapped_handler));
|
|
|
- pending_requests_.push_back(r);
|
|
|
- FlushPendingRequests();
|
|
|
+void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests) {
|
|
|
+ std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
|
|
+ LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc[] called; connected=" << ToString(connected_));
|
|
|
+
|
|
|
+ if (connected_ == kDisconnected) {
|
|
|
+ // Oops. The connection failed _just_ before the engine got a chance
|
|
|
+ // to send it. Register it as a failure
|
|
|
+ Status status = Status::ResourceUnavailable("RpcConnection closed before send.");
|
|
|
+ engine_->AsyncRpcCommsError(status, shared_from_this(), requests);
|
|
|
+ } else {
|
|
|
+ pending_requests_.reserve(pending_requests_.size() + requests.size());
|
|
|
+ for (auto r: requests) {
|
|
|
+ pending_requests_.push_back(r);
|
|
|
+ }
|
|
|
+ if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking
|
|
|
+ FlushPendingRequests();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
void RpcConnection::PreEnqueueRequests(
|
|
|
std::vector<std::shared_ptr<Request>> requests) {
|
|
|
// Public method - acquire lock
|
|
|
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
|
|
- assert(!connected_);
|
|
|
+
|
|
|
+ LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called");
|
|
|
+
|
|
|
+ assert(connected_ == kNotYetConnected);
|
|
|
|
|
|
pending_requests_.insert(pending_requests_.end(), requests.begin(),
|
|
|
requests.end());
|
|
@@ -349,7 +366,7 @@ void RpcConnection::CommsError(const Status &status) {
|
|
|
std::make_move_iterator(pending_requests_.end()));
|
|
|
pending_requests_.clear();
|
|
|
|
|
|
- engine_->AsyncRpcCommsError(status, requestsToReturn);
|
|
|
+ engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn);
|
|
|
}
|
|
|
|
|
|
void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
|
|
@@ -379,4 +396,15 @@ std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
|
|
|
requests_on_fly_.erase(it);
|
|
|
return req;
|
|
|
}
|
|
|
+
|
|
|
+std::string RpcConnection::ToString(ConnectedState connected) {
|
|
|
+ switch(connected) {
|
|
|
+ case kNotYetConnected: return "NotYetConnected";
|
|
|
+ case kConnecting: return "Connecting";
|
|
|
+ case kConnected: return "Connected";
|
|
|
+ case kDisconnected: return "Disconnected";
|
|
|
+ default: return "Invalid ConnectedState";
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
}
|