test_libhdfs_zerocopy.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "expect.h"
  19. #include "hdfs.h"
  20. #include "native_mini_dfs.h"
  21. #include <errno.h>
  22. #include <inttypes.h>
  23. #include <semaphore.h>
  24. #include <pthread.h>
  25. #include <unistd.h>
  26. #include <stdio.h>
  27. #include <stdlib.h>
  28. #include <string.h>
  29. #include <sys/types.h>
  30. #define TO_STR_HELPER(X) #X
  31. #define TO_STR(X) TO_STR_HELPER(X)
  32. #define TEST_FILE_NAME_LENGTH 128
  33. #define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
  34. #define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
  35. #define TEST_ZEROCOPY_NUM_BLOCKS 6
  36. #define SMALL_READ_LEN 16
  37. #define ZC_BUF_LEN 32768
  38. static uint8_t *getZeroCopyBlockData(int blockIdx)
  39. {
  40. uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
  41. int i;
  42. if (!buf) {
  43. fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
  44. exit(1);
  45. }
  46. for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
  47. buf[i] = blockIdx + (i % 17);
  48. }
  49. return buf;
  50. }
  51. static int getZeroCopyBlockLen(int blockIdx)
  52. {
  53. if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
  54. return 0;
  55. } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
  56. return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
  57. } else {
  58. return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
  59. }
  60. }
  61. static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
  62. static void printBuf(const uint8_t *buf, size_t len)
  63. {
  64. size_t i;
  65. for (i = 0; i < len; i++) {
  66. fprintf(stderr, "%02x", buf[i]);
  67. }
  68. fprintf(stderr, "\n");
  69. }
  70. static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
  71. {
  72. hdfsFile file = NULL;
  73. struct hadoopRzOptions *opts = NULL;
  74. struct hadoopRzBuffer *buffer = NULL;
  75. uint8_t *block;
  76. file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
  77. EXPECT_NONNULL(file);
  78. opts = hadoopRzOptionsAlloc();
  79. EXPECT_NONNULL(opts);
  80. EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
  81. /* haven't read anything yet */
  82. EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
  83. block = getZeroCopyBlockData(0);
  84. EXPECT_NONNULL(block);
  85. /* first read is half of a block. */
  86. buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
  87. EXPECT_NONNULL(buffer);
  88. EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
  89. hadoopRzBufferLength(buffer));
  90. EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
  91. TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
  92. hadoopRzBufferFree(file, buffer);
  93. /* read the next half of the block */
  94. buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
  95. EXPECT_NONNULL(buffer);
  96. EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
  97. hadoopRzBufferLength(buffer));
  98. EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
  99. block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
  100. TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
  101. hadoopRzBufferFree(file, buffer);
  102. free(block);
  103. EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
  104. TEST_ZEROCOPY_FULL_BLOCK_SIZE,
  105. TEST_ZEROCOPY_FULL_BLOCK_SIZE,
  106. TEST_ZEROCOPY_FULL_BLOCK_SIZE));
  107. /* Now let's read just a few bytes. */
  108. buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
  109. EXPECT_NONNULL(buffer);
  110. EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
  111. block = getZeroCopyBlockData(1);
  112. EXPECT_NONNULL(block);
  113. EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
  114. hadoopRzBufferFree(file, buffer);
  115. EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
  116. hdfsTell(fs, file));
  117. EXPECT_ZERO(expectFileStats(file,
  118. TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
  119. TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
  120. TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
  121. TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
  122. /* Clear 'skip checksums' and test that we can't do zero-copy reads any
  123. * more. Since there is no ByteBufferPool set, we should fail with
  124. * EPROTONOSUPPORT.
  125. */
  126. EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
  127. EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
  128. EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
  129. /* Verify that setting a NULL ByteBufferPool class works. */
  130. EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts, NULL));
  131. EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
  132. EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
  133. EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
  134. /* Now set a ByteBufferPool and try again. It should succeed this time. */
  135. EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
  136. ELASTIC_BYTE_BUFFER_POOL_CLASS));
  137. buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
  138. EXPECT_NONNULL(buffer);
  139. EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
  140. EXPECT_ZERO(expectFileStats(file,
  141. (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
  142. (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
  143. (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
  144. TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
  145. EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
  146. TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
  147. free(block);
  148. block = getZeroCopyBlockData(2);
  149. EXPECT_NONNULL(block);
  150. EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
  151. (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
  152. hadoopRzBufferFree(file, buffer);
  153. free(block);
  154. hadoopRzOptionsFree(opts);
  155. EXPECT_ZERO(hdfsCloseFile(fs, file));
  156. return 0;
  157. }
  158. static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
  159. size_t testFileNameLen)
  160. {
  161. int blockIdx, blockLen;
  162. hdfsFile file;
  163. uint8_t *data;
  164. snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
  165. getpid(), rand());
  166. file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
  167. TEST_ZEROCOPY_FULL_BLOCK_SIZE);
  168. EXPECT_NONNULL(file);
  169. for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
  170. blockLen = getZeroCopyBlockLen(blockIdx);
  171. data = getZeroCopyBlockData(blockIdx);
  172. EXPECT_NONNULL(data);
  173. EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
  174. }
  175. EXPECT_ZERO(hdfsCloseFile(fs, file));
  176. return 0;
  177. }
  178. /**
  179. * Test that we can write a file with libhdfs and then read it back
  180. */
  181. int main(void)
  182. {
  183. int port;
  184. struct NativeMiniDfsConf conf = {
  185. .doFormat = 1,
  186. .configureShortCircuit = 1,
  187. };
  188. char testFileName[TEST_FILE_NAME_LENGTH];
  189. hdfsFS fs;
  190. struct NativeMiniDfsCluster* cl;
  191. struct hdfsBuilder *bld;
  192. cl = nmdCreate(&conf);
  193. EXPECT_NONNULL(cl);
  194. EXPECT_ZERO(nmdWaitClusterUp(cl));
  195. port = nmdGetNameNodePort(cl);
  196. if (port < 0) {
  197. fprintf(stderr, "TEST_ERROR: test_zerocopy: "
  198. "nmdGetNameNodePort returned error %d\n", port);
  199. return EXIT_FAILURE;
  200. }
  201. bld = hdfsNewBuilder();
  202. EXPECT_NONNULL(bld);
  203. EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
  204. hdfsBuilderSetForceNewInstance(bld);
  205. hdfsBuilderConfSetStr(bld, "dfs.block.size",
  206. TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
  207. /* ensure that we'll always get our mmaps */
  208. hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
  209. "true");
  210. fs = hdfsBuilderConnect(bld);
  211. EXPECT_NONNULL(fs);
  212. EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
  213. TEST_FILE_NAME_LENGTH));
  214. EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
  215. EXPECT_ZERO(hdfsDisconnect(fs));
  216. EXPECT_ZERO(nmdShutdown(cl));
  217. nmdFree(cl);
  218. fprintf(stderr, "TEST_SUCCESS\n");
  219. return EXIT_SUCCESS;
  220. }