Browse Source

HDFS-9643. libhdfs++: Support async cancellation of read operations. Contributed by James Clampffer.

James 10 năm trước cách đây
mục cha
commit
166b3d49df
21 tập tin đã thay đổi với 433 bổ sung20 xóa
  1. 9 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
  2. 6 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
  3. 3 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
  4. 7 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
  6. 37 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc
  7. 40 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h
  8. 13 2
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
  9. 6 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
  11. 31 4
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
  12. 12 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
  14. 10 4
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
  15. 9 2
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
  16. 2 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
  17. 5 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
  18. 55 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc
  19. 52 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h
  20. 8 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
  21. 123 5
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h

@@ -63,6 +63,15 @@ LIBHDFS_EXTERNAL
 void hdfsGetLastError(char *buf, int len);
 
 
+/**
+ *  Cancels operations being made by the FileHandle.
+ *  Note: Cancel cannot be reversed.  This is intended
+ *  to be used before hdfsClose to avoid waiting for
+ *  operations to complete.
+ **/
+LIBHDFS_EXTERNAL
+int hdfsCancel(hdfsFS fs, hdfsFile file);
+
 /**
  * Create an HDFS builder, using the configuration XML files from the indicated
  * directory.  If the directory does not exist, or contains no configuration
@@ -99,6 +108,5 @@ int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
      */
 int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val);
 
-
 } /* end extern "C" */
 #endif

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h

@@ -92,6 +92,12 @@ public:
   virtual Status Read(void *buf, size_t *nbyte) = 0;
   virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0;
 
