Browse Source

HDFS-9103. Retry reads on DN failure. Contributed by James Clampffer.

Haohui Mai 9 years ago
parent
commit
6f44d92071
19 changed files with 596 additions and 115 deletions
  1. 36 15
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
  2. 7 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
  3. 8 8
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/status.h
  4. 12 5
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
  5. 5 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
  6. 17 3
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
  7. 1 4
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
  9. 69 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc
  10. 75 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h
  11. 4 2
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
  12. 28 14
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
  13. 6 6
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
  14. 38 24
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
  15. 8 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
  16. 172 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
  17. 22 17
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
  18. 66 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/node_exclusion_test.cc
  19. 21 16
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc

+ 36 - 15
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -22,6 +22,7 @@
 #include "libhdfspp/status.h"
 
 #include <functional>
+#include <memory>
 #include <set>
 
 namespace hdfs {
@@ -40,7 +41,7 @@ namespace hdfs {
  * for more details.
  **/
 class IoService {
-public:
+ public:
   static IoService *New();
   /**
    * Run the asynchronous tasks associated with this IoService.
@@ -53,11 +54,24 @@ public:
   virtual ~IoService();
 };
 
+/**
+ * A node exclusion rule provides a simple way of testing if the
+ * client should attempt to connect to a node based on the node's
+ * UUID.  The FileSystem and FileHandle use the BadDataNodeTracker
+ * by default.  AsyncPreadSome takes an optional NodeExclusionRule
+ * that will override the BadDataNodeTracker.
+ **/
+class NodeExclusionRule {
+ public:
+  virtual ~NodeExclusionRule(){};
+  virtual bool IsBadNode(const std::string &node_uuid) = 0;
+};
+
 /**
  * Applications opens an InputStream to read files in HDFS.
  **/
 class InputStream {
-public:
+ public:
   /**
    * Read data from a specific position. The current implementation
    * stops at the block boundary.
@@ -65,17 +79,24 @@ public:
    * @param buf the pointer to the buffer
    * @param nbyte the size of the buffer
    * @param offset the offset the file
-   * @param excluded_datanodes the UUID of the datanodes that should
-   * not be used in this read
    *
    * The handler returns the datanode that serves the block and the number of
    * bytes has read.
    **/
-  virtual void
-  PositionRead(void *buf, size_t nbyte, uint64_t offset,
-               const std::set<std::string> &excluded_datanodes,
-               const std::function<void(const Status &, const std::string &,
-                                        size_t)> &handler) = 0;
+  virtual void PositionRead(
+      void *buf, size_t nbyte, uint64_t offset,
+      const std::function<void(const Status &, const std::string &, size_t)> &
+          handler) = 0;
+  /**
+   * Determine if a datanode should be excluded from future operations
+   * based on the return Status.
+   *
+   * @param status the Status object returned by InputStream::PositionRead
+   * @return true if the status indicates a failure that is not recoverable
+   * by the client and false otherwise.
+   **/
+  static bool ShouldExclude(const Status &status);
+
   virtual ~InputStream();
 };
 
@@ -83,14 +104,14 @@ public:
  * FileSystem implements APIs to interact with HDFS.
  **/
 class FileSystem {
-public:
+ public:
   /**
    * Create a new instance of the FileSystem object. The call
    * initializes the RPC connections to the NameNode and returns an
    * FileSystem object.
    **/
-  static void
-  New(IoService *io_service, const Options &options, const std::string &server,
+  static void New(
+      IoService *io_service, const Options &options, const std::string &server,
       const std::string &service,
       const std::function<void(const Status &, FileSystem *)> &handler);
   /**
@@ -98,9 +119,9 @@ public:
    * gather the locations of all blocks in the file and to return a
    * new instance of the @ref InputStream object.
    **/
-  virtual void
-  Open(const std::string &path,
-       const std::function<void(const Status &, InputStream *)> &handler) = 0;
+  virtual void Open(
+      const std::string &path,
+      const std::function<void(const Status &, InputStream *)> &handler) = 0;
   virtual ~FileSystem();
 };
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h

@@ -29,6 +29,13 @@ struct Options {
    * Default: 30000
    **/
   int rpc_timeout;
+
+  /**
+   * Exclusion time for failed datanodes in milliseconds.
+   * Default: 60000
+   **/
+  unsigned int host_exclusion_duration;
+
   Options();
 };
 }

+ 8 - 8
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/status.h

@@ -59,14 +59,6 @@ class Status {
     return (state_ == NULL) ? kOk : static_cast<int>(state_[4]);
   }
 
- private:
-  // OK status has a NULL state_.  Otherwise, state_ is a new[] array
-  // of the following form:
-  //    state_[0..3] == length of message
-  //    state_[4]    == code
-  //    state_[5..]  == message
-  const char* state_;
-
   enum Code {
     kOk = 0,
     kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
@@ -75,6 +67,14 @@ class Status {
     kException = 255,
   };
 
+ private:
+  // OK status has a NULL state_.  Otherwise, state_ is a new[] array
+  // of the following form:
+  //    state_[0..3] == length of message
+  //    state_[4]    == code
+  //    state_[5..]  == message
+  const char* state_;
+
   explicit Status(int code, const char *msg1, const char *msg2);
   static const char *CopyState(const char* s);
   static const char *ConstructState(int code, const char *msg1, const char *msg2);

+ 12 - 5
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc

@@ -41,20 +41,27 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
 
   /* wrap async call with promise/future to make it blocking */
   size_t read_count = 0;
-  auto callback = [stat, &read_count](const Status &s, const std::string &dn,
-                                      size_t bytes) {
-    (void)dn;
+  std::string contacted_datanode;
+  auto callback = [stat, &read_count, &contacted_datanode](
+      const Status &s, const std::string &dn, size_t bytes) {
     stat->set_value(s);
     read_count = bytes;
+    contacted_datanode = dn;
   };
 
-  input_stream_->PositionRead(buf, nbyte, offset, std::set<std::string>(),
-                              callback);
+  input_stream_->PositionRead(buf, nbyte, offset, callback);
 
   /* wait for async to finish */
   auto s = future.get();
 
   if (!s.ok()) {
+    /* determine if DN gets marked bad */
+    if (InputStream::ShouldExclude(s)) {
+      InputStreamImpl *impl =
+          static_cast<InputStreamImpl *>(input_stream_.get());
+      impl->bad_node_tracker_->AddBadNode(contacted_datanode);
+    }
+
     return -1;
   }
   return (ssize_t)read_count;

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h

@@ -22,8 +22,11 @@
 #include <cstdint>
 #include <thread>
 #include <vector>
+#include <mutex>
+#include <chrono>
 
 #include "libhdfspp/hdfs.h"
+#include "fs/bad_datanode_tracker.h"
 #include <hdfs/hdfs.h>
 
 namespace hdfs {
@@ -34,6 +37,8 @@ namespace hdfs {
  * Then provide very thin C wrappers over each method.
  */
 
+class HadoopFileSystem;
+
 class FileHandle {
  public:
   virtual ~FileHandle(){};

+ 17 - 3
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc

@@ -22,8 +22,22 @@ namespace hdfs {
 
 IoService::~IoService() {}
 
-IoService *IoService::New() {
-  return new IoServiceImpl();
-}
+IoService *IoService::New() { return new IoServiceImpl(); }
+
+bool InputStream::ShouldExclude(const Status &s) {
+  if (s.ok()) {
+    return false;
+  }
 
+  switch (s.code()) {
+    /* client side resource exhaustion */
+    case Status::kResourceUnavailable:
+      return false;
+    case Status::kInvalidArgument:
+    case Status::kUnimplemented:
+    case Status::kException:
+    default:
+      return true;
+  }
+}
 }

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc

@@ -20,8 +20,5 @@
 
 namespace hdfs {
 
-Options::Options()
-    : rpc_timeout(30000)
-{}
-
+Options::Options() : rpc_timeout(30000), host_exclusion_duration(600000) {}
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt

@@ -1,2 +1,2 @@
-add_library(fs filesystem.cc inputstream.cc)
+add_library(fs filesystem.cc inputstream.cc bad_datanode_tracker.cc)
 add_dependencies(fs proto)

+ 69 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.cc

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "bad_datanode_tracker.h"
+
+namespace hdfs {
+
+BadDataNodeTracker::BadDataNodeTracker(const Options& options)
+    : timeout_duration_(options.host_exclusion_duration),
+      test_clock_shift_(0) {}
+
+BadDataNodeTracker::~BadDataNodeTracker() {}
+
+void BadDataNodeTracker::AddBadNode(const std::string& dn) {
+  std::lock_guard<std::mutex> update_lock(datanodes_update_lock_);
+  datanodes_[dn] = Clock::now();
+}
+
+bool BadDataNodeTracker::IsBadNode(const std::string& dn) {
+  std::lock_guard<std::mutex> update_lock(datanodes_update_lock_);
+
+  if (datanodes_.count(dn) == 1) {
+    const TimePoint& entered_time = datanodes_[dn];
+    if (TimeoutExpired(entered_time)) {
+      datanodes_.erase(dn);
+      return false;
+    }
+    /* node in set and still marked bad */
+    return true;
+  }
+  return false;
+}
+
+void BadDataNodeTracker::TEST_set_clock_shift(int t) { test_clock_shift_ = t; }
+
+bool BadDataNodeTracker::TimeoutExpired(const TimePoint& t) {
+  TimePoint threshold = Clock::now() -
+                        std::chrono::milliseconds(timeout_duration_) +
+                        std::chrono::milliseconds(test_clock_shift_);
+  if (t < threshold) {
+    return true;
+  }
+  return false;
+}
+
+ExclusionSet::ExclusionSet(const std::set<std::string>& excluded)
+    : excluded_(excluded) {}
+
+ExclusionSet::~ExclusionSet() {}
+
+bool ExclusionSet::IsBadNode(const std::string& node_uuid) {
+  return excluded_.count(node_uuid) == 1;
+}
+}

+ 75 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/bad_datanode_tracker.h

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef LIBHDFSPP_BADDATANODETRACKER_H
+#define LIBHDFSPP_BADDATANODETRACKER_H
+
+#include <mutex>
+#include <chrono>
+#include <map>
+#include <string>
+#include <set>
+
+#include "libhdfspp/options.h"
+#include "libhdfspp/hdfs.h"
+
+namespace hdfs {
+
+/**
+ * ExclusionSet is a simple override that can be filled with known
+ * bad node UUIDs and passed to AsyncPreadSome.
+ **/
+class ExclusionSet : public NodeExclusionRule {
+ public:
+  ExclusionSet(const std::set<std::string>& excluded);
+  virtual ~ExclusionSet();
+  virtual bool IsBadNode(const std::string& node_uuid);
+
+ private:
+  std::set<std::string> excluded_;
+};
+
+/**
+ * BadDataNodeTracker keeps a timestamped list of datanodes that have
+ * failed during past operations.  Entries present in this list will
+ * not be used for new requests.  Entries will be evicted from the list
+ * after a period of time has elapsed; the default is 10 minutes.
+ */
+class BadDataNodeTracker : public NodeExclusionRule {
+ public:
+  BadDataNodeTracker(const Options& options = Options());
+  virtual ~BadDataNodeTracker();
+  /* add a bad DN to the list */
+  void AddBadNode(const std::string& dn);
+  /* check if a node should be excluded */
+  virtual bool IsBadNode(const std::string& dn);
+  /* only for tests, shift clock by t milliseconds*/
+  void TEST_set_clock_shift(int t);
+
+ private:
+  typedef std::chrono::steady_clock Clock;
+  typedef std::chrono::time_point<Clock> TimePoint;
+  bool TimeoutExpired(const TimePoint& t);
+  /* after timeout_duration_ elapses remove DN */
+  const unsigned int timeout_duration_; /* milliseconds */
+  std::map<std::string, TimePoint> datanodes_;
+  std::mutex datanodes_update_lock_;
+  int test_clock_shift_;
+};
+}
+#endif

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc

@@ -54,7 +54,8 @@ FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options)
       engine_(&io_service_->io_service(), options,
               RpcEngine::GetRandomClientName(), kNamenodeProtocol,
               kNamenodeProtocolVersion),
-      namenode_(&engine_) {}
+      namenode_(&engine_),
+      bad_node_tracker_(std::make_shared<BadDataNodeTracker>()) {}
 
 void FileSystemImpl::Connect(const std::string &server,
                              const std::string &service,
@@ -99,7 +100,8 @@ void FileSystemImpl::Open(
         namenode_.GetBlockLocations(&s->req, s->resp, next);
       }));
   m->Run([this, handler](const Status &stat, const State &s) {
-    handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations())
+    handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations(),
+                                                  bad_node_tracker_)
                             : nullptr);
   });
 }

+ 28 - 14
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -20,41 +20,52 @@
 
 #include "common/hdfs_public_api.h"
 #include "libhdfspp/hdfs.h"
+#include "fs/bad_datanode_tracker.h"
 #include "rpc/rpc_engine.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "ClientNamenodeProtocol.hrpc.inl"
 
 namespace hdfs {
 
+class FileHandle;
+class HadoopFileSystem;
+
 class FileSystemImpl : public FileSystem {
-public:
+ public:
   FileSystemImpl(IoService *io_service, const Options &options);
   void Connect(const std::string &server, const std::string &service,
                std::function<void(const Status &)> &&handler);
   virtual void Open(const std::string &path,
-                    const std::function<void(const Status &, InputStream *)>
-                        &handler) override;
+                    const std::function<void(const Status &, InputStream *)> &
+                        handler) override;
   RpcEngine &rpc_engine() { return engine_; }
 
-private:
+ private:
   IoServiceImpl *io_service_;
   RpcEngine engine_;
   ClientNamenodeProtocol namenode_;
+  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
 };
 
 class InputStreamImpl : public InputStream {
-public:
+ public:
   InputStreamImpl(FileSystemImpl *fs,
-                  const ::hadoop::hdfs::LocatedBlocksProto *blocks);
-  virtual void
-  PositionRead(void *buf, size_t nbyte, uint64_t offset,
-               const std::set<std::string> &excluded_datanodes,
-               const std::function<void(const Status &, const std::string &,
-                                        size_t)> &handler) override;
+                  const ::hadoop::hdfs::LocatedBlocksProto *blocks,
+                  std::shared_ptr<BadDataNodeTracker> tracker);
+  virtual void PositionRead(
+      void *buf, size_t nbyte, uint64_t offset,
+      const std::function<void(const Status &, const std::string &, size_t)> &
+          handler) override;
+  /**
+   * If optional_rule_override is null then use the bad_datanode_tracker.  If
+   * non-null use the provided NodeExclusionRule to determine eligible
+   * datanodes.
+   **/
   template <class MutableBufferSequence, class Handler>
   void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
-                      const std::set<std::string> &excluded_datanodes,
+                      std::shared_ptr<NodeExclusionRule> excluded_nodes,
                       const Handler &handler);
+
   template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
   void AsyncReadBlock(const std::string &client_name,
                       const hadoop::hdfs::LocatedBlockProto &block,
@@ -62,14 +73,17 @@ public:
                       const MutableBufferSequence &buffers,
                       const Handler &handler);
 
-private:
+ private:
   FileSystemImpl *fs_;
   unsigned long long file_length_;
   std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
-  template <class Reader> struct HandshakeContinuation;
+  template <class Reader>
+  struct HandshakeContinuation;
   template <class Reader, class MutableBufferSequence>
   struct ReadBlockContinuation;
   struct RemoteBlockReaderTrait;
+  friend class FileHandle;
+  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
 };
 }
 

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc

