Browse Source

HDFS-9368. Implement reads with implicit offset state in libhdfs++. Contributed by James Clampffer.

James 9 years ago
parent
commit
9cdd5e385e

+ 86 - 11
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

@@ -58,6 +58,43 @@ static void ReportError(int errnum, std::string msg) {
 #endif
 }
 
+/* Convert Status wrapped error into appropriate errno and return code */
+static int Error(const Status &stat) {
+  int code = stat.code();
+  switch (code) {
+    case Status::Code::kOk:
+      return 0;
+    case Status::Code::kInvalidArgument:
+      ReportError(EINVAL, "Invalid argument");
+      break;
+    case Status::Code::kResourceUnavailable:
+      ReportError(EAGAIN, "Resource temporarily unavailable");
+      break;
+    case Status::Code::kUnimplemented:
+      ReportError(ENOSYS, "Function not implemented");
+      break;
+    case Status::Code::kException:
+      ReportError(EINTR, "Exception raised");
+      break;
+    default:
+      ReportError(ENOSYS, "Error: unrecognised code");
+  }
+  return -1;
+}
+
+/* return false on failure */
+bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
+  if (!fs) {
+    ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
+    return false;
+  }
+  if (!file) {
+    ReportError(EBADF, "Cannot perform FS operations with null File handle.");
+    return false;
+  }
+  return true;
+}
+
 /**
  * C API implementations
  **/
@@ -110,28 +147,66 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
 }
 
 int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
-  if (!fs) {
-    ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
-    return -1;
-  }
-  if (!file) {
-    ReportError(EBADF, "Cannot perform FS operations with null File handle.");
+  if (!CheckSystemAndHandle(fs, file)) {
     return -1;
   }
+
   delete file;
   return 0;
 }
 
 tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
                 tSize length) {
-  if (!fs) {
-    ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
+  if (!CheckSystemAndHandle(fs, file)) {
     return -1;
   }
-  if (!file) {
-    ReportError(EBADF, "Cannot perform FS operations with null File handle.");
+
+  size_t len = length;
+  Status stat = file->get_impl()->Pread(buffer, &len, position);
+  if (!stat.ok()) {
+    return Error(stat);
+  }
+  return (tSize)len;
+}
+
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
+  if (!CheckSystemAndHandle(fs, file)) {
+    return -1;
+  }
+
+  size_t len = length;
+  Status stat = file->get_impl()->Read(buffer, &len);
+  if (!stat.ok()) {
+    return Error(stat);
+  }
+
+  return (tSize)len;
+}
+
+int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
+  if (!CheckSystemAndHandle(fs, file)) {
     return -1;
   }
 
-  return file->get_impl()->Pread(buffer, length, position);
+  off_t desired = desiredPos;
+  Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
+  if (!stat.ok()) {
+    return Error(stat);
+  }
+
+  return (int)desired;
+}
+
+tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
+  if (!CheckSystemAndHandle(fs, file)) {
+    return -1;
+  }
+
+  ssize_t offset = 0;
+  Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
+  if (!stat.ok()) {
+    return Error(stat);
+  }
+
+  return offset;
 }

+ 58 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc

@@ -35,7 +35,9 @@
 
 namespace hdfs {
 
-ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
+FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){};
+
+Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) {
   auto stat = std::make_shared<std::promise<Status>>();
   std::future<Status> future(stat->get_future());
 
@@ -49,7 +51,7 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
     contacted_datanode = dn;
   };
 
-  input_stream_->PositionRead(buf, nbyte, offset, callback);
+  input_stream_->PositionRead(buf, *nbyte, offset, callback);
 
   /* wait for async to finish */
   auto s = future.get();
@@ -62,9 +64,61 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
       impl->bad_node_tracker_->AddBadNode(contacted_datanode);
     }
 
-    return -1;
+    return s;
+  }
+  *nbyte = (size_t)read_count;
+  return Status::OK();
+}
+
+Status FileHandle::Read(void *buf, size_t *nbyte) {
+  Status stat = Pread(buf, nbyte, offset_);
+  if (!stat.ok()) {
+    return stat;
   }
-  return (ssize_t)read_count;
+
+  offset_ += *nbyte;
+  return Status::OK();
+}
+
+Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) {
+  off_t new_offset = -1;
+
+  switch (whence) {
+    case std::ios_base::beg:
+      new_offset = *offset;
+      break;
+    case std::ios_base::cur:
+      new_offset = offset_ + *offset;
+      break;
+    case std::ios_base::end:
+      new_offset = static_cast<InputStreamImpl *>(input_stream_.get())
+                       ->get_file_length() +
+                   *offset;
+      break;
+    default:
+      /* unsupported */
+      return Status::InvalidArgument("Invalid Seek whence argument");
+  }
+
+  if (!CheckSeekBounds(new_offset)) {
+    return Status::InvalidArgument("Seek offset out of bounds");
+  }
+  offset_ = new_offset;
+
+  *offset = offset_;
+  return Status::OK();
+}
+
+/* return false if seek will be out of bounds */
+bool FileHandle::CheckSeekBounds(ssize_t desired_position) {
+  ssize_t file_length =
+      static_cast<InputStreamImpl *>(input_stream_.get())->get_file_length();
+
+  if (desired_position < 0 || desired_position >= file_length) {
+    return false;
+  }
+
+  return true;
 }
 
 bool FileHandle::IsOpenForRead() {

+ 20 - 2
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h

@@ -24,6 +24,7 @@
 #include <vector>
 #include <mutex>
 #include <chrono>
+#include <iostream>
 
 #include "libhdfspp/hdfs.h"
 #include "fs/bad_datanode_tracker.h"
@@ -42,14 +43,31 @@ class HadoopFileSystem;
 class FileHandle {
  public:
   virtual ~FileHandle(){};
-  ssize_t Pread(void *buf, size_t nbyte, off_t offset);
+  /**
+   * Note:  The nbyte argument for Read and Pread as well as the
+   * offset argument for Seek are in/out parameters.
+   *
+   * For Read and Pread the value referenced by nbyte should
+   * be set to the number of bytes to read. Before returning
+   * the value referenced will be set by the callee to the number
+   * of bytes that was successfully read.
+   *
+   * For Seek the value referenced by offset should be the number
+   * of bytes to shift from the specified whence position.  The
+   * referenced value will be set to the new offset before returning.
+   **/
+  Status Pread(void *buf, size_t *nbyte, off_t offset);
+  Status Read(void *buf, size_t *nbyte);
+  Status Seek(off_t *offset, std::ios_base::seekdir whence);
   bool IsOpenForRead();
 
  private:
   /* handle should only be created by fs */
   friend class HadoopFileSystem;
-  FileHandle(InputStream *is) : input_stream_(is){};
+  FileHandle(InputStream *is);
+  bool CheckSeekBounds(ssize_t desired_position);
   std::unique_ptr<InputStream> input_stream_;
+  off_t offset_;
 };
 
 class HadoopFileSystem {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -72,7 +72,7 @@ class InputStreamImpl : public InputStream {
                       const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
                       const MutableBufferSequence &buffers,
                       const Handler &handler);
-
+  uint64_t get_file_length() const;
  private:
   FileSystemImpl *fs_;
   unsigned long long file_length_;

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc

@@ -43,4 +43,6 @@ void InputStreamImpl::PositionRead(
         handler) {
   AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler);
 }
+
+uint64_t InputStreamImpl::get_file_length() const { return file_length_; }
 }