vecsum.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <errno.h>
  19. #include <fcntl.h>
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <string.h>
  23. #include <sys/mman.h>
  24. #include <sys/resource.h>
  25. #include <sys/stat.h>
  26. #include <sys/time.h>
  27. #include <sys/types.h>
  28. #include <time.h>
  29. #include <unistd.h>
  30. #ifdef __MACH__ // OS X does not have clock_gettime
  31. #include <mach/clock.h>
  32. #include <mach/mach.h>
  33. #include <mach/mach_time.h>
  34. #endif
  35. #include "config.h"
  36. #include "hdfs.h"
  37. #define VECSUM_CHUNK_SIZE (8 * 1024 * 1024)
  38. #define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8)
  39. #define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024)
  40. #define DOUBLES_PER_LOOP_ITER 16
  41. static double timespec_to_double(const struct timespec *ts)
  42. {
  43. double sec = ts->tv_sec;
  44. double nsec = ts->tv_nsec;
  45. return sec + (nsec / 1000000000L);
  46. }
  47. struct stopwatch {
  48. struct timespec start;
  49. struct timespec stop;
  50. };
  51. #ifdef __MACH__
  52. static int clock_gettime_mono(struct timespec * ts) {
  53. static mach_timebase_info_data_t tb;
  54. static uint64_t timestart = 0;
  55. uint64_t t = 0;
  56. if (timestart == 0) {
  57. mach_timebase_info(&tb);
  58. timestart = mach_absolute_time();
  59. }
  60. t = mach_absolute_time() - timestart;
  61. t *= tb.numer;
  62. t /= tb.denom;
  63. ts->tv_sec = t / 1000000000ULL;
  64. ts->tv_nsec = t - (ts->tv_sec * 1000000000ULL);
  65. return 0;
  66. }
  67. #else
  68. static int clock_gettime_mono(struct timespec * ts) {
  69. return clock_gettime(CLOCK_MONOTONIC, ts);
  70. }
  71. #endif
  72. static struct stopwatch *stopwatch_create(void)
  73. {
  74. struct stopwatch *watch;
  75. watch = calloc(1, sizeof(struct stopwatch));
  76. if (!watch) {
  77. fprintf(stderr, "failed to allocate memory for stopwatch\n");
  78. goto error;
  79. }
  80. if (clock_gettime_mono(&watch->start)) {
  81. int err = errno;
  82. fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
  83. "error %d (%s)\n", err, strerror(err));
  84. goto error;
  85. }
  86. return watch;
  87. error:
  88. free(watch);
  89. return NULL;
  90. }
  91. static void stopwatch_stop(struct stopwatch *watch,
  92. long long bytes_read)
  93. {
  94. double elapsed, rate;
  95. if (clock_gettime_mono(&watch->stop)) {
  96. int err = errno;
  97. fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
  98. "error %d (%s)\n", err, strerror(err));
  99. goto done;
  100. }
  101. elapsed = timespec_to_double(&watch->stop) -
  102. timespec_to_double(&watch->start);
  103. rate = (bytes_read / elapsed) / (1024 * 1024 * 1024);
  104. printf("stopwatch: took %.5g seconds to read %lld bytes, "
  105. "for %.5g GB/s\n", elapsed, bytes_read, rate);
  106. printf("stopwatch: %.5g seconds\n", elapsed);
  107. done:
  108. free(watch);
  109. }
  110. enum vecsum_type {
  111. VECSUM_LOCAL = 0,
  112. VECSUM_LIBHDFS,
  113. VECSUM_ZCR,
  114. };
  115. #define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local"
  116. int parse_vecsum_type(const char *str)
  117. {
  118. if (strcasecmp(str, "local") == 0)
  119. return VECSUM_LOCAL;
  120. else if (strcasecmp(str, "libhdfs") == 0)
  121. return VECSUM_LIBHDFS;
  122. else if (strcasecmp(str, "zcr") == 0)
  123. return VECSUM_ZCR;
  124. else
  125. return -1;
  126. }
  127. struct options {
  128. // The path to read.
  129. const char *path;
  130. // Length of the file.
  131. long long length;
  132. // The number of times to read the path.
  133. int passes;
  134. // Type of vecsum to do
  135. enum vecsum_type ty;
  136. // RPC address to use for HDFS
  137. const char *rpc_address;
  138. };
  139. static struct options *options_create(void)
  140. {
  141. struct options *opts = NULL;
  142. const char *pass_str;
  143. const char *ty_str;
  144. const char *length_str;
  145. int ty;
  146. opts = calloc(1, sizeof(struct options));
  147. if (!opts) {
  148. fprintf(stderr, "failed to calloc options\n");
  149. goto error;
  150. }
  151. opts->path = getenv("VECSUM_PATH");
  152. if (!opts->path) {
  153. fprintf(stderr, "You must set the VECSUM_PATH environment "
  154. "variable to the path of the file to read.\n");
  155. goto error;
  156. }
  157. length_str = getenv("VECSUM_LENGTH");
  158. if (!length_str) {
  159. length_str = "2147483648";
  160. }
  161. opts->length = atoll(length_str);
  162. if (!opts->length) {
  163. fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n",
  164. length_str);
  165. goto error;
  166. }
  167. if (opts->length % VECSUM_CHUNK_SIZE) {
  168. fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'. The "
  169. "currently specified length of '%lld' is not.\n",
  170. (long long)VECSUM_CHUNK_SIZE, (long long)opts->length);
  171. goto error;
  172. }
  173. pass_str = getenv("VECSUM_PASSES");
  174. if (!pass_str) {
  175. fprintf(stderr, "You must set the VECSUM_PASSES environment "
  176. "variable to the number of passes to make.\n");
  177. goto error;
  178. }
  179. opts->passes = atoi(pass_str);
  180. if (opts->passes <= 0) {
  181. fprintf(stderr, "Invalid value for the VECSUM_PASSES "
  182. "environment variable. You must set this to a "
  183. "number greater than 0.\n");
  184. goto error;
  185. }
  186. ty_str = getenv("VECSUM_TYPE");
  187. if (!ty_str) {
  188. fprintf(stderr, "You must set the VECSUM_TYPE environment "
  189. "variable to " VECSUM_TYPE_VALID_VALUES "\n");
  190. goto error;
  191. }
  192. ty = parse_vecsum_type(ty_str);
  193. if (ty < 0) {
  194. fprintf(stderr, "Invalid VECSUM_TYPE environment variable. "
  195. "Valid values are " VECSUM_TYPE_VALID_VALUES "\n");
  196. goto error;
  197. }
  198. opts->ty = ty;
  199. opts->rpc_address = getenv("VECSUM_RPC_ADDRESS");
  200. if (!opts->rpc_address) {
  201. opts->rpc_address = "default";
  202. }
  203. return opts;
  204. error:
  205. free(opts);
  206. return NULL;
  207. }
  208. static int test_file_chunk_setup(double **chunk)
  209. {
  210. int i;
  211. double *c, val;
  212. c = malloc(VECSUM_CHUNK_SIZE);
  213. if (!c) {
  214. fprintf(stderr, "test_file_create: failed to malloc "
  215. "a buffer of size '%lld'\n",
  216. (long long) VECSUM_CHUNK_SIZE);
  217. return EIO;
  218. }
  219. val = 0.0;
  220. for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) {
  221. c[i] = val;
  222. val += 0.5;
  223. }
  224. *chunk = c;
  225. return 0;
  226. }
  227. static void options_free(struct options *opts)
  228. {
  229. free(opts);
  230. }
  231. struct local_data {
  232. int fd;
  233. double *mmap;
  234. long long length;
  235. };
  236. static int local_data_create_file(struct local_data *cdata,
  237. const struct options *opts)
  238. {
  239. int ret = EIO;
  240. int dup_fd = -1;
  241. FILE *fp = NULL;
  242. double *chunk = NULL;
  243. long long offset = 0;
  244. dup_fd = dup(cdata->fd);
  245. if (dup_fd < 0) {
  246. ret = errno;
  247. fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n",
  248. strerror(ret), ret);
  249. goto done;
  250. }
  251. fp = fdopen(dup_fd, "w");
  252. if (!fp) {
  253. ret = errno;
  254. fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n",
  255. strerror(ret), ret);
  256. goto done;
  257. }
  258. ret = test_file_chunk_setup(&chunk);
  259. if (ret)
  260. goto done;
  261. while (offset < opts->length) {
  262. if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) {
  263. fprintf(stderr, "local_data_create_file: failed to write to "
  264. "the local file '%s' at offset %lld\n",
  265. opts->path, offset);
  266. ret = EIO;
  267. goto done;
  268. }
  269. offset += VECSUM_CHUNK_SIZE;
  270. }
  271. fprintf(stderr, "local_data_create_file: successfully re-wrote %s as "
  272. "a file of length %lld\n", opts->path, opts->length);
  273. ret = 0;
  274. done:
  275. if (dup_fd >= 0) {
  276. close(dup_fd);
  277. }
  278. if (fp) {
  279. fclose(fp);
  280. }
  281. free(chunk);
  282. return ret;
  283. }
  284. static struct local_data *local_data_create(const struct options *opts)
  285. {
  286. struct local_data *cdata = NULL;
  287. struct stat st_buf;
  288. cdata = malloc(sizeof(*cdata));
  289. if (!cdata) {
  290. fprintf(stderr, "Failed to allocate local test data.\n");
  291. goto error;
  292. }
  293. cdata->fd = -1;
  294. cdata->mmap = MAP_FAILED;
  295. cdata->length = opts->length;
  296. cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777);
  297. if (cdata->fd < 0) {
  298. int err = errno;
  299. fprintf(stderr, "local_data_create: failed to open %s "
  300. "for read/write: error %d (%s)\n", opts->path, err, strerror(err));
  301. goto error;
  302. }
  303. if (fstat(cdata->fd, &st_buf)) {
  304. int err = errno;
  305. fprintf(stderr, "local_data_create: fstat(%s) failed: "
  306. "error %d (%s)\n", opts->path, err, strerror(err));
  307. goto error;
  308. }
  309. if (st_buf.st_size != opts->length) {
  310. int err;
  311. fprintf(stderr, "local_data_create: current size of %s is %lld, but "
  312. "we want %lld. Re-writing the file.\n",
  313. opts->path, (long long)st_buf.st_size,
  314. (long long)opts->length);
  315. err = local_data_create_file(cdata, opts);
  316. if (err)
  317. goto error;
  318. }
  319. cdata->mmap = mmap(NULL, cdata->length, PROT_READ,
  320. MAP_PRIVATE, cdata->fd, 0);
  321. if (cdata->mmap == MAP_FAILED) {
  322. int err = errno;
  323. fprintf(stderr, "local_data_create: mmap(%s) failed: "
  324. "error %d (%s)\n", opts->path, err, strerror(err));
  325. goto error;
  326. }
  327. return cdata;
  328. error:
  329. if (cdata) {
  330. if (cdata->fd >= 0) {
  331. close(cdata->fd);
  332. }
  333. free(cdata);
  334. }
  335. return NULL;
  336. }
  337. static void local_data_free(struct local_data *cdata)
  338. {
  339. close(cdata->fd);
  340. munmap(cdata->mmap, cdata->length);
  341. }
  342. struct libhdfs_data {
  343. hdfsFS fs;
  344. hdfsFile file;
  345. long long length;
  346. double *buf;
  347. };
  348. static void libhdfs_data_free(struct libhdfs_data *ldata)
  349. {
  350. if (ldata->fs) {
  351. free(ldata->buf);
  352. if (ldata->file) {
  353. hdfsCloseFile(ldata->fs, ldata->file);
  354. }
  355. hdfsDisconnect(ldata->fs);
  356. }
  357. free(ldata);
  358. }
  359. static int libhdfs_data_create_file(struct libhdfs_data *ldata,
  360. const struct options *opts)
  361. {
  362. int ret;
  363. double *chunk = NULL;
  364. long long offset = 0;
  365. ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0);
  366. if (!ldata->file) {
  367. ret = errno;
  368. fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, "
  369. "O_WRONLY) failed: error %d (%s)\n", opts->path, ret,
  370. strerror(ret));
  371. goto done;
  372. }
  373. ret = test_file_chunk_setup(&chunk);
  374. if (ret)
  375. goto done;
  376. while (offset < opts->length) {
  377. ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE);
  378. if (ret < 0) {
  379. ret = errno;
  380. fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at "
  381. "offset %lld of %s\n", ret, strerror(ret),
  382. offset, opts->path);
  383. goto done;
  384. } else if (ret < VECSUM_CHUNK_SIZE) {
  385. fprintf(stderr, "libhdfs_data_create_file: got short write "
  386. "of %d at offset %lld of %s\n", ret, offset, opts->path);
  387. goto done;
  388. }
  389. offset += VECSUM_CHUNK_SIZE;
  390. }
  391. ret = 0;
  392. done:
  393. free(chunk);
  394. if (ldata->file) {
  395. if (hdfsCloseFile(ldata->fs, ldata->file)) {
  396. fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error.");
  397. ret = EIO;
  398. }
  399. ldata->file = NULL;
  400. }
  401. return ret;
  402. }
  403. static struct libhdfs_data *libhdfs_data_create(const struct options *opts)
  404. {
  405. struct libhdfs_data *ldata = NULL;
  406. struct hdfsBuilder *builder = NULL;
  407. hdfsFileInfo *pinfo = NULL;
  408. ldata = calloc(1, sizeof(struct libhdfs_data));
  409. if (!ldata) {
  410. fprintf(stderr, "Failed to allocate libhdfs test data.\n");
  411. goto error;
  412. }
  413. builder = hdfsNewBuilder();
  414. if (!builder) {
  415. fprintf(stderr, "Failed to create builder.\n");
  416. goto error;
  417. }
  418. hdfsBuilderSetNameNode(builder, opts->rpc_address);
  419. hdfsBuilderConfSetStr(builder,
  420. "dfs.client.read.shortcircuit.skip.checksum", "true");
  421. ldata->fs = hdfsBuilderConnect(builder);
  422. if (!ldata->fs) {
  423. fprintf(stderr, "Could not connect to default namenode!\n");
  424. goto error;
  425. }
  426. pinfo = hdfsGetPathInfo(ldata->fs, opts->path);
  427. if (!pinfo) {
  428. int err = errno;
  429. fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s). "
  430. "Attempting to re-create file.\n",
  431. opts->path, err, strerror(err));
  432. if (libhdfs_data_create_file(ldata, opts))
  433. goto error;
  434. } else if (pinfo->mSize != opts->length) {
  435. fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, "
  436. "but we want length %lld. Attempting to re-create file.\n",
  437. opts->path, (long long)pinfo->mSize, (long long)opts->length);
  438. if (libhdfs_data_create_file(ldata, opts))
  439. goto error;
  440. }
  441. ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0);
  442. if (!ldata->file) {
  443. int err = errno;
  444. fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n",
  445. opts->path, err, strerror(err));
  446. goto error;
  447. }
  448. ldata->length = opts->length;
  449. return ldata;
  450. error:
  451. if (pinfo)
  452. hdfsFreeFileInfo(pinfo, 1);
  453. if (ldata)
  454. libhdfs_data_free(ldata);
  455. return NULL;
  456. }
  457. static int check_byte_size(int byte_size, const char *const str)
  458. {
  459. if (byte_size % sizeof(double)) {
  460. fprintf(stderr, "%s is not a multiple "
  461. "of sizeof(double)\n", str);
  462. return EINVAL;
  463. }
  464. if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) {
  465. fprintf(stderr, "The number of doubles contained in "
  466. "%s is not a multiple of DOUBLES_PER_LOOP_ITER\n",
  467. str);
  468. return EINVAL;
  469. }
  470. return 0;
  471. }
  472. #ifdef HAVE_INTEL_SSE_INTRINSICS
  473. #include <emmintrin.h>
  474. static double vecsum(const double *buf, int num_doubles)
  475. {
  476. int i;
  477. double hi, lo;
  478. __m128d x0, x1, x2, x3, x4, x5, x6, x7;
  479. __m128d sum0 = _mm_set_pd(0.0,0.0);
  480. __m128d sum1 = _mm_set_pd(0.0,0.0);
  481. __m128d sum2 = _mm_set_pd(0.0,0.0);
  482. __m128d sum3 = _mm_set_pd(0.0,0.0);
  483. __m128d sum4 = _mm_set_pd(0.0,0.0);
  484. __m128d sum5 = _mm_set_pd(0.0,0.0);
  485. __m128d sum6 = _mm_set_pd(0.0,0.0);
  486. __m128d sum7 = _mm_set_pd(0.0,0.0);
  487. for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) {
  488. x0 = _mm_load_pd(buf + i + 0);
  489. x1 = _mm_load_pd(buf + i + 2);
  490. x2 = _mm_load_pd(buf + i + 4);
  491. x3 = _mm_load_pd(buf + i + 6);
  492. x4 = _mm_load_pd(buf + i + 8);
  493. x5 = _mm_load_pd(buf + i + 10);
  494. x6 = _mm_load_pd(buf + i + 12);
  495. x7 = _mm_load_pd(buf + i + 14);
  496. sum0 = _mm_add_pd(sum0, x0);
  497. sum1 = _mm_add_pd(sum1, x1);
  498. sum2 = _mm_add_pd(sum2, x2);
  499. sum3 = _mm_add_pd(sum3, x3);
  500. sum4 = _mm_add_pd(sum4, x4);
  501. sum5 = _mm_add_pd(sum5, x5);
  502. sum6 = _mm_add_pd(sum6, x6);
  503. sum7 = _mm_add_pd(sum7, x7);
  504. }
  505. x0 = _mm_add_pd(sum0, sum1);
  506. x1 = _mm_add_pd(sum2, sum3);
  507. x2 = _mm_add_pd(sum4, sum5);
  508. x3 = _mm_add_pd(sum6, sum7);
  509. x4 = _mm_add_pd(x0, x1);
  510. x5 = _mm_add_pd(x2, x3);
  511. x6 = _mm_add_pd(x4, x5);
  512. _mm_storeh_pd(&hi, x6);
  513. _mm_storel_pd(&lo, x6);
  514. return hi + lo;
  515. }
  516. #else
  517. static double vecsum(const double *buf, int num_doubles)
  518. {
  519. int i;
  520. double sum = 0.0;
  521. for (i = 0; i < num_doubles; i++) {
  522. sum += buf[i];
  523. }
  524. return sum;
  525. }
  526. #endif
  527. static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata,
  528. struct hadoopRzOptions *zopts,
  529. const struct options *opts)
  530. {
  531. int32_t len;
  532. double sum = 0.0;
  533. const double *buf;
  534. struct hadoopRzBuffer *rzbuf = NULL;
  535. int ret;
  536. while (1) {
  537. rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE);
  538. if (!rzbuf) {
  539. ret = errno;
  540. fprintf(stderr, "hadoopReadZero failed with error "
  541. "code %d (%s)\n", ret, strerror(ret));
  542. goto done;
  543. }
  544. buf = hadoopRzBufferGet(rzbuf);
  545. if (!buf) break;
  546. len = hadoopRzBufferLength(rzbuf);
  547. if (len < ZCR_READ_CHUNK_SIZE) {
  548. fprintf(stderr, "hadoopReadZero got a partial read "
  549. "of length %d\n", len);
  550. ret = EINVAL;
  551. goto done;
  552. }
  553. sum += vecsum(buf,
  554. ZCR_READ_CHUNK_SIZE / sizeof(double));
  555. hadoopRzBufferFree(ldata->file, rzbuf);
  556. }
  557. printf("finished zcr pass %d. sum = %g\n", pass, sum);
  558. ret = 0;
  559. done:
  560. if (rzbuf)
  561. hadoopRzBufferFree(ldata->file, rzbuf);
  562. return ret;
  563. }
  564. static int vecsum_zcr(struct libhdfs_data *ldata,
  565. const struct options *opts)
  566. {
  567. int ret, pass;
  568. struct hadoopRzOptions *zopts = NULL;
  569. zopts = hadoopRzOptionsAlloc();
  570. if (!zopts) {
  571. fprintf(stderr, "hadoopRzOptionsAlloc failed.\n");
  572. ret = ENOMEM;
  573. goto done;
  574. }
  575. if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) {
  576. ret = errno;
  577. perror("hadoopRzOptionsSetSkipChecksum failed: ");
  578. goto done;
  579. }
  580. if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) {
  581. ret = errno;
  582. perror("hadoopRzOptionsSetByteBufferPool failed: ");
  583. goto done;
  584. }
  585. for (pass = 0; pass < opts->passes; ++pass) {
  586. ret = vecsum_zcr_loop(pass, ldata, zopts, opts);
  587. if (ret) {
  588. fprintf(stderr, "vecsum_zcr_loop pass %d failed "
  589. "with error %d\n", pass, ret);
  590. goto done;
  591. }
  592. hdfsSeek(ldata->fs, ldata->file, 0);
  593. }
  594. ret = 0;
  595. done:
  596. if (zopts)
  597. hadoopRzOptionsFree(zopts);
  598. return ret;
  599. }
  600. tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  601. {
  602. uint8_t *buf = buffer;
  603. tSize ret, nread = 0;
  604. while (length > 0) {
  605. ret = hdfsRead(fs, f, buf, length);
  606. if (ret < 0) {
  607. if (errno != EINTR) {
  608. return -1;
  609. }
  610. }
  611. if (ret == 0) {
  612. break;
  613. }
  614. nread += ret;
  615. length -= ret;
  616. buf += ret;
  617. }
  618. return nread;
  619. }
  620. static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata,
  621. const struct options *opts)
  622. {
  623. double sum = 0.0;
  624. while (1) {
  625. int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf,
  626. NORMAL_READ_CHUNK_SIZE);
  627. if (res == 0) // EOF
  628. break;
  629. if (res < 0) {
  630. int err = errno;
  631. fprintf(stderr, "hdfsRead failed with error %d (%s)\n",
  632. err, strerror(err));
  633. return err;
  634. }
  635. if (res < NORMAL_READ_CHUNK_SIZE) {
  636. fprintf(stderr, "hdfsRead got a partial read of "
  637. "length %d\n", res);
  638. return EINVAL;
  639. }
  640. sum += vecsum(ldata->buf,
  641. NORMAL_READ_CHUNK_SIZE / sizeof(double));
  642. }
  643. printf("finished normal pass %d. sum = %g\n", pass, sum);
  644. return 0;
  645. }
  646. static int vecsum_libhdfs(struct libhdfs_data *ldata,
  647. const struct options *opts)
  648. {
  649. int pass;
  650. ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE);
  651. if (!ldata->buf) {
  652. fprintf(stderr, "failed to malloc buffer of size %d\n",
  653. NORMAL_READ_CHUNK_SIZE);
  654. return ENOMEM;
  655. }
  656. for (pass = 0; pass < opts->passes; ++pass) {
  657. int ret = vecsum_normal_loop(pass, ldata, opts);
  658. if (ret) {
  659. fprintf(stderr, "vecsum_normal_loop pass %d failed "
  660. "with error %d\n", pass, ret);
  661. return ret;
  662. }
  663. hdfsSeek(ldata->fs, ldata->file, 0);
  664. }
  665. return 0;
  666. }
  667. static void vecsum_local(struct local_data *cdata, const struct options *opts)
  668. {
  669. int pass;
  670. for (pass = 0; pass < opts->passes; pass++) {
  671. double sum = vecsum(cdata->mmap, cdata->length / sizeof(double));
  672. printf("finished vecsum_local pass %d. sum = %g\n", pass, sum);
  673. }
  674. }
  675. static long long vecsum_length(const struct options *opts,
  676. const struct libhdfs_data *ldata)
  677. {
  678. if (opts->ty == VECSUM_LOCAL) {
  679. struct stat st_buf = { 0 };
  680. if (stat(opts->path, &st_buf)) {
  681. int err = errno;
  682. fprintf(stderr, "vecsum_length: stat(%s) failed: "
  683. "error %d (%s)\n", opts->path, err, strerror(err));
  684. return -EIO;
  685. }
  686. return st_buf.st_size;
  687. } else {
  688. return ldata->length;
  689. }
  690. }
  691. /*
  692. * vecsum is a microbenchmark which measures the speed of various ways of
  693. * reading from HDFS. It creates a file containing floating-point 'doubles',
  694. * and computes the sum of all the doubles several times. For some CPUs,
  695. * assembly optimizations are used for the summation (SSE, etc).
  696. */
  697. int main(void)
  698. {
  699. int ret = 1;
  700. struct options *opts = NULL;
  701. struct local_data *cdata = NULL;
  702. struct libhdfs_data *ldata = NULL;
  703. struct stopwatch *watch = NULL;
  704. if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") ||
  705. check_byte_size(ZCR_READ_CHUNK_SIZE,
  706. "ZCR_READ_CHUNK_SIZE") ||
  707. check_byte_size(NORMAL_READ_CHUNK_SIZE,
  708. "NORMAL_READ_CHUNK_SIZE")) {
  709. goto done;
  710. }
  711. opts = options_create();
  712. if (!opts)
  713. goto done;
  714. if (opts->ty == VECSUM_LOCAL) {
  715. cdata = local_data_create(opts);
  716. if (!cdata)
  717. goto done;
  718. } else {
  719. ldata = libhdfs_data_create(opts);
  720. if (!ldata)
  721. goto done;
  722. }
  723. watch = stopwatch_create();
  724. if (!watch)
  725. goto done;
  726. switch (opts->ty) {
  727. case VECSUM_LOCAL:
  728. vecsum_local(cdata, opts);
  729. ret = 0;
  730. break;
  731. case VECSUM_LIBHDFS:
  732. ret = vecsum_libhdfs(ldata, opts);
  733. break;
  734. case VECSUM_ZCR:
  735. ret = vecsum_zcr(ldata, opts);
  736. break;
  737. }
  738. if (ret) {
  739. fprintf(stderr, "vecsum failed with error %d\n", ret);
  740. goto done;
  741. }
  742. ret = 0;
  743. done:
  744. fprintf(stderr, "cleaning up...\n");
  745. if (watch && (ret == 0)) {
  746. long long length = vecsum_length(opts, ldata);
  747. if (length >= 0) {
  748. stopwatch_stop(watch, length * opts->passes);
  749. }
  750. }
  751. if (cdata)
  752. local_data_free(cdata);
  753. if (ldata)
  754. libhdfs_data_free(ldata);
  755. if (opts)
  756. options_free(opts);
  757. return ret;
  758. }
  759. // vim: ts=4:sw=4:tw=79:et