/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef LIB_RPC_RPC_CONNECTION_H #define LIB_RPC_RPC_CONNECTION_H /* * Encapsulates a persistent connection to the NameNode, and the sending of * RPC requests and evaluating their responses. * * Can have multiple RPC requests in-flight simultaneously, but they are * evaluated in-order on the server side in a blocking manner. * * Threading model: public interface is thread-safe * All handlers passed in to method calls will be called from an asio thread, * and will not be holding any internal RpcConnection locks. */ #include "request.h" #include "common/auth_info.h" #include "common/libhdfs_events_impl.h" #include "common/new_delete.h" #include "hdfspp/status.h" #include #include #include #include #include namespace hdfs { typedef const std::function RpcCallback; class LockFreeRpcEngine; class SaslProtocol; class RpcConnection : public std::enable_shared_from_this { public: MEMCHECKED_CLASS(RpcConnection) RpcConnection(LockFreeRpcEngine *engine); virtual ~RpcConnection(); // Note that a single server can have multiple endpoints - especially both // an ipv4 and ipv6 endpoint virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, const AuthInfo & auth_info, RpcCallback &handler) = 0; virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; virtual void Disconnect() = 0; void StartReading(); void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, std::shared_ptr<::google::protobuf::MessageLite> resp, const RpcCallback &handler); void AsyncRpc(const std::vector > & requests); // Enqueue requests before the connection is connected. Will be flushed // on connect void PreEnqueueRequests(std::vector> requests); // Put requests at the front of the current request queue void PrependRequests_locked(std::vector> requests); void SetEventHandlers(std::shared_ptr event_handlers); void SetClusterName(std::string cluster_name); void SetAuthInfo(const AuthInfo& auth_info); LockFreeRpcEngine *engine() { return engine_; } ::asio::io_service &io_service(); protected: struct Response { enum ResponseState { kReadLength, kReadContent, kParseResponse, } state_; unsigned length_; std::vector data_; std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar; std::unique_ptr<::google::protobuf::io::CodedInputStream> in; Response() : state_(kReadLength), length_(0) {} }; // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected virtual void SendHandshake(RpcCallback &handler) = 0; void HandshakeComplete(const Status &s); void AuthComplete(const Status &s, const AuthInfo & new_auth_info); void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info); virtual void SendContext(RpcCallback &handler) = 0; void ContextComplete(const Status &s); virtual void OnSendCompleted(const ::asio::error_code &ec, size_t transferred) = 0; virtual void OnRecvCompleted(const ::asio::error_code &ec, size_t transferred) = 0; virtual void FlushPendingRequests()=0; // Synchronously write the next request void AsyncRpc_locked( const std::string &method_name, const ::google::protobuf::MessageLite *req, std::shared_ptr<::google::protobuf::MessageLite> resp, const RpcCallback &handler); void SendRpcRequests(const std::vector > & requests); void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time std::shared_ptr PrepareHandshakePacket(); std::shared_ptr PrepareContextPacket(); static std::string SerializeRpcRequest(const std::string &method_name, const ::google::protobuf::MessageLite *req); Status HandleRpcResponse(std::shared_ptr response); void HandleRpcTimeout(std::shared_ptr req, const ::asio::error_code &ec); void CommsError(const Status &status); void ClearAndDisconnect(const ::asio::error_code &ec); std::shared_ptr RemoveFromRunningQueue(int call_id); LockFreeRpcEngine *const engine_; std::shared_ptr current_response_state_; AuthInfo auth_info_; // Connection can have deferred connection, especially when we're pausing // during retry enum ConnectedState { kNotYetConnected, kConnecting, kHandshaking, kAuthenticating, kConnected, kDisconnected }; static std::string ToString(ConnectedState connected); ConnectedState connected_; // State machine for performing a SASL handshake std::shared_ptr sasl_protocol_; // The request being sent over the wire; will also be in requests_on_fly_ std::shared_ptr request_over_the_wire_; // Requests to be sent over the wire std::deque> pending_requests_; // Requests to be sent over the wire during authentication; not retried if // there is a connection error std::deque> auth_requests_; // Requests that are waiting for responses typedef std::unordered_map> RequestOnFlyMap; RequestOnFlyMap requests_on_fly_; std::shared_ptr event_handlers_; std::string cluster_name_; // Lock for mutable parts of this class that need to be thread safe std::mutex connection_state_lock_; friend class SaslProtocol; }; } // end namespace hdfs #endif // end include Guard