@@ -25,8 +25,9 @@ using ::hadoop::hdfs::LocatedBlocksProto;
 InputStream::~InputStream() {}
 
 InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
-                                 const LocatedBlocksProto *blocks)
-    : fs_(fs), file_length_(blocks->filelength()) {
+                                 const LocatedBlocksProto *blocks,
+                                 std::shared_ptr<BadDataNodeTracker> tracker)
+    : fs_(fs), file_length_(blocks->filelength()), bad_node_tracker_(tracker) {
   for (const auto &block : blocks->blocks()) {
     blocks_.push_back(block);
   }
@@ -38,9 +39,8 @@ InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
 
 void InputStreamImpl::PositionRead(
     void *buf, size_t nbyte, uint64_t offset,
-    const std::set<std::string> &excluded_datanodes,
-    const std::function<void(const Status &, const std::string &, size_t)>
-        &handler) {
-  AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler);
+    const std::function<void(const Status &, const std::string &, size_t)> &
+        handler) {
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler);
 }
 }

+ 38 - 24
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h

@@ -26,6 +26,7 @@
 #include <functional>
 #include <future>
 #include <type_traits>
+#include <algorithm>
 
 namespace hdfs {
 
@@ -40,9 +41,9 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
     size_t *transferred() { return &transferred_; }
     const size_t *transferred() const { return &transferred_; }
   };
