فهرست منبع

HDFS-8952. InputStream.PositionRead() should be aware of available DNs. Contributed by Haohui Mai.

Haohui Mai 9 سال پیش
والد
کامیت
acf57897ec

+ 16 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -21,6 +21,7 @@
 #include "libhdfspp/status.h"
 #include "libhdfspp/status.h"
 
 
 #include <functional>
 #include <functional>
+#include <set>
 
 
 namespace hdfs {
 namespace hdfs {
 
 
@@ -57,12 +58,23 @@ public:
 class InputStream {
 class InputStream {
 public:
 public:
   /**
   /**
-   * Read data from a specific position. The handler returns the
-   * number of bytes has read.
+   * Read data from a specific position. The current implementation
+   * stops at the block boundary.
+   *
+   * @param buf the pointer to the buffer
+   * @param nbyte the size of the buffer
+   * @param offset the offset the file
+   * @param excluded_datanodes the UUID of the datanodes that should
+   * not be used in this read
+   *
+   * The handler returns the datanode that serves the block and the number of
+   * bytes has read.
    **/
    **/
   virtual void
   virtual void
-  PositionRead(void *buf, size_t nbyte, size_t offset,
-               const std::function<void(const Status &, size_t)> &handler) = 0;
+  PositionRead(void *buf, size_t nbyte, uint64_t offset,
+               const std::set<std::string> &excluded_datanodes,
+               const std::function<void(const Status &, const std::string &,
+                                        size_t)> &handler) = 0;
   virtual ~InputStream();
   virtual ~InputStream();
 };
 };
 
 

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h

@@ -66,7 +66,6 @@ class Status {
   //    state_[4]    == code
   //    state_[4]    == code
   //    state_[5..]  == message
   //    state_[5..]  == message
   const char* state_;
   const char* state_;
-  friend class StatusHelper;
 
 
   enum Code {
   enum Code {
     kOk = 0,
     kOk = 0,

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -46,16 +46,20 @@ class InputStreamImpl : public InputStream {
 public:
 public:
   InputStreamImpl(FileSystemImpl *fs,
   InputStreamImpl(FileSystemImpl *fs,
                   const ::hadoop::hdfs::LocatedBlocksProto *blocks);
                   const ::hadoop::hdfs::LocatedBlocksProto *blocks);
-  virtual void PositionRead(
-      void *buf, size_t nbyte, size_t offset,
-      const std::function<void(const Status &, size_t)> &handler) override;
+  virtual void
+  PositionRead(void *buf, size_t nbyte, uint64_t offset,
+               const std::set<std::string> &excluded_datanodes,
+               const std::function<void(const Status &, const std::string &,
+                                        size_t)> &handler) override;
   template <class MutableBufferSequence, class Handler>
   template <class MutableBufferSequence, class Handler>
   void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
   void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+                      const std::set<std::string> &excluded_datanodes,
                       const Handler &handler);
                       const Handler &handler);
   template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
   template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
   void AsyncReadBlock(const std::string &client_name,
   void AsyncReadBlock(const std::string &client_name,
                       const hadoop::hdfs::LocatedBlockProto &block,
                       const hadoop::hdfs::LocatedBlockProto &block,
-                      size_t offset, const MutableBufferSequence &buffers,
+                      const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
+                      const MutableBufferSequence &buffers,
                       const Handler &handler);
                       const Handler &handler);
 
 
 private:
 private:

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc

@@ -37,8 +37,10 @@ InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
 }
 }
 
 
 void InputStreamImpl::PositionRead(
 void InputStreamImpl::PositionRead(
-    void *buf, size_t nbyte, size_t offset,
-    const std::function<void(const Status &, size_t)> &handler) {
-  AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+    void *buf, size_t nbyte, uint64_t offset,
+    const std::set<std::string> &excluded_datanodes,
+    const std::function<void(const Status &, const std::string &, size_t)>
+        &handler) {
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler);
 }
 }
 }
 }

+ 30 - 20
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h

@@ -33,7 +33,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
   struct State {
   struct State {
     std::unique_ptr<asio::ip::tcp::socket> conn_;
     std::unique_ptr<asio::ip::tcp::socket> conn_;
     std::unique_ptr<Reader> reader_;
     std::unique_ptr<Reader> reader_;
-    std::vector<asio::ip::tcp::endpoint> endpoints_;
+    std::array<asio::ip::tcp::endpoint, 1> endpoints_;
     size_t transferred_;
     size_t transferred_;
     Reader *reader() { return reader_.get(); }
     Reader *reader() { return reader_.get(); }
     size_t *transferred() { return &transferred_; }
     size_t *transferred() { return &transferred_; }
@@ -41,17 +41,15 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
   };
   };
   static continuation::Pipeline<State> *
   static continuation::Pipeline<State> *
   CreatePipeline(::asio::io_service *io_service,
   CreatePipeline(::asio::io_service *io_service,
-                 const ::hadoop::hdfs::LocatedBlockProto &b) {
+                 const ::hadoop::hdfs::DatanodeInfoProto &dn) {
     using namespace ::asio::ip;
     using namespace ::asio::ip;
     auto m = continuation::Pipeline<State>::Create();
     auto m = continuation::Pipeline<State>::Create();
     auto &s = m->state();
     auto &s = m->state();
     s.conn_.reset(new tcp::socket(*io_service));
     s.conn_.reset(new tcp::socket(*io_service));
     s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get()));
     s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get()));
