ソースを参照

HDFS-8774. Implement FileSystem and InputStream API for libhdfspp. Contributed by Haohui Mai.

Haohui Mai 9 年 前
コミット
38718b2a21
19 ファイル変更814 行追加22 行削除
  1. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
  2. 95 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
  3. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
  5. 32 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
  6. 14 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
  7. 29 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
  8. 42 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h
  9. 2 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
  10. 105 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
  11. 74 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
  12. 44 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
  13. 179 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
  14. 2 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
  16. 9 14
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
  17. 4 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
  18. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
  19. 174 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt

@@ -19,6 +19,7 @@
 project (libhdfspp)
 
 find_package(Doxygen)
+find_package(OpenSSL REQUIRED)
 find_package(Protobuf REQUIRED)
 find_package(Threads)
 

+ 95 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_HDFS_H_
+#define LIBHDFSPP_HDFS_H_
+
+#include "libhdfspp/status.h"
+
+#include <functional>
+
+namespace hdfs {
+
+/**
+ * An IoService manages a queue of asynchronous tasks. All libhdfs++
+ * operations are filed against a particular IoService.
+ *
+ * When an operation is queued into an IoService, the IoService will
+ * run the callback handler associated with the operation. Note that
+ * the IoService must be stopped before destructing the objects that
+ * file the operations.
+ *
+ * From an implementation point of view the IoService object wraps the
+ * ::asio::io_service objects. Please see the related documentation
+ * for more details.
+ **/
+class IoService {
+public:
+  static IoService *New();
+  /**
+   * Run the asynchronous tasks associated with this IoService.
+   **/
+  virtual void Run() = 0;
+  /**
+   * Stop running asynchronous tasks associated with this IoService.
+   **/
+  virtual void Stop() = 0;
+  virtual ~IoService();
+};
+
+/**
+ * Applications opens an InputStream to read files in HDFS.
+ **/
+class InputStream {
+public:
+  /**
+   * Read data from a specific position. The handler returns the
+   * number of bytes has read.
+   **/
+  virtual void
+  PositionRead(void *buf, size_t nbyte, size_t offset,
+               const std::function<void(const Status &, size_t)> &handler) = 0;
+  virtual ~InputStream();
+};
+
+/**
+ * FileSystem implements APIs to interact with HDFS.
+ **/
+class FileSystem {
+public:
+  /**
+   * Create a new instance of the FileSystem object. The call
+   * initializes the RPC connections to the NameNode and returns an
+   * FileSystem object.
+   **/
+  static void
+  New(IoService *io_service, const std::string &server,
+      const std::string &service,
+      const std::function<void(const Status &, FileSystem *)> &handler);
+  /**
+   * Open a file on HDFS. The call issues an RPC to the NameNode to
+   * gather the locations of all blocks in the file and to return a
+   * new instance of the @ref InputStream object.
+   **/
+  virtual void
+  Open(const std::string &path,
+       const std::function<void(const Status &, InputStream *)> &handler) = 0;
+  virtual ~FileSystem();
+};
+}
+
+#endif

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt

@@ -17,6 +17,7 @@
 #
 
 add_subdirectory(common)
+add_subdirectory(fs)
 add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt

@@ -1 +1 @@
-add_library(common base64.cc status.cc sasl_digest_md5.cc)
+add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc)

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h

@@ -89,6 +89,31 @@ private:
   Iterator *connected_endpoint_;
 };
 
+template <class OutputIterator>
+class ResolveContinuation : public Continuation {
+public:
+  ResolveContinuation(::asio::io_service *io_service, const std::string &server,
+                      const std::string &service, OutputIterator result)
+      : resolver_(*io_service), query_(server, service), result_(result) {}
+
+  virtual void Run(const Next &next) override {
+    using resolver = ::asio::ip::tcp::resolver;
+    auto handler =
+        [this, next](const asio::error_code &ec, resolver::iterator it) {
+          if (!ec) {
+            std::copy(it, resolver::iterator(), result_);
+          }
+          next(ToStatus(ec));
+        };
+    resolver_.async_resolve(query_, handler);
+  }
+
+private:
+  ::asio::ip::tcp::resolver resolver_;
+  ::asio::ip::tcp::resolver::query query_;
+  OutputIterator result_;
+};
+
 template <class Stream, class ConstBufferSequence>
 static inline Continuation *Write(Stream *stream,
                                   const ConstBufferSequence &buffer) {
@@ -106,6 +131,13 @@ static inline Continuation *Connect(Socket *socket, Iterator begin,
                                     Iterator end) {
   return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
 }
+
+template <class OutputIterator>
+static inline Continuation *
+Resolve(::asio::io_service *io_service, const std::string &server,
+        const std::string &service, OutputIterator result) {
+  return new ResolveContinuation<OutputIterator>(io_service, server, service, result);
+}
 }
 }
 

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h

