Browse Source

HDFS-10222. libhdfs++: Shutdown sockets to avoid 'Connection reset by peer'. Contributed by James Clampffer

James 9 years ago
parent
commit
b447c16a3f

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc

@@ -55,9 +55,7 @@ void DataNodeConnectionImpl::Connect(
 }
 }
 
 
 void DataNodeConnectionImpl::Cancel() {
 void DataNodeConnectionImpl::Cancel() {
-  // best to do a shutdown() first for portability
-  conn_->shutdown(asio::ip::tcp::socket::shutdown_both);
-  conn_->close();
+  conn_.reset();
 }
 }
 
 
 
 

+ 30 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h

@@ -22,9 +22,12 @@
 #include "common/async_stream.h"
 #include "common/async_stream.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "common/libhdfs_events_impl.h"
 #include "common/libhdfs_events_impl.h"
+#include "common/logging.h"
 
 
 #include "asio.hpp"
 #include "asio.hpp"
 
 
+#include <exception>
+
 namespace hdfs {
 namespace hdfs {
 
 
 class DataNodeConnection : public AsyncStream {
 class DataNodeConnection : public AsyncStream {
@@ -38,9 +41,35 @@ public:
 };
 };
 
 
 
 
+struct SocketDeleter {
+  inline void operator()(asio::ip::tcp::socket *sock) {
+    if(sock->is_open()) {
+      /**
+       *  Even though we just checked that the socket is open it's possible
+       *  it isn't in a state where it can properly send or receive.  If that's
+       *  the case asio will turn the underlying error codes from shutdown()
+       *  and close() into unhelpfully named std::exceptions.  Due to the
+       *  relatively innocuous nature of most of these error codes it's better
+       *  to just catch, give a warning, and move on with life.
+       **/
+      try {
+        sock->shutdown(asio::ip::tcp::socket::shutdown_both);
+      } catch (const std::exception &e) {
+        LOG_WARN(kBlockReader, << "Error calling socket->shutdown");
+      }
+      try {
+        sock->close();
+      } catch (const std::exception &e) {
+        LOG_WARN(kBlockReader, << "Error calling socket->close");
+      }
+    }
+    delete sock;
+  }
+};
+
 class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
 class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
 public:
 public:
-  std::unique_ptr<asio::ip::tcp::socket> conn_;
+  std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
   std::array<asio::ip::tcp::endpoint, 1> endpoints_;
   std::array<asio::ip::tcp::endpoint, 1> endpoints_;
   std::string uuid_;
   std::string uuid_;
   LibhdfsEvents *event_handlers_;
   LibhdfsEvents *event_handlers_;