util.cc 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/util.h"
  19. #include "common/util_c.h"
  20. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  21. #include <exception>
  22. #include <sstream>
  23. #include <iostream>
  24. #include <iomanip>
  25. #include <thread>
  26. namespace hdfs {
  27. bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
  28. ::google::protobuf::MessageLite *msg) {
  29. uint32_t size = 0;
  30. in->ReadVarint32(&size);
  31. auto limit = in->PushLimit(size);
  32. bool res = msg->ParseFromCodedStream(in);
  33. in->PopLimit(limit);
  34. return res;
  35. }
  36. std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
  37. bool *err) {
  38. namespace pbio = ::google::protobuf::io;
  39. std::string buf;
  40. int size = msg->ByteSize();
  41. buf.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
  42. pbio::StringOutputStream ss(&buf);
  43. pbio::CodedOutputStream os(&ss);
  44. os.WriteVarint32(size);
  45. if(err)
  46. *err = msg->SerializeToCodedStream(&os);
  47. return buf;
  48. }
  49. std::string GetRandomClientName() {
  50. std::vector<unsigned char>buf(8);
  51. RAND_pseudo_bytes(&buf[0], 8);
  52. std::ostringstream oss;
  53. oss << "DFSClient_" << getpid() << "_" <<
  54. std::this_thread::get_id() << "_" <<
  55. std::setw(2) << std::hex << std::uppercase << std::setfill('0');
  56. for (unsigned char b: buf)
  57. oss << static_cast<unsigned>(b);
  58. return oss.str();
  59. }
  60. std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
  61. std::string err;
  62. if(sock && sock->is_open()) {
  63. /**
  64. * Even though we just checked that the socket is open it's possible
  65. * it isn't in a state where it can properly send or receive. If that's
  66. * the case asio will turn the underlying error codes from shutdown()
  67. * and close() into unhelpfully named std::exceptions. Due to the
  68. * relatively innocuous nature of most of these error codes it's better
  69. * to just catch and return a flag so the caller can log failure.
  70. **/
  71. try {
  72. sock->shutdown(asio::ip::tcp::socket::shutdown_both);
  73. } catch (const std::exception &e) {
  74. err = std::string("shutdown() threw") + e.what();
  75. }
  76. try {
  77. sock->close();
  78. } catch (const std::exception &e) {
  79. // don't append if shutdown() already failed, first failure is the useful one
  80. if(err.empty())
  81. err = std::string("close() threw") + e.what();
  82. }
  83. }
  84. return err;
  85. }
  86. bool IsHighBitSet(uint64_t num) {
  87. uint64_t firstBit = (uint64_t) 1 << 63;
  88. if (num & firstBit) {
  89. return true;
  90. } else {
  91. return false;
  92. }
  93. }
  94. }
  95. void ShutdownProtobufLibrary_C() {
  96. google::protobuf::ShutdownProtobufLibrary();
  97. }