/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef LIB_RPC_RPC_ENGINE_H_ #define LIB_RPC_RPC_ENGINE_H_ #include "hdfspp/options.h" #include "hdfspp/status.h" #include "common/auth_info.h" #include "common/retry_policy.h" #include "common/libhdfs_events_impl.h" #include "common/util.h" #include "common/continuation/asio.h" #include "common/logging.h" #include "common/new_delete.h" #include "common/namenode_info.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace hdfs { /* * NOTE ABOUT LOCKING MODELS * * To prevent deadlocks, anything that might acquire multiple locks must * acquire the lock on the RpcEngine first, then the RpcConnection. Callbacks * will never be called while holding any locks, so the components are free * to take locks when servicing a callback. * * An RpcRequest or RpcConnection should never call any methods on the RpcEngine * except for those that are exposed through the LockFreeRpcEngine interface. */ typedef const std::function RpcCallback; class LockFreeRpcEngine; class RpcConnection; class SaslProtocol; /* * Internal bookkeeping for an outstanding request from the consumer. * * Threading model: not thread-safe; should only be accessed from a single * thread at a time */ class Request { public: MEMCHECKED_CLASS(Request) typedef std::function Handler; Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, const std::string &request, Handler &&callback); Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, const ::google::protobuf::MessageLite *request, Handler &&callback); // Null request (with no actual message) used to track the state of an // initial Connect call Request(LockFreeRpcEngine *engine, Handler &&handler); int call_id() const { return call_id_; } std::string method_name() const { return method_name_; } ::asio::deadline_timer &timer() { return timer_; } int IncrementRetryCount() { return retry_count_++; } int IncrementFailoverCount(); void GetPacket(std::string *res) const; void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, const Status &status); int get_failover_count() {return failover_count_;} std::string GetDebugString() const; private: LockFreeRpcEngine *const engine_; const std::string method_name_; const int call_id_; ::asio::deadline_timer timer_; std::string payload_; const Handler handler_; int retry_count_; int failover_count_; }; /* * Encapsulates a persistent connection to the NameNode, and the sending of * RPC requests and evaluating their responses. * * Can have multiple RPC requests in-flight simultaneously, but they are * evaluated in-order on the server side in a blocking manner. * * Threading model: public interface is thread-safe * All handlers passed in to method calls will be called from an asio thread, * and will not be holding any internal RpcConnection locks. */ class RpcConnection : public std::enable_shared_from_this { public: MEMCHECKED_CLASS(RpcConnection) RpcConnection(LockFreeRpcEngine *engine); virtual ~RpcConnection(); // Note that a single server can have multiple endpoints - especially both // an ipv4 and ipv6 endpoint virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, const AuthInfo & auth_info, RpcCallback &handler) = 0; virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; virtual void Disconnect() = 0; void StartReading(); void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, std::shared_ptr<::google::protobuf::MessageLite> resp, const RpcCallback &handler); void AsyncRpc(const std::vector > & requests); // Enqueue requests before the connection is connected. Will be flushed // on connect void PreEnqueueRequests(std::vector> requests); // Put requests at the front of the current request queue void PrependRequests_locked(std::vector> requests); void SetEventHandlers(std::shared_ptr event_handlers); void SetClusterName(std::string cluster_name); LockFreeRpcEngine *engine() { return engine_; } ::asio::io_service &io_service(); protected: struct Response { enum ResponseState { kReadLength, kReadContent, kParseResponse, } state_; unsigned length_; std::vector data_; std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar; std::unique_ptr<::google::protobuf::io::CodedInputStream> in; Response() : state_(kReadLength), length_(0) {} }; // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected virtual void SendHandshake(RpcCallback &handler) = 0; void HandshakeComplete(const Status &s); void AuthComplete(const Status &s, const AuthInfo & new_auth_info); void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info); virtual void SendContext(RpcCallback &handler) = 0; void ContextComplete(const Status &s); virtual void OnSendCompleted(const ::asio::error_code &ec, size_t transferred) = 0; virtual void OnRecvCompleted(const ::asio::error_code &ec, size_t transferred) = 0; virtual void FlushPendingRequests()=0; // Synchronously write the next request void AsyncRpc_locked( const std::string &method_name, const ::google::protobuf::MessageLite *req, std::shared_ptr<::google::protobuf::MessageLite> resp, const RpcCallback &handler); void SendRpcRequests(const std::vector > & requests); void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time std::shared_ptr PrepareHandshakePacket(); std::shared_ptr PrepareContextPacket(); static std::string SerializeRpcRequest( const std::string &method_name, const ::google::protobuf::MessageLite *req); Status HandleRpcResponse(std::shared_ptr response); void HandleRpcTimeout(std::shared_ptr req, const ::asio::error_code &ec); void CommsError(const Status &status); void ClearAndDisconnect(const ::asio::error_code &ec); std::shared_ptr RemoveFromRunningQueue(int call_id); LockFreeRpcEngine *const engine_; std::shared_ptr current_response_state_; AuthInfo auth_info_; // Connection can have deferred connection, especially when we're pausing // during retry enum ConnectedState { kNotYetConnected, kConnecting, kHandshaking, kAuthenticating, kConnected, kDisconnected }; static std::string ToString(ConnectedState connected); ConnectedState connected_; // State machine for performing a SASL handshake std::shared_ptr sasl_protocol_; // The request being sent over the wire; will also be in requests_on_fly_ std::shared_ptr request_over_the_wire_; // Requests to be sent over the wire std::deque> pending_requests_; // Requests to be sent over the wire during authentication; not retried if // there is a connection error std::deque> auth_requests_; // Requests that are waiting for responses typedef std::unordered_map> RequestOnFlyMap; RequestOnFlyMap requests_on_fly_; std::shared_ptr event_handlers_; std::string cluster_name_; // Lock for mutable parts of this class that need to be thread safe std::mutex connection_state_lock_; friend class SaslProtocol; }; /* * These methods of the RpcEngine will never acquire locks, and are safe for * RpcConnections to call while holding a ConnectionLock. */ class LockFreeRpcEngine { public: MEMCHECKED_CLASS(LockFreeRpcEngine) /* Enqueues a CommsError without acquiring a lock*/ virtual void AsyncRpcCommsError(const Status &status, std::shared_ptr failedConnection, std::vector> pendingRequests) = 0; virtual const RetryPolicy * retry_policy() const = 0; virtual int NextCallId() = 0; virtual const std::string &client_name() const = 0; virtual const std::string &client_id() const = 0; virtual const std::string &user_name() const = 0; virtual const std::string &protocol_name() const = 0; virtual int protocol_version() const = 0; virtual ::asio::io_service &io_service() = 0; virtual const Options &options() const = 0; }; /* * Tracker gives the RpcEngine a quick way to use an endpoint that just * failed in order to lookup a set of endpoints for a failover node. * * Note: For now this only deals with 2 NameNodes, but that's the default * anyway. */ class HANamenodeTracker { public: HANamenodeTracker(const std::vector &servers, ::asio::io_service *ioservice, std::shared_ptr event_handlers_); virtual ~HANamenodeTracker(); bool is_enabled() const { return enabled_; } bool is_resolved() const { return resolved_; } // Get node opposite of the current one if possible (swaps active/standby) // Note: This will always mutate internal state. Use IsCurrentActive/Standby to // get info without changing state ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint); bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; private: // If HA should be enabled, according to our options and runtime info like # nodes provided bool enabled_; // If we were able to resolve at least 1 HA namenode bool resolved_; // Keep service in case a second round of DNS lookup is required ::asio::io_service *ioservice_; // Event handlers, for now this is the simplest place to catch all failover events // and push info out to client application. Possibly move into RPCEngine. std::shared_ptr event_handlers_; // Only support 1 active and 1 standby for now. ResolvedNamenodeInfo active_info_; ResolvedNamenodeInfo standby_info_; // Aquire when switching from active-standby std::mutex swap_lock_; }; /* * An engine for reliable communication with a NameNode. Handles connection, * retry, and (someday) failover of the requested messages. * * Threading model: thread-safe. All callbacks will be called back from * an asio pool and will not hold any internal locks */ class RpcEngine : public LockFreeRpcEngine { public: MEMCHECKED_CLASS(RpcEngine) enum { kRpcVersion = 9 }; enum { kCallIdAuthorizationFailed = -1, kCallIdInvalid = -2, kCallIdConnectionContext = -3, kCallIdPing = -4, kCallIdSasl = -33 }; RpcEngine(::asio::io_service *io_service, const Options &options, const std::string &client_name, const std::string &user_name, const char *protocol_name, int protocol_version); void Connect(const std::string & cluster_name, const std::vector servers, RpcCallback &handler); void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp, const std::function &handler); Status Rpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp); void Shutdown(); /* Enqueues a CommsError without acquiring a lock*/ void AsyncRpcCommsError(const Status &status, std::shared_ptr failedConnection, std::vector> pendingRequests) override; void RpcCommsError(const Status &status, std::shared_ptr failedConnection, std::vector> pendingRequests); const RetryPolicy * retry_policy() const override { return retry_policy_.get(); } int NextCallId() override { return ++call_id_; } void TEST_SetRpcConnection(std::shared_ptr conn); void TEST_SetRetryPolicy(std::unique_ptr policy); std::unique_ptr TEST_GenerateRetryPolicyUsingOptions(); const std::string &client_name() const override { return client_name_; } const std::string &client_id() const override { return client_id_; } const std::string &user_name() const override { return auth_info_.getUser(); } const std::string &protocol_name() const override { return protocol_name_; } int protocol_version() const override { return protocol_version_; } ::asio::io_service &io_service() override { return *io_service_; } const Options &options() const override { return options_; } static std::string GetRandomClientName(); void SetFsEventCallback(fs_event_callback callback); protected: std::shared_ptr conn_; std::shared_ptr InitializeConnection(); virtual std::shared_ptr NewConnection(); virtual std::unique_ptr MakeRetryPolicy(const Options &options); static std::string getRandomClientId(); // Remember all of the last endpoints in case we need to reconnect and retry std::vector<::asio::ip::tcp::endpoint> last_endpoints_; private: ::asio::io_service * const io_service_; const Options options_; const std::string client_name_; const std::string client_id_; const std::string protocol_name_; const int protocol_version_; std::unique_ptr retry_policy_; //null --> no retry AuthInfo auth_info_; std::string cluster_name_; std::atomic_int call_id_; ::asio::deadline_timer retry_timer; std::shared_ptr event_handlers_; std::mutex engine_state_lock_; // Keep endpoint info for all HA connections, a non-null ptr indicates // that HA info was found in the configuation. std::unique_ptr ha_persisted_info_; }; } #endif