-    for (auto &loc : b.locs()) {
-      auto datanode = loc.id();
-      s.endpoints_.push_back(tcp::endpoint(
-          address::from_string(datanode.ipaddr()), datanode.xferport()));
-    }
+    auto datanode = dn.id();
+    s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
+                                    datanode.xferport());
 
 
     m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
     m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
                                   s.endpoints_.end()));
                                   s.endpoints_.end()));
@@ -125,12 +123,11 @@ private:
 };
 };
 
 
 template <class MutableBufferSequence, class Handler>
 template <class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncPreadSome(size_t offset,
-                                     const MutableBufferSequence &buffers,
-                                     const Handler &handler) {
+void InputStreamImpl::AsyncPreadSome(
+    size_t offset, const MutableBufferSequence &buffers,
+    const std::set<std::string> &excluded_datanodes, const Handler &handler) {
+  using ::hadoop::hdfs::DatanodeInfoProto;
   using ::hadoop::hdfs::LocatedBlockProto;
   using ::hadoop::hdfs::LocatedBlockProto;
-  namespace ip = ::asio::ip;
-  using ::asio::ip::tcp;
 
 
   auto it = std::find_if(
   auto it = std::find_if(
       blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
       blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
@@ -138,10 +135,21 @@ void InputStreamImpl::AsyncPreadSome(size_t offset,
       });
       });
 
 
   if (it == blocks_.end()) {
   if (it == blocks_.end()) {
-    handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
     return;
     return;
-  } else if (!it->locs_size()) {
-    handler(Status::ResourceUnavailable("No datanodes available"), 0);
+  }
+
+  const DatanodeInfoProto *chosen_dn = nullptr;
+  for (int i = 0; i < it->locs_size(); ++i) {
+    const auto &di = it->locs(i);
+    if (!excluded_datanodes.count(di.id().datanodeuuid())) {
+      chosen_dn = &di;
+      break;
+    }
+  }
+
+  if (!chosen_dn) {
+    handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
     return;
     return;
   }
   }
 
 
@@ -150,28 +158,30 @@ void InputStreamImpl::AsyncPreadSome(size_t offset,
       it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
       it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
 
 
   AsyncReadBlock<RemoteBlockReaderTrait>(
   AsyncReadBlock<RemoteBlockReaderTrait>(
-      fs_->rpc_engine().client_name(), *it, offset_within_block,
+      fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block,
       asio::buffer(buffers, size_within_block), handler);
       asio::buffer(buffers, size_within_block), handler);
 }
 }
 
 
 template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
 template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
 void InputStreamImpl::AsyncReadBlock(
 void InputStreamImpl::AsyncReadBlock(
     const std::string &client_name,
     const std::string &client_name,
-    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const hadoop::hdfs::LocatedBlockProto &block,
+    const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
     const MutableBufferSequence &buffers, const Handler &handler) {
     const MutableBufferSequence &buffers, const Handler &handler) {
 
 
   typedef typename BlockReaderTrait::Reader Reader;
   typedef typename BlockReaderTrait::Reader Reader;
   auto m =
   auto m =
-      BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), block);
+      BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);
   auto &s = m->state();
   auto &s = m->state();
   size_t size = asio::buffer_size(buffers);
   size_t size = asio::buffer_size(buffers);
   m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
   m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
                                             &block.b(), size, offset))
                                             &block.b(), size, offset))
       .Push(new ReadBlockContinuation<Reader, decltype(buffers)>(
       .Push(new ReadBlockContinuation<Reader, decltype(buffers)>(
           s.reader(), buffers, s.transferred()));
           s.reader(), buffers, s.transferred()));
-  m->Run([handler](const Status &status,
+  const std::string &dnid = dn.id().datanodeuuid();
+  m->Run([handler, dnid](const Status &status,
                    const typename BlockReaderTrait::State &state) {
                    const typename BlockReaderTrait::State &state) {
-           handler(status, *state.transferred());
+           handler(status, dnid, *state.transferred());
   });
   });
 }
 }
 }
 }

+ 59 - 10
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc

@@ -20,6 +20,8 @@
 #include <gmock/gmock.h>
 #include <gmock/gmock.h>
 
 
 using hadoop::common::TokenProto;
 using hadoop::common::TokenProto;