-  static continuation::Pipeline<State> *
-  CreatePipeline(::asio::io_service *io_service,
-                 const ::hadoop::hdfs::DatanodeInfoProto &dn) {
+  static continuation::Pipeline<State> *CreatePipeline(
+      ::asio::io_service *io_service,
+      const ::hadoop::hdfs::DatanodeInfoProto &dn) {
     using namespace ::asio::ip;
     auto m = continuation::Pipeline<State>::Create();
     auto &s = m->state();
@@ -64,7 +65,9 @@ struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
                         const hadoop::common::TokenProto *token,
                         const hadoop::hdfs::ExtendedBlockProto *block,
                         uint64_t length, uint64_t offset)
-      : reader_(reader), client_name_(client_name), length_(length),
+      : reader_(reader),
+        client_name_(client_name),
+        length_(length),
         offset_(offset) {
     if (token) {
       token_.reset(new hadoop::common::TokenProto());
@@ -78,7 +81,7 @@ struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
                            offset_, next);
   }
 
-private:
+ private:
   Reader *reader_;
   const std::string client_name_;
   std::unique_ptr<hadoop::common::TokenProto> token_;
@@ -91,8 +94,10 @@ template <class Reader, class MutableBufferSequence>
 struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
   ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
                         size_t *transferred)
