Browse Source

HDFS-9144. Refactoring libhdfs++ into stateful/ephemeral objects. Contributed by Bob Hansen.

James 10 năm trước cách đây
mục cha
commit
d7ecf396c9
38 tập tin đã thay đổi với 1562 bổ sung1284 xóa
  1. 26 12
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
  2. 11 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
  3. 1 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
  5. 22 15
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
  6. 0 216
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
  7. 0 105
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
  8. 18 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
  9. 49 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
  10. 3 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
  11. 9 12
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
  12. 0 16
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
  13. 2 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
  14. 35 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
  15. 7 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
  16. 2 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
  17. 57 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
  18. 66 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
  19. 1 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
  20. 240 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
  21. 115 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
  22. 179 33
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
  23. 86 51
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
  24. 0 48
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
  25. 0 207
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
  26. 1 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
  27. 193 102
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
  28. 59 24
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
  29. 19 9
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
  30. 5 18
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
  31. 36 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
  32. 0 46
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
  33. 0 14
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
  34. 4 8
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
  35. 141 67
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
  36. 0 232
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
  37. 10 5
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
  38. 164 38
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc

+ 26 - 12
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -24,6 +24,7 @@
 #include <functional>
 #include <memory>
 #include <set>
+#include <iostream>
 
 namespace hdfs {
 
@@ -68,10 +69,10 @@ class NodeExclusionRule {
 };
 
 /**
- * Applications opens an InputStream to read files in HDFS.
+ * Applications opens a FileHandle to read files in HDFS.
  **/
-class InputStream {
- public:
+class FileHandle {
+public:
   /**
    * Read data from a specific position. The current implementation
    * stops at the block boundary.
@@ -83,10 +84,14 @@ class InputStream {
    * The handler returns the datanode that serves the block and the number of
    * bytes has read.
    **/
-  virtual void PositionRead(
-      void *buf, size_t nbyte, uint64_t offset,
-      const std::function<void(const Status &, const std::string &, size_t)> &
-          handler) = 0;
+  virtual void
+  PositionRead(void *buf, size_t nbyte, uint64_t offset,
+               const std::function<void(const Status &, size_t)> &handler) = 0;
+
+  virtual Status PositionRead(void *buf, size_t *nbyte, off_t offset) = 0;
+  virtual Status Read(void *buf, size_t *nbyte) = 0;
+  virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0;
+
   /**
    * Determine if a datanode should be excluded from future operations
    * based on the return Status.
@@ -97,7 +102,7 @@ class InputStream {
    **/
   static bool ShouldExclude(const Status &status);
 
-  virtual ~InputStream();
+  virtual ~FileHandle();
 };
 
 /**
@@ -114,15 +119,24 @@ class FileSystem {
       IoService *io_service, const Options &options, const std::string &server,
       const std::string &service,
       const std::function<void(const Status &, FileSystem *)> &handler);
+
+  /* Synchronous call of New*/
+  static FileSystem *
+  New(IoService *io_service, const Options &options, const std::string &server,
+      const std::string &service);
+
   /**
    * Open a file on HDFS. The call issues an RPC to the NameNode to
    * gather the locations of all blocks in the file and to return a
    * new instance of the @ref InputStream object.
    **/
-  virtual void Open(
-      const std::string &path,
-      const std::function<void(const Status &, InputStream *)> &handler) = 0;
-  virtual ~FileSystem();
+  virtual void
+  Open(const std::string &path,
+       const std::function<void(const Status &, FileHandle *)> &handler) = 0;
+  virtual Status Open(const std::string &path, FileHandle **handle) = 0;
+
+  virtual ~FileSystem() {};
+
 };
 }
 

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h

@@ -30,6 +30,17 @@ struct Options {
    **/
   int rpc_timeout;
 
+  /**
+   * Maximum number of retries for RPC operations
+   **/
+  const static int NO_RPC_RETRY = -1;
+  int max_rpc_retries;
+
+  /**
+   * Number of ms to wait between retry of RPC operations
+   **/
+  int rpc_retry_delay_ms;
+
   /**
    * Exclusion time for failed datanodes in milliseconds.
    * Default: 60000

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt

@@ -21,4 +21,5 @@ add_subdirectory(fs)
 add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)
+add_subdirectory(connection)
 add_subdirectory(bindings)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt

@@ -16,5 +16,5 @@
 # under the License.
 
 
-add_library(bindings_c hdfs.cc hdfs_cpp.cc)
+add_library(bindings_c hdfs.cc)
 add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common)

+ 22 - 15
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

@@ -16,8 +16,10 @@
  * limitations under the License.
  */
 
-#include "hdfs_cpp.h"
+#include "fs/filesystem.h"
 
+#include <hdfs/hdfs.h>
+#include <string>
 #include <cstring>
 #include <iostream>
 
@@ -25,15 +27,15 @@ using namespace hdfs;
 
 /* Seperate the handles used by the C api from the C++ API*/
 struct hdfs_internal {
-  hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {}
-  hdfs_internal(std::unique_ptr<HadoopFileSystem> p)
+  hdfs_internal(FileSystem *p) : filesystem_(p) {}
+  hdfs_internal(std::unique_ptr<FileSystem> p)
       : filesystem_(std::move(p)) {}
   virtual ~hdfs_internal(){};
-  HadoopFileSystem *get_impl() { return filesystem_.get(); }
-  const HadoopFileSystem *get_impl() const { return filesystem_.get(); }
+  FileSystem *get_impl() { return filesystem_.get(); }
+  const FileSystem *get_impl() const { return filesystem_.get(); }
 
  private:
-  std::unique_ptr<HadoopFileSystem> filesystem_;
+  std::unique_ptr<FileSystem> filesystem_;
 };
 
 struct hdfsFile_internal {
@@ -102,17 +104,23 @@ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
 int hdfsFileIsOpenForRead(hdfsFile file) {
   /* files can only be open for reads at the moment, do a quick check */
   if (file) {
-    return file->get_impl()->IsOpenForRead();
+    return true; // Update implementation when we get file writing
   }
   return false;
 }
 
 hdfsFS hdfsConnect(const char *nn, tPort port) {
-  HadoopFileSystem *fs = new HadoopFileSystem();
-  Status stat = fs->Connect(nn, port);
-  if (!stat.ok()) {
+  std::string port_as_string = std::to_string(port);
+  IoService * io_service = IoService::New();
+  FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
+  if (!fs) {
     ReportError(ENODEV, "Unable to connect to NameNode.");
-    delete fs;
+
+    // FileSystem's ctor might take ownership of the io_service; if it does,
+    //    it will null out the pointer
+    if (io_service)
+      delete io_service;
+
     return nullptr;
   }
   return new hdfs_internal(fs);
@@ -139,7 +147,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
     return nullptr;
   }
   FileHandle *f = nullptr;
-  Status stat = fs->get_impl()->OpenFileForRead(path, &f);
+  Status stat = fs->get_impl()->Open(path, &f);
   if (!stat.ok()) {
     return nullptr;
   }
@@ -150,7 +158,6 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
   if (!CheckSystemAndHandle(fs, file)) {
     return -1;
   }
-
   delete file;
   return 0;
 }
@@ -162,8 +169,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
   }
 
   size_t len = length;
-  Status stat = file->get_impl()->Pread(buffer, &len, position);
-  if (!stat.ok()) {
+  Status stat = file->get_impl()->PositionRead(buffer, &len, position);
+  if(!stat.ok()) {
     return Error(stat);
   }
   return (tSize)len;

+ 0 - 216
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc

@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "hdfs_cpp.h"
-
-#include <cstdint>
-#include <cerrno>
-#include <string>
-#include <future>
-#include <memory>
-#include <thread>
-#include <vector>
-#include <set>
-#include <tuple>
-
-#include <hdfs/hdfs.h>
-#include "libhdfspp/hdfs.h"
-#include "libhdfspp/status.h"
-#include "fs/filesystem.h"
-#include "common/hdfs_public_api.h"
-
-namespace hdfs {
-
-FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){}
-
-Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) {
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, std::string, size_t>>>();
-  std::future<std::tuple<Status, std::string, size_t>> future(callstate->get_future());
-
-  /* wrap async call with promise/future to make it blocking */
-  auto callback = [callstate](
-      const Status &s, const std::string &dn, size_t bytes) {
-    callstate->set_value(std::make_tuple(s, dn, bytes));
-  };
-
-  input_stream_->PositionRead(buf, *nbyte, offset, callback);
-
-  /* wait for async to finish */
-  auto returnstate = future.get();
-  auto stat = std::get<0>(returnstate);
-
-  if (!stat.ok()) {
-    /* determine if DN gets marked bad */
-    if (InputStream::ShouldExclude(stat)) {
-      InputStreamImpl *impl =
-          static_cast<InputStreamImpl *>(input_stream_.get());
-      impl->bad_node_tracker_->AddBadNode(std::get<1>(returnstate));
-    }
-
-    return stat;
-  }
-  *nbyte = std::get<2>(returnstate);
-  return Status::OK();
-}
-
-Status FileHandle::Read(void *buf, size_t *nbyte) {
-  Status stat = Pread(buf, nbyte, offset_);
-  if (!stat.ok()) {
-    return stat;
-  }
-
-  offset_ += *nbyte;
-  return Status::OK();
-}
-
-Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) {
-  off_t new_offset = -1;
-
-  switch (whence) {
-    case std::ios_base::beg:
-      new_offset = *offset;
-      break;
-    case std::ios_base::cur:
-      new_offset = offset_ + *offset;
-      break;
-    case std::ios_base::end:
-      new_offset = static_cast<InputStreamImpl *>(input_stream_.get())
-                       ->get_file_length() +
-                   *offset;
-      break;
-    default:
-      /* unsupported */
-      return Status::InvalidArgument("Invalid Seek whence argument");
-  }
-
-  if (!CheckSeekBounds(new_offset)) {
-    return Status::InvalidArgument("Seek offset out of bounds");
-  }
-  offset_ = new_offset;
-
-  *offset = offset_;
-  return Status::OK();
-}
-
-/* return false if seek will be out of bounds */
-bool FileHandle::CheckSeekBounds(ssize_t desired_position) {
-  ssize_t file_length =
-      static_cast<InputStreamImpl *>(input_stream_.get())->get_file_length();
-
-  if (desired_position < 0 || desired_position >= file_length) {
-    return false;
-  }
-
-  return true;
-}
-
-bool FileHandle::IsOpenForRead() {
-  /* for now just check if InputStream exists */
-  if (!input_stream_) {
-    return false;
-  }
-  return true;
-}
-
-HadoopFileSystem::~HadoopFileSystem() {
-  /**
-   * Note: IoService must be stopped before getting rid of worker threads.
-   * Once worker threads are joined and deleted the service can be deleted.
-   **/
-
-  file_system_.reset(nullptr);
-  service_->Stop();
-  worker_threads_.clear();
-  service_.reset(nullptr);
-}
-
-Status HadoopFileSystem::Connect(const char *nn, tPort port,
-                                 unsigned int threads) {
-  /* IoService::New can return nullptr */
-  if (!service_) {
-    return Status::Error("Null IoService");
-  }
-  /* spawn background threads for asio delegation */
-  for (unsigned int i = 0; i < threads; i++) {
-    AddWorkerThread();
-  }
-  /* synchronized */
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem*>>>();
-  std::future<std::tuple<Status, FileSystem*>> future(callstate->get_future());
-
-  auto callback = [callstate](const Status &s, FileSystem *f) {
-    callstate->set_value(std::make_tuple(s,f));
-  };
-
-  /* dummy options object until this is hooked up to HDFS-9117 */
-  Options options_object;
-  FileSystem::New(service_.get(), options_object, nn, std::to_string(port),
-                  callback);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  FileSystem *fs = std::get<1>(returnstate);
-
-  /* check and see if it worked */
-  if (!stat.ok() || !fs) {
-    service_->Stop();
-    worker_threads_.clear();
-    return stat;
-  }
-
-  file_system_ = std::unique_ptr<FileSystem>(fs);
-  return stat;
-}
-
-int HadoopFileSystem::AddWorkerThread() {
-  auto service_task = [](IoService *service) { service->Run(); };
-  worker_threads_.push_back(
-      WorkerPtr(new std::thread(service_task, service_.get())));
-  return worker_threads_.size();
-}
-
-Status HadoopFileSystem::OpenFileForRead(const std::string &path,
-                                         FileHandle **handle) {
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, InputStream*>>>();
-  std::future<std::tuple<Status, InputStream*>> future(callstate->get_future());
-
-  /* wrap async FileSystem::Open with promise to make it a blocking call */
-  auto h = [callstate](const Status &s, InputStream *is) {
-    callstate->set_value(std::make_tuple(s, is));
-  };
-
-  file_system_->Open(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  InputStream *input_stream = std::get<1>(returnstate);
-
-  if (!stat.ok()) {
-    delete input_stream;
-    return stat;
-  }
-  if (!input_stream) {
-    return stat;
-  }
-
-  *handle = new FileHandle(input_stream);
-  return stat;
-}
-}

