123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "expect.h"
- #include "hdfs.h"
- #include "native_mini_dfs.h"
- #include <errno.h>
- #include <inttypes.h>
- #include <semaphore.h>
- #include <pthread.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/types.h>
- #define TO_STR_HELPER(X) #X
- #define TO_STR(X) TO_STR_HELPER(X)
- #define TEST_FILE_NAME_LENGTH 128
- #define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
- #define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
- #define TEST_ZEROCOPY_NUM_BLOCKS 6
- #define SMALL_READ_LEN 16
- #define ZC_BUF_LEN 32768
- static uint8_t *getZeroCopyBlockData(int blockIdx)
- {
- uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
- int i;
- if (!buf) {
- fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
- exit(1);
- }
- for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
- buf[i] = blockIdx + (i % 17);
- }
- return buf;
- }
- static int getZeroCopyBlockLen(int blockIdx)
- {
- if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
- return 0;
- } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
- return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
- } else {
- return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
- }
- }
- static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
- static void printBuf(const uint8_t *buf, size_t len)
- {
- size_t i;
- for (i = 0; i < len; i++) {
- fprintf(stderr, "%02x", buf[i]);
- }
- fprintf(stderr, "\n");
- }
- static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
- {
- hdfsFile file = NULL;
- struct hadoopRzOptions *opts = NULL;
- struct hadoopRzBuffer *buffer = NULL;
- uint8_t *block;
- file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
- EXPECT_NONNULL(file);
- opts = hadoopRzOptionsAlloc();
- EXPECT_NONNULL(opts);
- EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
- /* haven't read anything yet */
- EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
- block = getZeroCopyBlockData(0);
- EXPECT_NONNULL(block);
- /* first read is half of a block. */
- buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
- EXPECT_NONNULL(buffer);
- EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
- hadoopRzBufferLength(buffer));
- EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
- hadoopRzBufferFree(file, buffer);
- /* read the next half of the block */
- buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
- EXPECT_NONNULL(buffer);
- EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
- hadoopRzBufferLength(buffer));
- EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
- block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
- TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
- hadoopRzBufferFree(file, buffer);
- free(block);
- EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE));
- /* Now let's read just a few bytes. */
- buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
- EXPECT_NONNULL(buffer);
- EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
- block = getZeroCopyBlockData(1);
- EXPECT_NONNULL(block);
- EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
- hadoopRzBufferFree(file, buffer);
- EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
- hdfsTell(fs, file));
- EXPECT_ZERO(expectFileStats(file,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
- /* Clear 'skip checksums' and test that we can't do zero-copy reads any
- * more. Since there is no ByteBufferPool set, we should fail with
- * EPROTONOSUPPORT.
- */
- EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
- EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
- EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
- /* Verify that setting a NULL ByteBufferPool class works. */
- EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts, NULL));
- EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
- EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
- EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
- /* Now set a ByteBufferPool and try again. It should succeed this time. */
- EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
- ELASTIC_BYTE_BUFFER_POOL_CLASS));
- buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
- EXPECT_NONNULL(buffer);
- EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
- EXPECT_ZERO(expectFileStats(file,
- (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
- (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
- (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
- EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
- TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
- free(block);
- block = getZeroCopyBlockData(2);
- EXPECT_NONNULL(block);
- EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
- (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
- hadoopRzBufferFree(file, buffer);
- free(block);
- hadoopRzOptionsFree(opts);
- EXPECT_ZERO(hdfsCloseFile(fs, file));
- return 0;
- }
- static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
- size_t testFileNameLen)
- {
- int blockIdx, blockLen;
- hdfsFile file;
- uint8_t *data;
- snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
- getpid(), rand());
- file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
- TEST_ZEROCOPY_FULL_BLOCK_SIZE);
- EXPECT_NONNULL(file);
- for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
- blockLen = getZeroCopyBlockLen(blockIdx);
- data = getZeroCopyBlockData(blockIdx);
- EXPECT_NONNULL(data);
- EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
- }
- EXPECT_ZERO(hdfsCloseFile(fs, file));
- return 0;
- }
- /**
- * Test that we can write a file with libhdfs and then read it back
- */
- int main(void)
- {
- int port;
- struct NativeMiniDfsConf conf = {
- .doFormat = 1,
- .configureShortCircuit = 1,
- };
- char testFileName[TEST_FILE_NAME_LENGTH];
- hdfsFS fs;
- struct NativeMiniDfsCluster* cl;
- struct hdfsBuilder *bld;
- cl = nmdCreate(&conf);
- EXPECT_NONNULL(cl);
- EXPECT_ZERO(nmdWaitClusterUp(cl));
- port = nmdGetNameNodePort(cl);
- if (port < 0) {
- fprintf(stderr, "TEST_ERROR: test_zerocopy: "
- "nmdGetNameNodePort returned error %d\n", port);
- return EXIT_FAILURE;
- }
- bld = hdfsNewBuilder();
- EXPECT_NONNULL(bld);
- EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
- hdfsBuilderSetForceNewInstance(bld);
- hdfsBuilderConfSetStr(bld, "dfs.block.size",
- TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
- /* ensure that we'll always get our mmaps */
- hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
- "true");
- fs = hdfsBuilderConnect(bld);
- EXPECT_NONNULL(fs);
- EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
- TEST_FILE_NAME_LENGTH));
- EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
- EXPECT_ZERO(hdfsDisconnect(fs));
- EXPECT_ZERO(nmdShutdown(cl));
- nmdFree(cl);
- fprintf(stderr, "TEST_SUCCESS\n");
- return EXIT_SUCCESS;
- }
|