Browse Source

HDFS-8766. Implement a libhdfs(3) compatible API. Contributed by James Clampffer.

Haohui Mai 9 years ago
parent
commit
2c8a78d0ee

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt

@@ -51,6 +51,7 @@ include_directories(
   third_party/asio-1.10.2/include
   third_party/gmock-1.7.0
   ${OPENSSL_INCLUDE_DIR}
+  ../libhdfs/include
 )
 
 set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../hadoop-hdfs-client/src/main/proto)

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt

@@ -21,3 +21,4 @@ add_subdirectory(fs)
 add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)
+add_subdirectory(bindings)

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/CMakeLists.txt

@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_subdirectory(c)

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt

@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+add_library(bindings_c hdfs.cc hdfs_cpp.cc)
+add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common)

+ 137 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfs_cpp.h"
+
+#include <cstring>
+#include <iostream>
+
+using namespace hdfs;
+
+/* Seperate the handles used by the C api from the C++ API*/
+struct hdfs_internal {
+  hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {}
+  hdfs_internal(std::unique_ptr<HadoopFileSystem> p)
+      : filesystem_(std::move(p)) {}
+  virtual ~hdfs_internal(){};
+  HadoopFileSystem *get_impl() { return filesystem_.get(); }
+  const HadoopFileSystem *get_impl() const { return filesystem_.get(); }
+
+ private:
+  std::unique_ptr<HadoopFileSystem> filesystem_;
+};
+
+struct hdfsFile_internal {
+  hdfsFile_internal(FileHandle *p) : file_(p) {}
+  hdfsFile_internal(std::unique_ptr<FileHandle> p) : file_(std::move(p)) {}
+  virtual ~hdfsFile_internal(){};
+  FileHandle *get_impl() { return file_.get(); }
+  const FileHandle *get_impl() const { return file_.get(); }
+
+ private:
+  std::unique_ptr<FileHandle> file_;
+};
+
+/* Error handling with optional debug to stderr */
+static void ReportError(int errnum, std::string msg) {
+  errno = errnum;
+#ifdef LIBHDFSPP_C_API_ENABLE_DEBUG
+  std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg
+            << "\"" << std::endl;
+#else
+  (void)msg;
+#endif
+}
+
+/**
+ * C API implementations
+ **/
+
+int hdfsFileIsOpenForRead(hdfsFile file) {
+  /* files can only be open for reads at the moment, do a quick check */
+  if (file) {
+    return file->get_impl()->IsOpenForRead();
+  }
+  return false;
+}
+
+hdfsFS hdfsConnect(const char *nn, tPort port) {
+  HadoopFileSystem *fs = new HadoopFileSystem();
+  Status stat = fs->Connect(nn, port);
+  if (!stat.ok()) {
+    ReportError(ENODEV, "Unable to connect to NameNode.");
+    delete fs;
+    return nullptr;
+  }
+  return new hdfs_internal(fs);
+}
+
+int hdfsDisconnect(hdfsFS fs) {
+  if (!fs) {
+    ReportError(ENODEV, "Cannot disconnect null FS handle.");
+    return -1;
+  }
+
+  delete fs;
+  return 0;
+}
+
+hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
+                      short replication, tSize blocksize) {
+  (void)flags;
+  (void)bufferSize;
+  (void)replication;
+  (void)blocksize;
+  if (!fs) {
+    ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
+    return nullptr;
+  }
+  FileHandle *f = nullptr;
+  Status stat = fs->get_impl()->OpenFileForRead(path, &f);
+  if (!stat.ok()) {
+    return nullptr;
+  }
+  return new hdfsFile_internal(f);
+}
+
+int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
+  if (!fs) {
+    ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
+    return -1;
+  }
+  if (!file) {
+    ReportError(EBADF, "Cannot perform FS operations with null File handle.");
+    return -1;
+  }
+  delete file;
+  return 0;
+}
+
+tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
+                tSize length) {
+  if (!fs) {
+    ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
+    return -1;
+  }
+  if (!file) {
+    ReportError(EBADF, "Cannot perform FS operations with null File handle.");
+    return -1;
+  }
+
+  return file->get_impl()->Pread(buffer, length, position);
+}