-      : reader_(reader), buffer_(buffer),
-        buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
+      : reader_(reader),
+        buffer_(buffer),
+        buffer_size_(asio::buffer_size(buffer)),
+        transferred_(transferred) {
     static_assert(!std::is_reference<MutableBufferSequence>::value,
                   "Buffer must not be a reference type");
   }
@@ -103,7 +108,7 @@ struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
     OnReadData(Status::OK(), 0);
   }
 
-private:
+ private:
   Reader *reader_;
   const MutableBufferSequence buffer_;
   const size_t buffer_size_;
@@ -129,40 +134,50 @@ private:
 template <class MutableBufferSequence, class Handler>
 void InputStreamImpl::AsyncPreadSome(
     size_t offset, const MutableBufferSequence &buffers,
-    const std::set<std::string> &excluded_datanodes, const Handler &handler) {
+    std::shared_ptr<NodeExclusionRule> excluded_nodes, const Handler &handler) {
   using ::hadoop::hdfs::DatanodeInfoProto;
   using ::hadoop::hdfs::LocatedBlockProto;
 
-  auto it = std::find_if(
+  /**
+   *  Note: block and chosen_dn will end up pointing to things inside
+   *  the blocks_ vector.  They shouldn't be directly deleted.
+   **/
+  auto block = std::find_if(
       blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
         return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
       });
 
-  if (it == blocks_.end()) {
+  if (block == blocks_.end()) {
     handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
     return;
   }
 
-  const DatanodeInfoProto *chosen_dn = nullptr;
-  for (int i = 0; i < it->locs_size(); ++i) {
-    const auto &di = it->locs(i);
-    if (!excluded_datanodes.count(di.id().datanodeuuid())) {
-      chosen_dn = &di;
-      break;
-    }
-  }
+  /**
+   * If user supplies a rule use it, otherwise use the tracker.
+   * User is responsible for making sure one of them isn't null.
+   **/
+  std::shared_ptr<NodeExclusionRule> rule =
+      excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
+
+  auto datanodes = block->locs();
+  auto it = std::find_if(datanodes.begin(), datanodes.end(),
+                         [rule](const DatanodeInfoProto &dn) {
+                           return !rule->IsBadNode(dn.id().datanodeuuid());
+                         });
 
-  if (!chosen_dn) {
+  if (it == datanodes.end()) {
     handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
     return;
   }
 
-  uint64_t offset_within_block = offset - it->offset();
+  DatanodeInfoProto *chosen_dn = &*it;
+
+  uint64_t offset_within_block = offset - block->offset();
   uint64_t size_within_block = std::min<uint64_t>(
-      it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
 
   AsyncReadBlock<RemoteBlockReaderTrait>(
-      fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block,
+      fs_->rpc_engine().client_name(), *block, *chosen_dn, offset_within_block,
       asio::buffer(buffers, size_within_block), handler);
 }
 
@@ -172,7 +187,6 @@ void InputStreamImpl::AsyncReadBlock(
     const hadoop::hdfs::LocatedBlockProto &block,
     const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
     const MutableBufferSequence &buffers, const Handler &handler) {
-
   typedef typename BlockReaderTrait::Reader Reader;
   auto m =
       BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -41,3 +41,11 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
 add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>)
 target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(rpc_engine rpc_engine_test)
+
+add_executable(bad_datanode_test bad_datanode_test.cc)
+target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader  ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+add_test(bad_datanode bad_datanode_test)
+
+add_executable(node_exclusion_test node_exclusion_test.cc)
+target_link_libraries(node_exclusion_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+add_test(node_exclusion node_exclusion_test)

+ 172 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc

@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/filesystem.h"
+#include "fs/bad_datanode_tracker.h"
+
+#include <gmock/gmock.h>
+
+using hadoop::common::TokenProto;
+using hadoop::hdfs::DatanodeInfoProto;
+using hadoop::hdfs::DatanodeIDProto;
+using hadoop::hdfs::ExtendedBlockProto;
+using hadoop::hdfs::LocatedBlockProto;
+using hadoop::hdfs::LocatedBlocksProto;
+
+using ::testing::_;
+using ::testing::InvokeArgument;
+using ::testing::Return;
+
+using namespace hdfs;
+
+class MockReader {
+ public:
+  virtual ~MockReader() {}
+  MOCK_METHOD2(
+      async_read_some,
+      void(const asio::mutable_buffers_1 &,
+           const std::function<void(const Status &, size_t transferred)> &));
+
+  MOCK_METHOD6(async_connect,
+               void(const std::string &, TokenProto *, ExtendedBlockProto *,
+                    uint64_t, uint64_t,
+                    const std::function<void(const Status &)> &));
+};
+
+template <class Trait>
+struct MockBlockReaderTrait {
+  typedef MockReader Reader;
+  struct State {
+    MockReader reader_;
+    size_t transferred_;
+    Reader *reader() { return &reader_; }
+    size_t *transferred() { return &transferred_; }
+    const size_t *transferred() const { return &transferred_; }
+  };
+
+  static continuation::Pipeline<State> *CreatePipeline(
+      ::asio::io_service *, const DatanodeInfoProto &) {
+    auto m = continuation::Pipeline<State>::Create();
+    *m->state().transferred() = 0;
+    Trait::InitializeMockReader(m->state().reader());
+    return m;
+  }
+};
+
+TEST(BadDataNodeTest, RecoverableError) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  DatanodeInfoProto dn;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  Options default_options;
+  FileSystemImpl fs(&io_service, default_options);
+  auto tracker = std::make_shared<BadDataNodeTracker>();
+  InputStreamImpl is(&fs, &blocks, tracker);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          // resource unavailable error
+          .WillOnce(InvokeArgument<1>(
+              Status::ResourceUnavailable(
+                  "Unable to get some resource, try again later"),
+              sizeof(buf)));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+
+  ASSERT_FALSE(stat.ok());
+
+  std::string failing_dn = "id_of_bad_datanode";
+  if (!stat.ok()) {
+    if (InputStream::ShouldExclude(stat)) {
+      tracker->AddBadNode(failing_dn);
+    }
+  }
+
+  ASSERT_FALSE(tracker->IsBadNode(failing_dn));
+}
+
+TEST(BadDataNodeTest, InternalError) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  DatanodeInfoProto dn;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  Options default_options;
+  auto tracker = std::make_shared<BadDataNodeTracker>();
+  FileSystemImpl fs(&io_service, default_options);
+  InputStreamImpl is(&fs, &blocks, tracker);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          // something bad happened on the DN, calling again isn't going to help
+          .WillOnce(
+              InvokeArgument<1>(Status::Exception("server_explosion_exception",
+                                                  "the server exploded"),
+                                sizeof(buf)));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+
+  ASSERT_FALSE(stat.ok());
+
+  std::string failing_dn = "id_of_bad_datanode";
+  if (!stat.ok()) {
+    if (InputStream::ShouldExclude(stat)) {
+      tracker->AddBadNode(failing_dn);
+    }
+  }
+
+  ASSERT_TRUE(tracker->IsBadNode(failing_dn));
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}

+ 22 - 17
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc

@@ -17,6 +17,7 @@
  */
 
 #include "fs/filesystem.h"
+#include "fs/bad_datanode_tracker.h"
 #include <gmock/gmock.h>
 
 using hadoop::common::TokenProto;
@@ -35,7 +36,7 @@ using namespace hdfs;
 namespace hdfs {
 
 class MockReader {
-public:
+ public:
   virtual ~MockReader() {}
   MOCK_METHOD2(
       async_read_some,
@@ -48,7 +49,8 @@ public:
                     const std::function<void(const Status &)> &));
 };
 
-template <class Trait> struct MockBlockReaderTrait {
+template <class Trait>
+struct MockBlockReaderTrait {
   typedef MockReader Reader;
   struct State {
     MockReader reader_;
@@ -58,8 +60,8 @@ template <class Trait> struct MockBlockReaderTrait {
     const size_t *transferred() const { return &transferred_; }
   };
 
-  static continuation::Pipeline<State> *
-  CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) {
+  static continuation::Pipeline<State> *CreatePipeline(
+      ::asio::io_service *, const DatanodeInfoProto &) {
     auto m = continuation::Pipeline<State>::Create();
     *m->state().transferred() = 0;
     Trait::InitializeMockReader(m->state().reader());
@@ -78,7 +80,7 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
   IoServiceImpl io_service;
   Options options;
   FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks);
+  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
   Status stat;
   size_t read = 0;
   struct Trait {
@@ -93,7 +95,8 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
 
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
       "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
         stat = status;
         read = transferred;
       });
@@ -112,7 +115,7 @@ TEST(InputStreamTest, TestReadMultipleTrunk) {
   IoServiceImpl io_service;
   Options options;
   FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks);
+  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
   Status stat;
   size_t read = 0;
   struct Trait {
@@ -148,7 +151,7 @@ TEST(InputStreamTest, TestReadError) {
   IoServiceImpl io_service;
   Options options;
   FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks);
+  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
   Status stat;
   size_t read = 0;
   struct Trait {
@@ -195,7 +198,7 @@ TEST(InputStreamTest, TestExcludeDataNode) {
   IoServiceImpl io_service;
   Options options;
   FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks);
+  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
   Status stat;
   size_t read = 0;
   struct Trait {
@@ -208,14 +211,16 @@ TEST(InputStreamTest, TestExcludeDataNode) {
     }
   };
 
-
-  std::set<std::string> excluded_dn({"foo"});
-  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn,
-      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
+  std::shared_ptr<NodeExclusionRule> exclude_set =
+      std::make_shared<ExclusionSet>(std::set<std::string>({"foo"}));
+  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), exclude_set,
+                    [&stat, &read](const Status &status, const std::string &,
+                                   size_t transferred) {
+                      stat = status;
+                      read = transferred;
+                    });
+  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again),
+            stat.code());
   ASSERT_EQ(0UL, read);
 }
 

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/node_exclusion_test.cc

@@ -0,0 +1,66 @@
+#include "fs/filesystem.h"
+#include "fs/bad_datanode_tracker.h"
+
+#include <gmock/gmock.h>
+
+using ::testing::_;
+using ::testing::InvokeArgument;
+using ::testing::Return;
+
+using namespace hdfs;
+
+/**
+ * Unit test for the tracker
+ **/
+
+/* make sure nodes can be added */
+TEST(NodeExclusionTest, AddBadNode) {
+  auto tracker = std::make_shared<BadDataNodeTracker>();
+
+  ASSERT_FALSE(tracker->IsBadNode("dn1"));
+  tracker->AddBadNode("dn1");
+  ASSERT_TRUE(tracker->IsBadNode("dn1"));
+  ASSERT_FALSE(tracker->IsBadNode("dn2"));
+  tracker->AddBadNode("dn2");
+  ASSERT_TRUE(tracker->IsBadNode("dn2"));
+}
+
+/* Make sure nodes get removed when time elapses */
+TEST(NodeExclusionTest, RemoveOnTimeout) {
+  auto tracker = std::make_shared<BadDataNodeTracker>();
+
+  /* add node and make sure only that node is marked bad */
+  std::string bad_dn("this_dn_died");
+  tracker->AddBadNode(bad_dn);
+  ASSERT_TRUE(tracker->IsBadNode(bad_dn));
+  ASSERT_FALSE(tracker->IsBadNode("good_dn"));
+
+  tracker->TEST_set_clock_shift(1000000);
+
+  /* node should be removed on lookup after time shift */
+  ASSERT_FALSE(tracker->IsBadNode(bad_dn));
+}
+
+/**
+ * Unit tests for ExcludeSet
+ **/
+
+TEST(NodeExclusionTest, ExcludeSet) {
+  /* empty case */
+  auto exclude_set = std::make_shared<ExclusionSet>(std::set<std::string>());
+  ASSERT_FALSE(exclude_set->IsBadNode("any_node"));
+
+  /* common case */
+  exclude_set =
+      std::make_shared<ExclusionSet>(std::set<std::string>({"dn_1", "dn_3"}));
+  ASSERT_TRUE(exclude_set->IsBadNode("dn_1"));
+  ASSERT_FALSE(exclude_set->IsBadNode("dn_2"));
+  ASSERT_TRUE(exclude_set->IsBadNode("dn_3"));
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}

+ 21 - 16
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc

@@ -50,7 +50,7 @@ namespace pbio = pb::io;
 namespace hdfs {
 
 class MockDNConnection : public MockConnectionBase {
-public:
+ public:
   MockDNConnection(::asio::io_service &io_service)
       : MockConnectionBase(&io_service) {}
   MOCK_METHOD0(Produce, ProducerResult());
@@ -71,9 +71,9 @@ static inline std::pair<error_code, string> Produce(const std::string &s) {
   return make_pair(error_code(), s);
 }
 
-static inline std::pair<error_code, string>
-ProducePacket(const std::string &data, const std::string &checksum,
-              int offset_in_block, int seqno, bool last_packet) {
+static inline std::pair<error_code, string> ProducePacket(
+    const std::string &data, const std::string &checksum, int offset_in_block,
+    int seqno, bool last_packet) {
   PacketHeaderProto proto;
   proto.set_datalen(data.size());
   proto.set_offsetinblock(offset_in_block);
@@ -83,7 +83,8 @@ ProducePacket(const std::string &data, const std::string &checksum,
   char prefix[6];
   *reinterpret_cast<unsigned *>(prefix) =
       htonl(data.size() + checksum.size() + sizeof(int32_t));
-  *reinterpret_cast<short *>(prefix + sizeof(int32_t)) = htons(proto.ByteSize());
+  *reinterpret_cast<short *>(prefix + sizeof(int32_t)) =
+      htons(proto.ByteSize());
   std::string payload(prefix, sizeof(prefix));
   payload.reserve(payload.size() + proto.ByteSize() + checksum.size() +
                   data.size());
@@ -94,10 +95,10 @@ ProducePacket(const std::string &data, const std::string &checksum,
 }
 
 template <class Stream = MockDNConnection, class Handler>
-static std::shared_ptr<RemoteBlockReader<Stream>>
-ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
-            uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
-            const Handler &handler) {
+static std::shared_ptr<RemoteBlockReader<Stream>> ReadContent(
+    Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
+    uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
+    const Handler &handler) {
   BlockReaderOptions options;
   auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
   Status result;
@@ -128,7 +129,7 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   block.set_poolid("foo");
   block.set_blockid(0);
   block.set_generationstamp(0);
-  
+
   std::string data(kChunkSize, 0);
   ReadContent(&conn, nullptr, block, kChunkSize, 0,
               buffer(const_cast<char *>(data.c_str()), data.size()),
@@ -201,14 +202,16 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   string data(kChunkSize, 0);
   mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
   BlockReaderOptions options;
-  auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn);
+  auto reader =
+      std::make_shared<RemoteBlockReader<MockDNConnection>>(options, &conn);
   Status result;
   reader->async_connect(
       "libhdfs++", nullptr, &block, data.size(), 0,
       [buf, reader, &data, &io_service](const Status &stat) {
         ASSERT_TRUE(stat.ok());
         reader->async_read_some(
-            buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
+            buf, [buf, reader, &data, &io_service](const Status &stat,
+                                                   size_t transferred) {
               ASSERT_TRUE(stat.ok());
               ASSERT_EQ(kChunkSize, transferred);
               ASSERT_EQ(kChunkData, data);
@@ -216,7 +219,8 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
               data.resize(kChunkSize);
               transferred = 0;
               reader->async_read_some(
-                  buf, [&data,&io_service](const Status &stat, size_t transferred) {
+                  buf,
+                  [&data, &io_service](const Status &stat, size_t transferred) {
                     ASSERT_TRUE(stat.ok());
                     ASSERT_EQ(kChunkSize, transferred);
                     ASSERT_EQ(kChunkData, data);
@@ -230,9 +234,10 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
 TEST(RemoteBlockReaderTest, TestSaslConnection) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');
-  static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
-                                     "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
-                                     "charset=utf-8,algorithm=md5-sess";
+  static const string kAuthPayload =
+      "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+      "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
+      "charset=utf-8,algorithm=md5-sess";
   ::asio::io_service io_service;
   MockDNConnection conn(io_service);
   BlockOpResponseProto block_op_resp;