@@ -104,7 +104,7 @@ inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
 
 template <class State>
 inline void Pipeline<State>::Schedule(const Status &status) {
-  if (stage_ >= routines_.size()) {
+  if (!status.ok() || stage_ >= routines_.size()) {
     handler_(status, state_);
     routines_.clear();
     delete this;
@@ -119,6 +119,19 @@ template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
   handler_ = std::move(handler);
   Schedule(Status::OK());
 }
+
+template <class Handler> class BindContinuation : public Continuation {
+public:
+  BindContinuation(const Handler &handler) : handler_(handler) {}
+  virtual void Run(const Next &next) override { handler_(next); }
+
+private:
+  Handler handler_;
+};
+
+template <class Handler> static inline Continuation *Bind(const Handler &handler) {
+  return new BindContinuation<Handler>(handler);
+}
 }
 }
 

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfs_public_api.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+  return new IoServiceImpl();
+}
+
+}

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_PUBLIC_API_H_
+#define COMMON_HDFS_PUBLIC_API_H_
+
+#include "libhdfspp/hdfs.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+class IoServiceImpl : public IoService {
+ public:
+  virtual void Run() override {
+    asio::io_service::work work(io_service_);
+    io_service_.run();
+  }
+  virtual void Stop() override { io_service_.stop(); }
+  ::asio::io_service &io_service() { return io_service_; }
+ private:
+  ::asio::io_service io_service_;
+};
+
+}
+
+#endif

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt

@@ -0,0 +1,2 @@
+add_library(fs filesystem.cc inputstream.cc)
+add_dependencies(fs proto)

+ 105 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+#include "common/continuation/asio.h"
+#include "common/util.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <limits>
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] =
+    "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+FileSystem::~FileSystem() {}
+
+void FileSystem::New(
+    IoService *io_service, const std::string &server,
+    const std::string &service,
+    const std::function<void(const Status &, FileSystem *)> &handler) {
+  FileSystemImpl *impl = new FileSystemImpl(io_service);
+  impl->Connect(server, service, [impl, handler](const Status &stat) {
+    if (stat.ok()) {
+      handler(stat, impl);
+    } else {
+      delete impl;
+      handler(stat, nullptr);
+    }
+  });
+}
+
+FileSystemImpl::FileSystemImpl(IoService *io_service)
+    : io_service_(static_cast<IoServiceImpl *>(io_service)),
+      engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
+              kNamenodeProtocol, kNamenodeProtocolVersion),
+      namenode_(&engine_) {}
+
+void FileSystemImpl::Connect(const std::string &server,
+                             const std::string &service,
+                             std::function<void(const Status &)> &&handler) {
+  using namespace continuation;
+  typedef std::vector<tcp::endpoint> State;
+  auto m = Pipeline<State>::Create();
+  m->Push(Resolve(&io_service_->io_service(), server, service,
+                  std::back_inserter(m->state())))
+      .Push(Bind([this, m](const Continuation::Next &next) {
+        engine_.Connect(m->state(), next);
+      }));
+  m->Run([this, handler](const Status &status, const State &) {
+    if (status.ok()) {
+      engine_.Start();
+    }
+    handler(status);
+  });
+}
+
+void FileSystemImpl::Open(
+    const std::string &path,
+    const std::function<void(const Status &, InputStream *)> &handler) {
+  using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+  using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+  struct State {
+    GetBlockLocationsRequestProto req;
+    std::shared_ptr<GetBlockLocationsResponseProto> resp;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  auto &req = m->state().req;
+  req.set_src(path);
+  req.set_offset(0);
+  req.set_length(std::numeric_limits<long long>::max());
+  m->state().resp.reset(new GetBlockLocationsResponseProto());
+
+  State *s = &m->state();
+  m->Push(continuation::Bind(
+      [this, s](const continuation::Continuation::Next &next) {
+        namenode_.GetBlockLocations(&s->req, s->resp, next);
+      }));
+  m->Run([this, handler](const Status &stat, const State &s) {
+    handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations())
+                            : nullptr);
+  });
+}
+}

