/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __MACH__ // OS X does not have clock_gettime #include #include #include #endif #include "config.h" #include "hdfs.h" #define VECSUM_CHUNK_SIZE (8 * 1024 * 1024) #define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8) #define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024) #define DOUBLES_PER_LOOP_ITER 16 static double timespec_to_double(const struct timespec *ts) { double sec = ts->tv_sec; double nsec = ts->tv_nsec; return sec + (nsec / 1000000000L); } struct stopwatch { struct timespec start; struct timespec stop; }; #ifdef __MACH__ static int clock_gettime_mono(struct timespec * ts) { static mach_timebase_info_data_t tb; static uint64_t timestart = 0; uint64_t t = 0; if (timestart == 0) { mach_timebase_info(&tb); timestart = mach_absolute_time(); } t = mach_absolute_time() - timestart; t *= tb.numer; t /= tb.denom; ts->tv_sec = t / 1000000000ULL; ts->tv_nsec = t - (ts->tv_sec * 1000000000ULL); return 0; } #else static int clock_gettime_mono(struct timespec * ts) { return clock_gettime(CLOCK_MONOTONIC, ts); } #endif static struct stopwatch *stopwatch_create(void) { struct stopwatch *watch; watch = calloc(1, sizeof(struct stopwatch)); if (!watch) { fprintf(stderr, "failed to allocate memory for stopwatch\n"); goto error; } if (clock_gettime_mono(&watch->start)) { int err = errno; fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with " "error %d (%s)\n", err, strerror(err)); goto error; } return watch; error: free(watch); return NULL; } static void stopwatch_stop(struct stopwatch *watch, long long bytes_read) { double elapsed, rate; if (clock_gettime_mono(&watch->stop)) { int err = errno; fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with " "error %d (%s)\n", err, strerror(err)); goto done; } elapsed = timespec_to_double(&watch->stop) - timespec_to_double(&watch->start); rate = (bytes_read / elapsed) / (1024 * 1024 * 1024); printf("stopwatch: took %.5g seconds to read %lld bytes, " "for %.5g GB/s\n", elapsed, bytes_read, rate); printf("stopwatch: %.5g seconds\n", elapsed); done: free(watch); } enum vecsum_type { VECSUM_LOCAL = 0, VECSUM_LIBHDFS, VECSUM_ZCR, }; #define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local" int parse_vecsum_type(const char *str) { if (strcasecmp(str, "local") == 0) return VECSUM_LOCAL; else if (strcasecmp(str, "libhdfs") == 0) return VECSUM_LIBHDFS; else if (strcasecmp(str, "zcr") == 0) return VECSUM_ZCR; else return -1; } struct options { // The path to read. const char *path; // Length of the file. long long length; // The number of times to read the path. int passes; // Type of vecsum to do enum vecsum_type ty; // RPC address to use for HDFS const char *rpc_address; }; static struct options *options_create(void) { struct options *opts = NULL; const char *pass_str; const char *ty_str; const char *length_str; int ty; opts = calloc(1, sizeof(struct options)); if (!opts) { fprintf(stderr, "failed to calloc options\n"); goto error; } opts->path = getenv("VECSUM_PATH"); if (!opts->path) { fprintf(stderr, "You must set the VECSUM_PATH environment " "variable to the path of the file to read.\n"); goto error; } length_str = getenv("VECSUM_LENGTH"); if (!length_str) { length_str = "2147483648"; } opts->length = atoll(length_str); if (!opts->length) { fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n", length_str); goto error; } if (opts->length % VECSUM_CHUNK_SIZE) { fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'. The " "currently specified length of '%lld' is not.\n", (long long)VECSUM_CHUNK_SIZE, (long long)opts->length); goto error; } pass_str = getenv("VECSUM_PASSES"); if (!pass_str) { fprintf(stderr, "You must set the VECSUM_PASSES environment " "variable to the number of passes to make.\n"); goto error; } opts->passes = atoi(pass_str); if (opts->passes <= 0) { fprintf(stderr, "Invalid value for the VECSUM_PASSES " "environment variable. You must set this to a " "number greater than 0.\n"); goto error; } ty_str = getenv("VECSUM_TYPE"); if (!ty_str) { fprintf(stderr, "You must set the VECSUM_TYPE environment " "variable to " VECSUM_TYPE_VALID_VALUES "\n"); goto error; } ty = parse_vecsum_type(ty_str); if (ty < 0) { fprintf(stderr, "Invalid VECSUM_TYPE environment variable. " "Valid values are " VECSUM_TYPE_VALID_VALUES "\n"); goto error; } opts->ty = ty; opts->rpc_address = getenv("VECSUM_RPC_ADDRESS"); if (!opts->rpc_address) { opts->rpc_address = "default"; } return opts; error: free(opts); return NULL; } static int test_file_chunk_setup(double **chunk) { int i; double *c, val; c = malloc(VECSUM_CHUNK_SIZE); if (!c) { fprintf(stderr, "test_file_create: failed to malloc " "a buffer of size '%lld'\n", (long long) VECSUM_CHUNK_SIZE); return EIO; } val = 0.0; for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) { c[i] = val; val += 0.5; } *chunk = c; return 0; } static void options_free(struct options *opts) { free(opts); } struct local_data { int fd; double *mmap; long long length; }; static int local_data_create_file(struct local_data *cdata, const struct options *opts) { int ret = EIO; int dup_fd = -1; FILE *fp = NULL; double *chunk = NULL; long long offset = 0; dup_fd = dup(cdata->fd); if (dup_fd < 0) { ret = errno; fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n", strerror(ret), ret); goto done; } fp = fdopen(dup_fd, "w"); if (!fp) { ret = errno; fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n", strerror(ret), ret); goto done; } ret = test_file_chunk_setup(&chunk); if (ret) goto done; while (offset < opts->length) { if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) { fprintf(stderr, "local_data_create_file: failed to write to " "the local file '%s' at offset %lld\n", opts->path, offset); ret = EIO; goto done; } offset += VECSUM_CHUNK_SIZE; } fprintf(stderr, "local_data_create_file: successfully re-wrote %s as " "a file of length %lld\n", opts->path, opts->length); ret = 0; done: if (dup_fd >= 0) { close(dup_fd); } if (fp) { fclose(fp); } free(chunk); return ret; } static struct local_data *local_data_create(const struct options *opts) { struct local_data *cdata = NULL; struct stat st_buf; cdata = malloc(sizeof(*cdata)); if (!cdata) { fprintf(stderr, "Failed to allocate local test data.\n"); goto error; } cdata->fd = -1; cdata->mmap = MAP_FAILED; cdata->length = opts->length; cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777); if (cdata->fd < 0) { int err = errno; fprintf(stderr, "local_data_create: failed to open %s " "for read/write: error %d (%s)\n", opts->path, err, strerror(err)); goto error; } if (fstat(cdata->fd, &st_buf)) { int err = errno; fprintf(stderr, "local_data_create: fstat(%s) failed: " "error %d (%s)\n", opts->path, err, strerror(err)); goto error; } if (st_buf.st_size != opts->length) { int err; fprintf(stderr, "local_data_create: current size of %s is %lld, but " "we want %lld. Re-writing the file.\n", opts->path, (long long)st_buf.st_size, (long long)opts->length); err = local_data_create_file(cdata, opts); if (err) goto error; } cdata->mmap = mmap(NULL, cdata->length, PROT_READ, MAP_PRIVATE, cdata->fd, 0); if (cdata->mmap == MAP_FAILED) { int err = errno; fprintf(stderr, "local_data_create: mmap(%s) failed: " "error %d (%s)\n", opts->path, err, strerror(err)); goto error; } return cdata; error: if (cdata) { if (cdata->fd >= 0) { close(cdata->fd); } free(cdata); } return NULL; } static void local_data_free(struct local_data *cdata) { close(cdata->fd); munmap(cdata->mmap, cdata->length); } struct libhdfs_data { hdfsFS fs; hdfsFile file; long long length; double *buf; }; static void libhdfs_data_free(struct libhdfs_data *ldata) { if (ldata->fs) { free(ldata->buf); if (ldata->file) { hdfsCloseFile(ldata->fs, ldata->file); } hdfsDisconnect(ldata->fs); } free(ldata); } static int libhdfs_data_create_file(struct libhdfs_data *ldata, const struct options *opts) { int ret; double *chunk = NULL; long long offset = 0; ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0); if (!ldata->file) { ret = errno; fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, " "O_WRONLY) failed: error %d (%s)\n", opts->path, ret, strerror(ret)); goto done; } ret = test_file_chunk_setup(&chunk); if (ret) goto done; while (offset < opts->length) { ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE); if (ret < 0) { ret = errno; fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at " "offset %lld of %s\n", ret, strerror(ret), offset, opts->path); goto done; } else if (ret < VECSUM_CHUNK_SIZE) { fprintf(stderr, "libhdfs_data_create_file: got short write " "of %d at offset %lld of %s\n", ret, offset, opts->path); goto done; } offset += VECSUM_CHUNK_SIZE; } ret = 0; done: free(chunk); if (ldata->file) { if (hdfsCloseFile(ldata->fs, ldata->file)) { fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error."); ret = EIO; } ldata->file = NULL; } return ret; } static struct libhdfs_data *libhdfs_data_create(const struct options *opts) { struct libhdfs_data *ldata = NULL; struct hdfsBuilder *builder = NULL; hdfsFileInfo *pinfo = NULL; ldata = calloc(1, sizeof(struct libhdfs_data)); if (!ldata) { fprintf(stderr, "Failed to allocate libhdfs test data.\n"); goto error; } builder = hdfsNewBuilder(); if (!builder) { fprintf(stderr, "Failed to create builder.\n"); goto error; } hdfsBuilderSetNameNode(builder, opts->rpc_address); hdfsBuilderConfSetStr(builder, "dfs.client.read.shortcircuit.skip.checksum", "true"); ldata->fs = hdfsBuilderConnect(builder); if (!ldata->fs) { fprintf(stderr, "Could not connect to default namenode!\n"); goto error; } pinfo = hdfsGetPathInfo(ldata->fs, opts->path); if (!pinfo) { int err = errno; fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s). " "Attempting to re-create file.\n", opts->path, err, strerror(err)); if (libhdfs_data_create_file(ldata, opts)) goto error; } else if (pinfo->mSize != opts->length) { fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, " "but we want length %lld. Attempting to re-create file.\n", opts->path, (long long)pinfo->mSize, (long long)opts->length); if (libhdfs_data_create_file(ldata, opts)) goto error; } ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0); if (!ldata->file) { int err = errno; fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n", opts->path, err, strerror(err)); goto error; } ldata->length = opts->length; return ldata; error: if (pinfo) hdfsFreeFileInfo(pinfo, 1); if (ldata) libhdfs_data_free(ldata); return NULL; } static int check_byte_size(int byte_size, const char *const str) { if (byte_size % sizeof(double)) { fprintf(stderr, "%s is not a multiple " "of sizeof(double)\n", str); return EINVAL; } if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) { fprintf(stderr, "The number of doubles contained in " "%s is not a multiple of DOUBLES_PER_LOOP_ITER\n", str); return EINVAL; } return 0; } #ifdef HAVE_INTEL_SSE_INTRINSICS #include static double vecsum(const double *buf, int num_doubles) { int i; double hi, lo; __m128d x0, x1, x2, x3, x4, x5, x6, x7; __m128d sum0 = _mm_set_pd(0.0,0.0); __m128d sum1 = _mm_set_pd(0.0,0.0); __m128d sum2 = _mm_set_pd(0.0,0.0); __m128d sum3 = _mm_set_pd(0.0,0.0); __m128d sum4 = _mm_set_pd(0.0,0.0); __m128d sum5 = _mm_set_pd(0.0,0.0); __m128d sum6 = _mm_set_pd(0.0,0.0); __m128d sum7 = _mm_set_pd(0.0,0.0); for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) { x0 = _mm_load_pd(buf + i + 0); x1 = _mm_load_pd(buf + i + 2); x2 = _mm_load_pd(buf + i + 4); x3 = _mm_load_pd(buf + i + 6); x4 = _mm_load_pd(buf + i + 8); x5 = _mm_load_pd(buf + i + 10); x6 = _mm_load_pd(buf + i + 12); x7 = _mm_load_pd(buf + i + 14); sum0 = _mm_add_pd(sum0, x0); sum1 = _mm_add_pd(sum1, x1); sum2 = _mm_add_pd(sum2, x2); sum3 = _mm_add_pd(sum3, x3); sum4 = _mm_add_pd(sum4, x4); sum5 = _mm_add_pd(sum5, x5); sum6 = _mm_add_pd(sum6, x6); sum7 = _mm_add_pd(sum7, x7); } x0 = _mm_add_pd(sum0, sum1); x1 = _mm_add_pd(sum2, sum3); x2 = _mm_add_pd(sum4, sum5); x3 = _mm_add_pd(sum6, sum7); x4 = _mm_add_pd(x0, x1); x5 = _mm_add_pd(x2, x3); x6 = _mm_add_pd(x4, x5); _mm_storeh_pd(&hi, x6); _mm_storel_pd(&lo, x6); return hi + lo; } #else static double vecsum(const double *buf, int num_doubles) { int i; double sum = 0.0; for (i = 0; i < num_doubles; i++) { sum += buf[i]; } return sum; } #endif static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata, struct hadoopRzOptions *zopts, const struct options *opts) { int32_t len; double sum = 0.0; const double *buf; struct hadoopRzBuffer *rzbuf = NULL; int ret; while (1) { rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE); if (!rzbuf) { ret = errno; fprintf(stderr, "hadoopReadZero failed with error " "code %d (%s)\n", ret, strerror(ret)); goto done; } buf = hadoopRzBufferGet(rzbuf); if (!buf) break; len = hadoopRzBufferLength(rzbuf); if (len < ZCR_READ_CHUNK_SIZE) { fprintf(stderr, "hadoopReadZero got a partial read " "of length %d\n", len); ret = EINVAL; goto done; } sum += vecsum(buf, ZCR_READ_CHUNK_SIZE / sizeof(double)); hadoopRzBufferFree(ldata->file, rzbuf); } printf("finished zcr pass %d. sum = %g\n", pass, sum); ret = 0; done: if (rzbuf) hadoopRzBufferFree(ldata->file, rzbuf); return ret; } static int vecsum_zcr(struct libhdfs_data *ldata, const struct options *opts) { int ret, pass; struct hadoopRzOptions *zopts = NULL; zopts = hadoopRzOptionsAlloc(); if (!zopts) { fprintf(stderr, "hadoopRzOptionsAlloc failed.\n"); ret = ENOMEM; goto done; } if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) { ret = errno; perror("hadoopRzOptionsSetSkipChecksum failed: "); goto done; } if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) { ret = errno; perror("hadoopRzOptionsSetByteBufferPool failed: "); goto done; } for (pass = 0; pass < opts->passes; ++pass) { ret = vecsum_zcr_loop(pass, ldata, zopts, opts); if (ret) { fprintf(stderr, "vecsum_zcr_loop pass %d failed " "with error %d\n", pass, ret); goto done; } hdfsSeek(ldata->fs, ldata->file, 0); } ret = 0; done: if (zopts) hadoopRzOptionsFree(zopts); return ret; } tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { uint8_t *buf = buffer; tSize ret, nread = 0; while (length > 0) { ret = hdfsRead(fs, f, buf, length); if (ret < 0) { if (errno != EINTR) { return -1; } } if (ret == 0) { break; } nread += ret; length -= ret; buf += ret; } return nread; } static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata, const struct options *opts) { double sum = 0.0; while (1) { int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf, NORMAL_READ_CHUNK_SIZE); if (res == 0) // EOF break; if (res < 0) { int err = errno; fprintf(stderr, "hdfsRead failed with error %d (%s)\n", err, strerror(err)); return err; } if (res < NORMAL_READ_CHUNK_SIZE) { fprintf(stderr, "hdfsRead got a partial read of " "length %d\n", res); return EINVAL; } sum += vecsum(ldata->buf, NORMAL_READ_CHUNK_SIZE / sizeof(double)); } printf("finished normal pass %d. sum = %g\n", pass, sum); return 0; } static int vecsum_libhdfs(struct libhdfs_data *ldata, const struct options *opts) { int pass; ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE); if (!ldata->buf) { fprintf(stderr, "failed to malloc buffer of size %d\n", NORMAL_READ_CHUNK_SIZE); return ENOMEM; } for (pass = 0; pass < opts->passes; ++pass) { int ret = vecsum_normal_loop(pass, ldata, opts); if (ret) { fprintf(stderr, "vecsum_normal_loop pass %d failed " "with error %d\n", pass, ret); return ret; } hdfsSeek(ldata->fs, ldata->file, 0); } return 0; } static void vecsum_local(struct local_data *cdata, const struct options *opts) { int pass; for (pass = 0; pass < opts->passes; pass++) { double sum = vecsum(cdata->mmap, cdata->length / sizeof(double)); printf("finished vecsum_local pass %d. sum = %g\n", pass, sum); } } static long long vecsum_length(const struct options *opts, const struct libhdfs_data *ldata) { if (opts->ty == VECSUM_LOCAL) { struct stat st_buf = { 0 }; if (stat(opts->path, &st_buf)) { int err = errno; fprintf(stderr, "vecsum_length: stat(%s) failed: " "error %d (%s)\n", opts->path, err, strerror(err)); return -EIO; } return st_buf.st_size; } else { return ldata->length; } } /* * vecsum is a microbenchmark which measures the speed of various ways of * reading from HDFS. It creates a file containing floating-point 'doubles', * and computes the sum of all the doubles several times. For some CPUs, * assembly optimizations are used for the summation (SSE, etc). */ int main(void) { int ret = 1; struct options *opts = NULL; struct local_data *cdata = NULL; struct libhdfs_data *ldata = NULL; struct stopwatch *watch = NULL; if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") || check_byte_size(ZCR_READ_CHUNK_SIZE, "ZCR_READ_CHUNK_SIZE") || check_byte_size(NORMAL_READ_CHUNK_SIZE, "NORMAL_READ_CHUNK_SIZE")) { goto done; } opts = options_create(); if (!opts) goto done; if (opts->ty == VECSUM_LOCAL) { cdata = local_data_create(opts); if (!cdata) goto done; } else { ldata = libhdfs_data_create(opts); if (!ldata) goto done; } watch = stopwatch_create(); if (!watch) goto done; switch (opts->ty) { case VECSUM_LOCAL: vecsum_local(cdata, opts); ret = 0; break; case VECSUM_LIBHDFS: ret = vecsum_libhdfs(ldata, opts); break; case VECSUM_ZCR: ret = vecsum_zcr(ldata, opts); break; } if (ret) { fprintf(stderr, "vecsum failed with error %d\n", ret); goto done; } ret = 0; done: fprintf(stderr, "cleaning up...\n"); if (watch && (ret == 0)) { long long length = vecsum_length(opts, ldata); if (length >= 0) { stopwatch_stop(watch, length * opts->passes); } } if (cdata) local_data_free(cdata); if (ldata) libhdfs_data_free(ldata); if (opts) options_free(opts); return ret; } // vim: ts=4:sw=4:tw=79:et