hdfspp_mini_dfs.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hdfs/hdfs.h"
  19. #include "hdfspp/hdfspp.h"
  20. #include <native_mini_dfs.h>
  21. #include "x-platform/syscall.h"
  22. #include <google/protobuf/io/coded_stream.h>
  23. #include <gmock/gmock.h>
  24. #include <string>
  25. #include <atomic>
  26. #define TO_STR_HELPER(X) #X
  27. #define TO_STR(X) TO_STR_HELPER(X)
  28. #define TEST_BLOCK_SIZE 134217728
  29. namespace hdfs {
  30. static std::atomic<int> dirnum;
  31. static std::atomic<int> filenum;
  32. class FSHandle {
  33. public:
  34. FSHandle() : fs(nullptr) {}
  35. FSHandle(FileSystem * fs_in) : fs(fs_in) {}
  36. FileSystem * handle() { return fs.get(); }
  37. operator FileSystem *() { return fs.get(); }
  38. protected:
  39. std::shared_ptr<FileSystem> fs;
  40. };
  41. /**
  42. * For tests going through the C API to libhdfs++
  43. */
  44. class HdfsHandle {
  45. public:
  46. HdfsHandle() : fs(nullptr) {
  47. }
  48. HdfsHandle(hdfsFS fs_in) : fs(fs_in) {
  49. }
  50. ~HdfsHandle () {
  51. if (fs) {
  52. EXPECT_EQ(0, hdfsDisconnect(fs));
  53. }
  54. }
  55. std::string newDir(const std::string & parent_dir = "/") {
  56. int newDirNum = dirnum++;
  57. std::string path = parent_dir;
  58. if (path.back() != '/')
  59. path += "/";
  60. path += "dir" + std::to_string(newDirNum) + "/";
  61. EXPECT_EQ(0, hdfsCreateDirectory(*this, path.c_str()));
  62. return path;
  63. }
  64. std::string newFile(const std::string & dir = "/", size_t size = 1024) {
  65. int newFileNum = filenum++;
  66. std::string path = dir;
  67. if (path.back() != '/')
  68. path += "/";
  69. path += "file" + std::to_string(newFileNum);
  70. hdfsFile file = hdfsOpenFile(*this, path.c_str(), O_WRONLY, 0, 0, 0);
  71. EXPECT_NE(nullptr, file);
  72. void * buf = malloc(size);
  73. XPlatform::Syscall::ClearBufferSafely(buf, size);
  74. EXPECT_EQ(1024, hdfsWrite(*this, file, buf, size));
  75. EXPECT_EQ(0, hdfsCloseFile(*this, file));
  76. free(buf);
  77. return path;
  78. }
  79. std::string newFile(size_t size) {
  80. return newFile("/", size);
  81. }
  82. hdfsFS handle() { return fs; }
  83. operator hdfsFS() { return fs; }
  84. private:
  85. hdfsFS fs;
  86. };
  87. class MiniCluster {
  88. public:
  89. MiniCluster() : io_service(IoService::MakeShared()) {
  90. struct NativeMiniDfsConf conf = {
  91. 1, /* doFormat */
  92. 0, /* webhdfs */
  93. -1, /* webhdfs port */
  94. 1 /* shortcircuit */
  95. };
  96. clusterInfo = nmdCreate(&conf);
  97. EXPECT_NE(nullptr, clusterInfo);
  98. EXPECT_EQ(0, nmdWaitClusterUp(clusterInfo));
  99. //TODO: Write some files for tests to read/check
  100. }
  101. virtual ~MiniCluster() {
  102. if (clusterInfo) {
  103. EXPECT_EQ(0, nmdShutdown(clusterInfo));
  104. }
  105. nmdFree(clusterInfo);
  106. }
  107. // Connect via the C++ API
  108. FSHandle connect(const std::string username) {
  109. Options options;
  110. unsigned int worker_count = io_service->InitDefaultWorkers();
  111. EXPECT_NE(0, worker_count);
  112. FileSystem * fs = FileSystem::New(io_service, username, options);
  113. EXPECT_NE(nullptr, fs);
  114. FSHandle result(fs);
  115. tPort port = (tPort)nmdGetNameNodePort(clusterInfo);
  116. EXPECT_NE(0, port);
  117. Status status = fs->Connect("localhost", std::to_string(port));
  118. EXPECT_EQ(true, status.ok());
  119. return result;
  120. }
  121. FSHandle connect() {
  122. return connect("");
  123. }
  124. // Connect via the C API
  125. HdfsHandle connect_c(const std::string & username) {
  126. tPort port;
  127. hdfsFS hdfs;
  128. struct hdfsBuilder *bld;
  129. port = (tPort)nmdGetNameNodePort(clusterInfo);
  130. bld = hdfsNewBuilder();
  131. EXPECT_NE(nullptr, bld);
  132. hdfsBuilderSetForceNewInstance(bld);
  133. hdfsBuilderSetNameNode(bld, "localhost");
  134. hdfsBuilderSetNameNodePort(bld, port);
  135. hdfsBuilderConfSetStr(bld, "dfs.block.size",
  136. TO_STR(TEST_BLOCK_SIZE));
  137. hdfsBuilderConfSetStr(bld, "dfs.blocksize",
  138. TO_STR(TEST_BLOCK_SIZE));
  139. if (!username.empty()) {
  140. hdfsBuilderSetUserName(bld, username.c_str());
  141. }
  142. hdfs = hdfsBuilderConnect(bld);
  143. EXPECT_NE(nullptr, hdfs);
  144. return HdfsHandle(hdfs);
  145. }
  146. // Connect via the C API
  147. HdfsHandle connect_c() {
  148. return connect_c("");
  149. }
  150. protected:
  151. struct NativeMiniDfsCluster* clusterInfo;
  152. std::shared_ptr<IoService> io_service;
  153. };
  154. } // namespace