vecsum.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <errno.h>
  19. #include <fcntl.h>
  20. #include <malloc.h>
  21. #include <stdio.h>
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <sys/mman.h>
  25. #include <sys/resource.h>
  26. #include <sys/stat.h>
  27. #include <sys/time.h>
  28. #include <sys/types.h>
  29. #include <time.h>
  30. #include <unistd.h>
  31. #include "config.h"
  32. #include "hdfs.h"
  33. #define VECSUM_CHUNK_SIZE (8 * 1024 * 1024)
  34. #define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8)
  35. #define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024)
  36. #define DOUBLES_PER_LOOP_ITER 16
  37. static double timespec_to_double(const struct timespec *ts)
  38. {
  39. double sec = ts->tv_sec;
  40. double nsec = ts->tv_nsec;
  41. return sec + (nsec / 1000000000L);
  42. }
  43. struct stopwatch {
  44. struct timespec start;
  45. struct timespec stop;
  46. struct rusage rusage;
  47. };
  48. static struct stopwatch *stopwatch_create(void)
  49. {
  50. struct stopwatch *watch;
  51. watch = calloc(1, sizeof(struct stopwatch));
  52. if (!watch) {
  53. fprintf(stderr, "failed to allocate memory for stopwatch\n");
  54. goto error;
  55. }
  56. if (clock_gettime(CLOCK_MONOTONIC, &watch->start)) {
  57. int err = errno;
  58. fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
  59. "error %d (%s)\n", err, strerror(err));
  60. goto error;
  61. }
  62. if (getrusage(RUSAGE_THREAD, &watch->rusage) < 0) {
  63. int err = errno;
  64. fprintf(stderr, "getrusage failed: error %d (%s)\n",
  65. err, strerror(err));
  66. goto error;
  67. }
  68. return watch;
  69. error:
  70. free(watch);
  71. return NULL;
  72. }
  73. static void stopwatch_stop(struct stopwatch *watch,
  74. long long bytes_read)
  75. {
  76. double elapsed, rate;
  77. if (clock_gettime(CLOCK_MONOTONIC, &watch->stop)) {
  78. int err = errno;
  79. fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with "
  80. "error %d (%s)\n", err, strerror(err));
  81. goto done;
  82. }
  83. elapsed = timespec_to_double(&watch->stop) -
  84. timespec_to_double(&watch->start);
  85. rate = (bytes_read / elapsed) / (1024 * 1024 * 1024);
  86. printf("stopwatch: took %.5g seconds to read %lld bytes, "
  87. "for %.5g GB/s\n", elapsed, bytes_read, rate);
  88. printf("stopwatch: %.5g seconds\n", elapsed);
  89. done:
  90. free(watch);
  91. }
  92. enum vecsum_type {
  93. VECSUM_LOCAL = 0,
  94. VECSUM_LIBHDFS,
  95. VECSUM_ZCR,
  96. };
  97. #define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local"
  98. int parse_vecsum_type(const char *str)
  99. {
  100. if (strcasecmp(str, "local") == 0)
  101. return VECSUM_LOCAL;
  102. else if (strcasecmp(str, "libhdfs") == 0)
  103. return VECSUM_LIBHDFS;
  104. else if (strcasecmp(str, "zcr") == 0)
  105. return VECSUM_ZCR;
  106. else
  107. return -1;
  108. }
  109. struct options {
  110. // The path to read.
  111. const char *path;
  112. // Length of the file.
  113. long long length;
  114. // The number of times to read the path.
  115. int passes;
  116. // Type of vecsum to do
  117. enum vecsum_type ty;
  118. // RPC address to use for HDFS
  119. const char *rpc_address;
  120. };
  121. static struct options *options_create(void)
  122. {
  123. struct options *opts = NULL;
  124. const char *pass_str;
  125. const char *ty_str;
  126. const char *length_str;
  127. int ty;
  128. opts = calloc(1, sizeof(struct options));
  129. if (!opts) {
  130. fprintf(stderr, "failed to calloc options\n");
  131. goto error;
  132. }
  133. opts->path = getenv("VECSUM_PATH");
  134. if (!opts->path) {
  135. fprintf(stderr, "You must set the VECSUM_PATH environment "
  136. "variable to the path of the file to read.\n");
  137. goto error;
  138. }
  139. length_str = getenv("VECSUM_LENGTH");
  140. if (!length_str) {
  141. length_str = "2147483648";
  142. }
  143. opts->length = atoll(length_str);
  144. if (!opts->length) {
  145. fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n",
  146. length_str);
  147. goto error;
  148. }
  149. if (opts->length % VECSUM_CHUNK_SIZE) {
  150. fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'. The "
  151. "currently specified length of '%lld' is not.\n",
  152. (long long)VECSUM_CHUNK_SIZE, (long long)opts->length);
  153. goto error;
  154. }
  155. pass_str = getenv("VECSUM_PASSES");
  156. if (!pass_str) {
  157. fprintf(stderr, "You must set the VECSUM_PASSES environment "
  158. "variable to the number of passes to make.\n");
  159. goto error;
  160. }
  161. opts->passes = atoi(pass_str);
  162. if (opts->passes <= 0) {
  163. fprintf(stderr, "Invalid value for the VECSUM_PASSES "
  164. "environment variable. You must set this to a "
  165. "number greater than 0.\n");
  166. goto error;
  167. }
  168. ty_str = getenv("VECSUM_TYPE");
  169. if (!ty_str) {
  170. fprintf(stderr, "You must set the VECSUM_TYPE environment "
  171. "variable to " VECSUM_TYPE_VALID_VALUES "\n");
  172. goto error;
  173. }
  174. ty = parse_vecsum_type(ty_str);
  175. if (ty < 0) {
  176. fprintf(stderr, "Invalid VECSUM_TYPE environment variable. "
  177. "Valid values are " VECSUM_TYPE_VALID_VALUES "\n");
  178. goto error;
  179. }
  180. opts->ty = ty;
  181. opts->rpc_address = getenv("VECSUM_RPC_ADDRESS");
  182. if (!opts->rpc_address) {
  183. opts->rpc_address = "default";
  184. }
  185. return opts;
  186. error:
  187. free(opts);
  188. return NULL;
  189. }
  190. static int test_file_chunk_setup(double **chunk)
  191. {
  192. int i;
  193. double *c, val;
  194. c = malloc(VECSUM_CHUNK_SIZE);
  195. if (!c) {
  196. fprintf(stderr, "test_file_create: failed to malloc "
  197. "a buffer of size '%lld'\n",
  198. (long long) VECSUM_CHUNK_SIZE);
  199. return EIO;
  200. }
  201. val = 0.0;
  202. for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) {
  203. c[i] = val;
  204. val += 0.5;
  205. }
  206. *chunk = c;
  207. return 0;
  208. }
  209. static void options_free(struct options *opts)
  210. {
  211. free(opts);
  212. }
  213. struct local_data {
  214. int fd;
  215. double *mmap;
  216. long long length;
  217. };
  218. static int local_data_create_file(struct local_data *cdata,
  219. const struct options *opts)
  220. {
  221. int ret = EIO;
  222. int dup_fd = -1;
  223. FILE *fp = NULL;
  224. double *chunk = NULL;
  225. long long offset = 0;
  226. dup_fd = dup(cdata->fd);
  227. if (dup_fd < 0) {
  228. ret = errno;
  229. fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n",
  230. strerror(ret), ret);
  231. goto done;
  232. }
  233. fp = fdopen(dup_fd, "w");
  234. if (!fp) {
  235. ret = errno;
  236. fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n",
  237. strerror(ret), ret);
  238. goto done;
  239. }
  240. ret = test_file_chunk_setup(&chunk);
  241. if (ret)
  242. goto done;
  243. while (offset < opts->length) {
  244. if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) {
  245. fprintf(stderr, "local_data_create_file: failed to write to "
  246. "the local file '%s' at offset %lld\n",
  247. opts->path, offset);
  248. ret = EIO;
  249. goto done;
  250. }
  251. offset += VECSUM_CHUNK_SIZE;
  252. }
  253. fprintf(stderr, "local_data_create_file: successfully re-wrote %s as "
  254. "a file of length %lld\n", opts->path, opts->length);
  255. ret = 0;
  256. done:
  257. if (dup_fd >= 0) {
  258. close(dup_fd);
  259. }
  260. if (fp) {
  261. fclose(fp);
  262. }
  263. free(chunk);
  264. return ret;
  265. }
  266. static struct local_data *local_data_create(const struct options *opts)
  267. {
  268. struct local_data *cdata = NULL;
  269. struct stat st_buf;
  270. cdata = malloc(sizeof(*cdata));
  271. if (!cdata) {
  272. fprintf(stderr, "Failed to allocate local test data.\n");
  273. goto error;
  274. }
  275. cdata->fd = -1;
  276. cdata->mmap = MAP_FAILED;
  277. cdata->length = opts->length;
  278. cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777);
  279. if (cdata->fd < 0) {
  280. int err = errno;
  281. fprintf(stderr, "local_data_create: failed to open %s "
  282. "for read/write: error %d (%s)\n", opts->path, err, strerror(err));
  283. goto error;
  284. }
  285. if (fstat(cdata->fd, &st_buf)) {
  286. int err = errno;
  287. fprintf(stderr, "local_data_create: fstat(%s) failed: "
  288. "error %d (%s)\n", opts->path, err, strerror(err));
  289. goto error;
  290. }
  291. if (st_buf.st_size != opts->length) {
  292. int err;
  293. fprintf(stderr, "local_data_create: current size of %s is %lld, but "
  294. "we want %lld. Re-writing the file.\n",
  295. opts->path, (long long)st_buf.st_size,
  296. (long long)opts->length);
  297. err = local_data_create_file(cdata, opts);
  298. if (err)
  299. goto error;
  300. }
  301. cdata->mmap = mmap(NULL, cdata->length, PROT_READ,
  302. MAP_PRIVATE, cdata->fd, 0);
  303. if (cdata->mmap == MAP_FAILED) {
  304. int err = errno;
  305. fprintf(stderr, "local_data_create: mmap(%s) failed: "
  306. "error %d (%s)\n", opts->path, err, strerror(err));
  307. goto error;
  308. }
  309. return cdata;
  310. error:
  311. if (cdata) {
  312. if (cdata->fd >= 0) {
  313. close(cdata->fd);
  314. }
  315. free(cdata);
  316. }
  317. return NULL;
  318. }
  319. static void local_data_free(struct local_data *cdata)
  320. {
  321. close(cdata->fd);
  322. munmap(cdata->mmap, cdata->length);
  323. }
  324. struct libhdfs_data {
  325. hdfsFS fs;
  326. hdfsFile file;
  327. long long length;
  328. double *buf;
  329. };
  330. static void libhdfs_data_free(struct libhdfs_data *ldata)
  331. {
  332. if (ldata->fs) {
  333. free(ldata->buf);
  334. if (ldata->file) {
  335. hdfsCloseFile(ldata->fs, ldata->file);
  336. }
  337. hdfsDisconnect(ldata->fs);
  338. }
  339. free(ldata);
  340. }
  341. static int libhdfs_data_create_file(struct libhdfs_data *ldata,
  342. const struct options *opts)
  343. {
  344. int ret;
  345. double *chunk = NULL;
  346. long long offset = 0;
  347. ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0);
  348. if (!ldata->file) {
  349. ret = errno;
  350. fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, "
  351. "O_WRONLY) failed: error %d (%s)\n", opts->path, ret,
  352. strerror(ret));
  353. goto done;
  354. }
  355. ret = test_file_chunk_setup(&chunk);
  356. if (ret)
  357. goto done;
  358. while (offset < opts->length) {
  359. ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE);
  360. if (ret < 0) {
  361. ret = errno;
  362. fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at "
  363. "offset %lld of %s\n", ret, strerror(ret),
  364. offset, opts->path);
  365. goto done;
  366. } else if (ret < VECSUM_CHUNK_SIZE) {
  367. fprintf(stderr, "libhdfs_data_create_file: got short write "
  368. "of %d at offset %lld of %s\n", ret, offset, opts->path);
  369. goto done;
  370. }
  371. offset += VECSUM_CHUNK_SIZE;
  372. }
  373. ret = 0;
  374. done:
  375. free(chunk);
  376. if (ldata->file) {
  377. if (hdfsCloseFile(ldata->fs, ldata->file)) {
  378. fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error.");
  379. ret = EIO;
  380. }
  381. ldata->file = NULL;
  382. }
  383. return ret;
  384. }
  385. static struct libhdfs_data *libhdfs_data_create(const struct options *opts)
  386. {
  387. struct libhdfs_data *ldata = NULL;
  388. struct hdfsBuilder *builder = NULL;
  389. hdfsFileInfo *pinfo = NULL;
  390. ldata = calloc(1, sizeof(struct libhdfs_data));
  391. if (!ldata) {
  392. fprintf(stderr, "Failed to allocate libhdfs test data.\n");
  393. goto error;
  394. }
  395. builder = hdfsNewBuilder();
  396. if (!builder) {
  397. fprintf(stderr, "Failed to create builder.\n");
  398. goto error;
  399. }
  400. hdfsBuilderSetNameNode(builder, opts->rpc_address);
  401. hdfsBuilderConfSetStr(builder,
  402. "dfs.client.read.shortcircuit.skip.checksum", "true");
  403. ldata->fs = hdfsBuilderConnect(builder);
  404. if (!ldata->fs) {
  405. fprintf(stderr, "Could not connect to default namenode!\n");
  406. goto error;
  407. }
  408. pinfo = hdfsGetPathInfo(ldata->fs, opts->path);
  409. if (!pinfo) {
  410. int err = errno;
  411. fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s). "
  412. "Attempting to re-create file.\n",
  413. opts->path, err, strerror(err));
  414. if (libhdfs_data_create_file(ldata, opts))
  415. goto error;
  416. } else if (pinfo->mSize != opts->length) {
  417. fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, "
  418. "but we want length %lld. Attempting to re-create file.\n",
  419. opts->path, (long long)pinfo->mSize, (long long)opts->length);
  420. if (libhdfs_data_create_file(ldata, opts))
  421. goto error;
  422. }
  423. ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0);
  424. if (!ldata->file) {
  425. int err = errno;
  426. fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n",
  427. opts->path, err, strerror(err));
  428. goto error;
  429. }
  430. ldata->length = opts->length;
  431. return ldata;
  432. error:
  433. if (pinfo)
  434. hdfsFreeFileInfo(pinfo, 1);
  435. if (ldata)
  436. libhdfs_data_free(ldata);
  437. return NULL;
  438. }
  439. static int check_byte_size(int byte_size, const char *const str)
  440. {
  441. if (byte_size % sizeof(double)) {
  442. fprintf(stderr, "%s is not a multiple "
  443. "of sizeof(double)\n", str);
  444. return EINVAL;
  445. }
  446. if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) {
  447. fprintf(stderr, "The number of doubles contained in "
  448. "%s is not a multiple of DOUBLES_PER_LOOP_ITER\n",
  449. str);
  450. return EINVAL;
  451. }
  452. return 0;
  453. }
  454. #ifdef HAVE_INTEL_SSE_INTRINSICS
  455. #include <emmintrin.h>
  456. static double vecsum(const double *buf, int num_doubles)
  457. {
  458. int i;
  459. double hi, lo;
  460. __m128d x0, x1, x2, x3, x4, x5, x6, x7;
  461. __m128d sum0 = _mm_set_pd(0.0,0.0);
  462. __m128d sum1 = _mm_set_pd(0.0,0.0);
  463. __m128d sum2 = _mm_set_pd(0.0,0.0);
  464. __m128d sum3 = _mm_set_pd(0.0,0.0);
  465. __m128d sum4 = _mm_set_pd(0.0,0.0);
  466. __m128d sum5 = _mm_set_pd(0.0,0.0);
  467. __m128d sum6 = _mm_set_pd(0.0,0.0);
  468. __m128d sum7 = _mm_set_pd(0.0,0.0);
  469. for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) {
  470. x0 = _mm_load_pd(buf + i + 0);
  471. x1 = _mm_load_pd(buf + i + 2);
  472. x2 = _mm_load_pd(buf + i + 4);
  473. x3 = _mm_load_pd(buf + i + 6);
  474. x4 = _mm_load_pd(buf + i + 8);
  475. x5 = _mm_load_pd(buf + i + 10);
  476. x6 = _mm_load_pd(buf + i + 12);
  477. x7 = _mm_load_pd(buf + i + 14);
  478. sum0 = _mm_add_pd(sum0, x0);
  479. sum1 = _mm_add_pd(sum1, x1);
  480. sum2 = _mm_add_pd(sum2, x2);
  481. sum3 = _mm_add_pd(sum3, x3);
  482. sum4 = _mm_add_pd(sum4, x4);
  483. sum5 = _mm_add_pd(sum5, x5);
  484. sum6 = _mm_add_pd(sum6, x6);
  485. sum7 = _mm_add_pd(sum7, x7);
  486. }
  487. x0 = _mm_add_pd(sum0, sum1);
  488. x1 = _mm_add_pd(sum2, sum3);
  489. x2 = _mm_add_pd(sum4, sum5);
  490. x3 = _mm_add_pd(sum6, sum7);
  491. x4 = _mm_add_pd(x0, x1);
  492. x5 = _mm_add_pd(x2, x3);
  493. x6 = _mm_add_pd(x4, x5);
  494. _mm_storeh_pd(&hi, x6);
  495. _mm_storel_pd(&lo, x6);
  496. return hi + lo;
  497. }
  498. #else
  499. static double vecsum(const double *buf, int num_doubles)
  500. {
  501. int i;
  502. double sum = 0.0;
  503. for (i = 0; i < num_doubles; i++) {
  504. sum += buf[i];
  505. }
  506. return sum;
  507. }
  508. #endif
  509. static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata,
  510. struct hadoopRzOptions *zopts,
  511. const struct options *opts)
  512. {
  513. int32_t len;
  514. double sum = 0.0;
  515. const double *buf;
  516. struct hadoopRzBuffer *rzbuf = NULL;
  517. int ret;
  518. while (1) {
  519. rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE);
  520. if (!rzbuf) {
  521. ret = errno;
  522. fprintf(stderr, "hadoopReadZero failed with error "
  523. "code %d (%s)\n", ret, strerror(ret));
  524. goto done;
  525. }
  526. buf = hadoopRzBufferGet(rzbuf);
  527. if (!buf) break;
  528. len = hadoopRzBufferLength(rzbuf);
  529. if (len < ZCR_READ_CHUNK_SIZE) {
  530. fprintf(stderr, "hadoopReadZero got a partial read "
  531. "of length %d\n", len);
  532. ret = EINVAL;
  533. goto done;
  534. }
  535. sum += vecsum(buf,
  536. ZCR_READ_CHUNK_SIZE / sizeof(double));
  537. hadoopRzBufferFree(ldata->file, rzbuf);
  538. }
  539. printf("finished zcr pass %d. sum = %g\n", pass, sum);
  540. ret = 0;
  541. done:
  542. if (rzbuf)
  543. hadoopRzBufferFree(ldata->file, rzbuf);
  544. return ret;
  545. }
  546. static int vecsum_zcr(struct libhdfs_data *ldata,
  547. const struct options *opts)
  548. {
  549. int ret, pass;
  550. struct hadoopRzOptions *zopts = NULL;
  551. zopts = hadoopRzOptionsAlloc();
  552. if (!zopts) {
  553. fprintf(stderr, "hadoopRzOptionsAlloc failed.\n");
  554. ret = ENOMEM;
  555. goto done;
  556. }
  557. if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) {
  558. ret = errno;
  559. perror("hadoopRzOptionsSetSkipChecksum failed: ");
  560. goto done;
  561. }
  562. if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) {
  563. ret = errno;
  564. perror("hadoopRzOptionsSetByteBufferPool failed: ");
  565. goto done;
  566. }
  567. for (pass = 0; pass < opts->passes; ++pass) {
  568. ret = vecsum_zcr_loop(pass, ldata, zopts, opts);
  569. if (ret) {
  570. fprintf(stderr, "vecsum_zcr_loop pass %d failed "
  571. "with error %d\n", pass, ret);
  572. goto done;
  573. }
  574. hdfsSeek(ldata->fs, ldata->file, 0);
  575. }
  576. ret = 0;
  577. done:
  578. if (zopts)
  579. hadoopRzOptionsFree(zopts);
  580. return ret;
  581. }
  582. tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  583. {
  584. uint8_t *buf = buffer;
  585. tSize ret, nread = 0;
  586. while (length > 0) {
  587. ret = hdfsRead(fs, f, buf, length);
  588. if (ret < 0) {
  589. if (errno != EINTR) {
  590. return -1;
  591. }
  592. }
  593. if (ret == 0) {
  594. break;
  595. }
  596. nread += ret;
  597. length -= ret;
  598. buf += ret;
  599. }
  600. return nread;
  601. }
  602. static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata,
  603. const struct options *opts)
  604. {
  605. double sum = 0.0;
  606. while (1) {
  607. int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf,
  608. NORMAL_READ_CHUNK_SIZE);
  609. if (res == 0) // EOF
  610. break;
  611. if (res < 0) {
  612. int err = errno;
  613. fprintf(stderr, "hdfsRead failed with error %d (%s)\n",
  614. err, strerror(err));
  615. return err;
  616. }
  617. if (res < NORMAL_READ_CHUNK_SIZE) {
  618. fprintf(stderr, "hdfsRead got a partial read of "
  619. "length %d\n", res);
  620. return EINVAL;
  621. }
  622. sum += vecsum(ldata->buf,
  623. NORMAL_READ_CHUNK_SIZE / sizeof(double));
  624. }
  625. printf("finished normal pass %d. sum = %g\n", pass, sum);
  626. return 0;
  627. }
  628. static int vecsum_libhdfs(struct libhdfs_data *ldata,
  629. const struct options *opts)
  630. {
  631. int pass;
  632. ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE);
  633. if (!ldata->buf) {
  634. fprintf(stderr, "failed to malloc buffer of size %d\n",
  635. NORMAL_READ_CHUNK_SIZE);
  636. return ENOMEM;
  637. }
  638. for (pass = 0; pass < opts->passes; ++pass) {
  639. int ret = vecsum_normal_loop(pass, ldata, opts);
  640. if (ret) {
  641. fprintf(stderr, "vecsum_normal_loop pass %d failed "
  642. "with error %d\n", pass, ret);
  643. return ret;
  644. }
  645. hdfsSeek(ldata->fs, ldata->file, 0);
  646. }
  647. return 0;
  648. }
  649. static void vecsum_local(struct local_data *cdata, const struct options *opts)
  650. {
  651. int pass;
  652. for (pass = 0; pass < opts->passes; pass++) {
  653. double sum = vecsum(cdata->mmap, cdata->length / sizeof(double));
  654. printf("finished vecsum_local pass %d. sum = %g\n", pass, sum);
  655. }
  656. }
  657. static long long vecsum_length(const struct options *opts,
  658. const struct libhdfs_data *ldata)
  659. {
  660. if (opts->ty == VECSUM_LOCAL) {
  661. struct stat st_buf = { 0 };
  662. if (stat(opts->path, &st_buf)) {
  663. int err = errno;
  664. fprintf(stderr, "vecsum_length: stat(%s) failed: "
  665. "error %d (%s)\n", opts->path, err, strerror(err));
  666. return -EIO;
  667. }
  668. return st_buf.st_size;
  669. } else {
  670. return ldata->length;
  671. }
  672. }
  673. /*
  674. * vecsum is a microbenchmark which measures the speed of various ways of
  675. * reading from HDFS. It creates a file containing floating-point 'doubles',
  676. * and computes the sum of all the doubles several times. For some CPUs,
  677. * assembly optimizations are used for the summation (SSE, etc).
  678. */
  679. int main(void)
  680. {
  681. int ret = 1;
  682. struct options *opts = NULL;
  683. struct local_data *cdata = NULL;
  684. struct libhdfs_data *ldata = NULL;
  685. struct stopwatch *watch = NULL;
  686. if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") ||
  687. check_byte_size(ZCR_READ_CHUNK_SIZE,
  688. "ZCR_READ_CHUNK_SIZE") ||
  689. check_byte_size(NORMAL_READ_CHUNK_SIZE,
  690. "NORMAL_READ_CHUNK_SIZE")) {
  691. goto done;
  692. }
  693. opts = options_create();
  694. if (!opts)
  695. goto done;
  696. if (opts->ty == VECSUM_LOCAL) {
  697. cdata = local_data_create(opts);
  698. if (!cdata)
  699. goto done;
  700. } else {
  701. ldata = libhdfs_data_create(opts);
  702. if (!ldata)
  703. goto done;
  704. }
  705. watch = stopwatch_create();
  706. if (!watch)
  707. goto done;
  708. switch (opts->ty) {
  709. case VECSUM_LOCAL:
  710. vecsum_local(cdata, opts);
  711. ret = 0;
  712. break;
  713. case VECSUM_LIBHDFS:
  714. ret = vecsum_libhdfs(ldata, opts);
  715. break;
  716. case VECSUM_ZCR:
  717. ret = vecsum_zcr(ldata, opts);
  718. break;
  719. }
  720. if (ret) {
  721. fprintf(stderr, "vecsum failed with error %d\n", ret);
  722. goto done;
  723. }
  724. ret = 0;
  725. done:
  726. fprintf(stderr, "cleaning up...\n");
  727. if (watch && (ret == 0)) {
  728. long long length = vecsum_length(opts, ldata);
  729. if (length >= 0) {
  730. stopwatch_stop(watch, length * opts->passes);
  731. }
  732. }
  733. if (cdata)
  734. local_data_free(cdata);
  735. if (ldata)
  736. libhdfs_data_free(ldata);
  737. if (opts)
  738. options_free(opts);
  739. return ret;
  740. }
  741. // vim: ts=4:sw=4:tw=79:et