+  /**
+   * Cancel outstanding file operations.  This is not reversable, once called
+   * the handle should be disposed of.
+   **/
+  virtual void CancelOperations(void) = 0;
+
   /**
    * Determine if a datanode should be excluded from future operations
    * based on the return Status.

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h

@@ -47,6 +47,8 @@ class Status {
   { return Status(kException, expception_class_name, error_message); }
   static Status Error(const char *error_message)
   { return Exception("Exception", error_message); }
+  static Status Canceled()
+  { return Status(kOperationCanceled,""); }
 
   // Returns true iff the status indicates success.
   bool ok() const { return (state_ == NULL); }
@@ -64,6 +66,7 @@ class Status {
     kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
     kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
     kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
+    kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
     kException = 255,
   };
 

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

@@ -267,6 +267,13 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
   return offset;
 }
 
+int hdfsCancel(hdfsFS fs, hdfsFile file) {
+  if (!CheckSystemAndHandle(fs, file)) {
+    return -1;
+  }
+  static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
+  return 0;
+}
 
 /*******************************************************************
  *                BUILDER INTERFACE

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt

@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc)
+add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc cancel_tracker.cc)
 add_library(common $<TARGET_OBJECTS:common_obj>)
 target_link_libraries(common ${LIB_DL})

+ 37 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "cancel_tracker.h"
+
+namespace hdfs {
+
+CancelTracker::CancelTracker() : canceled_(false) {}
+
+std::shared_ptr<CancelTracker> CancelTracker::New() {
+  return std::make_shared<CancelTracker>();
+}
+
+bool CancelTracker::is_canceled() {
+  return canceled_;
+}
+
+void CancelTracker::set_canceled() {
+  canceled_ = true;
+}
+
+}

+ 40 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef COMMON_CANCELTRACKER_H
+#define COMMON_CANCELTRACKER_H
+
+#include <memory>
+#include <atomic>
+
+namespace hdfs {
+
+class CancelTracker : public std::enable_shared_from_this<CancelTracker> {
+ public:
+  CancelTracker();
+  static std::shared_ptr<CancelTracker> New();
+  void set_canceled();
+  bool is_canceled();
+ private:
+  std::atomic_bool canceled_;
+};
+
+typedef std::shared_ptr<CancelTracker> CancelHandle;
+
+}
+#endif

+ 13 - 2
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h

@@ -19,6 +19,7 @@
 #define LIB_COMMON_CONTINUATION_CONTINUATION_H_
 
 #include "hdfspp/status.h"
+#include "common/cancel_tracker.h"
 
 #include <functional>
 #include <memory>
@@ -81,6 +82,9 @@ template <class State> class Pipeline {
 public:
   typedef std::function<void(const Status &, const State &)> UserHandler;
   static Pipeline *Create() { return new Pipeline(); }
+  static Pipeline *Create(CancelHandle cancel_handle) {
+    return new Pipeline(cancel_handle);
+  }
   Pipeline &Push(Continuation *stage);
   void Run(UserHandler &&handler);
   State &state() { return state_; }
@@ -91,9 +95,11 @@ private:
   size_t stage_;
   std::function<void(const Status &, const State &)> handler_;
 
-  Pipeline() : stage_(0) {}
+  Pipeline() : stage_(0), cancel_handle_(CancelTracker::New()) {}
+  Pipeline(CancelHandle cancel_handle) : stage_(0), cancel_handle_(cancel_handle) {}
   ~Pipeline() = default;
   void Schedule(const Status &status);
+  CancelHandle cancel_handle_;
 };
 
 template <class State>
@@ -104,7 +110,12 @@ inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
 
 template <class State>
 inline void Pipeline<State>::Schedule(const Status &status) {
-  if (!status.ok() || stage_ >= routines_.size()) {
+  // catch cancelation signalled from outside of pipeline
+  if(cancel_handle_->is_canceled()) {
+    handler_(Status::Canceled(), state_);
+    routines_.clear();
+    delete this;
+  } else if (!status.ok() || stage_ >= routines_.size()) {
     handler_(status, state_);
     routines_.clear();
     delete this;

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc

@@ -53,5 +53,11 @@ void DataNodeConnectionImpl::Connect(
             handler(ToStatus(ec), shared_this); });
 }
 
+void DataNodeConnectionImpl::Cancel() {
+  // best to do a shutdown() first for portability
+  conn_->shutdown(asio::ip::tcp::socket::shutdown_both);
+  conn_->close();
+}
+
 
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h

@@ -33,6 +33,7 @@ public:
 
     virtual ~DataNodeConnection();
     virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0;
+    virtual void Cancel() = 0;
 };
 
 
@@ -48,6 +49,8 @@ public:
 
   void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override;
 
+  void Cancel() override;
+
   void async_read_some(const MutableBuffers &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) override {

+ 31 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc

@@ -34,13 +34,17 @@ FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string
                                  const std::shared_ptr<const struct FileInfo> file_info,
                                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
     : io_service_(io_service), client_name_(client_name), file_info_(file_info),
-      bad_node_tracker_(bad_data_nodes), offset_(0) {
+      bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) {
 }
 
 void FileHandleImpl::PositionRead(
     void *buf, size_t nbyte, uint64_t offset,
-    const std::function<void(const Status &, size_t)>
-        &handler) {
+    const std::function<void(const Status &, size_t)> &handler) {
+  /* prevent usage after cancelation */
+  if(cancel_state_->is_canceled()) {
+    handler(Status::Canceled(), 0);
+    return;
+  }
 
   auto callback = [this, handler](const Status &status,
                                   const std::string &contacted_datanode,
@@ -90,6 +94,10 @@ Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
 }
 
 Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