+ 74 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+
+#include "common/hdfs_public_api.h"
+#include "libhdfspp/hdfs.h"
+#include "rpc/rpc_engine.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+namespace hdfs {
+
+class FileSystemImpl : public FileSystem {
+public:
+  FileSystemImpl(IoService *io_service);
+  void Connect(const std::string &server, const std::string &service,
+               std::function<void(const Status &)> &&handler);
+  virtual void Open(const std::string &path,
+                    const std::function<void(const Status &, InputStream *)>
+                        &handler) override;
+  RpcEngine &rpc_engine() { return engine_; }
+
+private:
+  IoServiceImpl *io_service_;
+  RpcEngine engine_;
+  ClientNamenodeProtocol namenode_;
+};
+
+class InputStreamImpl : public InputStream {
+public:
+  InputStreamImpl(FileSystemImpl *fs,
+                  const ::hadoop::hdfs::LocatedBlocksProto *blocks);
+  virtual void PositionRead(
+      void *buf, size_t nbyte, size_t offset,
+      const std::function<void(const Status &, size_t)> &handler) override;
+  template <class MutableBufferSequence, class Handler>
+  void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+                      const Handler &handler);
+  template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
+  void AsyncReadBlock(const std::string &client_name,
+                      const hadoop::hdfs::LocatedBlockProto &block,
+                      size_t offset, const MutableBufferSequence &buffers,
+                      const Handler &handler);
+
+private:
+  FileSystemImpl *fs_;
+  unsigned long long file_length_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+  template <class Reader> struct HandshakeContinuation;
+  template <class Reader, class MutableBufferSequence>
+  struct ReadBlockContinuation;
+  struct RemoteBlockReaderTrait;
+};
+}
+
+#include "inputstream_impl.h"
+
+#endif

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+InputStream::~InputStream() {}
+
+InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
+                                 const LocatedBlocksProto *blocks)
+    : fs_(fs), file_length_(blocks->filelength()) {
+  for (const auto &block : blocks->blocks()) {
+    blocks_.push_back(block);
+  }
+
+  if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
+    blocks_.push_back(blocks->lastblock());
+  }
+}
+
+void InputStreamImpl::PositionRead(
+    void *buf, size_t nbyte, size_t offset,
+    const std::function<void(const Status &, size_t)> &handler) {
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+}
+}

+ 179 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h

