|
@@ -23,11 +23,10 @@
|
|
|
#include "ClientNamenodeProtocol.pb.h"
|
|
|
#include "common/libhdfs_events_impl.h"
|
|
|
#include "common/logging.h"
|
|
|
+#include "common/util.h"
|
|
|
|
|
|
#include "asio.hpp"
|
|
|
|
|
|
-#include <exception>
|
|
|
-
|
|
|
namespace hdfs {
|
|
|
|
|
|
class DataNodeConnection : public AsyncStream {
|
|
@@ -43,31 +42,19 @@ public:
|
|
|
|
|
|
struct SocketDeleter {
|
|
|
inline void operator()(asio::ip::tcp::socket *sock) {
|
|
|
- if(sock->is_open()) {
|
|
|
- /**
|
|
|
- * Even though we just checked that the socket is open it's possible
|
|
|
- * it isn't in a state where it can properly send or receive. If that's
|
|
|
- * the case asio will turn the underlying error codes from shutdown()
|
|
|
- * and close() into unhelpfully named std::exceptions. Due to the
|
|
|
- * relatively innocuous nature of most of these error codes it's better
|
|
|
- * to just catch, give a warning, and move on with life.
|
|
|
- **/
|
|
|
- try {
|
|
|
- sock->shutdown(asio::ip::tcp::socket::shutdown_both);
|
|
|
- } catch (const std::exception &e) {
|
|
|
- LOG_WARN(kBlockReader, << "Error calling socket->shutdown");
|
|
|
- }
|
|
|
- try {
|
|
|
- sock->close();
|
|
|
- } catch (const std::exception &e) {
|
|
|
- LOG_WARN(kBlockReader, << "Error calling socket->close");
|
|
|
- }
|
|
|
+ // Cancel may have already closed the socket.
|
|
|
+ std::string err = SafeDisconnect(sock);
|
|
|
+ if(!err.empty()) {
|
|
|
+ LOG_WARN(kBlockReader, << "Error disconnecting socket: " << err);
|
|
|
}
|
|
|
delete sock;
|
|
|
}
|
|
|
};
|
|
|
|
|
|
class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
|
|
|
+private:
|
|
|
+ // held (briefly) while posting async ops to the asio task queue
|
|
|
+ std::mutex state_lock_;
|
|
|
public:
|
|
|
std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
|
|
|
std::array<asio::ip::tcp::endpoint, 1> endpoints_;
|
|
@@ -84,19 +71,21 @@ public:
|
|
|
void Cancel() override;
|
|
|
|
|
|
void async_read_some(const MutableBuffers &buf,
|
|
|
- std::function<void (const asio::error_code & error,
|
|
|
- std::size_t bytes_transferred) > handler) override {
|
|
|
+ std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
|
|
|
+ override {
|
|
|
event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
|
|
|
|
|
|
+
|
|
|
+ mutex_guard state_lock(state_lock_);
|
|
|
conn_->async_read_some(buf, handler);
|
|
|
};
|
|
|
|
|
|
void async_write_some(const ConstBuffers &buf,
|
|
|
- std::function<void (const asio::error_code & error,
|
|
|
- std::size_t bytes_transferred) > handler) override {
|
|
|
-
|
|
|
+ std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
|
|
|
+ override {
|
|
|
event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
|
|
|
|
|
|
+ mutex_guard state_lock(state_lock_);
|
|
|
conn_->async_write_some(buf, handler);
|
|
|
}
|
|
|
};
|