+ 0 - 105
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h

@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H
-#define LIBHDFSPP_BINDINGS_HDFSCPP_H
-
-#include <cstdint>
-#include <thread>
-#include <vector>
-#include <mutex>
-#include <chrono>
-#include <iostream>
-
-#include "libhdfspp/hdfs.h"
-#include "fs/bad_datanode_tracker.h"
-#include <hdfs/hdfs.h>
-
-namespace hdfs {
-
-/**
- * Implement a very simple 'it just works' interface in C++
- * that provides posix-like file operations + extra stuff for hadoop.
- * Then provide very thin C wrappers over each method.
- */
-
-class HadoopFileSystem;
-
-class FileHandle {
- public:
-  virtual ~FileHandle(){};
-  /**
-   * Note:  The nbyte argument for Read and Pread as well as the
-   * offset argument for Seek are in/out parameters.
-   *
-   * For Read and Pread the value referenced by nbyte should
-   * be set to the number of bytes to read. Before returning
-   * the value referenced will be set by the callee to the number
-   * of bytes that was successfully read.
-   *
-   * For Seek the value referenced by offset should be the number
-   * of bytes to shift from the specified whence position.  The
-   * referenced value will be set to the new offset before returning.
-   **/
-  Status Pread(void *buf, size_t *nbyte, off_t offset);
-  Status Read(void *buf, size_t *nbyte);
-  Status Seek(off_t *offset, std::ios_base::seekdir whence);
-  bool IsOpenForRead();
-
- private:
-  /* handle should only be created by fs */
-  friend class HadoopFileSystem;
-  FileHandle(InputStream *is);
-  bool CheckSeekBounds(ssize_t desired_position);
-  std::unique_ptr<InputStream> input_stream_;
-  off_t offset_;
-};
-
-class HadoopFileSystem {
- public:
-  HadoopFileSystem() : service_(IoService::New()) {}
-  virtual ~HadoopFileSystem();
-
-  /* attempt to connect to namenode, return false on failure */
-  Status Connect(const char *nn, tPort port, unsigned int threads = 1);
-
-  /* how many worker threads are servicing asio requests */
-  int WorkerThreadCount() { return worker_threads_.size(); }
-
-  /* add a new thread to handle asio requests, return number of threads in pool
-   */
-  int AddWorkerThread();
-
-  Status OpenFileForRead(const std::string &path, FileHandle **handle);
-
- private:
-  std::unique_ptr<IoService> service_;
-  /* std::thread needs to join before deletion */
-  struct WorkerDeleter {
-    void operator()(std::thread *t) {
-      t->join();
-      delete t;
-    }
-  };
-  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
-  std::vector<WorkerPtr> worker_threads_;
-  std::unique_ptr<FileSystem> file_system_;
-};
-}
-
-#endif

+ 18 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt

@@ -1 +1,18 @@
-add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc)
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc util.cc)

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_COMMON_ASYNC_STREAM_H_
+#define LIB_COMMON_ASYNC_STREAM_H_
+
+#include <asio.hpp>
+
+namespace hdfs {
+
+typedef asio::mutable_buffers_1 MutableBuffers;
+typedef asio::const_buffers_1   ConstBuffers;
+
+/*
+ * asio-compatible stream implementation.
+ *
+ * Lifecycle: should be managed using std::shared_ptr so the object can be
+ *    handed from consumer to consumer
+ * Threading model: async_read_some and async_write_some are not thread-safe.
+ */
+class AsyncStream  {
+public:
+  virtual void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) = 0;
+
+  virtual void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) = 0;
+};
+
+}
+
+#endif

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h

@@ -29,7 +29,9 @@
 #include <asio/ip/tcp.hpp>
 
 namespace hdfs {
-namespace continuation {
+namespace asio_continuation {
+
+using namespace continuation;
 
 template <class Stream, class MutableBufferSequence>
 class ReadContinuation : public Continuation {

+ 9 - 12
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h

@@ -33,7 +33,7 @@ namespace continuation {
 
 template <class Stream, size_t MaxMessageSize = 512>
 struct ReadDelimitedPBMessageContinuation : public Continuation {
-  ReadDelimitedPBMessageContinuation(Stream *stream,
+  ReadDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
                                      ::google::protobuf::MessageLite *msg)
       : stream_(stream), msg_(msg) {}
 
@@ -56,8 +56,8 @@ struct ReadDelimitedPBMessageContinuation : public Continuation {
       }
       next(status);
     };
-    asio::async_read(
-        *stream_, asio::buffer(buf_),
+    asio::async_read(*stream_,
+        asio::buffer(buf_),
         std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
                   std::placeholders::_1, std::placeholders::_2),
         handler);
@@ -82,14 +82,14 @@ private:
     return offset ? len + offset - transferred : 1;
   }
 
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   ::google::protobuf::MessageLite *msg_;
   std::array<char, MaxMessageSize> buf_;
 };
 
 template <class Stream>
 struct WriteDelimitedPBMessageContinuation : Continuation {
-  WriteDelimitedPBMessageContinuation(Stream *stream,
+  WriteDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
                                       const google::protobuf::MessageLite *msg)
       : stream_(stream), msg_(msg) {}
 
@@ -101,28 +101,25 @@ struct WriteDelimitedPBMessageContinuation : Continuation {
     pbio::CodedOutputStream os(&ss);
     os.WriteVarint32(size);
     msg_->SerializeToCodedStream(&os);
-    write_coroutine_ =
-        std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
-    write_coroutine_->Run([next](const Status &stat) { next(stat); });
+    asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } );
   }
 
 private:
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   const google::protobuf::MessageLite *msg_;
   std::string buf_;
-  std::shared_ptr<Continuation> write_coroutine_;
 };
 
 template <class Stream, size_t MaxMessageSize = 512>
 static inline Continuation *
-ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ReadDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
   return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
                                                                         msg);
 }
 
 template <class Stream>
 static inline Continuation *
-WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+WriteDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
   return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
 }
 }

+ 0 - 16
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc

@@ -24,20 +24,4 @@ IoService::~IoService() {}
 
 IoService *IoService::New() { return new IoServiceImpl(); }
 
-bool InputStream::ShouldExclude(const Status &s) {
-  if (s.ok()) {
-    return false;
-  }
-
-  switch (s.code()) {
-    /* client side resource exhaustion */
-    case Status::kResourceUnavailable:
-      return false;
-    case Status::kInvalidArgument:
-    case Status::kUnimplemented:
-    case Status::kException:
-    default:
-      return true;
-  }
-}
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc

@@ -20,5 +20,6 @@
 
 namespace hdfs {
 
-Options::Options() : rpc_timeout(30000), host_exclusion_duration(600000) {}
+Options::Options() : rpc_timeout(30000), max_rpc_retries(0),
+                     rpc_retry_delay_ms(10000), host_exclusion_duration(600000) {}
 }

+ 35 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/util.h"
+
+namespace hdfs {
+
+std::string GetRandomClientName() {
+  unsigned char buf[6] = {
+      0,
+  };
+  RAND_pseudo_bytes(buf, sizeof(buf));
+
+  std::stringstream ss;
+  ss << "libhdfs++_"
+     << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
+  return ss.str();
+}
+
+}

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h

@@ -20,7 +20,10 @@
 
 #include "libhdfspp/status.h"
 
+#include <sstream>
+
 #include <asio/error_code.hpp>
+#include <openssl/rand.h>
 
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
@@ -53,6 +56,10 @@ static inline void ReadDelimitedPBMessage(
 
 std::string Base64Encode(const std::string &src);
 
+/*
+ * Returns a new high-entropy client name
+ */
+std::string GetRandomClientName();
 }
 
 #endif

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt

@@ -0,0 +1,2 @@
+add_library(connection datanodeconnection.cc)
+add_dependencies(connection proto)

+ 57 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datanodeconnection.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+DataNodeConnection::~DataNodeConnection(){}
+DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
+
+DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
+                                                const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                                                const hadoop::common::TokenProto *token)
+{
+  using namespace ::asio::ip;
+
+  conn_.reset(new tcp::socket(*io_service));
+  auto datanode_addr = dn_proto.id();
+  endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
+                                  datanode_addr.xferport());
+  uuid_ = dn_proto.id().datanodeuuid();
+
+  if (token) {
+    token_.reset(new hadoop::common::TokenProto());
+    token_->CheckTypeAndMergeFrom(*token);
+  }
+}
+
+
+void DataNodeConnectionImpl::Connect(
+             std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) {
+  // Keep the DN from being freed until we're done
+  auto shared_this = shared_from_this();
+  asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(),
+          [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint, 1>::iterator it) {
+            (void)it;
+            handler(ToStatus(ec), shared_this); });
+}
+
+
+}

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+
+#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include "asio.hpp"
+
+namespace hdfs {
+
+class DataNodeConnection : public AsyncStream {
+public:
+    std::string uuid_;
+    std::unique_ptr<hadoop::common::TokenProto> token_;
+
+    virtual ~DataNodeConnection();
+    virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0;
+};
+
+
+class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
+public:
+  std::unique_ptr<asio::ip::tcp::socket> conn_;
+  std::array<asio::ip::tcp::endpoint, 1> endpoints_;
+  std::string uuid_;
+
+  virtual ~DataNodeConnectionImpl();
+  DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                          const hadoop::common::TokenProto *token);
+
+  void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override;
+
+  void async_read_some(const MutableBuffers &buf,
+        std::function<void (const asio::error_code & error,
+                               std::size_t bytes_transferred) > handler) override {
+    conn_->async_read_some(buf, handler);
+  };
+
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    conn_->async_write_some(buf, handler);
+  }
+};
+
+}
+
+#endif

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt

@@ -1,2 +1,2 @@
-add_library(fs filesystem.cc inputstream.cc bad_datanode_tracker.cc)
+add_library(fs filesystem.cc filehandle.cc bad_datanode_tracker.cc)
 add_dependencies(fs proto)

+ 240 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc

@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filehandle.h"
+#include "common/continuation/continuation.h"
+#include "connection/datanodeconnection.h"
+#include "reader/block_reader.h"
+
+#include <future>
+#include <tuple>
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+FileHandle::~FileHandle() {}
+
+FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
+                                 const std::shared_ptr<const struct FileInfo> file_info,
+                                 std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
+    : io_service_(io_service), client_name_(client_name), file_info_(file_info),
+      bad_node_tracker_(bad_data_nodes), offset_(0) {
+}
+
+void FileHandleImpl::PositionRead(
+    void *buf, size_t nbyte, uint64_t offset,
+    const std::function<void(const Status &, size_t)>
+        &handler) {
+
+  auto callback = [this, handler](const Status &status,
+                                  const std::string &contacted_datanode,
+                                  size_t bytes_read) {
+    /* determine if DN gets marked bad */
+    if (ShouldExclude(status)) {
+      bad_node_tracker_->AddBadNode(contacted_datanode);
+    }
+
+    handler(status, bytes_read);
+  };
+
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback);
+}
+
+Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
+  std::future<std::tuple<Status, size_t>> future(callstate->get_future());
+
+  /* wrap async call with promise/future to make it blocking */
+  auto callback = [callstate](const Status &s, size_t bytes) {
+    callstate->set_value(std::make_tuple(s,bytes));
+  };
+
+  PositionRead(buf, *nbyte, offset, callback);
+
+  /* wait for async to finish */
+  auto returnstate = future.get();
+  auto stat = std::get<0>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *nbyte = std::get<1>(returnstate);
+  return stat;
+}
+
+Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
+  Status stat = PositionRead(buf, nbyte, offset_);
+  if(!stat.ok()) {
+    return stat;
+  }
+
+  offset_ += *nbyte;
+  return Status::OK();
+}
+
+Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
+  off_t new_offset = -1;
+
+  switch (whence) {
+    case std::ios_base::beg:
+      new_offset = *offset;
+      break;
+    case std::ios_base::cur:
+      new_offset = offset_ + *offset;
+      break;
+    case std::ios_base::end:
+      new_offset = file_info_->file_length_ + *offset;
+      break;
+    default:
+      /* unsupported */
+      return Status::InvalidArgument("Invalid Seek whence argument");
+  }
+
+  if(!CheckSeekBounds(new_offset)) {
+    return Status::InvalidArgument("Seek offset out of bounds");
+  }
+  offset_ = new_offset;
+
+  *offset = offset_;
+  return Status::OK();
+}
+
+/* return false if seek will be out of bounds */
+bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
+  ssize_t file_length = file_info_->file_length_;
+
+  if (desired_position < 0 || desired_position >= file_length) {
+    return false;
+  }
+
+  return true;
+}
+
+/*
+ * Note that this method must be thread-safe w.r.t. the unsafe operations occurring
+ * on the FileHandle
+ */
+void FileHandleImpl::AsyncPreadSome(
+    size_t offset, const MutableBuffers &buffers,
+    std::shared_ptr<NodeExclusionRule> excluded_nodes,
+    const std::function<void(const Status &, const std::string &, size_t)> handler) {
+  using ::hadoop::hdfs::DatanodeInfoProto;
+  using ::hadoop::hdfs::LocatedBlockProto;
+
+  /**
+   *  Note: block and chosen_dn will end up pointing to things inside
+   *  the blocks_ vector.  They shouldn't be directly deleted.
+   **/
+  auto block = std::find_if(
+      file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (block == file_info_->blocks_.end()) {
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
+    return;
+  }
+
+  /**
+   * If user supplies a rule use it, otherwise use the tracker.
+   * User is responsible for making sure one of them isn't null.
+   **/
+  std::shared_ptr<NodeExclusionRule> rule =
+      excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
+
+  auto datanodes = block->locs();
+  auto it = std::find_if(datanodes.begin(), datanodes.end(),
+                         [rule](const DatanodeInfoProto &dn) {
+                           return !rule->IsBadNode(dn.id().datanodeuuid());
+                         });
+
+  if (it == datanodes.end()) {
+    handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
+    return;
+  }
+
+  DatanodeInfoProto &chosen_dn = *it;
+
+  uint64_t offset_within_block = offset - block->offset();
+  uint64_t size_within_block = std::min<uint64_t>(
+      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+  // This is where we will put the logic for re-using a DN connection; we can
+  //    steal the FileHandle's dn and put it back when we're done
+  std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, nullptr /*token*/);
+  std::string dn_id = dn->uuid_;
+  std::string client_name = client_name_;
+
+  // Wrap the DN in a block reader to handle the state and logic of the
+  //    block request protocol
+  std::shared_ptr<BlockReader> reader;
+  reader = CreateBlockReader(BlockReaderOptions(), dn);
+
+
+  auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) {
+    handler(status, dn_id, transferred);
+  };
+
+  dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
+          (Status status, std::shared_ptr<DataNodeConnection> dn) {
+    (void)dn;
+    if (status.ok()) {
+      reader->AsyncReadBlock(
+          client_name, *block, offset_within_block,
+          asio::buffer(buffers, size_within_block), read_handler);
+    } else {
+      handler(status, dn_id, 0);
+    }
+  });
+
+  return;
+}
+
+std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
+                                               std::shared_ptr<DataNodeConnection> dn)
+{
+  return std::make_shared<BlockReaderImpl>(options, dn);
+}
+
+std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
+    ::asio::io_service * io_service,
+    const ::hadoop::hdfs::DatanodeInfoProto & dn,
+    const hadoop::common::TokenProto * token) {
+  return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
+}
+
+bool FileHandle::ShouldExclude(const Status &s) {
+  if (s.ok()) {
+    return false;
+  }
+
+  switch (s.code()) {
+    /* client side resource exhaustion */
+    case Status::kResourceUnavailable:
+      return false;
+    case Status::kInvalidArgument:
+    case Status::kUnimplemented:
+    case Status::kException:
+    default:
+      return true;
+  }
+}
+
+}

+ 115 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+
+#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
+#include "reader/fileinfo.h"
+
+#include "asio.hpp"
+#include "bad_datanode_tracker.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include <mutex>
+#include <iostream>
+
+namespace hdfs {
+
+class BlockReader;
+class BlockReaderOptions;
+class DataNodeConnection;
+
+/*
+ * FileHandle: coordinates operations on a particular file in HDFS
+ *
+ * Threading model: not thread-safe; consumers and io_service should not call
+ *    concurrently.  PositionRead is the exceptions; they can be
+ *    called concurrently and repeatedly.
+ * Lifetime: pointer returned to consumer by FileSystem::Open.  Consumer is
+ *    resonsible for freeing the object.
+ */
+class FileHandleImpl : public FileHandle {
+public:
+  FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
+                  const std::shared_ptr<const struct FileInfo> file_info,
+                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes);
+
+  /*
+   * [Some day reliably] Reads a particular offset into the data file.
+   * On error, bytes_read returns the number of bytes successfully read; on
+   * success, bytes_read will equal nbyte
+   */
+  void PositionRead(
+		void *buf,
+		size_t nbyte,
+		uint64_t offset,
+    const std::function<void(const Status &status, size_t bytes_read)> &handler
+    ) override;
+
+  /**
+   * Note:  The nbyte argument for Read and Pread as well as the
+   * offset argument for Seek are in/out parameters.
+   *
+   * For Read and Pread the value referenced by nbyte should
+   * be set to the number of bytes to read. Before returning
+   * the value referenced will be set by the callee to the number
+   * of bytes that was successfully read.
+   *
+   * For Seek the value referenced by offset should be the number
+   * of bytes to shift from the specified whence position.  The
+   * referenced value will be set to the new offset before returning.
+   **/
+  Status PositionRead(void *buf, size_t *bytes_read, off_t offset) override;
+  Status Read(void *buf, size_t *nbyte) override;
+  Status Seek(off_t *offset, std::ios_base::seekdir whence) override;
+
+
+  /*
+   * Reads some amount of data into the buffer.  Will attempt to find the best
+   * datanode and read data from it.
+   *
+   * If an error occurs during connection or transfer, the callback will be
+   * called with bytes_read equal to the number of bytes successfully transferred.
+   * If no data nodes can be found, status will be Status::ResourceUnavailable.
+   *
+   */
+  void AsyncPreadSome(size_t offset, const MutableBuffers &buffers,
+                      std::shared_ptr<NodeExclusionRule> excluded_nodes,
+                      const std::function<void(const Status &status,
+                      const std::string &dn_id, size_t bytes_read)> handler);
+
+protected:
+  virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
+                                                 std::shared_ptr<DataNodeConnection> dn);
+  virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+      ::asio::io_service *io_service,
+      const ::hadoop::hdfs::DatanodeInfoProto & dn,
+      const hadoop::common::TokenProto * token);
+private:
+  ::asio::io_service * const io_service_;
+  const std::string client_name_;
+  const std::shared_ptr<const struct FileInfo> file_info_;
+  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+  bool CheckSeekBounds(ssize_t desired_position);
+  off_t offset_;
+};
+
+}
+
+#endif

+ 179 - 33
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc

@@ -22,7 +22,10 @@
 
 #include <asio/ip/tcp.hpp>
 
+#include <functional>
 #include <limits>