@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_INPUTSTREAM_IMPL_H_
+#define FS_INPUTSTREAM_IMPL_H_
+
+#include "reader/block_reader.h"
+
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <functional>
+#include <future>
+
+namespace hdfs {
+
+struct InputStreamImpl::RemoteBlockReaderTrait {
+  typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
+  struct State {
+    std::unique_ptr<asio::ip::tcp::socket> conn_;
+    std::unique_ptr<Reader> reader_;
+    std::vector<asio::ip::tcp::endpoint> endpoints_;
+    size_t transferred_;
+    Reader *reader() { return reader_.get(); }
+    size_t *transferred() { return &transferred_; }
+    const size_t *transferred() const { return &transferred_; }
+  };
+  static continuation::Pipeline<State> *
+  CreatePipeline(::asio::io_service *io_service,
+                 const ::hadoop::hdfs::LocatedBlockProto &b) {
+    using namespace ::asio::ip;
+    auto m = continuation::Pipeline<State>::Create();
+    auto &s = m->state();
+    s.conn_.reset(new tcp::socket(*io_service));
+    s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get()));
+    for (auto &loc : b.locs()) {
+      auto datanode = loc.id();
+      s.endpoints_.push_back(tcp::endpoint(
+          address::from_string(datanode.ipaddr()), datanode.xferport()));
+    }
+
+    m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
+                                  s.endpoints_.end()));
+    return m;
+  }
+};
+
+template <class Reader>
+struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
+  HandshakeContinuation(Reader *reader, const std::string &client_name,
+                        const hadoop::common::TokenProto *token,
+                        const hadoop::hdfs::ExtendedBlockProto *block,
+                        uint64_t length, uint64_t offset)
+      : reader_(reader), client_name_(client_name), length_(length),
+        offset_(offset) {
+    if (token) {
+      token_.reset(new hadoop::common::TokenProto());
+      token_->CheckTypeAndMergeFrom(*token);
+    }
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next &next) override {
+    reader_->async_connect(client_name_, token_.get(), &block_, length_,
+                           offset_, next);
+  }
+
+private:
+  Reader *reader_;
+  const std::string client_name_;
+  std::unique_ptr<hadoop::common::TokenProto> token_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+template <class Reader, class MutableBufferSequence>
+struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
+  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
+                        size_t *transferred)
+      : reader_(reader), buffer_(buffer),
+        buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
+
+  virtual void Run(const Next &next) override {
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+private:
+  Reader *reader_;
+  MutableBufferSequence buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->async_read_some(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+template <class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncPreadSome(size_t offset,
+                                     const MutableBufferSequence &buffers,
+                                     const Handler &handler) {
+  using ::hadoop::hdfs::LocatedBlockProto;
+  namespace ip = ::asio::ip;
+  using ::asio::ip::tcp;
+
+  auto it = std::find_if(
+      blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (it == blocks_.end()) {
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+    return;
+  } else if (!it->locs_size()) {
+    handler(Status::ResourceUnavailable("No datanodes available"), 0);
+    return;
+  }
+
+  uint64_t offset_within_block = offset - it->offset();
+  uint64_t size_within_block = std::min<uint64_t>(
+      it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+  AsyncReadBlock<RemoteBlockReaderTrait>(
+      fs_->rpc_engine().client_name(), *it, offset_within_block,
+      asio::buffer(buffers, size_within_block), handler);
+}
+
+template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncReadBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBufferSequence &buffers, const Handler &handler) {
+
+  typedef typename BlockReaderTrait::Reader Reader;
+  auto m =
+      BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), block);
+  auto &s = m->state();
+  size_t size = asio::buffer_size(buffers);
+  m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
+                                            &block.b(), size, offset))
+      .Push(new ReadBlockContinuation<Reader, decltype(buffers)>(
+          s.reader(), buffers, s.transferred()));
+  m->Run([handler](const Status &status,
+                   const typename BlockReaderTrait::State &state) {
+           handler(status, *state.transferred());
+  });
+}
+}
+
+#endif

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc

@@ -81,8 +81,8 @@ void StubGenerator::EmitMethod(const MethodDescriptor *method,
   out->Print(
       "\n  inline void $camel_method$(const Message *req, "
       "const std::shared_ptr<Message> &resp, "
-      "Callback &&handler) {\n"
-      "    engine_->AsyncRpc(\"$method$\", req, resp, std::move(handler));\n"
+      "const Callback &handler) {\n"
+      "    engine_->AsyncRpc(\"$method$\", req, resp, handler);\n"
       "  }\n",
       "camel_method", ToCamelCase(method->name()), "method", method->name());
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc

@@ -182,7 +182,8 @@ std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
 
 void RpcConnection::AsyncRpc(
     const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) {
+    std::shared_ptr<::google::protobuf::MessageLite> resp,
+    const Callback &handler) {
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
 
   auto wrapped_handler =

+ 9 - 14
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -35,20 +35,15 @@ RpcEngine::RpcEngine(::asio::io_service *io_service,
     , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this))
 {}
 
-Status
-RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) {
-  using ::asio::ip::tcp;
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  conn_->Connect(servers, [this, stat](const Status &status) {
-    if (!status.ok()) {
-      stat->set_value(status);
-      return;
+void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers,
+                        const std::function<void(const Status &)> &handler) {
+  conn_->Connect(servers, [this, handler](const Status &stat) {
+    if (!stat.ok()) {
+      handler(stat);
+    } else {
+      conn_->Handshake([handler](const Status &s) { handler(s); });
     }
-    conn_->Handshake(
-        [this, stat](const Status &status) { stat->set_value(status); });
   });
-  return future.get();
 }
 
 void RpcEngine::Start() { conn_->Start(); }
@@ -60,8 +55,8 @@ void RpcEngine::Shutdown() {
 void RpcEngine::AsyncRpc(
     const std::string &method_name, const ::google::protobuf::MessageLite *req,
     const std::shared_ptr<::google::protobuf::MessageLite> &resp,
-    std::function<void(const Status &)> &&handler) {
-  conn_->AsyncRpc(method_name, req, resp, std::move(handler));
+    const std::function<void(const Status &)> &handler) {
+  conn_->AsyncRpc(method_name, req, resp, handler);
 }
 
 Status

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h

@@ -48,7 +48,7 @@ public:
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
                 std::shared_ptr<::google::protobuf::MessageLite> resp,
-                Callback &&handler);
+                const Callback &handler);
 
   void AsyncRawRpc(const std::string &method_name, const std::string &request,
                    std::shared_ptr<std::string> resp, Callback &&handler);
@@ -123,7 +123,7 @@ public:
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
                 const std::shared_ptr<::google::protobuf::MessageLite> &resp,
-                std::function<void(const Status &)> &&handler);
+                const std::function<void(const Status &)> &handler);
 
   Status Rpc(const std::string &method_name,
              const ::google::protobuf::MessageLite *req,
@@ -134,7 +134,8 @@ public:
    **/
   Status RawRpc(const std::string &method_name, const std::string &req,
                 std::shared_ptr<std::string> resp);
-  Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server);
+  void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+               const std::function<void(const Status &)> &handler);
   void Start();
   void Shutdown();
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -25,3 +25,7 @@ add_test(remote_block_reader remote_block_reader_test)
 add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
 target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main)
 add_test(sasl_digest_md5 sasl_digest_md5_test)
+
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main)
+add_test(inputstream inputstream_test)

+ 174 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/filesystem.h"
+#include <gmock/gmock.h>
+
+using hadoop::common::TokenProto;
+using hadoop::hdfs::ExtendedBlockProto;
+using hadoop::hdfs::LocatedBlockProto;
+using hadoop::hdfs::LocatedBlocksProto;
+
+using ::testing::_;
+using ::testing::InvokeArgument;
+using ::testing::Return;
+
+using namespace hdfs;
+
+namespace hdfs {
+
+class MockReader {
+public:
+  virtual ~MockReader() {}
+  MOCK_METHOD2(
+      async_read_some,
+      void(const asio::mutable_buffers_1 &,
+           const std::function<void(const Status &, size_t transferred)> &));
+
+  MOCK_METHOD6(async_connect,
+               void(const std::string &, TokenProto *, ExtendedBlockProto *,
+                    uint64_t, uint64_t,
+                    const std::function<void(const Status &)> &));
+};
+
+template <class Trait> struct MockBlockReaderTrait {
+  typedef MockReader Reader;
+  struct State {
+    MockReader reader_;
+    size_t transferred_;
+    Reader *reader() { return &reader_; }
+    size_t *transferred() { return &transferred_; }
+    const size_t *transferred() const { return &transferred_; }
+  };
+
+  static continuation::Pipeline<State> *
+  CreatePipeline(::asio::io_service *, const LocatedBlockProto &) {
+    auto m = continuation::Pipeline<State>::Create();
+    *m->state().transferred() = 0;
+    Trait::InitializeMockReader(m->state().reader());
+    return m;
+  }
+};
+}
+
+TEST(InputStreamTest, TestReadSingleTrunk) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_,_))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestReadMultipleTrunk) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_,_))
+          .Times(4)
+          .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(InputStreamTest, TestReadError) {
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  FileSystemImpl fs(&io_service);
+  InputStreamImpl is(&fs, &blocks);
+  Status stat;
+  size_t read = 0;
+  struct Trait {
+    static void InitializeMockReader(MockReader *reader) {
+      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
+          .WillOnce(InvokeArgument<5>(Status::OK()));
+
+      EXPECT_CALL(*reader, async_read_some(_,_))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+          .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
+    }
+  };
+
+  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
+      "client", block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
+  read = 0;
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}