+ 157 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfs_cpp.h"
+
+#include <cstdint>
+#include <cerrno>
+#include <string>
+#include <future>
+#include <memory>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include <hdfs/hdfs.h>
+#include "libhdfspp/hdfs.h"
+#include "libhdfspp/status.h"
+#include "fs/filesystem.h"
+#include "common/hdfs_public_api.h"
+
+namespace hdfs {
+
+ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+
+  /* wrap async call with promise/future to make it blocking */
+  size_t read_count = 0;
+  auto callback = [stat, &read_count](const Status &s, const std::string &dn,
+                                      size_t bytes) {
+    (void)dn;
+    stat->set_value(s);
+    read_count = bytes;
+  };
+
+  input_stream_->PositionRead(buf, nbyte, offset, std::set<std::string>(),
+                              callback);
+
+  /* wait for async to finish */
+  auto s = future.get();
+
+  if (!s.ok()) {
+    return -1;
+  }
+  return (ssize_t)read_count;
+}
+
+bool FileHandle::IsOpenForRead() {
+  /* for now just check if InputStream exists */
+  if (!input_stream_) {
+    return false;
+  }
+  return true;
+}
+
+HadoopFileSystem::~HadoopFileSystem() {
+  /**
+   * Note: IoService must be stopped before getting rid of worker threads.
+   * Once worker threads are joined and deleted the service can be deleted.
+   **/
+
+  file_system_.reset(nullptr);
+  service_->Stop();
+  worker_threads_.clear();
+  service_.reset(nullptr);
+}
+
+Status HadoopFileSystem::Connect(const char *nn, tPort port,
+                                 unsigned int threads) {
+  /* IoService::New can return nullptr */
+  if (!service_) {
+    return Status::Error("Null IoService");
+  }
+  /* spawn background threads for asio delegation */
+  for (unsigned int i = 0; i < threads; i++) {
+    AddWorkerThread();
+  }
+  /* synchronized */
+  FileSystem *fs = nullptr;
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat, &fs](const Status &s, FileSystem *f) {
+    fs = f;
+    stat->set_value(s);
+  };
+
+  /* dummy options object until this is hooked up to HDFS-9117 */
+  Options options_object;
+  FileSystem::New(service_.get(), options_object, nn, std::to_string(port),
+                  callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  /* check and see if it worked */
+  if (!fs) {
+    service_->Stop();
+    worker_threads_.clear();
+    return s;
+  }
+
+  file_system_ = std::unique_ptr<FileSystem>(fs);
+  return s;
+}
+
+int HadoopFileSystem::AddWorkerThread() {
+  auto service_task = [](IoService *service) { service->Run(); };
+  worker_threads_.push_back(
+      WorkerPtr(new std::thread(service_task, service_.get())));
+  return worker_threads_.size();
+}
+
+Status HadoopFileSystem::OpenFileForRead(const std::string &path,
+                                         FileHandle **handle) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  /* wrap async FileSystem::Open with promise to make it a blocking call */
+  InputStream *input_stream = nullptr;
+  auto h = [stat, &input_stream](const Status &s, InputStream *is) {
+    stat->set_value(s);
+    input_stream = is;
+  };
+
+  file_system_->Open(path, h);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  if (!s.ok()) {
+    delete input_stream;
+    return s;
+  }
+  if (!input_stream) {
+    return s;
+  }
+
+  *handle = new FileHandle(input_stream);
+  return s;
+}
+}

+ 82 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H
+#define LIBHDFSPP_BINDINGS_HDFSCPP_H
+
+#include <cstdint>
+#include <thread>
+#include <vector>
+
+#include "libhdfspp/hdfs.h"
+#include <hdfs/hdfs.h>
+
+namespace hdfs {
+
+/**
+ * Implement a very simple 'it just works' interface in C++
+ * that provides posix-like file operations + extra stuff for hadoop.
+ * Then provide very thin C wrappers over each method.
+ */
+
+class FileHandle {
+ public:
+  virtual ~FileHandle(){};
+  ssize_t Pread(void *buf, size_t nbyte, off_t offset);
+  bool IsOpenForRead();
+
+ private:
+  /* handle should only be created by fs */
+  friend class HadoopFileSystem;
+  FileHandle(InputStream *is) : input_stream_(is){};
+  std::unique_ptr<InputStream> input_stream_;
+};
+
+class HadoopFileSystem {
+ public:
+  HadoopFileSystem() : service_(IoService::New()) {}
+  virtual ~HadoopFileSystem();
+
+  /* attempt to connect to namenode, return false on failure */
+  Status Connect(const char *nn, tPort port, unsigned int threads = 1);
+
+  /* how many worker threads are servicing asio requests */
+  int WorkerThreadCount() { return worker_threads_.size(); }
+
+  /* add a new thread to handle asio requests, return number of threads in pool
+   */
+  int AddWorkerThread();
+
+  Status OpenFileForRead(const std::string &path, FileHandle **handle);
+
+ private:
+  std::unique_ptr<IoService> service_;
+  /* std::thread needs to join before deletion */
+  struct WorkerDeleter {
+    void operator()(std::thread *t) {
+      t->join();
+      delete t;
+    }
+  };
+  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
+  std::vector<WorkerPtr> worker_threads_;
+  std::unique_ptr<FileSystem> file_system_;
+};
+}
+
+#endif