util.h 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIB_COMMON_UTIL_H_
  19. #define LIB_COMMON_UTIL_H_
  20. #include "hdfspp/status.h"
  21. #include "common/logging.h"
  22. #include <mutex>
  23. #include <memory>
  24. #include <string>
  25. #include <boost/asio/ip/tcp.hpp>
  26. #include <boost/system/error_code.hpp>
  27. #include <openssl/rand.h>
  28. #include <google/protobuf/io/coded_stream.h>
  29. namespace google {
  30. namespace protobuf {
  31. class MessageLite;
  32. }
  33. }
  34. namespace hdfs {
  35. // typedefs based on code that's repeated everywhere
  36. typedef std::lock_guard<std::mutex> mutex_guard;
  37. Status ToStatus(const boost::system::error_code &ec);
  38. // Determine size of buffer that needs to be allocated in order to serialize msg
  39. // in delimited format
  40. int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg);
  41. // Construct msg from the input held in the CodedInputStream
  42. // return false on failure, otherwise return true
  43. bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
  44. ::google::protobuf::MessageLite *msg);
  45. // Serialize msg into a delimited form (java protobuf compatible)
  46. // err, if not null, will be set to false on failure
  47. std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
  48. bool *err);
  49. std::string Base64Encode(const std::string &src);
  50. // Return a new high-entropy client name
  51. std::shared_ptr<std::string> GetRandomClientName();
  52. // Returns true if _someone_ is holding the lock (not necessarily this thread,
  53. // but a std::mutex doesn't track which thread is holding the lock)
  54. template<class T>
  55. bool lock_held(T & mutex) {
  56. bool result = !mutex.try_lock();
  57. if (!result)
  58. mutex.unlock();
  59. return result;
  60. }
  61. // Shutdown and close a socket safely; will check if the socket is open and
  62. // catch anything thrown by asio.
  63. // Returns a string containing error message on failure, otherwise an empty string.
  64. std::string SafeDisconnect(boost::asio::ip::tcp::socket *sock);
  65. // The following helper function is used for classes that look like the following:
  66. //
  67. // template <typename socket_like_object>
  68. // class ObjectThatHoldsSocket {
  69. // socket_like_object sock_;
  70. // void DoSomethingWithAsioTcpSocket();
  71. // }
  72. //
  73. // The trick here is that ObjectThatHoldsSocket may be templated on a mock socket
  74. // in mock tests. If you have a method that explicitly needs to call some asio
  75. // method unrelated to the mock test you need a way of making sure socket_like_object
  76. // is, in fact, an asio::ip::tcp::socket. Otherwise the mocks need to implement
  77. // lots of no-op boilerplate. This will return the value of the input param if
  78. // it's a asio socket, and nullptr if it's anything else.
  79. template <typename sock_t>
  80. inline boost::asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) {
  81. (void)s;
  82. return nullptr;
  83. }
  84. template<>
  85. inline boost::asio::ip::tcp::socket *get_asio_socket_ptr<boost::asio::ip::tcp::socket>
  86. (boost::asio::ip::tcp::socket *s) {
  87. return s;
  88. }
  89. //Check if the high bit is set
  90. bool IsHighBitSet(uint64_t num);
  91. // Provide a way to do an atomic swap on a callback.
  92. // SetCallback, AtomicSwapCallback, and GetCallback can only be called once each.
  93. // AtomicSwapCallback and GetCallback must only be called after SetCallback.
  94. //
  95. // We can't throw on error, and since the callback is templated it's tricky to
  96. // generate generic dummy callbacks. Complain loudly in the log and get good
  97. // test coverage. It shouldn't be too hard to avoid invalid states.
  98. template <typename CallbackType>
  99. class SwappableCallbackHolder {
  100. private:
  101. std::mutex state_lock_;
  102. CallbackType callback_;
  103. bool callback_set_ = false;
  104. bool callback_swapped_ = false;
  105. bool callback_accessed_ = false;
  106. public:
  107. bool IsCallbackSet() {
  108. mutex_guard swap_lock(state_lock_);
  109. return callback_set_;
  110. }
  111. bool IsCallbackAccessed() {
  112. mutex_guard swap_lock(state_lock_);
  113. return callback_accessed_;
  114. }
  115. bool SetCallback(const CallbackType& callback) {
  116. mutex_guard swap_lock(state_lock_);
  117. if(callback_set_ || callback_swapped_ || callback_accessed_) {
  118. LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
  119. return false;
  120. }
  121. callback_ = callback;
  122. callback_set_ = true;
  123. return true;
  124. }
  125. CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) {
  126. mutex_guard swap_lock(state_lock_);
  127. if(!callback_set_ || callback_swapped_) {
  128. LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.")
  129. swapped = false;
  130. } else if (callback_accessed_) {
  131. // Common case where callback has been invoked but caller may not know
  132. LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed");
  133. return callback_;
  134. }
  135. CallbackType old = callback_;
  136. callback_ = replacement;
  137. callback_swapped_ = true;
  138. swapped = true;
  139. return old;
  140. }
  141. CallbackType GetCallback() {
  142. mutex_guard swap_lock(state_lock_);
  143. if(!callback_set_ || callback_accessed_) {
  144. LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
  145. }
  146. callback_accessed_ = true;
  147. return callback_;
  148. }
  149. };
  150. }
  151. #endif