+#include <future>
+#include <tuple>
 
 namespace hdfs {
 
@@ -32,38 +35,17 @@ static const int kNamenodeProtocolVersion = 1;
 
 using ::asio::ip::tcp;
 
-FileSystem::~FileSystem() {}
+/*****************************************************************************
+ *                    NAMENODE OPERATIONS
+ ****************************************************************************/
 
-void FileSystem::New(
-    IoService *io_service, const Options &options, const std::string &server,
-    const std::string &service,
-    const std::function<void(const Status &, FileSystem *)> &handler) {
-  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
-  impl->Connect(server, service, [impl, handler](const Status &stat) {
-    if (stat.ok()) {
-      handler(stat, impl);
-    } else {
-      delete impl;
-      handler(stat, nullptr);
-    }
-  });
-}
-
-FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options)
-    : io_service_(static_cast<IoServiceImpl *>(io_service)),
-      engine_(&io_service_->io_service(), options,
-              RpcEngine::GetRandomClientName(), kNamenodeProtocol,
-              kNamenodeProtocolVersion),
-      namenode_(&engine_),
-      bad_node_tracker_(std::make_shared<BadDataNodeTracker>()) {}
-
-void FileSystemImpl::Connect(const std::string &server,
+void NameNodeOperations::Connect(const std::string &server,
                              const std::string &service,
-                             std::function<void(const Status &)> &&handler) {
-  using namespace continuation;
+                             std::function<void(const Status &)> &handler) {
+  using namespace asio_continuation;
   typedef std::vector<tcp::endpoint> State;
   auto m = Pipeline<State>::Create();
-  m->Push(Resolve(&io_service_->io_service(), server, service,
+  m->Push(Resolve(io_service_, server, service,
                   std::back_inserter(m->state())))
       .Push(Bind([this, m](const Continuation::Next &next) {
         engine_.Connect(m->state().front(), next);
@@ -76,9 +58,9 @@ void FileSystemImpl::Connect(const std::string &server,
   });
 }
 
-void FileSystemImpl::Open(
-    const std::string &path,
-    const std::function<void(const Status &, InputStream *)> &handler) {
+void NameNodeOperations::GetBlockLocations(const std::string & path,
+  std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
+{
   using ::hadoop::hdfs::GetBlockLocationsRequestProto;
   using ::hadoop::hdfs::GetBlockLocationsResponseProto;
 
@@ -99,10 +81,174 @@ void FileSystemImpl::Open(
       [this, s](const continuation::Continuation::Next &next) {
         namenode_.GetBlockLocations(&s->req, s->resp, next);
       }));
+
   m->Run([this, handler](const Status &stat, const State &s) {
-    handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations(),
-                                                  bad_node_tracker_)
+    if (stat.ok()) {
+      auto file_info = std::make_shared<struct FileInfo>();
+      auto locations = s.resp->locations();
+
+      file_info->file_length_ = locations.filelength();
+
+      for (const auto &block : locations.blocks()) {
+        file_info->blocks_.push_back(block);
+      }
+
+      if (locations.has_lastblock() && locations.lastblock().b().numbytes()) {
+        file_info->blocks_.push_back(locations.lastblock());
+      }
+
+      handler(stat, file_info);
+    } else {
+      handler(stat, nullptr);
+    }
+  });
+}
+
+
+/*****************************************************************************
+ *                    FILESYSTEM BASE CLASS
+ ****************************************************************************/
+
+void FileSystem::New(
+    IoService *io_service, const Options &options, const std::string &server,
+    const std::string &service,
+    const std::function<void(const Status &, FileSystem *)> &handler) {
+  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
+  impl->Connect(server, service, [impl, handler](const Status &stat) {
+    if (stat.ok()) {
+      handler(stat, impl);
+    } else {
+      delete impl;
+      handler(stat, nullptr);
+    }
+  });
+}
+
+FileSystem * FileSystem::New(
+    IoService *io_service, const Options &options, const std::string &server,
+    const std::string &service) {
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
+  std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
+
+  auto callback = [callstate](const Status &s, FileSystem * fs) {
+    callstate->set_value(std::make_tuple(s, fs));
+  };
+
+  New(io_service, options, server, service, callback);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+
+  if (std::get<0>(returnstate).ok()) {
+    return std::get<1>(returnstate);
+  } else {
+    return nullptr;
+  }
+}
+
+/*****************************************************************************
+ *                    FILESYSTEM IMPLEMENTATION
+ ****************************************************************************/
+
+FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options)
+  :   io_service_(static_cast<IoServiceImpl *>(io_service)),
+      nn_(&io_service_->io_service(), options,
+          GetRandomClientName(), kNamenodeProtocol,
+          kNamenodeProtocolVersion),
+      client_name_(GetRandomClientName())
+{
+  // Poor man's move
+  io_service = nullptr;
+
+  /* spawn background threads for asio delegation */
+  unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
+  for (unsigned int i = 0; i < threads; i++) {
+    AddWorkerThread();
+  }
+}
+
+FileSystemImpl::~FileSystemImpl() {
+  /**
+   * Note: IoService must be stopped before getting rid of worker threads.
+   * Once worker threads are joined and deleted the service can be deleted.
+   **/
+  io_service_->Stop();
+  worker_threads_.clear();
+  io_service_.reset(nullptr);
+}
+
+void FileSystemImpl::Connect(const std::string &server,
+                             const std::string &service,
+                             std::function<void(const Status &)> &&handler) {
+  /* IoService::New can return nullptr */
+  if (!io_service_) {
+    handler (Status::Error("Null IoService"));
+  }
+  nn_.Connect(server, service, handler);
+}
+
+Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
+  /* synchronized */
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat](const Status &s) {
+    stat->set_value(s);
+  };
+
+  Connect(server, service, callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  return s;
+}
+
+
+int FileSystemImpl::AddWorkerThread() {
+  auto service_task = [](IoService *service) { service->Run(); };
+  worker_threads_.push_back(
+      WorkerPtr(new std::thread(service_task, io_service_.get())));
+  return worker_threads_.size();
+}
+
+void FileSystemImpl::Open(
+    const std::string &path,
+    const std::function<void(const Status &, FileHandle *)> &handler) {
+
+  nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
+    handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_)
                             : nullptr);
   });
 }
+
+Status FileSystemImpl::Open(const std::string &path,
+                                         FileHandle **handle) {
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
+  std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
+
+  /* wrap async FileSystem::Open with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, FileHandle *is) {
+    callstate->set_value(std::make_tuple(s, is));
+  };
+
+  Open(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  FileHandle *file_handle = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    delete file_handle;
+    return stat;
+  }
+  if (!file_handle) {
+    return stat;
+  }
+
+  *handle = file_handle;
+  return stat;
+}
+
 }

+ 86 - 51
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -18,75 +18,110 @@
 #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 
+#include "filehandle.h"
 #include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
 #include "libhdfspp/hdfs.h"
 #include "fs/bad_datanode_tracker.h"
 #include "rpc/rpc_engine.h"
+#include "reader/block_reader.h"
+#include "reader/fileinfo.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "ClientNamenodeProtocol.hrpc.inl"
 
+#include "asio.hpp"
+
+#include <thread>
+
 namespace hdfs {
 
-class FileHandle;
-class HadoopFileSystem;
+/**
+ * NameNodeConnection: abstracts the details of communicating with a NameNode
+ * and the implementation of the communications protocol.
+ *
+ * Will eventually handle retry and failover.
+ *
+ * Threading model: thread-safe; all operations can be called concurrently
+ * Lifetime: owned by a FileSystemImpl
+ */
+class NameNodeOperations {
+public:
+  NameNodeOperations(::asio::io_service *io_service, const Options &options,
+            const std::string &client_name, const char *protocol_name,
+            int protocol_version) :
+  io_service_(io_service),
+  engine_(io_service, options, client_name, protocol_name, protocol_version),
+  namenode_(& engine_) {}
+
+  void Connect(const std::string &server,
+               const std::string &service,
+               std::function<void(const Status &)> &handler);
+
+  void GetBlockLocations(const std::string & path,
+    std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
 
+private:
+  ::asio::io_service * io_service_;
+  RpcEngine engine_;
+  ClientNamenodeProtocol namenode_;
+};
+
+/*
+ * FileSystem: The consumer's main point of interaction with the cluster as
+ * a whole.
+ *
+ * Initially constructed in a disconnected state; call Connect before operating
+ * on the FileSystem.
+ *
+ * All open files must be closed before the FileSystem is destroyed.
+ *
+ * Threading model: thread-safe for all operations
+ * Lifetime: pointer created for consumer who is responsible for deleting it
+ */
 class FileSystemImpl : public FileSystem {
- public:
-  FileSystemImpl(IoService *io_service, const Options &options);
+public:
+  FileSystemImpl(IoService *&io_service, const Options &options);
+  ~FileSystemImpl() override;
+
+  /* attempt to connect to namenode, return bad status on failure */
   void Connect(const std::string &server, const std::string &service,
                std::function<void(const Status &)> &&handler);
+  /* attempt to connect to namenode, return bad status on failure */
+  Status Connect(const std::string &server, const std::string &service);
+
+
   virtual void Open(const std::string &path,
-                    const std::function<void(const Status &, InputStream *)> &
-                        handler) override;
-  RpcEngine &rpc_engine() { return engine_; }
+                    const std::function<void(const Status &, FileHandle *)>
+                        &handler) override;
+  Status Open(const std::string &path, FileHandle **handle) override;
 
- private:
-  IoServiceImpl *io_service_;
-  RpcEngine engine_;
-  ClientNamenodeProtocol namenode_;
-  std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
-};
 
-class InputStreamImpl : public InputStream {
- public:
-  InputStreamImpl(FileSystemImpl *fs,
-                  const ::hadoop::hdfs::LocatedBlocksProto *blocks,
-                  std::shared_ptr<BadDataNodeTracker> tracker);
-  virtual void PositionRead(
-      void *buf, size_t nbyte, uint64_t offset,
-      const std::function<void(const Status &, const std::string &, size_t)> &
-          handler) override;
-  /**
-   * If optional_rule_override is null then use the bad_datanode_tracker.  If
-   * non-null use the provided NodeExclusionRule to determine eligible
-   * datanodes.
-   **/
-  template <class MutableBufferSequence, class Handler>
-  void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
-                      std::shared_ptr<NodeExclusionRule> excluded_nodes,
-                      const Handler &handler);
-
-  template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
-  void AsyncReadBlock(const std::string &client_name,
-                      const hadoop::hdfs::LocatedBlockProto &block,
-                      const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
-                      const MutableBufferSequence &buffers,
-                      const Handler &handler);
-  uint64_t get_file_length() const;
- private:
-  FileSystemImpl *fs_;
-  unsigned long long file_length_;
-  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
-  template <class Reader>
-  struct HandshakeContinuation;
-  template <class Reader, class MutableBufferSequence>
-  struct ReadBlockContinuation;
-  struct RemoteBlockReaderTrait;
-  friend class FileHandle;
+  /* add a new thread to handle asio requests, return number of threads in pool
+   */
+  int AddWorkerThread();
+
+  /* how many worker threads are servicing asio requests */
+  int WorkerThreadCount() { return worker_threads_.size(); }
+
+
+private:
+  std::unique_ptr<IoServiceImpl> io_service_;
+  NameNodeOperations nn_;
+  const std::string client_name_;
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+
+  struct WorkerDeleter {
+    void operator()(std::thread *t) {
+      t->join();
+      delete t;
+    }
+  };
+  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
+  std::vector<WorkerPtr> worker_threads_;
+
 };
-}
 
-#include "inputstream_impl.h"
+
+}
 
 #endif

+ 0 - 48
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc

@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "filesystem.h"
-
-namespace hdfs {
-
-using ::hadoop::hdfs::LocatedBlocksProto;
-
-InputStream::~InputStream() {}
-
-InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
-                                 const LocatedBlocksProto *blocks,
-                                 std::shared_ptr<BadDataNodeTracker> tracker)
-    : fs_(fs), file_length_(blocks->filelength()), bad_node_tracker_(tracker) {
-  for (const auto &block : blocks->blocks()) {
-    blocks_.push_back(block);
-  }
-
-  if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
-    blocks_.push_back(blocks->lastblock());
-  }
-}
-
-void InputStreamImpl::PositionRead(
-    void *buf, size_t nbyte, uint64_t offset,
-    const std::function<void(const Status &, const std::string &, size_t)> &
-        handler) {
-  AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler);
-}
-
-uint64_t InputStreamImpl::get_file_length() const { return file_length_; }
-}

+ 0 - 207
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h