+using hadoop::hdfs::DatanodeInfoProto;
+using hadoop::hdfs::DatanodeIDProto;
 using hadoop::hdfs::ExtendedBlockProto;
 using hadoop::hdfs::ExtendedBlockProto;
 using hadoop::hdfs::LocatedBlockProto;
 using hadoop::hdfs::LocatedBlockProto;
 using hadoop::hdfs::LocatedBlocksProto;
 using hadoop::hdfs::LocatedBlocksProto;
@@ -57,7 +59,7 @@ template <class Trait> struct MockBlockReaderTrait {
   };
   };
 
 
   static continuation::Pipeline<State> *
   static continuation::Pipeline<State> *
-  CreatePipeline(::asio::io_service *, const LocatedBlockProto &) {
+  CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) {
     auto m = continuation::Pipeline<State>::Create();
     auto m = continuation::Pipeline<State>::Create();
     *m->state().transferred() = 0;
     *m->state().transferred() = 0;
     Trait::InitializeMockReader(m->state().reader());
     Trait::InitializeMockReader(m->state().reader());
@@ -69,6 +71,7 @@ template <class Trait> struct MockBlockReaderTrait {
 TEST(InputStreamTest, TestReadSingleTrunk) {
 TEST(InputStreamTest, TestReadSingleTrunk) {
   LocatedBlocksProto blocks;
   LocatedBlocksProto blocks;
   LocatedBlockProto block;
   LocatedBlockProto block;
+  DatanodeInfoProto dn;
   char buf[4096] = {
   char buf[4096] = {
       0,
       0,
   };
   };
@@ -82,14 +85,14 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
       EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
       EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
           .WillOnce(InvokeArgument<5>(Status::OK()));
           .WillOnce(InvokeArgument<5>(Status::OK()));
 
 
-      EXPECT_CALL(*reader, async_read_some(_,_))
+      EXPECT_CALL(*reader, async_read_some(_, _))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
     }
     }
   };
   };
 
 
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, size_t transferred) {
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
         stat = status;
         stat = status;
         read = transferred;
         read = transferred;
       });
       });
@@ -101,6 +104,7 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
 TEST(InputStreamTest, TestReadMultipleTrunk) {
 TEST(InputStreamTest, TestReadMultipleTrunk) {
   LocatedBlocksProto blocks;
   LocatedBlocksProto blocks;
   LocatedBlockProto block;
   LocatedBlockProto block;
+  DatanodeInfoProto dn;
   char buf[4096] = {
   char buf[4096] = {
       0,
       0,
   };
   };
@@ -114,15 +118,16 @@ TEST(InputStreamTest, TestReadMultipleTrunk) {
       EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
       EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
           .WillOnce(InvokeArgument<5>(Status::OK()));
           .WillOnce(InvokeArgument<5>(Status::OK()));
 
 
-      EXPECT_CALL(*reader, async_read_some(_,_))
+      EXPECT_CALL(*reader, async_read_some(_, _))
           .Times(4)
           .Times(4)
           .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
           .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
     }
     }
   };
   };
 
 
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, size_t transferred) {
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
         stat = status;
         stat = status;
         read = transferred;
         read = transferred;
       });
       });
@@ -134,6 +139,7 @@ TEST(InputStreamTest, TestReadMultipleTrunk) {
 TEST(InputStreamTest, TestReadError) {
 TEST(InputStreamTest, TestReadError) {
   LocatedBlocksProto blocks;
   LocatedBlocksProto blocks;
   LocatedBlockProto block;
   LocatedBlockProto block;
+  DatanodeInfoProto dn;
   char buf[4096] = {
   char buf[4096] = {
       0,
       0,
   };
   };
@@ -147,7 +153,7 @@ TEST(InputStreamTest, TestReadError) {
       EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
       EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
           .WillOnce(InvokeArgument<5>(Status::OK()));
           .WillOnce(InvokeArgument<5>(Status::OK()));
 
 
-      EXPECT_CALL(*reader, async_read_some(_,_))
+      EXPECT_CALL(*reader, async_read_some(_, _))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
           .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
@@ -156,8 +162,9 @@ TEST(InputStreamTest, TestReadError) {
   };
   };
 
 
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
   is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, size_t transferred) {
+      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
         stat = status;
         stat = status;
         read = transferred;
         read = transferred;
       });
       });
@@ -166,6 +173,48 @@ TEST(InputStreamTest, TestReadError) {
   read = 0;
   read = 0;
 }
 }
 
 
+TEST(InputStreamTest, TestExcludeDataNode) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto *block = blocks.add_blocks();
+  ExtendedBlockProto *b = block->mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  DatanodeInfoProto *di = block->add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_, _))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+    }
+  };
+
+
+  std::set<std::string> excluded_dn({"foo"});
+  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn,
+      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
+  ASSERT_EQ(0UL, read);
+}
+
 int main(int argc, char *argv[]) {
 int main(int argc, char *argv[]) {
   // The following line must be executed to initialize Google Mock
   // The following line must be executed to initialize Google Mock
   // (and Google Test) before running the tests.
   // (and Google Test) before running the tests.