1
0

datanodeconnection.h 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
  19. #define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
  20. #include "common/hdfs_public_api.h"
  21. #include "common/async_stream.h"
  22. #include "ClientNamenodeProtocol.pb.h"
  23. #include "common/libhdfs_events_impl.h"
  24. #include "common/logging.h"
  25. #include "common/util.h"
  26. #include "common/new_delete.h"
  27. #include "asio.hpp"
  28. namespace hdfs {
  29. class DataNodeConnection : public AsyncStream {
  30. public:
  31. MEMCHECKED_CLASS(DataNodeConnection)
  32. std::string uuid_;
  33. std::unique_ptr<hadoop::common::TokenProto> token_;
  34. virtual ~DataNodeConnection();
  35. virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0;
  36. virtual void Cancel() = 0;
  37. };
  38. struct SocketDeleter {
  39. inline void operator()(asio::ip::tcp::socket *sock) {
  40. // Cancel may have already closed the socket.
  41. std::string err = SafeDisconnect(sock);
  42. if(!err.empty()) {
  43. LOG_WARN(kBlockReader, << "Error disconnecting socket: " << err);
  44. }
  45. delete sock;
  46. }
  47. };
  48. class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
  49. private:
  50. // held (briefly) while posting async ops to the asio task queue
  51. std::mutex state_lock_;
  52. public:
  53. std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
  54. std::array<asio::ip::tcp::endpoint, 1> endpoints_;
  55. std::string uuid_;
  56. LibhdfsEvents *event_handlers_;
  57. virtual ~DataNodeConnectionImpl();
  58. DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
  59. const hadoop::common::TokenProto *token,
  60. LibhdfsEvents *event_handlers);
  61. void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override;
  62. void Cancel() override;
  63. void async_read_some(const MutableBuffers &buf,
  64. std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
  65. override {
  66. event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
  67. mutex_guard state_lock(state_lock_);
  68. conn_->async_read_some(buf, handler);
  69. };
  70. void async_write_some(const ConstBuffers &buf,
  71. std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
  72. override {
  73. event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
  74. mutex_guard state_lock(state_lock_);
  75. conn_->async_write_some(buf, handler);
  76. }
  77. };
  78. }
  79. #endif