@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef FS_INPUTSTREAM_IMPL_H_
-#define FS_INPUTSTREAM_IMPL_H_
-
-#include "reader/block_reader.h"
-
-#include "common/continuation/asio.h"
-#include "common/continuation/protobuf.h"
-
-#include <functional>
-#include <future>
-#include <type_traits>
-#include <algorithm>
-
-namespace hdfs {
-
-struct InputStreamImpl::RemoteBlockReaderTrait {
-  typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
-  struct State {
-    std::unique_ptr<asio::ip::tcp::socket> conn_;
-    std::shared_ptr<Reader> reader_;
-    std::array<asio::ip::tcp::endpoint, 1> endpoints_;
-    size_t transferred_;
-    Reader *reader() { return reader_.get(); }
-    size_t *transferred() { return &transferred_; }
-    const size_t *transferred() const { return &transferred_; }
-  };
-  static continuation::Pipeline<State> *CreatePipeline(
-      ::asio::io_service *io_service,
-      const ::hadoop::hdfs::DatanodeInfoProto &dn) {
-    using namespace ::asio::ip;
-    auto m = continuation::Pipeline<State>::Create();
-    auto &s = m->state();
-    s.conn_.reset(new tcp::socket(*io_service));
-    s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get());
-    auto datanode = dn.id();
-    s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
-                                    datanode.xferport());
-
-    m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
-                                  s.endpoints_.end()));
-    return m;
-  }
-};
-
-template <class Reader>
-struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
-  HandshakeContinuation(Reader *reader, const std::string &client_name,
-                        const hadoop::common::TokenProto *token,
-                        const hadoop::hdfs::ExtendedBlockProto *block,
-                        uint64_t length, uint64_t offset)
-      : reader_(reader),
-        client_name_(client_name),
-        length_(length),
-        offset_(offset) {
-    if (token) {
-      token_.reset(new hadoop::common::TokenProto());
-      token_->CheckTypeAndMergeFrom(*token);
-    }
-    block_.CheckTypeAndMergeFrom(*block);
-  }
-
-  virtual void Run(const Next &next) override {
-    reader_->async_connect(client_name_, token_.get(), &block_, length_,
-                           offset_, next);
-  }
-
- private:
-  Reader *reader_;
-  const std::string client_name_;
-  std::unique_ptr<hadoop::common::TokenProto> token_;
-  hadoop::hdfs::ExtendedBlockProto block_;
-  uint64_t length_;
-  uint64_t offset_;
-};
-
-template <class Reader, class MutableBufferSequence>
-struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
-  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
-                        size_t *transferred)
-      : reader_(reader),
-        buffer_(buffer),
-        buffer_size_(asio::buffer_size(buffer)),
-        transferred_(transferred) {
-    static_assert(!std::is_reference<MutableBufferSequence>::value,
-                  "Buffer must not be a reference type");
-  }
-
-  virtual void Run(const Next &next) override {
-    *transferred_ = 0;
-    next_ = next;
-    OnReadData(Status::OK(), 0);
-  }
-
- private:
-  Reader *reader_;
-  const MutableBufferSequence buffer_;
-  const size_t buffer_size_;
-  size_t *transferred_;
-  std::function<void(const Status &)> next_;
-
-  void OnReadData(const Status &status, size_t transferred) {
-    using std::placeholders::_1;
-    using std::placeholders::_2;
-    *transferred_ += transferred;
-    if (!status.ok()) {
-      next_(status);
-    } else if (*transferred_ >= buffer_size_) {
-      next_(status);
-    } else {
-      reader_->async_read_some(
-          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
-          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
-    }
-  }
-};
-
-template <class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncPreadSome(
-    size_t offset, const MutableBufferSequence &buffers,
-    std::shared_ptr<NodeExclusionRule> excluded_nodes, const Handler &handler) {
-  using ::hadoop::hdfs::DatanodeInfoProto;
-  using ::hadoop::hdfs::LocatedBlockProto;
-
-  /**
-   *  Note: block and chosen_dn will end up pointing to things inside
-   *  the blocks_ vector.  They shouldn't be directly deleted.
-   **/
-  auto block = std::find_if(
-      blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
-        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
-      });
-
-  if (block == blocks_.end()) {
-    handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
-    return;
-  }
-
-  /**
-   * If user supplies a rule use it, otherwise use the tracker.
-   * User is responsible for making sure one of them isn't null.
-   **/
-  std::shared_ptr<NodeExclusionRule> rule =
-      excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
-
-  auto datanodes = block->locs();
-  auto it = std::find_if(datanodes.begin(), datanodes.end(),
-                         [rule](const DatanodeInfoProto &dn) {
-                           return !rule->IsBadNode(dn.id().datanodeuuid());
-                         });
-
-  if (it == datanodes.end()) {
-    handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
-    return;
-  }
-
-  DatanodeInfoProto *chosen_dn = &*it;
-
-  uint64_t offset_within_block = offset - block->offset();
-  uint64_t size_within_block = std::min<uint64_t>(
-      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
-
-  AsyncReadBlock<RemoteBlockReaderTrait>(
-      fs_->rpc_engine().client_name(), *block, *chosen_dn, offset_within_block,
-      asio::buffer(buffers, size_within_block), handler);
-}
-
-template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncReadBlock(
-    const std::string &client_name,
-    const hadoop::hdfs::LocatedBlockProto &block,
-    const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
-    const MutableBufferSequence &buffers, const Handler &handler) {
-  typedef typename BlockReaderTrait::Reader Reader;
-  auto m =
-      BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);
-  auto &s = m->state();
-  size_t size = asio::buffer_size(buffers);
-  m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
-                                            &block.b(), size, offset))
-      .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>(
-          s.reader(), buffers, s.transferred()));
-  const std::string &dnid = dn.id().datanodeuuid();
-  m->Run([handler, dnid](const Status &status,
-                         const typename BlockReaderTrait::State &state) {
-    handler(status, dnid, *state.transferred());
-  });
-}
-}
-
-#endif

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt

@@ -16,5 +16,5 @@
 # limitations under the License.
 #
 
-add_library(reader remote_block_reader.cc datatransfer.cc)
+add_library(reader block_reader.cc datatransfer.cc)
 add_dependencies(reader proto)

+ 193 - 102
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h → hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc

@@ -15,18 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
-#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
-
-#include "datatransfer.h"
+#include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "common/continuation/continuation.h"
 #include "common/continuation/asio.h"
-#include "common/continuation/protobuf.h"
-
-#include <asio/buffers_iterator.hpp>
-#include <asio/streambuf.hpp>
-#include <asio/write.hpp>
-
-#include <arpa/inet.h>
 
 #include <future>
 
@@ -36,14 +28,31 @@ hadoop::hdfs::OpReadBlockProto
 ReadBlockProto(const std::string &client_name, bool verify_checksum,
                const hadoop::common::TokenProto *token,
                const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-               uint64_t offset);
+               uint64_t offset) {
+  using namespace hadoop::hdfs;
+  using namespace hadoop::common;
+  BaseHeaderProto *base_h = new BaseHeaderProto();
+  base_h->set_allocated_block(new ExtendedBlockProto(*block));
+  if (token) {
+    base_h->set_allocated_token(new TokenProto(*token));
+  }
+  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+  h->set_clientname(client_name);
+  h->set_allocated_baseheader(base_h);
+
+  OpReadBlockProto p;
+  p.set_allocated_header(h);
+  p.set_offset(offset);
+  p.set_len(length);
+  p.set_sendchecksums(verify_checksum);
+  // TODO: p.set_allocated_cachingstrategy();
+  return p;
+}
 
-template <class Stream>
-template <class ConnectHandler>
-void RemoteBlockReader<Stream>::async_connect(
-    const std::string &client_name, const hadoop::common::TokenProto *token,
+void BlockReaderImpl::AsyncRequestBlock(
+    const std::string &client_name,
     const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-    uint64_t offset, const ConnectHandler &handler) {
+    uint64_t offset, const std::function<void(Status)> &handler) {
   // The total number of bytes that we need to transfer from the DN is
   // the amount that the user wants (bytesToRead), plus the padding at
   // the beginning in order to chunk-align. Note that the DN may elect
@@ -62,18 +71,17 @@ void RemoteBlockReader<Stream>::async_connect(
   s->header.insert(s->header.begin(),
                    {0, kDataTransferVersion, Operation::kReadBlock});
   s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
-                                        token, block, length, offset));
+                                        dn_->token_.get(), block, length, offset));
 
   auto read_pb_message =
-      new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
-          stream_, &s->response);
+      new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(
+          dn_, &s->response);
 
-  m->Push(continuation::Write(stream_, asio::buffer(s->header)))
-      .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
+  m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header)))
+      .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request))
       .Push(read_pb_message);
 
-  m->Run([this, handler, offset](const Status &status, const State &s) {
-    Status stat = status;
+  m->Run([this, handler, offset](const Status &status, const State &s) {    Status stat = status;
     if (stat.ok()) {
       const auto &resp = s.response;
       if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
@@ -90,10 +98,26 @@ void RemoteBlockReader<Stream>::async_connect(
   });
 }
 
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadPacketHeader
+Status BlockReaderImpl::RequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  AsyncRequestBlock(client_name, block, length, offset,
+                [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset);
+
+struct BlockReaderImpl::ReadPacketHeader
     : continuation::Continuation {
-  ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+  ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {}
 
   virtual void Run(const Next &next) override {
     parent_->packet_data_read_bytes_ = 0;
@@ -113,7 +137,7 @@ struct RemoteBlockReader<Stream>::ReadPacketHeader
       next(status);
     };
 
-    asio::async_read(*parent_->stream_, asio::buffer(buf_),
+    asio::async_read(*parent_->dn_, asio::buffer(buf_),
                      std::bind(&ReadPacketHeader::CompletionHandler, this,
                                std::placeholders::_1, std::placeholders::_2),
                      handler);
@@ -127,7 +151,7 @@ private:
   static const size_t kHeaderLenSize = sizeof(int16_t);
   static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
 
-  RemoteBlockReader<Stream> *parent_;
+  BlockReaderImpl *parent_;
   std::array<char, kMaxHeaderSize> buf_;
 
   size_t packet_length() const {
@@ -149,9 +173,8 @@ private:
   }
 };
 
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
-  ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
+  ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {}
 
   virtual void Run(const Next &next) override {
     auto parent = parent_;
@@ -172,20 +195,58 @@ struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
     };
     parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
                              parent->header_.datalen());
-    asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
-                     handler);
+    asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler);
+  }
+
+private:
+  BlockReaderImpl *parent_;
+};
+
+struct BlockReaderImpl::ReadData : continuation::Continuation {
+  ReadData(BlockReaderImpl *parent,
+           std::shared_ptr<size_t> bytes_transferred,
+           const asio::mutable_buffers_1 &buf)
+      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {
+    buf_.begin();
+  }
+
+  ~ReadData() {
+    buf_.end();
+  }
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next, this](const asio::error_code &ec, size_t transferred) {
+          Status status;
+          if (ec) {
+            status = Status(ec.value(), ec.message().c_str());
+          }
+          *bytes_transferred_ += transferred;
+          parent_->bytes_to_read_ -= transferred;
+          parent_->packet_data_read_bytes_ += transferred;
+          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+            parent_->state_ = kReadPacketHeader;
+          }
+          next(status);
+        };
+
+    auto data_len =
+        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+    asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len),
+               handler);
   }
 
 private:
