datanodeconnection.cc 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "datanodeconnection.h"
  19. #include "common/util.h"
  20. namespace hdfs {
  21. DataNodeConnection::~DataNodeConnection(){}
  22. DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
  23. DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
  24. const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
  25. const hadoop::common::TokenProto *token,
  26. LibhdfsEvents *event_handlers) : event_handlers_(event_handlers)
  27. {
  28. using namespace ::asio::ip;
  29. conn_.reset(new tcp::socket(*io_service));
  30. auto datanode_addr = dn_proto.id();
  31. endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
  32. datanode_addr.xferport());
  33. uuid_ = dn_proto.id().datanodeuuid();
  34. if (token) {
  35. token_.reset(new hadoop::common::TokenProto());
  36. token_->CheckTypeAndMergeFrom(*token);
  37. }
  38. }
  39. void DataNodeConnectionImpl::Connect(
  40. std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) {
  41. // Keep the DN from being freed until we're done
  42. auto shared_this = shared_from_this();
  43. asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(),
  44. [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint, 1>::iterator it) {
  45. (void)it;
  46. handler(ToStatus(ec), shared_this); });
  47. }
  48. void DataNodeConnectionImpl::Cancel() {
  49. // best to do a shutdown() first for portability
  50. conn_->shutdown(asio::ip::tcp::socket::shutdown_both);
  51. conn_->close();
  52. }
  53. }