Browse Source

HDFS-8774. Implement FileSystem and InputStream API for libhdfspp. Contributed by Haohui Mai.

Haohui Mai 10 năm trước cách đây
mục cha
commit
1efb677976
19 tập tin đã thay đổi với 814 bổ sung22 xóa
  1. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
  2. 95 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
  3. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
  5. 32 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
  6. 14 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
  7. 29 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
  8. 42 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h
  9. 2 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
  10. 105 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
  11. 74 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
  12. 44 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
  13. 179 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
  14. 2 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
  16. 9 14
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
  17. 4 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
  18. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
  19. 174 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt

@@ -19,6 +19,7 @@
 project (libhdfspp)
 
 find_package(Doxygen)
+find_package(OpenSSL REQUIRED)
 find_package(Protobuf REQUIRED)
 find_package(Threads)
 

+ 95 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_HDFS_H_
+#define LIBHDFSPP_HDFS_H_
+
+#include "libhdfspp/status.h"
+
+#include <functional>
+
+namespace hdfs {
+
+/**
+ * An IoService manages a queue of asynchronous tasks. All libhdfs++
+ * operations are filed against a particular IoService.
+ *
+ * When an operation is queued into an IoService, the IoService will
+ * run the callback handler associated with the operation. Note that
+ * the IoService must be stopped before destructing the objects that
+ * file the operations.
+ *
+ * From an implementation point of view the IoService object wraps the
+ * ::asio::io_service objects. Please see the related documentation
+ * for more details.
+ **/
+class IoService {
+public:
+  static IoService *New();
+  /**
+   * Run the asynchronous tasks associated with this IoService.
+   **/
+  virtual void Run() = 0;
+  /**
+   * Stop running asynchronous tasks associated with this IoService.
+   **/
+  virtual void Stop() = 0;
+  virtual ~IoService();
+};
+
+/**
+ * Applications opens an InputStream to read files in HDFS.
+ **/
+class InputStream {
+public:
+  /**
+   * Read data from a specific position. The handler returns the
+   * number of bytes has read.
+   **/
+  virtual void
+  PositionRead(void *buf, size_t nbyte, size_t offset,
+               const std::function<void(const Status &, size_t)> &handler) = 0;
+  virtual ~InputStream();
+};
+
+/**
+ * FileSystem implements APIs to interact with HDFS.
+ **/
+class FileSystem {
+public:
+  /**
+   * Create a new instance of the FileSystem object. The call
+   * initializes the RPC connections to the NameNode and returns an
+   * FileSystem object.
+   **/
+  static void
+  New(IoService *io_service, const std::string &server,
+      const std::string &service,
+      const std::function<void(const Status &, FileSystem *)> &handler);
+  /**
+   * Open a file on HDFS. The call issues an RPC to the NameNode to
+   * gather the locations of all blocks in the file and to return a
+   * new instance of the @ref InputStream object.
+   **/
+  virtual void
+  Open(const std::string &path,
+       const std::function<void(const Status &, InputStream *)> &handler) = 0;
+  virtual ~FileSystem();
+};
+}
+
+#endif

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt

@@ -17,6 +17,7 @@
 #
 
 add_subdirectory(common)
+add_subdirectory(fs)
 add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt

@@ -1 +1 @@
-add_library(common base64.cc status.cc sasl_digest_md5.cc)
+add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc)

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h

@@ -89,6 +89,31 @@ private:
   Iterator *connected_endpoint_;
 };
 