+  if(cancel_state_->is_canceled()) {
+    return Status::Canceled();
+  }
+
   off_t new_offset = -1;
 
   switch (whence) {
@@ -138,6 +146,11 @@ void FileHandleImpl::AsyncPreadSome(
   using ::hadoop::hdfs::DatanodeInfoProto;
   using ::hadoop::hdfs::LocatedBlockProto;
 
+  if(cancel_state_->is_canceled()) {
+    handler(Status::Canceled(), "", 0);
+    return;
+  }
+
   /**
    *  Note: block and chosen_dn will end up pointing to things inside
    *  the blocks_ vector.  They shouldn't be directly deleted.
@@ -210,7 +223,9 @@ void FileHandleImpl::AsyncPreadSome(
 std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
                                                std::shared_ptr<DataNodeConnection> dn)
 {
-  return std::make_shared<BlockReaderImpl>(options, dn);
+  std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_);
+  readers_.AddReader(reader);
+  return reader;
 }
 
 std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
@@ -220,6 +235,17 @@ std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
   return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
 }
 
+void FileHandleImpl::CancelOperations() {
+  cancel_state_->set_canceled();
+
+  /* Push update to BlockReaders that may be hung in an asio call */
+  std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders();
+  for(auto reader : live_readers) {
+    reader->CancelOperation();
+  }
+}
+
+
 bool FileHandle::ShouldExclude(const Status &s) {
   if (s.ok()) {
     return false;
@@ -228,6 +254,7 @@ bool FileHandle::ShouldExclude(const Status &s) {
   switch (s.code()) {
     /* client side resource exhaustion */
     case Status::kResourceUnavailable:
+    case Status::kOperationCanceled:
       return false;
     case Status::kInvalidArgument:
     case Status::kUnimplemented:

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h

@@ -20,7 +20,9 @@
 
 #include "common/hdfs_public_api.h"
 #include "common/async_stream.h"
+#include "common/cancel_tracker.h"
 #include "reader/fileinfo.h"
+#include "reader/readergroup.h"
 
 #include "asio.hpp"
 #include "bad_datanode_tracker.h"
@@ -94,6 +96,14 @@ public:
                       const std::function<void(const Status &status,
                       const std::string &dn_id, size_t bytes_read)> handler);
 
+
+  /**
+   *  Cancels all operations instantiated from this FileHandle.
+   *  Will set a flag to abort continuation pipelines when they try to move to the next step.
+   *  Closes TCP connections to Datanode in order to abort pipelines waiting on slow IO.
+   **/
+  virtual void CancelOperations(void) override;
+
 protected:
   virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
                                                  std::shared_ptr<DataNodeConnection> dn);
@@ -108,6 +118,8 @@ private:
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
   bool CheckSeekBounds(ssize_t desired_position);
   off_t offset_;
+  CancelHandle cancel_state_;
+  ReaderGroup readers_;
 };
 
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt

@@ -16,6 +16,6 @@
 # limitations under the License.
 #
 
-add_library(reader_obj OBJECT block_reader.cc datatransfer.cc)
+add_library(reader_obj OBJECT block_reader.cc datatransfer.cc readergroup.cc)
 add_dependencies(reader_obj proto)
 add_library(reader $<TARGET_OBJECTS:reader_obj>)

+ 10 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc

@@ -22,6 +22,7 @@
 
 #include <future>
 
+
 namespace hdfs {
 
 hadoop::hdfs::OpReadBlockProto
@@ -65,7 +66,7 @@ void BlockReaderImpl::AsyncRequestBlock(
     hadoop::hdfs::BlockOpResponseProto response;
   };
 
-  auto m = continuation::Pipeline<State>::Create();
+  auto m = continuation::Pipeline<State>::Create(cancel_state_);
   State *s = &m->state();
 
   s->header.insert(s->header.begin(),
@@ -287,7 +288,7 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
     }
 
     auto m =
-        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
     m->state().set_status(parent_->options_.verify_checksum
                               ? hadoop::hdfs::Status::CHECKSUM_OK
                               : hadoop::hdfs::Status::SUCCESS);
@@ -316,7 +317,7 @@ void BlockReaderImpl::AsyncReadPacket(
   struct State {
     std::shared_ptr<size_t> bytes_transferred;
   };
-  auto m = continuation::Pipeline<State>::Create();
+  auto m = continuation::Pipeline<State>::Create(cancel_state_);
   m->state().bytes_transferred = std::make_shared<size_t>(0);
 
   m->Push(new ReadPacketHeader(this))
@@ -415,7 +416,7 @@ void BlockReaderImpl::AsyncReadBlock(
     const MutableBuffers &buffers,
     const std::function<void(const Status &, size_t)> handler) {
 
-  auto m = continuation::Pipeline<size_t>::Create();
+  auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
   size_t * bytesTransferred = &m->state();
 
   size_t size = asio::buffer_size(buffers);
@@ -430,4 +431,9 @@ void BlockReaderImpl::AsyncReadBlock(
   });
 }
 
+void BlockReaderImpl::CancelOperation() {
+  /* just forward cancel to DNConnection */
+  dn_->Cancel();
+}
+
 }

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h

@@ -20,6 +20,7 @@
 
 #include "hdfspp/status.h"
 #include "common/async_stream.h"
+#include "common/cancel_tracker.h"
 #include "datatransfer.pb.h"
 #include "connection/datanodeconnection.h"
 
@@ -82,14 +83,17 @@ public:
     uint64_t length,
     uint64_t offset,
     const std::function<void(Status)> &handler) = 0;
+
+  virtual void CancelOperation() = 0;
 };
 
 class BlockReaderImpl
     : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
 public:
