protobuf.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
  19. #define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
  20. #include "common/util.h"
  21. #include <boost/asio/read.hpp>
  22. #include <boost/asio/write.hpp>
  23. #include <boost/asio/buffer.hpp>
  24. #include <boost/system/error_code.hpp>
  25. #include <google/protobuf/message_lite.h>
  26. #include <google/protobuf/io/coded_stream.h>
  27. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  28. #include <cassert>
  29. namespace hdfs {
  30. namespace continuation {
  31. template <class Stream, size_t MaxMessageSize = 512>
  32. struct ReadDelimitedPBMessageContinuation : public Continuation {
  33. ReadDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
  34. ::google::protobuf::MessageLite *msg)
  35. : stream_(stream), msg_(msg) {}
  36. virtual void Run(const Next &next) override {
  37. namespace pbio = google::protobuf::io;
  38. auto handler = [this, next](const boost::system::error_code &ec, size_t) {
  39. Status status;
  40. if (ec) {
  41. status = ToStatus(ec);
  42. } else {
  43. pbio::ArrayInputStream as(&buf_[0], buf_.size());
  44. pbio::CodedInputStream is(&as);
  45. uint32_t size = 0;
  46. bool v = is.ReadVarint32(&size);
  47. assert(v);
  48. (void)v; //avoids unused variable warning
  49. is.PushLimit(size);
  50. msg_->Clear();
  51. v = msg_->MergeFromCodedStream(&is);
  52. assert(v);
  53. }
  54. next(status);
  55. };
  56. boost::asio::async_read(*stream_,
  57. boost::asio::buffer(buf_),
  58. std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
  59. std::placeholders::_1, std::placeholders::_2),
  60. handler);
  61. }
  62. private:
  63. size_t CompletionHandler(const boost::system::error_code &ec, size_t transferred) {
  64. if (ec) {
  65. return 0;
  66. }
  67. size_t offset = 0, len = 0;
  68. for (size_t i = 0; i + 1 < transferred && i < sizeof(int32_t); ++i) {
  69. len = (len << 7) | (buf_[i] & 0x7f);
  70. if ((uint8_t)buf_.at(i) < 0x80) {
  71. offset = i + 1;
  72. break;
  73. }
  74. }
  75. assert(offset + len < buf_.size() && "Message is too big");
  76. return offset ? len + offset - transferred : 1;
  77. }
  78. std::shared_ptr<Stream> stream_;
  79. ::google::protobuf::MessageLite *msg_;
  80. std::array<char, MaxMessageSize> buf_;
  81. };
  82. template <class Stream>
  83. struct WriteDelimitedPBMessageContinuation : Continuation {
  84. WriteDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
  85. const google::protobuf::MessageLite *msg)
  86. : stream_(stream), msg_(msg) {}
  87. virtual void Run(const Next &next) override {
  88. bool success = true;
  89. buf_ = SerializeDelimitedProtobufMessage(msg_, &success);
  90. if(!success) {
  91. next(Status::Error("Unable to serialize protobuf message."));
  92. return;
  93. }
  94. boost::asio::async_write(*stream_, boost::asio::buffer(buf_), [next](const boost::system::error_code &ec, size_t) { next(ToStatus(ec)); } );
  95. }
  96. private:
  97. std::shared_ptr<Stream> stream_;
  98. const google::protobuf::MessageLite *msg_;
  99. std::string buf_;
  100. };
  101. template <class Stream, size_t MaxMessageSize = 512>
  102. static inline Continuation *
  103. ReadDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
  104. return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
  105. msg);
  106. }
  107. template <class Stream>
  108. static inline Continuation *
  109. WriteDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
  110. return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
  111. }
  112. }
  113. }
  114. #endif