123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "hdfs/hdfs.h"
- #include "hdfspp/hdfspp.h"
- #include <native_mini_dfs.h>
- #include "x-platform/syscall.h"
- #include <google/protobuf/io/coded_stream.h>
- #include <gmock/gmock.h>
- #include <string>
- #include <atomic>
- #define TO_STR_HELPER(X) #X
- #define TO_STR(X) TO_STR_HELPER(X)
- #define TEST_BLOCK_SIZE 134217728
- namespace hdfs {
- static std::atomic<int> dirnum;
- static std::atomic<int> filenum;
- class FSHandle {
- public:
- FSHandle() : fs(nullptr) {}
- FSHandle(FileSystem * fs_in) : fs(fs_in) {}
- FileSystem * handle() { return fs.get(); }
- operator FileSystem *() { return fs.get(); }
- protected:
- std::shared_ptr<FileSystem> fs;
- };
- /**
- * For tests going through the C API to libhdfs++
- */
- class HdfsHandle {
- public:
- HdfsHandle() : fs(nullptr) {
- }
- HdfsHandle(hdfsFS fs_in) : fs(fs_in) {
- }
- ~HdfsHandle () {
- if (fs) {
- EXPECT_EQ(0, hdfsDisconnect(fs));
- }
- }
- std::string newDir(const std::string & parent_dir = "/") {
- int newDirNum = dirnum++;
- std::string path = parent_dir;
- if (path.back() != '/')
- path += "/";
- path += "dir" + std::to_string(newDirNum) + "/";
- EXPECT_EQ(0, hdfsCreateDirectory(*this, path.c_str()));
- return path;
- }
- std::string newFile(const std::string & dir = "/", size_t size = 1024) {
- int newFileNum = filenum++;
- std::string path = dir;
- if (path.back() != '/')
- path += "/";
- path += "file" + std::to_string(newFileNum);
- hdfsFile file = hdfsOpenFile(*this, path.c_str(), O_WRONLY, 0, 0, 0);
- EXPECT_NE(nullptr, file);
- void * buf = malloc(size);
- XPlatform::Syscall::ClearBufferSafely(buf, size);
- EXPECT_EQ(1024, hdfsWrite(*this, file, buf, size));
- EXPECT_EQ(0, hdfsCloseFile(*this, file));
- free(buf);
- return path;
- }
- std::string newFile(size_t size) {
- return newFile("/", size);
- }
- hdfsFS handle() { return fs; }
- operator hdfsFS() { return fs; }
- private:
- hdfsFS fs;
- };
- class MiniCluster {
- public:
- MiniCluster() : io_service(IoService::MakeShared()) {
- struct NativeMiniDfsConf conf = {
- 1, /* doFormat */
- 0, /* webhdfs */
- -1, /* webhdfs port */
- 1 /* shortcircuit */
- };
- clusterInfo = nmdCreate(&conf);
- EXPECT_NE(nullptr, clusterInfo);
- EXPECT_EQ(0, nmdWaitClusterUp(clusterInfo));
- //TODO: Write some files for tests to read/check
- }
- virtual ~MiniCluster() {
- if (clusterInfo) {
- EXPECT_EQ(0, nmdShutdown(clusterInfo));
- }
- nmdFree(clusterInfo);
- }
- // Connect via the C++ API
- FSHandle connect(const std::string username) {
- Options options;
- unsigned int worker_count = io_service->InitDefaultWorkers();
- EXPECT_NE(0, worker_count);
- FileSystem * fs = FileSystem::New(io_service, username, options);
- EXPECT_NE(nullptr, fs);
- FSHandle result(fs);
- tPort port = (tPort)nmdGetNameNodePort(clusterInfo);
- EXPECT_NE(0, port);
- Status status = fs->Connect("localhost", std::to_string(port));
- EXPECT_EQ(true, status.ok());
- return result;
- }
- FSHandle connect() {
- return connect("");
- }
- // Connect via the C API
- HdfsHandle connect_c(const std::string & username) {
- tPort port;
- hdfsFS hdfs;
- struct hdfsBuilder *bld;
- port = (tPort)nmdGetNameNodePort(clusterInfo);
- bld = hdfsNewBuilder();
- EXPECT_NE(nullptr, bld);
- hdfsBuilderSetForceNewInstance(bld);
- hdfsBuilderSetNameNode(bld, "localhost");
- hdfsBuilderSetNameNodePort(bld, port);
- hdfsBuilderConfSetStr(bld, "dfs.block.size",
- TO_STR(TEST_BLOCK_SIZE));
- hdfsBuilderConfSetStr(bld, "dfs.blocksize",
- TO_STR(TEST_BLOCK_SIZE));
- if (!username.empty()) {
- hdfsBuilderSetUserName(bld, username.c_str());
- }
- hdfs = hdfsBuilderConnect(bld);
- EXPECT_NE(nullptr, hdfs);
- return HdfsHandle(hdfs);
- }
- // Connect via the C API
- HdfsHandle connect_c() {
- return connect_c("");
- }
- protected:
- struct NativeMiniDfsCluster* clusterInfo;
- std::shared_ptr<IoService> io_service;
- };
- } // namespace
|