/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "hdfs/hdfs.h" #include "hdfspp/hdfspp.h" #include #include "x-platform/syscall.h" #include #include #include #include #define TO_STR_HELPER(X) #X #define TO_STR(X) TO_STR_HELPER(X) #define TEST_BLOCK_SIZE 134217728 namespace hdfs { static std::atomic dirnum; static std::atomic filenum; class FSHandle { public: FSHandle() : fs(nullptr) {} FSHandle(FileSystem * fs_in) : fs(fs_in) {} FileSystem * handle() { return fs.get(); } operator FileSystem *() { return fs.get(); } protected: std::shared_ptr fs; }; /** * For tests going through the C API to libhdfs++ */ class HdfsHandle { public: HdfsHandle() : fs(nullptr) { } HdfsHandle(hdfsFS fs_in) : fs(fs_in) { } ~HdfsHandle () { if (fs) { EXPECT_EQ(0, hdfsDisconnect(fs)); } } std::string newDir(const std::string & parent_dir = "/") { int newDirNum = dirnum++; std::string path = parent_dir; if (path.back() != '/') path += "/"; path += "dir" + std::to_string(newDirNum) + "/"; EXPECT_EQ(0, hdfsCreateDirectory(*this, path.c_str())); return path; } std::string newFile(const std::string & dir = "/", size_t size = 1024) { int newFileNum = filenum++; std::string path = dir; if (path.back() != '/') path += "/"; path += "file" + std::to_string(newFileNum); hdfsFile file = hdfsOpenFile(*this, path.c_str(), O_WRONLY, 0, 0, 0); EXPECT_NE(nullptr, file); void * buf = malloc(size); XPlatform::Syscall::ClearBufferSafely(buf, size); EXPECT_EQ(1024, hdfsWrite(*this, file, buf, size)); EXPECT_EQ(0, hdfsCloseFile(*this, file)); free(buf); return path; } std::string newFile(size_t size) { return newFile("/", size); } hdfsFS handle() { return fs; } operator hdfsFS() { return fs; } private: hdfsFS fs; }; class MiniCluster { public: MiniCluster() : io_service(IoService::MakeShared()) { struct NativeMiniDfsConf conf = { 1, /* doFormat */ 0, /* webhdfs */ -1, /* webhdfs port */ 1 /* shortcircuit */ }; clusterInfo = nmdCreate(&conf); EXPECT_NE(nullptr, clusterInfo); EXPECT_EQ(0, nmdWaitClusterUp(clusterInfo)); //TODO: Write some files for tests to read/check } virtual ~MiniCluster() { if (clusterInfo) { EXPECT_EQ(0, nmdShutdown(clusterInfo)); } nmdFree(clusterInfo); } // Connect via the C++ API FSHandle connect(const std::string username) { Options options; unsigned int worker_count = io_service->InitDefaultWorkers(); EXPECT_NE(0, worker_count); FileSystem * fs = FileSystem::New(io_service, username, options); EXPECT_NE(nullptr, fs); FSHandle result(fs); tPort port = (tPort)nmdGetNameNodePort(clusterInfo); EXPECT_NE(0, port); Status status = fs->Connect("localhost", std::to_string(port)); EXPECT_EQ(true, status.ok()); return result; } FSHandle connect() { return connect(""); } // Connect via the C API HdfsHandle connect_c(const std::string & username) { tPort port; hdfsFS hdfs; struct hdfsBuilder *bld; port = (tPort)nmdGetNameNodePort(clusterInfo); bld = hdfsNewBuilder(); EXPECT_NE(nullptr, bld); hdfsBuilderSetForceNewInstance(bld); hdfsBuilderSetNameNode(bld, "localhost"); hdfsBuilderSetNameNodePort(bld, port); hdfsBuilderConfSetStr(bld, "dfs.block.size", TO_STR(TEST_BLOCK_SIZE)); hdfsBuilderConfSetStr(bld, "dfs.blocksize", TO_STR(TEST_BLOCK_SIZE)); if (!username.empty()) { hdfsBuilderSetUserName(bld, username.c_str()); } hdfs = hdfsBuilderConnect(bld); EXPECT_NE(nullptr, hdfs); return HdfsHandle(hdfs); } // Connect via the C API HdfsHandle connect_c() { return connect_c(""); } protected: struct NativeMiniDfsCluster* clusterInfo; std::shared_ptr io_service; }; } // namespace