-  RemoteBlockReader<Stream> *parent_;
+  BlockReaderImpl *parent_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  const asio::mutable_buffers_1 buf_;
 };
 
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
-  ReadPadding(RemoteBlockReader<Stream> *parent)
+struct BlockReaderImpl::ReadPadding : continuation::Continuation {
+  ReadPadding(BlockReaderImpl *parent)
       : parent_(parent), padding_(parent->chunk_padding_bytes_),
         bytes_transferred_(std::make_shared<size_t>(0)),
-        read_data_(new ReadData<asio::mutable_buffers_1>(
+        read_data_(new ReadData(
             parent, bytes_transferred_, asio::buffer(padding_))) {}
 
   virtual void Run(const Next &next) override {
@@ -207,7 +268,7 @@ struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
   }
 
 private:
-  RemoteBlockReader<Stream> *parent_;
+  BlockReaderImpl *parent_;
   std::vector<char> padding_;
   std::shared_ptr<size_t> bytes_transferred_;
   std::shared_ptr<continuation::Continuation> read_data_;
@@ -215,45 +276,9 @@ private:
   ReadPadding &operator=(const ReadPadding &) = delete;
 };
 
-template <class Stream>
-template <class MutableBufferSequence>
-struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
-  ReadData(RemoteBlockReader<Stream> *parent,
-           std::shared_ptr<size_t> bytes_transferred,
-           const MutableBufferSequence &buf)
-      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
 
-  virtual void Run(const Next &next) override {
-    auto handler =
-        [next, this](const asio::error_code &ec, size_t transferred) {
-          Status status;
-          if (ec) {
-            status = Status(ec.value(), ec.message().c_str());
-          }
-          *bytes_transferred_ += transferred;
-          parent_->bytes_to_read_ -= transferred;
-          parent_->packet_data_read_bytes_ += transferred;
-          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
-            parent_->state_ = kReadPacketHeader;
-          }
-          next(status);
-        };
-
-    auto data_len =
-        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
-    async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
-               handler);
-  }
-
-private:
-  RemoteBlockReader<Stream> *parent_;
-  std::shared_ptr<size_t> bytes_transferred_;
-  MutableBufferSequence buf_;
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
-  AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+struct BlockReaderImpl::AckRead : continuation::Continuation {
+  AckRead(BlockReaderImpl *parent) : parent_(parent) {}
 
   virtual void Run(const Next &next) override {
     if (parent_->bytes_to_read_ > 0) {
@@ -268,25 +293,24 @@ struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
                               : hadoop::hdfs::Status::SUCCESS);
 
     m->Push(
-        continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
+        continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
 
     m->Run([this, next](const Status &status,
                         const hadoop::hdfs::ClientReadStatusProto &) {
       if (status.ok()) {
-        parent_->state_ = RemoteBlockReader<Stream>::kFinished;
+        parent_->state_ = BlockReaderImpl::kFinished;
       }
       next(status);
     });
   }
 
 private:
-  RemoteBlockReader<Stream> *parent_;
+  BlockReaderImpl *parent_;
 };
 
-template <class Stream>
-template <class MutableBufferSequence, class ReadHandler>
-void RemoteBlockReader<Stream>::async_read_some(
-    const MutableBufferSequence &buffers, const ReadHandler &handler) {
+void BlockReaderImpl::AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> &handler) {
   assert(state_ != kOpen && "Not connected");
 
   struct State {
@@ -298,7 +322,7 @@ void RemoteBlockReader<Stream>::async_read_some(
   m->Push(new ReadPacketHeader(this))
       .Push(new ReadChecksum(this))
       .Push(new ReadPadding(this))
-      .Push(new ReadData<MutableBufferSequence>(
+      .Push(new ReadData(
           this, m->state().bytes_transferred, buffers))
       .Push(new AckRead(this));
 
@@ -308,15 +332,14 @@ void RemoteBlockReader<Stream>::async_read_some(
   });
 }
 
-template <class Stream>
-template <class MutableBufferSequence>
+
 size_t
-RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
+BlockReaderImpl::ReadPacket(const MutableBuffers &buffers,
                                      Status *status) {
   size_t transferred = 0;
   auto done = std::make_shared<std::promise<void>>();
   auto future = done->get_future();
-  async_read_some(buffers,
+  AsyncReadPacket(buffers,
                   [status, &transferred, done](const Status &stat, size_t t) {
                     *status = stat;
                     transferred = t;
@@ -326,17 +349,85 @@ RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
   return transferred;
 }
 
-template <class Stream>
-Status RemoteBlockReader<Stream>::connect(
-    const std::string &client_name, const hadoop::common::TokenProto *token,
-    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-    uint64_t offset) {
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  async_connect(client_name, token, block, length, offset,
-                [stat](const Status &status) { stat->set_value(status); });
-  return future.get();
-}
+
+struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation {
+  RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
+                        const hadoop::hdfs::ExtendedBlockProto *block,
+                        uint64_t length, uint64_t offset)
+      : reader_(reader), client_name_(client_name), length_(length),
+        offset_(offset) {
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next &next) override {
+    reader_->AsyncRequestBlock(client_name_, &block_, length_,
+                           offset_, next);
+  }
+
+private:
+  BlockReader *reader_;
+  const std::string client_name_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation {
+  ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer,
+                        size_t *transferred)
+      : reader_(reader), buffer_(buffer),
+        buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
+  }
+
+  virtual void Run(const Next &next) override {
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+private:
+  BlockReader *reader_;
+  const MutableBuffers buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->AsyncReadPacket(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+void BlockReaderImpl::AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block,
+    size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) {
+
+  auto m = continuation::Pipeline<size_t>::Create();
+  size_t * bytesTransferred = &m->state();
+
+  size_t size = asio::buffer_size(buffers);
+
+  m->Push(new RequestBlockContinuation(this, client_name,
+                                            &block.b(), size, offset))
+    .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
+
+  m->Run([handler] (const Status &status,
+                         const size_t totalBytesTransferred) {
+    handler(status, totalBytesTransferred);
+  });
 }
 
-#endif
+}

+ 59 - 24
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h

@@ -19,7 +19,9 @@
 #define BLOCK_READER_H_
 
 #include "libhdfspp/status.h"
+#include "common/async_stream.h"
 #include "datatransfer.pb.h"
+#include "connection/datanodeconnection.h"
 
 #include <memory>
 
@@ -55,38 +57,73 @@ struct BlockReaderOptions {
       : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
 };
 
-template <class Stream>
-class RemoteBlockReader
-    : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+/**
+ * Handles the operational state of request and reading a block (or portion of
+ * a block) from a DataNode.
+ *
+ * Threading model: not thread-safe.
+ * Lifecycle: should be created, used for a single read, then freed.
+ */
+class BlockReader {
 public:
-  explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
-      : stream_(stream), state_(kOpen), options_(options),
+  virtual void AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) = 0;
+
+  virtual void AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0;
+
+  virtual void AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset,
+    const std::function<void(Status)> &handler) = 0;
+};
+
+class BlockReaderImpl
+    : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
+public:
+  explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn)
+      : dn_(dn), state_(kOpen), options_(options),
         chunk_padding_bytes_(0) {}
 
-  template <class MutableBufferSequence, class ReadHandler>
-  void async_read_some(const MutableBufferSequence &buffers,
-                       const ReadHandler &handler);
+  virtual void AsyncReadPacket(
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t bytes_transferred)> &handler) override;
+
+  virtual void AsyncRequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset,
+    const std::function<void(Status)> &handler) override;
 
-  template <class MutableBufferSequence>
-  size_t read_some(const MutableBufferSequence &buffers, Status *status);
+  virtual void AsyncReadBlock(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler) override;
 
-  Status connect(const std::string &client_name,
-                 const hadoop::common::TokenProto *token,
-                 const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-                 uint64_t offset);
+  size_t ReadPacket(const MutableBuffers &buffers, Status *status);
 
-  template <class ConnectHandler>
-  void async_connect(const std::string &client_name,
-                     const hadoop::common::TokenProto *token,
-                     const hadoop::hdfs::ExtendedBlockProto *block,
-                     uint64_t length, uint64_t offset,
-                     const ConnectHandler &handler);
+  Status RequestBlock(
+    const std::string &client_name,
+    const hadoop::hdfs::ExtendedBlockProto *block,
+    uint64_t length,
+    uint64_t offset);
 
 private:
+  struct RequestBlockContinuation;
+  struct ReadBlockContinuation;
+
   struct ReadPacketHeader;
   struct ReadChecksum;
   struct ReadPadding;
-  template <class MutableBufferSequence> struct ReadData;
+  struct ReadData;
   struct AckRead;
   enum State {
     kOpen,
@@ -97,7 +134,7 @@ private:
     kFinished,
   };
 
-  Stream *stream_;
+  std::shared_ptr<DataNodeConnection> dn_;
   hadoop::hdfs::PacketHeaderProto header_;
   State state_;
   BlockReaderOptions options_;
@@ -109,6 +146,4 @@ private:
 };
 }
 
-#include "remote_block_reader_impl.h"
-
 #endif

+ 19 - 9
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h

@@ -19,6 +19,10 @@
 #define LIB_READER_DATA_TRANSFER_H_
 
 #include "common/sasl_authenticator.h"
+#include "common/async_stream.h"
+#include "connection/datanodeconnection.h"
+#include <memory>
+
 
 namespace hdfs {
 
@@ -32,26 +36,32 @@ enum Operation {
   kReadBlock = 81,
 };
 
-template <class Stream> class DataTransferSaslStream {
+template <class Stream> class DataTransferSaslStream : public DataNodeConnection {
 public:
-  DataTransferSaslStream(Stream *stream, const std::string &username,
+  DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string &username,
                          const std::string &password)
       : stream_(stream), authenticator_(username, password) {}
 
   template <class Handler> void Handshake(const Handler &next);
 
-  template <class MutableBufferSequence, class ReadHandler>
-  void async_read_some(const MutableBufferSequence &buffers,
-                       ReadHandler &&handler);
+  void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    stream_->async_read_some(buf, handler);
+  }
 
-  template <class ConstBufferSequence, class WriteHandler>
-  void async_write_some(const ConstBufferSequence &buffers,
-                        WriteHandler &&handler);
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    stream_->async_write_some(buf, handler);
+  }
 
+  void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override
+  {(void)handler;  /*TODO: Handshaking goes here*/};
 private:
   DataTransferSaslStream(const DataTransferSaslStream &) = delete;
   DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   DigestMD5Authenticator authenticator_;
   struct ReadSaslMessage;
   struct Authenticator;

+ 5 - 18
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h

@@ -70,7 +70,7 @@ private:
 template <class Stream>
 struct DataTransferSaslStream<Stream>::ReadSaslMessage
     : continuation::Continuation {
-  ReadSaslMessage(Stream *stream, std::string *data)
+  ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data)
       : stream_(stream), data_(data), read_pb_(stream, &resp_) {}
 
   virtual void Run(const Next &next) override {
@@ -87,7 +87,7 @@ struct DataTransferSaslStream<Stream>::ReadSaslMessage
   }
 
 private:
-  Stream *stream_;
+  std::shared_ptr<Stream> stream_;
   std::string *data_;
   hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
   continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
@@ -97,7 +97,7 @@ template <class Stream>
 template <class Handler>
 void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
   using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
-  using ::hdfs::continuation::Write;
+  using ::hdfs::asio_continuation::Write;
   using ::hdfs::continuation::WriteDelimitedPBMessage;
 
   static const int kMagicNumber = htonl(kDataTransferSasl);
@@ -109,7 +109,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
     std::string resp0;
     DataTransferEncryptorMessageProto req1;
     std::string resp1;
-    Stream *stream;
+    std::shared_ptr<Stream> stream;
   };
   auto m = continuation::Pipeline<State>::Create();
   State *s = &m->state();
@@ -117,7 +117,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
 
   DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
 
-  m->Push(Write(stream_, kMagicNumberBuffer))
+  m->Push(Write(stream_.get(), kMagicNumberBuffer))
       .Push(WriteDelimitedPBMessage(stream_, &s->req0))
       .Push(new ReadSaslMessage(stream_, &s->resp0))
       .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
@@ -126,19 +126,6 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
   m->Run([next](const Status &status, const State &) { next(status); });
 }
 
-template <class Stream>
-template <class MutableBufferSequence, class ReadHandler>
-void DataTransferSaslStream<Stream>::async_read_some(
-    const MutableBufferSequence &buffers, ReadHandler &&handler) {
-  stream_->async_read_some(buffers, handler);
-}
-
-template <class Stream>
-template <typename ConstBufferSequence, typename WriteHandler>
-void DataTransferSaslStream<Stream>::async_write_some(
-    const ConstBufferSequence &buffers, WriteHandler &&handler) {
-  stream_->async_write_some(buffers, handler);
-}
 }
 
 #endif

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_FILEINFO_H_
+#define LIB_READER_FILEINFO_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+
+namespace hdfs {
+
+/**
+ * Information that is assumed to be unchanging about a file for the duration of
+ * the operations.
+ */
+struct FileInfo {
+  unsigned long long file_length_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+};
+
+}
+
+#endif

+ 0 - 46
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc

@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "block_reader.h"
-
-namespace hdfs {
-
-hadoop::hdfs::OpReadBlockProto
-ReadBlockProto(const std::string &client_name, bool verify_checksum,
-               const hadoop::common::TokenProto *token,
-               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
-               uint64_t offset) {
-  using namespace hadoop::hdfs;
-  using namespace hadoop::common;
-  BaseHeaderProto *base_h = new BaseHeaderProto();
-  base_h->set_allocated_block(new ExtendedBlockProto(*block));
-  if (token) {
-    base_h->set_allocated_token(new TokenProto(*token));
-  }
-  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
-  h->set_clientname(client_name);
-  h->set_allocated_baseheader(base_h);
-
-  OpReadBlockProto p;
-  p.set_allocated_header(h);
-  p.set_offset(offset);
-  p.set_len(length);
-  p.set_sendchecksums(verify_checksum);
-  // TODO: p.set_allocated_cachingstrategy();
-  return p;
-}
-}