+template <class OutputIterator>
+class ResolveContinuation : public Continuation {
+public:
+  ResolveContinuation(::asio::io_service *io_service, const std::string &server,
+                      const std::string &service, OutputIterator result)
+      : resolver_(*io_service), query_(server, service), result_(result) {}
+
+  virtual void Run(const Next &next) override {
+    using resolver = ::asio::ip::tcp::resolver;
+    auto handler =
+        [this, next](const asio::error_code &ec, resolver::iterator it) {
+          if (!ec) {
+            std::copy(it, resolver::iterator(), result_);
+          }
+          next(ToStatus(ec));
+        };
+    resolver_.async_resolve(query_, handler);
+  }
+
+private:
+  ::asio::ip::tcp::resolver resolver_;
+  ::asio::ip::tcp::resolver::query query_;
+  OutputIterator result_;
+};
+
 template <class Stream, class ConstBufferSequence>
 static inline Continuation *Write(Stream *stream,
                                   const ConstBufferSequence &buffer) {
@@ -106,6 +131,13 @@ static inline Continuation *Connect(Socket *socket, Iterator begin,
                                     Iterator end) {
   return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
 }
+
+template <class OutputIterator>
+static inline Continuation *
+Resolve(::asio::io_service *io_service, const std::string &server,
+        const std::string &service, OutputIterator result) {
+  return new ResolveContinuation<OutputIterator>(io_service, server, service, result);
+}
 }
 }
 

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h

@@ -104,7 +104,7 @@ inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
 
 template <class State>
 inline void Pipeline<State>::Schedule(const Status &status) {
-  if (stage_ >= routines_.size()) {
+  if (!status.ok() || stage_ >= routines_.size()) {
     handler_(status, state_);
     routines_.clear();
     delete this;
@@ -119,6 +119,19 @@ template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
   handler_ = std::move(handler);
   Schedule(Status::OK());
 }
+
+template <class Handler> class BindContinuation : public Continuation {
+public:
+  BindContinuation(const Handler &handler) : handler_(handler) {}
+  virtual void Run(const Next &next) override { handler_(next); }
+
+private:
+  Handler handler_;
+};
+
+template <class Handler> static inline Continuation *Bind(const Handler &handler) {
+  return new BindContinuation<Handler>(handler);
+}
 }
 }
 

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfs_public_api.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+  return new IoServiceImpl();
+}
+
+}

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_PUBLIC_API_H_
+#define COMMON_HDFS_PUBLIC_API_H_
+
+#include "libhdfspp/hdfs.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+class IoServiceImpl : public IoService {
+ public:
+  virtual void Run() override {
+    asio::io_service::work work(io_service_);
+    io_service_.run();
+  }
+  virtual void Stop() override { io_service_.stop(); }
+  ::asio::io_service &io_service() { return io_service_; }
+ private:
+  ::asio::io_service io_service_;
+};
+
+}
+
+#endif

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt

@@ -0,0 +1,2 @@
+add_library(fs filesystem.cc inputstream.cc)
+add_dependencies(fs proto)

+ 105 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+#include "common/continuation/asio.h"
+#include "common/util.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <limits>
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] =
+    "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+FileSystem::~FileSystem() {}
+
+void FileSystem::New(
+    IoService *io_service, const std::string &server,
+    const std::string &service,
+    const std::function<void(const Status &, FileSystem *)> &handler) {
+  FileSystemImpl *impl = new FileSystemImpl(io_service);
+  impl->Connect(server, service, [impl, handler](const Status &stat) {
+    if (stat.ok()) {
+      handler(stat, impl);
+    } else {
+      delete impl;
+      handler(stat, nullptr);
+    }
+  });
+}
+
+FileSystemImpl::FileSystemImpl(IoService *io_service)
+    : io_service_(static_cast<IoServiceImpl *>(io_service)),
+      engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
+              kNamenodeProtocol, kNamenodeProtocolVersion),
+      namenode_(&engine_) {}
+
+void FileSystemImpl::Connect(const std::string &server,
+                             const std::string &service,
+                             std::function<void(const Status &)> &&handler) {
+  using namespace continuation;
+  typedef std::vector<tcp::endpoint> State;
+  auto m = Pipeline<State>::Create();
+  m->Push(Resolve(&io_service_->io_service(), server, service,
+                  std::back_inserter(m->state())))
+      .Push(Bind([this, m](const Continuation::Next &next) {
+        engine_.Connect(m->state(), next);
+      }));
+  m->Run([this, handler](const Status &status, const State &) {
+    if (status.ok()) {
+      engine_.Start();
+    }
+    handler(status);
+  });
+}
+
+void FileSystemImpl::Open(
+    const std::string &path,
+    const std::function<void(const Status &, InputStream *)> &handler) {
+  using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+  using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+  struct State {
+    GetBlockLocationsRequestProto req;
+    std::shared_ptr<GetBlockLocationsResponseProto> resp;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  auto &req = m->state().req;
+  req.set_src(path);
+  req.set_offset(0);
+  req.set_length(std::numeric_limits<long long>::max());
+  m->state().resp.reset(new GetBlockLocationsResponseProto());
+
+  State *s = &m->state();
+  m->Push(continuation::Bind(
+      [this, s](const continuation::Continuation::Next &next) {
+        namenode_.GetBlockLocations(&s->req, s->resp, next);
+      }));
+  m->Run([this, handler](const Status &stat, const State &s) {
+    handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations())
+                            : nullptr);
+  });
+}
+}

