123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #ifndef LIB_COMMON_UTIL_H_
- #define LIB_COMMON_UTIL_H_
- #include "hdfspp/status.h"
- #include "common/logging.h"
- #include <mutex>
- #include <memory>
- #include <string>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/system/error_code.hpp>
- #include <openssl/rand.h>
- #include <google/protobuf/io/coded_stream.h>
- namespace google {
- namespace protobuf {
- class MessageLite;
- }
- }
- namespace hdfs {
- // typedefs based on code that's repeated everywhere
- typedef std::lock_guard<std::mutex> mutex_guard;
- Status ToStatus(const boost::system::error_code &ec);
- // Determine size of buffer that needs to be allocated in order to serialize msg
- // in delimited format
- int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg);
- // Construct msg from the input held in the CodedInputStream
- // return false on failure, otherwise return true
- bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
- ::google::protobuf::MessageLite *msg);
- // Serialize msg into a delimited form (java protobuf compatible)
- // err, if not null, will be set to false on failure
- std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
- bool *err);
- std::string Base64Encode(const std::string &src);
- // Return a new high-entropy client name
- std::shared_ptr<std::string> GetRandomClientName();
- // Returns true if _someone_ is holding the lock (not necessarily this thread,
- // but a std::mutex doesn't track which thread is holding the lock)
- template<class T>
- bool lock_held(T & mutex) {
- bool result = !mutex.try_lock();
- if (!result)
- mutex.unlock();
- return result;
- }
- // Shutdown and close a socket safely; will check if the socket is open and
- // catch anything thrown by asio.
- // Returns a string containing error message on failure, otherwise an empty string.
- std::string SafeDisconnect(boost::asio::ip::tcp::socket *sock);
- // The following helper function is used for classes that look like the following:
- //
- // template <typename socket_like_object>
- // class ObjectThatHoldsSocket {
- // socket_like_object sock_;
- // void DoSomethingWithAsioTcpSocket();
- // }
- //
- // The trick here is that ObjectThatHoldsSocket may be templated on a mock socket
- // in mock tests. If you have a method that explicitly needs to call some asio
- // method unrelated to the mock test you need a way of making sure socket_like_object
- // is, in fact, an asio::ip::tcp::socket. Otherwise the mocks need to implement
- // lots of no-op boilerplate. This will return the value of the input param if
- // it's a asio socket, and nullptr if it's anything else.
- template <typename sock_t>
- inline boost::asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) {
- (void)s;
- return nullptr;
- }
- template<>
- inline boost::asio::ip::tcp::socket *get_asio_socket_ptr<boost::asio::ip::tcp::socket>
- (boost::asio::ip::tcp::socket *s) {
- return s;
- }
- //Check if the high bit is set
- bool IsHighBitSet(uint64_t num);
- // Provide a way to do an atomic swap on a callback.
- // SetCallback, AtomicSwapCallback, and GetCallback can only be called once each.
- // AtomicSwapCallback and GetCallback must only be called after SetCallback.
- //
- // We can't throw on error, and since the callback is templated it's tricky to
- // generate generic dummy callbacks. Complain loudly in the log and get good
- // test coverage. It shouldn't be too hard to avoid invalid states.
- template <typename CallbackType>
- class SwappableCallbackHolder {
- private:
- std::mutex state_lock_;
- CallbackType callback_;
- bool callback_set_ = false;
- bool callback_swapped_ = false;
- bool callback_accessed_ = false;
- public:
- bool IsCallbackSet() {
- mutex_guard swap_lock(state_lock_);
- return callback_set_;
- }
- bool IsCallbackAccessed() {
- mutex_guard swap_lock(state_lock_);
- return callback_accessed_;
- }
- bool SetCallback(const CallbackType& callback) {
- mutex_guard swap_lock(state_lock_);
- if(callback_set_ || callback_swapped_ || callback_accessed_) {
- LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
- return false;
- }
- callback_ = callback;
- callback_set_ = true;
- return true;
- }
- CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) {
- mutex_guard swap_lock(state_lock_);
- if(!callback_set_ || callback_swapped_) {
- LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.")
- swapped = false;
- } else if (callback_accessed_) {
- // Common case where callback has been invoked but caller may not know
- LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed");
- return callback_;
- }
- CallbackType old = callback_;
- callback_ = replacement;
- callback_swapped_ = true;
- swapped = true;
- return old;
- }
- CallbackType GetCallback() {
- mutex_guard swap_lock(state_lock_);
- if(!callback_set_ || callback_accessed_) {
- LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
- }
- callback_accessed_ = true;
- return callback_;
- }
- };
- }
- #endif
|