+ 0 - 14
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -19,9 +19,6 @@
 #include "rpc_connection.h"
 #include "common/util.h"
 
-#include <openssl/rand.h>
-
-#include <sstream>
 #include <future>
 
 namespace hdfs {
@@ -83,15 +80,4 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
   return future.get();
 }
 
-std::string RpcEngine::GetRandomClientName() {
-  unsigned char buf[6] = {
-      0,
-  };
-  RAND_pseudo_bytes(buf, sizeof(buf));
-
-  std::stringstream ss;
-  ss << "libhdfs++_"
-     << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
-  return ss.str();
-}
 }

+ 4 - 8
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -32,7 +32,7 @@ include_directories(
     ${LIBHDFS_SRC_DIR}
     ${OS_DIR}
 )
-add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs_cpp.cc)
+add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
 
 add_library(test_common OBJECT mock_connection.cc)
 
@@ -44,24 +44,20 @@ protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS
 )
 
 add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
-target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(remote_block_reader_test reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(remote_block_reader remote_block_reader_test)
 
 add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
 target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(sasl_digest_md5 sasl_digest_md5_test)
 
-add_executable(inputstream_test inputstream_test.cc)
-target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
-add_test(inputstream inputstream_test)
-
 include_directories(${CMAKE_CURRENT_BINARY_DIR})
 add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>)
 target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(rpc_engine rpc_engine_test)
 
 add_executable(bad_datanode_test bad_datanode_test.cc)
-target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader  ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(bad_datanode bad_datanode_test)
 
 add_executable(node_exclusion_test node_exclusion_test.cc)
@@ -73,5 +69,5 @@ target_link_libraries(configuration_test common gmock_main ${CMAKE_THREAD_LIBS_I
 add_test(configuration configuration_test)
 
 build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c)
-link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
+link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
 add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static)

+ 141 - 67
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc

@@ -19,6 +19,8 @@
 #include "fs/filesystem.h"
 #include "fs/bad_datanode_tracker.h"
 
+#include "common/util.h"
+
 #include <gmock/gmock.h>
 
 using hadoop::common::TokenProto;
@@ -34,70 +36,140 @@ using ::testing::Return;
 
 using namespace hdfs;
 
-class MockReader {
- public:
-  virtual ~MockReader() {}
+class MockReader : public BlockReader {
+public:
   MOCK_METHOD2(
-      async_read_some,
+      AsyncReadPacket,
       void(const asio::mutable_buffers_1 &,
            const std::function<void(const Status &, size_t transferred)> &));
 
-  MOCK_METHOD6(async_connect,
-               void(const std::string &, TokenProto *, ExtendedBlockProto *,
-                    uint64_t, uint64_t,
-                    const std::function<void(const Status &)> &));
+  MOCK_METHOD5(AsyncRequestBlock,
+               void(const std::string &client_name,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const std::function<void(Status)> &handler));
+
+  MOCK_METHOD5(AsyncReadBlock, void(
+    const std::string & client_name,
+    const hadoop::hdfs::LocatedBlockProto &block,
+    size_t offset,
+    const MutableBuffers &buffers,
+    const std::function<void(const Status &, size_t)> handler));
 };
 
-template <class Trait>
-struct MockBlockReaderTrait {
-  typedef MockReader Reader;
-  struct State {
-    MockReader reader_;
-    size_t transferred_;
-    Reader *reader() { return &reader_; }
-    size_t *transferred() { return &transferred_; }
-    const size_t *transferred() const { return &transferred_; }
-  };
+class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> {
+    void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override {
+      handler(Status::OK(), shared_from_this());
+    }
+
+  void async_read_some(const MutableBuffers &buf,
+        std::function<void (const asio::error_code & error,
+                               std::size_t bytes_transferred) > handler) override {
+      (void)buf;
+      handler(asio::error::fault, 0);
+  }
 
-  static continuation::Pipeline<State> *CreatePipeline(
-      ::asio::io_service *, const DatanodeInfoProto &) {
-    auto m = continuation::Pipeline<State>::Create();
-    *m->state().transferred() = 0;
-    Trait::InitializeMockReader(m->state().reader());
-    return m;
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+      (void)buf;
+      handler(asio::error::fault, 0);
   }
 };
 
+
+class PartialMockFileHandle : public FileHandleImpl {
+  using FileHandleImpl::FileHandleImpl;
+public:
+  std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
+protected:
+  std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
+                                                 std::shared_ptr<DataNodeConnection> dn) override
+  {
+    (void) options; (void) dn;
+    assert(mock_reader_);
+    return mock_reader_;
+  }
+  std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+      ::asio::io_service *io_service,
+      const ::hadoop::hdfs::DatanodeInfoProto & dn,
+      const hadoop::common::TokenProto * token) override {
+    (void) io_service; (void) dn; (void) token;
+    return std::make_shared<MockDNConnection>();
+  }
+
+
+};
+
+TEST(BadDataNodeTest, TestNoNodes) {
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanode holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
+  bad_node_tracker->AddBadNode("foo");
+
+  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker);
+  Status stat;
+  size_t read = 0;
+
+  // Exclude the one datanode with the data
+  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), nullptr,
+      [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+
+  // Should fail with no resource available
+  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
+  ASSERT_EQ(0UL, read);
+}
+
 TEST(BadDataNodeTest, RecoverableError) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanode holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
   char buf[4096] = {
       0,
   };
   IoServiceImpl io_service;
-  Options default_options;
-  FileSystemImpl fs(&io_service, default_options);
   auto tracker = std::make_shared<BadDataNodeTracker>();
-  InputStreamImpl is(&fs, &blocks, tracker);
+  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(),  file_info, tracker);
   Status stat;
   size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          // resource unavailable error
-          .WillOnce(InvokeArgument<1>(
-              Status::ResourceUnavailable(
-                  "Unable to get some resource, try again later"),
-              sizeof(buf)));
-    }
-  };
+  EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+      // resource unavailable error
+      .WillOnce(InvokeArgument<4>(
+          Status::ResourceUnavailable("Unable to get some resource, try again later"), 0));
 
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+
+  is.AsyncPreadSome(
+      0, asio::buffer(buf, sizeof(buf)), nullptr,
       [&stat, &read](const Status &status, const std::string &,
                      size_t transferred) {
         stat = status;
@@ -108,7 +180,7 @@ TEST(BadDataNodeTest, RecoverableError) {
 
   std::string failing_dn = "id_of_bad_datanode";
   if (!stat.ok()) {
-    if (InputStream::ShouldExclude(stat)) {
+    if (FileHandle::ShouldExclude(stat)) {
       tracker->AddBadNode(failing_dn);
     }
   }
@@ -117,35 +189,37 @@ TEST(BadDataNodeTest, RecoverableError) {
 }
 
 TEST(BadDataNodeTest, InternalError) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanode holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("foo");
+
   char buf[4096] = {
       0,
   };
   IoServiceImpl io_service;
-  Options default_options;
   auto tracker = std::make_shared<BadDataNodeTracker>();
-  FileSystemImpl fs(&io_service, default_options);
-  InputStreamImpl is(&fs, &blocks, tracker);
+  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(),  file_info, tracker);
   Status stat;
   size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          // something bad happened on the DN, calling again isn't going to help
-          .WillOnce(
-              InvokeArgument<1>(Status::Exception("server_explosion_exception",
-                                                  "the server exploded"),
+  EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+      // resource unavailable error
+      .WillOnce(InvokeArgument<4>(
+              Status::Exception("server_explosion_exception",
+                                "the server exploded"),
                                 sizeof(buf)));
-    }
-  };
 
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+  is.AsyncPreadSome(
+      0, asio::buffer(buf, sizeof(buf)), nullptr,
       [&stat, &read](const Status &status, const std::string &,
                      size_t transferred) {
         stat = status;
@@ -156,7 +230,7 @@ TEST(BadDataNodeTest, InternalError) {
 
   std::string failing_dn = "id_of_bad_datanode";
   if (!stat.ok()) {
-    if (InputStream::ShouldExclude(stat)) {
+    if (FileHandle::ShouldExclude(stat)) {
       tracker->AddBadNode(failing_dn);
     }
   }

+ 0 - 232
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc

@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "fs/filesystem.h"
-#include "fs/bad_datanode_tracker.h"
-#include <gmock/gmock.h>
-
-using hadoop::common::TokenProto;
-using hadoop::hdfs::DatanodeInfoProto;
-using hadoop::hdfs::DatanodeIDProto;
-using hadoop::hdfs::ExtendedBlockProto;
-using hadoop::hdfs::LocatedBlockProto;
-using hadoop::hdfs::LocatedBlocksProto;
-
-using ::testing::_;
-using ::testing::InvokeArgument;
-using ::testing::Return;
-
-using namespace hdfs;
-
-namespace hdfs {
-
-class MockReader {
- public:
-  virtual ~MockReader() {}
-  MOCK_METHOD2(
-      async_read_some,
-      void(const asio::mutable_buffers_1 &,
-           const std::function<void(const Status &, size_t transferred)> &));
-
-  MOCK_METHOD6(async_connect,
-               void(const std::string &, TokenProto *, ExtendedBlockProto *,
-                    uint64_t, uint64_t,
-                    const std::function<void(const Status &)> &));
-};
-
-template <class Trait>
-struct MockBlockReaderTrait {
-  typedef MockReader Reader;
-  struct State {
-    MockReader reader_;
-    size_t transferred_;
-    Reader *reader() { return &reader_; }
-    size_t *transferred() { return &transferred_; }
-    const size_t *transferred() const { return &transferred_; }
-  };
-
-  static continuation::Pipeline<State> *CreatePipeline(
-      ::asio::io_service *, const DatanodeInfoProto &) {
-    auto m = continuation::Pipeline<State>::Create();
-    *m->state().transferred() = 0;
-    Trait::InitializeMockReader(m->state().reader());
-    return m;
-  }
-};
-}
-
-TEST(InputStreamTest, TestReadSingleTrunk) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
-    }
-  };
-
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &,
-                     size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(sizeof(buf), read);
-  read = 0;
-}
-
-TEST(InputStreamTest, TestReadMultipleTrunk) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .Times(4)
-          .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
-    }
-  };
-
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &,
-                     size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(sizeof(buf), read);
-  read = 0;
-}
-
-TEST(InputStreamTest, TestReadError) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto block;
-  DatanodeInfoProto dn;
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
-          .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
-    }
-  };
-
-  is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
-      "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
-      [&stat, &read](const Status &status, const std::string &,
-                     size_t transferred) {
-        stat = status;
-        read = transferred;
-      });
-  ASSERT_FALSE(stat.ok());
-  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
-  read = 0;
-}
-
-TEST(InputStreamTest, TestExcludeDataNode) {
-  LocatedBlocksProto blocks;
-  LocatedBlockProto *block = blocks.add_blocks();
-  ExtendedBlockProto *b = block->mutable_b();
-  b->set_poolid("");
-  b->set_blockid(1);
-  b->set_generationstamp(1);
-  b->set_numbytes(4096);
-
-  DatanodeInfoProto *di = block->add_locs();
-  DatanodeIDProto *dnid = di->mutable_id();
-  dnid->set_datanodeuuid("foo");
-
-  char buf[4096] = {
-      0,
-  };
-  IoServiceImpl io_service;
-  Options options;
-  FileSystemImpl fs(&io_service, options);
-  InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
-  Status stat;
-  size_t read = 0;
-  struct Trait {
-    static void InitializeMockReader(MockReader *reader) {
-      EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
-          .WillOnce(InvokeArgument<5>(Status::OK()));
-
-      EXPECT_CALL(*reader, async_read_some(_, _))
-          .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
-    }
-  };
-
-  std::shared_ptr<NodeExclusionRule> exclude_set =
-      std::make_shared<ExclusionSet>(std::set<std::string>({"foo"}));
-  is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), exclude_set,
-                    [&stat, &read](const Status &status, const std::string &,
-                                   size_t transferred) {
-                      stat = status;
-                      read = transferred;
-                    });
-  ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again),
-            stat.code());
-  ASSERT_EQ(0UL, read);
-}
-
-int main(int argc, char *argv[]) {
-  // The following line must be executed to initialize Google Mock
-  // (and Google Test) before running the tests.
-  ::testing::InitGoogleMock(&argc, argv);
-  return RUN_ALL_TESTS();
-}

