util.h 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIB_COMMON_UTIL_H_
  19. #define LIB_COMMON_UTIL_H_
  20. #include "hdfspp/status.h"
  21. #include "common/logging.h"
  22. #include <mutex>
  23. #include <string>
  24. #include <asio/error_code.hpp>
  25. #include <openssl/rand.h>
  26. #include <google/protobuf/io/coded_stream.h>
  27. namespace google {
  28. namespace protobuf {
  29. class MessageLite;
  30. }
  31. }
  32. namespace hdfs {
  33. // typedefs based on code that's repeated everywhere
  34. typedef std::lock_guard<std::mutex> mutex_guard;
  35. Status ToStatus(const ::asio::error_code &ec);
  36. // Determine size of buffer that needs to be allocated in order to serialize msg
  37. // in delimited format
  38. int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg);
  39. // Construct msg from the input held in the CodedInputStream
  40. // return false on failure, otherwise return true
  41. bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
  42. ::google::protobuf::MessageLite *msg);
  43. // Serialize msg into a delimited form (java protobuf compatible)
  44. // err, if not null, will be set to false on failure
  45. std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
  46. bool *err);
  47. std::string Base64Encode(const std::string &src);
  48. // Return a new high-entropy client name
  49. std::string GetRandomClientName();
  50. // Returns true if _someone_ is holding the lock (not necessarily this thread,
  51. // but a std::mutex doesn't track which thread is holding the lock)
  52. template<class T>
  53. bool lock_held(T & mutex) {
  54. bool result = !mutex.try_lock();
  55. if (!result)
  56. mutex.unlock();
  57. return result;
  58. }
  59. // Shutdown and close a socket safely; will check if the socket is open and
  60. // catch anything thrown by asio.
  61. // Returns a string containing error message on failure, otherwise an empty string.
  62. std::string SafeDisconnect(asio::ip::tcp::socket *sock);
  63. // The following helper function is used for classes that look like the following:
  64. //
  65. // template <typename socket_like_object>
  66. // class ObjectThatHoldsSocket {
  67. // socket_like_object sock_;
  68. // void DoSomethingWithAsioTcpSocket();
  69. // }
  70. //
  71. // The trick here is that ObjectThatHoldsSocket may be templated on a mock socket
  72. // in mock tests. If you have a method that explicitly needs to call some asio
  73. // method unrelated to the mock test you need a way of making sure socket_like_object
  74. // is, in fact, an asio::ip::tcp::socket. Otherwise the mocks need to implement
  75. // lots of no-op boilerplate. This will return the value of the input param if
  76. // it's a asio socket, and nullptr if it's anything else.
  77. template <typename sock_t>
  78. inline asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) {
  79. (void)s;
  80. return nullptr;
  81. }
  82. template<>
  83. inline asio::ip::tcp::socket *get_asio_socket_ptr<asio::ip::tcp::socket>
  84. (asio::ip::tcp::socket *s) {
  85. return s;
  86. }
  87. //Check if the high bit is set
  88. bool IsHighBitSet(uint64_t num);
  89. // Provide a way to do an atomic swap on a callback.
  90. // SetCallback, AtomicSwapCallback, and GetCallback can only be called once each.
  91. // AtomicSwapCallback and GetCallback must only be called after SetCallback.
  92. //
  93. // We can't throw on error, and since the callback is templated it's tricky to
  94. // generate generic dummy callbacks. Complain loudly in the log and get good
  95. // test coverage. It shouldn't be too hard to avoid invalid states.
  96. template <typename CallbackType>
  97. class SwappableCallbackHolder {
  98. private:
  99. std::mutex state_lock_;
  100. CallbackType callback_;
  101. bool callback_set_ = false;
  102. bool callback_swapped_ = false;
  103. bool callback_accessed_ = false;
  104. public:
  105. bool IsCallbackSet() {
  106. mutex_guard swap_lock(state_lock_);
  107. return callback_set_;
  108. }
  109. bool IsCallbackAccessed() {
  110. mutex_guard swap_lock(state_lock_);
  111. return callback_accessed_;
  112. }
  113. bool SetCallback(const CallbackType& callback) {
  114. mutex_guard swap_lock(state_lock_);
  115. if(callback_set_ || callback_swapped_ || callback_accessed_) {
  116. LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
  117. return false;
  118. }
  119. callback_ = callback;
  120. callback_set_ = true;
  121. return true;
  122. }
  123. CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) {
  124. mutex_guard swap_lock(state_lock_);
  125. if(!callback_set_ || callback_swapped_) {
  126. LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.")
  127. swapped = false;
  128. } else if (callback_accessed_) {
  129. // Common case where callback has been invoked but caller may not know
  130. LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed");
  131. return callback_;
  132. }
  133. CallbackType old = callback_;
  134. callback_ = replacement;
  135. callback_swapped_ = true;
  136. swapped = true;
  137. return old;
  138. }
  139. CallbackType GetCallback() {
  140. mutex_guard swap_lock(state_lock_);
  141. if(!callback_set_ || callback_accessed_) {
  142. LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
  143. }
  144. callback_accessed_ = true;
  145. return callback_;
  146. }
  147. };
  148. }
  149. #endif