-  explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn)
+  explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn,
+                           CancelHandle cancel_state)
       : dn_(dn), state_(kOpen), options_(options),
-        chunk_padding_bytes_(0) {}
+        chunk_padding_bytes_(0), cancel_state_(cancel_state) {}
 
   virtual void AsyncReadPacket(
     const MutableBuffers &buffers,
@@ -108,6 +112,8 @@ public:
     const MutableBuffers &buffers,
     const std::function<void(const Status &, size_t)> handler) override;
 
+  virtual void CancelOperation() override;
+
   size_t ReadPacket(const MutableBuffers &buffers, Status *status);
 
   Status RequestBlock(
@@ -143,6 +149,7 @@ private:
   int chunk_padding_bytes_;
   long long bytes_to_read_;
   std::vector<char> checksum_;
+  CancelHandle cancel_state_;
 };
 }
 

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h

@@ -58,6 +58,8 @@ public:
 
   void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override
   {(void)handler;  /*TODO: Handshaking goes here*/};
+
+  void Cancel();
 private:
   DataTransferSaslStream(const DataTransferSaslStream &) = delete;
   DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h

@@ -126,6 +126,11 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
   m->Run([next](const Status &status, const State &) { next(status); });
 }
 
+template <class Stream>
+void DataTransferSaslStream<Stream>::Cancel() {
+  /* implement with secured reads */
+}
+
 }
 
 #endif

+ 55 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "readergroup.h"
+
+#include <algorithm>
+
+namespace hdfs {
+
+void ReaderGroup::AddReader(std::shared_ptr<BlockReader> reader) {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+  ClearDeadReaders();
+  std::weak_ptr<BlockReader> weak_ref = reader;
+  readers_.push_back(weak_ref);
+}
+
+std::vector<std::shared_ptr<BlockReader>> ReaderGroup::GetLiveReaders() {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+
+  std::vector<std::shared_ptr<BlockReader>> live_readers;
+  for(auto it=readers_.begin(); it != readers_.end(); it++) {
+    std::shared_ptr<BlockReader> live_reader = it->lock();
+    if(live_reader) {
+      live_readers.push_back(live_reader);
+    }
+  }
+  return live_readers;
+}
+
+void ReaderGroup::ClearDeadReaders() {
+  std::lock_guard<std::recursive_mutex> state_lock(state_lock_);
+
+  auto reader_is_dead = [](const std::weak_ptr<BlockReader> &ptr) {
+    return ptr.expired();
+  };
+
+  auto it = std::remove_if(readers_.begin(), readers_.end(), reader_is_dead);
+  readers_.erase(it, readers_.end());
+}
+
+} // end namespace hdfs

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef READER_READER_GROUP_H_
+#define READER_READER_GROUP_H_
+
+#include "block_reader.h"
+
+#include <memory>
+#include <vector>
+#include <mutex>
+
+namespace hdfs {
+
+/**
+ * Provide a way of logically grouping ephemeral block readers
+ * so that their status can be monitored or changed.
+ *
+ * Note: This does not attempt to extend the reader life
+ * cycle.  Readers are assumed to be owned by something else
+ * using a shared_ptr.
+ **/
+
+class ReaderGroup {
+ public:
+  ReaderGroup() {};
+  void AddReader(std::shared_ptr<BlockReader> reader);
+  /* find live readers, promote to shared_ptr */
+  std::vector<std::shared_ptr<BlockReader>> GetLiveReaders();
+ private:
+  /* remove weak_ptrs that don't point to live object */
+  void ClearDeadReaders();
+  std::recursive_mutex state_lock_;
+  std::vector<std::weak_ptr<BlockReader>> readers_;
+};
+
+} // end namespace hdfs
+#endif

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc

@@ -55,6 +55,10 @@ public:
     size_t offset,
     const MutableBuffers &buffers,
     const std::function<void(const Status &, size_t)> handler));
+
+  virtual void CancelOperation() override {
+    /* no-op, declared pure virtual */
+  }
 };
 
 class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> {
@@ -75,6 +79,10 @@ class MockDNConnection : public DataNodeConnection, public std::enable_shared_fr
       (void)buf;
       handler(asio::error::fault, 0);
   }
+
+  virtual void Cancel() override {
+    /* no-op, declared pure virtual */
+  }
 };
 
 

+ 123 - 5
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc

@@ -20,6 +20,7 @@
 
 #include "datatransfer.pb.h"
 #include "common/util.h"
+#include "common/cancel_tracker.h"
 #include "reader/block_reader.h"
 #include "reader/datatransfer.h"
 #include "reader/fileinfo.h"
@@ -29,6 +30,8 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include <iostream>
+
 using namespace hdfs;
 
 using ::hadoop::common::TokenProto;
@@ -58,14 +61,18 @@ namespace hdfs {
 class MockDNConnection : public MockConnectionBase, public DataNodeConnection{
 public:
   MockDNConnection(::asio::io_service &io_service)
-      : MockConnectionBase(&io_service) {}
+      : MockConnectionBase(&io_service), OnRead([](){}) {}
   MOCK_METHOD0(Produce, ProducerResult());
 
   MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>));
 
+  /* event handler to trigger side effects */
+  std::function<void(void)> OnRead;
+
   void async_read_some(const MutableBuffers &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) override {
+      this->OnRead();
       this->MockConnectionBase::async_read_some(buf, handler);
   }
 
@@ -74,6 +81,10 @@ public:
                                  std::size_t bytes_transferred) > handler) override {
     this->MockConnectionBase::async_write_some(buf, handler);
   }
+
+  void Cancel() {
+    /* no-op, declared pure virtual */
+  }
 };
 
 // Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we