+ 10 - 5
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h

@@ -18,6 +18,8 @@
 #ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_
 #define LIBHDFSPP_TEST_MOCK_CONNECTION_H_
 
+#include "common/async_stream.h"
+
 #include <asio/error_code.hpp>
 #include <asio/buffer.hpp>
 #include <asio/streambuf.hpp>
@@ -27,13 +29,15 @@
 
 namespace hdfs {
 
-class MockConnectionBase {
+class MockConnectionBase : public AsyncStream{
 public:
   MockConnectionBase(::asio::io_service *io_service);
   virtual ~MockConnectionBase();
   typedef std::pair<asio::error_code, std::string> ProducerResult;
-  template <class MutableBufferSequence, class Handler>
-  void async_read_some(const MutableBufferSequence &buf, Handler &&handler) {
+
+  void async_read_some(const MutableBuffers &buf,
+          std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
     if (produced_.size() == 0) {
       ProducerResult r = Produce();
       if (r.first) {
@@ -51,8 +55,9 @@ public:
     io_service_->post(std::bind(handler, asio::error_code(), len));
   }
 
-  template <class ConstBufferSequence, class Handler>
-  void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
     // CompletionResult res = OnWrite(buf);
     io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf)));
   }

+ 164 - 38
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc

@@ -21,6 +21,8 @@
 #include "datatransfer.pb.h"
 #include "common/util.h"
 #include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "reader/fileinfo.h"
 
 #include <google/protobuf/io/coded_stream.h>
 #include <google/protobuf/io/zero_copy_stream_impl.h>
@@ -36,10 +38,14 @@ using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
 using ::hadoop::hdfs::ExtendedBlockProto;
 using ::hadoop::hdfs::PacketHeaderProto;
 using ::hadoop::hdfs::ReadOpChecksumInfoProto;
+using ::hadoop::hdfs::LocatedBlockProto;
+using ::hadoop::hdfs::LocatedBlocksProto;
 
 using ::asio::buffer;
 using ::asio::error_code;
 using ::asio::mutable_buffers_1;
+using ::testing::_;
+using ::testing::InvokeArgument;
 using ::testing::Return;
 using std::make_pair;
 using std::string;
@@ -49,12 +55,47 @@ namespace pbio = pb::io;
 
 namespace hdfs {
 
-class MockDNConnection : public MockConnectionBase {
- public:
+class MockDNConnection : public MockConnectionBase, public DataNodeConnection{
+public:
   MockDNConnection(::asio::io_service &io_service)
       : MockConnectionBase(&io_service) {}
   MOCK_METHOD0(Produce, ProducerResult());
+
+  MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>));
+
+  void async_read_some(const MutableBuffers &buf,
+        std::function<void (const asio::error_code & error,
+                               std::size_t bytes_transferred) > handler) override {
+      this->MockConnectionBase::async_read_some(buf, handler);
+  }
+
+  void async_write_some(const ConstBuffers &buf,
+            std::function<void (const asio::error_code & error,
+                                 std::size_t bytes_transferred) > handler) override {
+    this->MockConnectionBase::async_write_some(buf, handler);
+  }
+};
+
+// Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we
+//     can test the logic of AsyncReadBlock
+class PartialMockReader : public BlockReaderImpl {
+public:
+  PartialMockReader() :
+    BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {};
+
+  MOCK_METHOD2(
+      AsyncReadPacket,
+      void(const asio::mutable_buffers_1 &,
+           const std::function<void(const Status &, size_t transferred)> &));
+
+  MOCK_METHOD5(AsyncRequestBlock,
+               void(const std::string &client_name,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const std::function<void(Status)> &handler));
 };
+
+
 }
 
 static inline string ToDelimitedString(const pb::MessageLite *msg) {
@@ -94,20 +135,102 @@ static inline std::pair<error_code, string> ProducePacket(
   return std::make_pair(error_code(), std::move(payload));
 }
 
+TEST(RemoteBlockReaderTest, TestReadSingleTrunk) {
+  auto file_info = std::make_shared<struct FileInfo>();
+  LocatedBlocksProto blocks;
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+
+  Status stat;
+  size_t read = 0;
+  PartialMockReader reader;
+  EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+      .WillOnce(InvokeArgument<4>(Status::OK()));
+  EXPECT_CALL(reader, AsyncReadPacket(_, _))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+
+  reader.AsyncReadBlock(
+       GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) {
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  Status stat;
+  size_t read = 0;
+
+  PartialMockReader reader;
+  EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+      .WillOnce(InvokeArgument<4>(Status::OK()));
+
+  EXPECT_CALL(reader, AsyncReadPacket(_, _))
+      .Times(4)
+      .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
+
+  reader.AsyncReadBlock(
+       GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(sizeof(buf), read);
+  read = 0;
+}
+
+TEST(RemoteBlockReaderTest, TestReadError) {
+  LocatedBlockProto block;
+  char buf[4096] = {
+      0,
+  };
+  Status stat;
+  size_t read = 0;
+  PartialMockReader reader;
+  EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+      .WillOnce(InvokeArgument<4>(Status::OK()));
+
+  EXPECT_CALL(reader, AsyncReadPacket(_, _))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+      .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+      .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
+
+  reader.AsyncReadBlock(
+       GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+      [&stat, &read](const Status &status, size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ(sizeof(buf) / 4 * 3, read);
+  read = 0;
+}
+
 template <class Stream = MockDNConnection, class Handler>
-static std::shared_ptr<RemoteBlockReader<Stream>> ReadContent(
-    Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
-    uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
-    const Handler &handler) {
+static std::shared_ptr<BlockReaderImpl>
+ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block,
+            uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
+            const Handler &handler) {
   BlockReaderOptions options;
-  auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
   Status result;
-  reader->async_connect("libhdfs++", token, &block, length, offset,
+  reader->AsyncRequestBlock("libhdfs++", &block, length, offset,
                         [buf, reader, handler](const Status &stat) {
                           if (!stat.ok()) {
                             handler(stat, 0);
                           } else {
-                            reader->async_read_some(buf, handler);
+                            reader->AsyncReadPacket(buf, handler);
                           }
                         });
   return reader;
@@ -117,11 +240,11 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
 
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
 
@@ -130,16 +253,19 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   block.set_blockid(0);
   block.set_generationstamp(0);
 
+  bool done = false;
   std::string data(kChunkSize, 0);
-  ReadContent(&conn, nullptr, block, kChunkSize, 0,
+  ReadContent(conn, block, kChunkSize, 0,
               buffer(const_cast<char *>(data.c_str()), data.size()),
-              [&data, &io_service](const Status &stat, size_t transferred) {
+              [&data, &io_service, &done](const Status &stat, size_t transferred) {
                 ASSERT_TRUE(stat.ok());
                 ASSERT_EQ(kChunkSize, transferred);
                 ASSERT_EQ(kChunkData, data);
+                done = true;
                 io_service.stop();
               });
   io_service.run();
+  ASSERT_TRUE(done);
 }
 
 TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
@@ -149,7 +275,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
 
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
   ReadOpChecksumInfoProto *checksum_info =
       block_op_resp.mutable_readopchecksuminfo();
@@ -159,7 +285,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   checksum->set_bytesperchecksum(512);
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
 
@@ -168,16 +294,20 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   block.set_blockid(0);
   block.set_generationstamp(0);
 
+  bool done = false;
+
   string data(kLength, 0);
-  ReadContent(&conn, nullptr, block, data.size(), kOffset,
+  ReadContent(conn, block, data.size(), kOffset,
               buffer(const_cast<char *>(data.c_str()), data.size()),
-              [&data, &io_service](const Status &stat, size_t transferred) {
+              [&data, &io_service,&done](const Status &stat, size_t transferred) {
                 ASSERT_TRUE(stat.ok());
                 ASSERT_EQ(kLength, transferred);
                 ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+                done = true;
                 io_service.stop();
               });
   io_service.run();
+  ASSERT_TRUE(done);
 }
 
 TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
@@ -185,11 +315,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   static const string kChunkData(kChunkSize, 'a');
 
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)))
       .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
@@ -202,25 +332,22 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   string data(kChunkSize, 0);
   mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
   BlockReaderOptions options;
-  auto reader =
-      std::make_shared<RemoteBlockReader<MockDNConnection>>(options, &conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
   Status result;
-  reader->async_connect(
-      "libhdfs++", nullptr, &block, data.size(), 0,
+  reader->AsyncRequestBlock(
+      "libhdfs++", &block, data.size(), 0,
       [buf, reader, &data, &io_service](const Status &stat) {
         ASSERT_TRUE(stat.ok());
-        reader->async_read_some(
-            buf, [buf, reader, &data, &io_service](const Status &stat,
-                                                   size_t transferred) {
+        reader->AsyncReadPacket(
+            buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
               ASSERT_TRUE(stat.ok());
               ASSERT_EQ(kChunkSize, transferred);
               ASSERT_EQ(kChunkData, data);
               data.clear();
               data.resize(kChunkSize);
               transferred = 0;
-              reader->async_read_some(
-                  buf,
-                  [&data, &io_service](const Status &stat, size_t transferred) {
+              reader->AsyncReadPacket(
+                  buf, [&data,&io_service](const Status &stat, size_t transferred) {
                     ASSERT_TRUE(stat.ok());
                     ASSERT_EQ(kChunkSize, transferred);
                     ASSERT_EQ(kChunkData, data);
@@ -234,12 +361,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
 TEST(RemoteBlockReaderTest, TestSaslConnection) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');
-  static const string kAuthPayload =
-      "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
-      "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
-      "charset=utf-8,algorithm=md5-sess";
+  static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+                                     "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
+                                     "charset=utf-8,algorithm=md5-sess";
   ::asio::io_service io_service;
-  MockDNConnection conn(io_service);
+  auto conn = std::make_shared<MockDNConnection>(io_service);
   BlockOpResponseProto block_op_resp;
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
@@ -252,23 +378,23 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
       ::hadoop::hdfs::
           DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
 
-  EXPECT_CALL(conn, Produce())
+  EXPECT_CALL(*conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
       .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
 
-  DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
+  auto sasl_conn = std::make_shared<DataTransferSaslStream<MockDNConnection> >(conn, "foo", "bar");
   ExtendedBlockProto block;
   block.set_poolid("foo");
   block.set_blockid(0);
   block.set_generationstamp(0);
 
   std::string data(kChunkSize, 0);
-  sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service](
+  sasl_conn->Handshake([sasl_conn, &block, &data, &io_service](
       const Status &s) {
     ASSERT_TRUE(s.ok());
-    ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
+    ReadContent(sasl_conn, block, kChunkSize, 0,
                 buffer(const_cast<char *>(data.c_str()), data.size()),
                 [&data, &io_service](const Status &stat, size_t transferred) {
                   ASSERT_TRUE(stat.ok());