+ 74 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+
+#include "common/hdfs_public_api.h"
+#include "libhdfspp/hdfs.h"
+#include "rpc/rpc_engine.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+namespace hdfs {
+
+class FileSystemImpl : public FileSystem {
+public:
+  FileSystemImpl(IoService *io_service);
+  void Connect(const std::string &server, const std::string &service,
+               std::function<void(const Status &)> &&handler);
+  virtual void Open(const std::string &path,
+                    const std::function<void(const Status &, InputStream *)>
+                        &handler) override;
+  RpcEngine &rpc_engine() { return engine_; }
+
+private:
+  IoServiceImpl *io_service_;
+  RpcEngine engine_;
+  ClientNamenodeProtocol namenode_;
+};
+
+class InputStreamImpl : public InputStream {
+public:
+  InputStreamImpl(FileSystemImpl *fs,
+                  const ::hadoop::hdfs::LocatedBlocksProto *blocks);
+  virtual void PositionRead(
+      void *buf, size_t nbyte, size_t offset,
+      const std::function<void(const Status &, size_t)> &handler) override;
+  template <class MutableBufferSequence, class Handler>
+  void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+                      const Handler &handler);
+  template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
+  void AsyncReadBlock(const std::string &client_name,
+                      const hadoop::hdfs::LocatedBlockProto &block,
+                      size_t offset, const MutableBufferSequence &buffers,
+                      const Handler &handler);
+
+private:
+  FileSystemImpl *fs_;
+  unsigned long long file_length_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+  template <class Reader> struct HandshakeContinuation;
+  template <class Reader, class MutableBufferSequence>
+  struct ReadBlockContinuation;
+  struct RemoteBlockReaderTrait;
+};
+}
+
+#include "inputstream_impl.h"
+
+#endif

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+InputStream::~InputStream() {}
+
+InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
+                                 const LocatedBlocksProto *blocks)
+    : fs_(fs), file_length_(blocks->filelength()) {
+  for (const auto &block : blocks->blocks()) {
+    blocks_.push_back(block);
+  }
+
+  if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
+    blocks_.push_back(blocks->lastblock());
+  }
+}
+
+void InputStreamImpl::PositionRead(
+    void *buf, size_t nbyte, size_t offset,
+    const std::function<void(const Status &, size_t)> &handler) {
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+}
+}

+ 179 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h