@@ -81,7 +92,7 @@ public:
 class PartialMockReader : public BlockReaderImpl {
 public:
   PartialMockReader() :
-    BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {};
+    BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>(), CancelTracker::New()) {};
 
   MOCK_METHOD2(
       AsyncReadPacket,
@@ -221,9 +232,9 @@ template <class Stream = MockDNConnection, class Handler>
 static std::shared_ptr<BlockReaderImpl>
 ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block,
             uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
-            const Handler &handler) {
+            const Handler &handler, CancelHandle cancel_handle = CancelTracker::New()) {
   BlockReaderOptions options;
-  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn, cancel_handle);
   Status result;
   reader->AsyncRequestBlock("libhdfs++", &block, length, offset,
                         [buf, reader, handler](const Status &stat) {
@@ -268,6 +279,59 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   ASSERT_TRUE(done);
 }
 
+/* used for cancelation tests, global to avoid cluttering capture lists */
+CancelHandle packet_canceller;
+
+TEST(RemoteBlockReaderTest, TestCancelWhileReceiving) {
+  packet_canceller = CancelTracker::New();
+
+  static const size_t kChunkSize = 512;
+  static const string kChunkData(kChunkSize, 'a');
+  ::asio::io_service io_service;
+  auto conn = std::make_shared<MockDNConnection>(io_service);
+  BlockOpResponseProto block_op_resp;
+
+  /**
+   * async_read would normally get called 5 times here; once for each
+   * continuation in the pipeline.  Cancel will be triggered on the
+   * fourth call to catch the pipeline mid-execution.
+   **/
+  int call_count = 0;
+  int trigger_at_count = 4;
+  auto cancel_trigger = [&call_count, &trigger_at_count]() {
+    call_count += 1;
+    std::cout << "read called " << call_count << " times" << std::endl;
+    if(call_count == trigger_at_count)
+       packet_canceller->set_canceled();
+  };
+
+  conn->OnRead = cancel_trigger;
+
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+  EXPECT_CALL(*conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  bool done = false;
+  std::string data(kChunkSize, 0);
+  ReadContent(conn, block, kChunkSize, 0,
+              buffer(const_cast<char *>(data.c_str()), data.size()),
+              [&data, &io_service, &done](const Status &stat, size_t transferred) {
+                ASSERT_EQ(stat.code(), Status::kOperationCanceled);
+                ASSERT_EQ(0, transferred);
+                done = true;
+                io_service.stop();
+              }, packet_canceller);
+
+  io_service.run();
+  ASSERT_TRUE(done);
+}
+
 TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   static const size_t kChunkSize = 1024;
   static const size_t kLength = kChunkSize / 4 * 3;
@@ -332,7 +396,7 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   string data(kChunkSize, 0);
   mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
   BlockReaderOptions options;
-  auto reader = std::make_shared<BlockReaderImpl>(options, conn);
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn, CancelTracker::New());
   Status result;
   reader->AsyncRequestBlock(
       "libhdfs++", &block, data.size(), 0,
@@ -358,6 +422,60 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   io_service.run();
 }
 
+TEST(RemoteBlockReaderTest, TestReadCancelBetweenPackets) {
+  packet_canceller = CancelTracker::New();
+
+  static const size_t kChunkSize = 1024;
+  static const string kChunkData(kChunkSize, 'a');
+
+  ::asio::io_service io_service;
+  auto conn = std::make_shared<MockDNConnection>(io_service);
+  BlockOpResponseProto block_op_resp;
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+  EXPECT_CALL(*conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)));
+      /* the second AsyncReadPacket should never attempt to read */
+
+  ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
+  string data(kChunkSize, 0);
+  mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
+  BlockReaderOptions options;
+  auto reader = std::make_shared<BlockReaderImpl>(options, conn, packet_canceller);
+  Status result;
+  reader->AsyncRequestBlock(
+      "libhdfs++", &block, data.size(), 0,
+      [buf, reader, &data, &io_service](const Status &stat) {
+        ASSERT_TRUE(stat.ok());
+        reader->AsyncReadPacket(
+            buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
+              ASSERT_TRUE(stat.ok());
+              ASSERT_EQ(kChunkSize, transferred);
+              ASSERT_EQ(kChunkData, data);
+              data.clear();
+              data.resize(kChunkSize);
+              transferred = 0;
+
+              /* Cancel the operation.*/
+              packet_canceller->set_canceled();
+
+              reader->AsyncReadPacket(
+                  buf, [&data,&io_service](const Status &stat, size_t transferred) {
+                    ASSERT_EQ(stat.code(), Status::kOperationCanceled);
+                    ASSERT_EQ(0, transferred);
+                    io_service.stop();
+                  });
+            });
+      });
+  io_service.run();
+}
+
+
 TEST(RemoteBlockReaderTest, TestSaslConnection) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');