Procházet zdrojové kódy

HDFS-9524. libhdfs++ deadlocks in Filesystem::New if NN connection fails. Contributed by Bob Hansen.

James před 9 roky
rodič
revize
31d28e3105

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c

@@ -285,6 +285,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
     fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
         ti->threadIdx);
     EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+    if (!fs)
+        return 1;
     EXPECT_ZERO(setupPaths(ti, &paths));
     // test some operations
     EXPECT_ZERO(doTestHdfsOperations(ti, fs, &paths));
@@ -295,6 +297,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
     EXPECT_ZERO(hdfsDisconnect(fs));
     // reconnect to do the final delete.
     EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+    if (!fs)
+        return 1;
     EXPECT_ZERO(hdfsDelete(fs, paths.prefix, 1));
     EXPECT_ZERO(hdfsDisconnect(fs));
     return 0;

+ 12 - 7
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -115,15 +115,16 @@ class FileSystem {
    * initializes the RPC connections to the NameNode and returns an
    * FileSystem object.
    **/
-  static void New(
-      IoService *io_service, const Options &options, const std::string &server,
+  static FileSystem * New(
+      IoService *&io_service, const Options &options);
+
+  virtual void Connect(const std::string &server,
       const std::string &service,
-      const std::function<void(const Status &, FileSystem *)> &handler);
+      const std::function<void(const Status &, FileSystem *)> &&handler) = 0;
 
-  /* Synchronous call of New*/
-  static FileSystem *
-  New(IoService *io_service, const Options &options, const std::string &server,
-      const std::string &service);
+  /* Synchronous call of Connect */
+  virtual Status Connect(const std::string &server,
+      const std::string &service) = 0;
 
   /**
    * Open a file on HDFS. The call issues an RPC to the NameNode to
@@ -135,6 +136,10 @@ class FileSystem {
        const std::function<void(const Status &, FileHandle *)> &handler) = 0;
   virtual Status Open(const std::string &path, FileHandle **handle) = 0;
 
+  /**
+   * Note that it is an error to destroy the filesystem from within a filesystem
+   * callback.  It will lead to a deadlock and the termination of the process.
+   */
   virtual ~FileSystem() {};
 
 };

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

@@ -112,8 +112,12 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
 hdfsFS hdfsConnect(const char *nn, tPort port) {
   std::string port_as_string = std::to_string(port);
   IoService * io_service = IoService::New();
-  FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
+  FileSystem *fs = FileSystem::New(io_service, Options());
   if (!fs) {
+    return nullptr;
+  }
+
+  if (!fs->Connect(nn, port_as_string).ok()) {
     ReportError(ENODEV, "Unable to connect to NameNode.");
 
     // FileSystem's ctor might take ownership of the io_service; if it does,
@@ -121,6 +125,8 @@ hdfsFS hdfsConnect(const char *nn, tPort port) {
     if (io_service)
       delete io_service;
 
+    delete fs;
+
     return nullptr;
   }
   return new hdfs_internal(fs);

+ 25 - 39
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc

@@ -26,6 +26,7 @@
 #include <limits>
 #include <future>
 #include <tuple>
+#include <iostream>
 
 namespace hdfs {
 
@@ -41,7 +42,7 @@ using ::asio::ip::tcp;
 
 void NameNodeOperations::Connect(const std::string &server,
                              const std::string &service,
-                             std::function<void(const Status &)> &handler) {
+                             std::function<void(const Status &)> &&handler) {
   using namespace asio_continuation;
   typedef std::vector<tcp::endpoint> State;
   auto m = Pipeline<State>::Create();
@@ -106,41 +107,9 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
  *                    FILESYSTEM BASE CLASS
  ****************************************************************************/
 
-void FileSystem::New(
-    IoService *io_service, const Options &options, const std::string &server,
-    const std::string &service,
-    const std::function<void(const Status &, FileSystem *)> &handler) {
-  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
-  impl->Connect(server, service, [impl, handler](const Status &stat) {
-    if (stat.ok()) {
-      handler(stat, impl);
-    } else {
-      delete impl;
-      handler(stat, nullptr);
-    }
-  });
-}
-
 FileSystem * FileSystem::New(
-    IoService *io_service, const Options &options, const std::string &server,
-    const std::string &service) {
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
-  std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
-
-  auto callback = [callstate](const Status &s, FileSystem * fs) {
-    callstate->set_value(std::make_tuple(s, fs));
-  };
-
-  New(io_service, options, server, service, callback);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-
-  if (std::get<0>(returnstate).ok()) {
-    return std::get<1>(returnstate);
-  } else {
-    return nullptr;
-  }
+    IoService *&io_service, const Options &options) {
+  return new FileSystemImpl(io_service, options);
 }
 
 /*****************************************************************************
@@ -175,12 +144,15 @@ FileSystemImpl::~FileSystemImpl() {
 
 void FileSystemImpl::Connect(const std::string &server,
                              const std::string &service,
-                             std::function<void(const Status &)> &&handler) {
+                             const std::function<void(const Status &, FileSystem * fs)> &&handler) {
   /* IoService::New can return nullptr */
   if (!io_service_) {
-    handler (Status::Error("Null IoService"));
+    handler (Status::Error("Null IoService"), this);
   }
-  nn_.Connect(server, service, handler);
+
+  nn_.Connect(server, service, [this, handler](const Status & s) {
+    handler(s, this);
+  });
 }
 
 Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
@@ -188,7 +160,8 @@ Status FileSystemImpl::Connect(const std::string &server, const std::string &ser
   auto stat = std::make_shared<std::promise<Status>>();
   std::future<Status> future = stat->get_future();
 
-  auto callback = [stat](const Status &s) {
+  auto callback = [stat](const Status &s, FileSystem *fs) {
+    (void)fs;
     stat->set_value(s);
   };
 
@@ -247,4 +220,17 @@ Status FileSystemImpl::Open(const std::string &path,
   return stat;
 }
 
+void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
+  // It is far too easy to destroy the filesystem (and thus the threadpool)
+  //     from within one of the worker threads, leading to a deadlock.  Let's
+  //     provide some explicit protection.
+  if(t->get_id() == std::this_thread::get_id()) {
+    //TODO: When we get good logging support, add it in here
+    std::cerr << "FATAL: Attempted to destroy a thread pool from within a "
+                 "callback of the thread pool.\n";
+  }
+  t->join();
+  delete t;
+}
+
 }

+ 4 - 7
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -55,7 +55,7 @@ public:
 
   void Connect(const std::string &server,
                const std::string &service,
-               std::function<void(const Status &)> &handler);
+               std::function<void(const Status &)> &&handler);
 
   void GetBlockLocations(const std::string & path,
     std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
@@ -85,9 +85,9 @@ public:
 
   /* attempt to connect to namenode, return bad status on failure */
   void Connect(const std::string &server, const std::string &service,
-               std::function<void(const Status &)> &&handler);
+               const std::function<void(const Status &, FileSystem *)> &&handler) override;
   /* attempt to connect to namenode, return bad status on failure */
-  Status Connect(const std::string &server, const std::string &service);
+  Status Connect(const std::string &server, const std::string &service) override;
 
 
   virtual void Open(const std::string &path,
@@ -116,10 +116,7 @@ private:
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
 
   struct WorkerDeleter {
-    void operator()(std::thread *t) {
-      t->join();
-      delete t;
-    }
+    void operator()(std::thread *t);
   };
   typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
   std::vector<WorkerPtr> worker_threads_;