@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_INPUTSTREAM_IMPL_H_
+#define FS_INPUTSTREAM_IMPL_H_
+
+#include "reader/block_reader.h"
+
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <functional>
+#include <future>
+
+namespace hdfs {
+
+struct InputStreamImpl::RemoteBlockReaderTrait {
+  typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
+  struct State {
+    std::unique_ptr<asio::ip::tcp::socket> conn_;
+    std::unique_ptr<Reader> reader_;
+    std::vector<asio::ip::tcp::endpoint> endpoints_;
+    size_t transferred_;
+    Reader *reader() { return reader_.get(); }
+    size_t *transferred() { return &transferred_; }
+    const size_t *transferred() const { return &transferred_; }
+  };
+  static continuation::Pipeline<State> *
+  CreatePipeline(::asio::io_service *io_service,
+                 const ::hadoop::hdfs::LocatedBlockProto &b) {
+    using namespace ::asio::ip;
+    auto m = continuation::Pipeline<State>::Create();
+    auto &s = m->state();
+    s.conn_.reset(new tcp::socket(*io_service));
+    s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get()));
+    for (auto &loc : b.locs()) {
+      auto datanode = loc.id();
+      s.endpoints_.push_back(tcp::endpoint(
+          address::from_string(datanode.ipaddr()), datanode.xferport()));
+    }
+
+    m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
+                                  s.endpoints_.end()));
+    return m;
+  }
+};
+
+template <class Reader>
+struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
+  HandshakeContinuation(Reader *reader, const std::string &client_name,
+                        const hadoop::common::TokenProto *token,
+                        const hadoop::hdfs::ExtendedBlockProto *block,
+                        uint64_t length, uint64_t offset)
+      : reader_(reader), client_name_(client_name), length_(length),
+        offset_(offset) {
+    if (token) {
+      token_.reset(new hadoop::common::TokenProto());
+      token_->CheckTypeAndMergeFrom(*token);
+    }
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next &next) override {
+    reader_->async_connect(client_name_, token_.get(), &block_, length_,
+                           offset_, next);
+  }
+
+private:
+  Reader *reader_;
+  const std::string client_name_;
+  std::unique_ptr<hadoop::common::TokenProto> token_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+template <class Reader, class MutableBufferSequence>
+struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
+  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
+                        size_t *transferred)
+      : reader_(reader), buffer_(buffer),
+        buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
+
+  virtual void Run(const Next &next) override {
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+private:
+  Reader *reader_;
+  MutableBufferSequence buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->async_read_some(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+template <class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncPreadSome(size_t offset,
+                                     const MutableBufferSequence &buffers,
+                                     const Handler &handler) {
+  using ::hadoop::hdfs::LocatedBlockProto;
+  namespace ip = ::asio::ip;
+  using ::asio::ip::tcp;
+
+  auto it = std::find_if(
+      blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (it == blocks_.end()) {
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+    return;
+  } else if (!it->locs_size()) {
+    handler(Status::ResourceUnavailable("No datanodes available"), 0);
+    return;
+  }
+
+  uint64_t offset_within_block = offset - it->offset();
+  uint64_t size_within_block = std::min<uint64_t>(
+      it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+  AsyncReadBlock<RemoteBlockReaderTrait>(
+      fs_->rpc_engine().client_name(), *it, offset_within_block,
+      asio::buffer(buffers, size_within_block), handler);
+}
+
+template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncReadBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBufferSequence &buffers, const Handler &handler) {
+
+  typedef typename BlockReaderTrait::Reader Reader;
+  auto m =
+      BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), block);
+  auto &s = m->state();
+  size_t size = asio::buffer_size(buffers);
+  m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
+                                            &block.b(), size, offset))
+      .Push(new ReadBlockContinuation<Reader, decltype(buffers)>(
+          s.reader(), buffers, s.transferred()));
+  m->Run([handler](const Status &status,
+                   const typename BlockReaderTrait::State &state) {
+           handler(status, *state.transferred());
+  });
+}
+}
+
+#endif

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc

@@ -81,8 +81,8 @@ void StubGenerator::EmitMethod(const MethodDescriptor *method,
   out->Print(
       "\n  inline void $camel_method$(const Message *req, "
       "const std::shared_ptr<Message> &resp, "
-      "Callback &&handler) {\n"
-      "    engine_->AsyncRpc(\"$method$\", req, resp, std::move(handler));\n"
+      "const Callback &handler) {\n"
+      "    engine_->AsyncRpc(\"$method$\", req, resp, handler);\n"
       "  }\n",
       "camel_method", ToCamelCase(method->name()), "method", method->name());
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc

@@ -182,7 +182,8 @@ std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
 
 void RpcConnection::AsyncRpc(
     const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) {
+    std::shared_ptr<::google::protobuf::MessageLite> resp,
+    const Callback &handler) {
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
 
   auto wrapped_handler =

+ 9 - 14
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -35,20 +35,15 @@ RpcEngine::RpcEngine(::asio::io_service *io_service,
     , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this))
 {}
 
-Status
-RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) {
-  using ::asio::ip::tcp;
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  conn_->Connect(servers, [this, stat](const Status &status) {
-    if (!status.ok()) {
-      stat->set_value(status);
-      return;
+void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers,
+                        const std::function<void(const Status &)> &handler) {
+  conn_->Connect(servers, [this, handler](const Status &stat) {
+    if (!stat.ok()) {
+      handler(stat);
+    } else {
+      conn_->Handshake([handler](const Status &s) { handler(s); });
     }
-    conn_->Handshake(
-        [this, stat](const Status &status) { stat->set_value(status); });
   });
-  return future.get();
 }
 
 void RpcEngine::Start() { conn_->Start(); }
@@ -60,8 +55,8 @@ void RpcEngine::Shutdown() {
 void RpcEngine::AsyncRpc(
     const std::string &method_name, const ::google::protobuf::MessageLite *req,
     const std::shared_ptr<::google::protobuf::MessageLite> &resp,
-    std::function<void(const Status &)> &&handler) {
-  conn_->AsyncRpc(method_name, req, resp, std::move(handler));
+    const std::function<void(const Status &)> &handler) {
+  conn_->AsyncRpc(method_name, req, resp, handler);
 }
 
 Status

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h

@@ -48,7 +48,7 @@ public:
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
                 std::shared_ptr<::google::protobuf::MessageLite> resp,
-                Callback &&handler);
+                const Callback &handler);
 
   void AsyncRawRpc(const std::string &method_name, const std::string &request,
                    std::shared_ptr<std::string> resp, Callback &&handler);
@@ -123,7 +123,7 @@ public:
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
                 const std::shared_ptr<::google::protobuf::MessageLite> &resp,
-                std::function<void(const Status &)> &&handler);
+                const std::function<void(const Status &)> &handler);
 
   Status Rpc(const std::string &method_name,
              const ::google::protobuf::MessageLite *req,
@@ -134,7 +134,8 @@ public:
    **/
   Status RawRpc(const std::string &method_name, const std::string &req,
                 std::shared_ptr<std::string> resp);
-  Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server);
+  void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+               const std::function<void(const Status &)> &handler);
   void Start();
   void Shutdown();
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -25,3 +25,7 @@ add_test(remote_block_reader remote_block_reader_test)
 add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
 target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main)
 add_test(sasl_digest_md5 sasl_digest_md5_test)
+
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main)
+add_test(inputstream inputstream_test)

+ 174 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/filesystem.h"
+#include <gmock/gmock.h>
+
+using hadoop::common::TokenProto;
+using hadoop::hdfs::ExtendedBlockProto;
+using hadoop::hdfs::LocatedBlockProto;
+using hadoop::hdfs::LocatedBlocksProto;
+
+using ::testing::_;
+using ::testing::InvokeArgument;
+using ::testing::Return;
+
+using namespace hdfs;
+
+namespace hdfs {
+
+class MockReader {
+public:
+  virtual ~MockReader() {}
+  MOCK_METHOD2(
+      async_read_some,
+      void(const asio::mutable_buffers_1 &,
+           const std::function<void(const Status &, size_t transferred)> &));
+
+  MOCK_METHOD6(async_connect,
+               void(const std::string &, TokenProto *, ExtendedBlockProto *,
+                    uint64_t, uint64_t,
+                    const std::function<void(const Status &)> &));
+};
+
+template <class Trait> struct MockBlockReaderTrait {
+  typedef MockReader Reader;
+  struct State {
+    MockReader reader_;
+    size_t transferred_;
+    Reader *reader() { return &reader_; }
+    size_t *transferred() { return &transferred_; }
+    const size_t *transferred() const { return &transferred_; }
+  };
+
+  static continuation::Pipeline<State> *
+  CreatePipeline(::asio::io_service *, const LocatedBlockProto &) {
+    auto m = continuation::Pipeline<State>::Create();
+    *m->state().transferred() = 0;
+    Trait::InitializeMockReader(m->state().reader());
+    return m;
+  }
+};
+}
+
+TEST(InputStreamTest, TestReadSingleTrunk) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_,_))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestReadMultipleTrunk) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_,_))
+          .Times(4)
+          .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestReadError) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_,_))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
+  read = 0;
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}