1
0

TestIFile.cc 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <algorithm>
  19. #include "lib/commons.h"
  20. #include "config.h"
  21. #include "lib/BufferStream.h"
  22. #include "lib/FileSystem.h"
  23. #include "lib/IFile.h"
  24. #include "test_commons.h"
  25. SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
  26. const string & path, KeyValueType type, const string & codec) {
  27. FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path);
  28. IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL);
  29. for (int i = 0; i < partition; i++) {
  30. iw->startPartition();
  31. for (size_t i = 0; i < kvs.size(); i++) {
  32. pair<string, string> & p = kvs[i];
  33. iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length());
  34. }
  35. iw->endPartition();
  36. }
  37. SingleSpillInfo * info = iw->getSpillInfo();
  38. delete iw;
  39. delete fout;
  40. return info;
  41. }
  42. void readIFile(vector<pair<string, string> > & kvs, const string & path, KeyValueType type,
  43. SingleSpillInfo * info, const string & codec) {
  44. FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path);
  45. IFileReader * ir = new IFileReader(fin, info);
  46. while (ir->nextPartition()) {
  47. const char * key, *value;
  48. uint32_t keyLen, valueLen;
  49. while (NULL != (key = ir->nextKey(keyLen))) {
  50. value = ir->value(valueLen);
  51. string keyS(key, keyLen);
  52. string valueS(value, valueLen);
  53. kvs.push_back(std::make_pair(keyS, valueS));
  54. }
  55. }
  56. delete ir;
  57. delete fin;
  58. }
  59. void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
  60. vector<pair<string, string> > & kvs, const string & codec = "") {
  61. string outputpath = "ifilewriter";
  62. SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec);
  63. LOG("write finished");
  64. vector<pair<string, string> > readkvs;
  65. readIFile(readkvs, outputpath, kvtype, info, codec);
  66. LOG("read finished");
  67. delete info;
  68. ASSERT_EQ(kvs.size() * partition, readkvs.size());
  69. for (int i = 0; i < partition; i++) {
  70. vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
  71. readkvs.begin() + (i + 1) * kvs.size());
  72. ASSERT_EQ(kvs.size(), cur_part.size());
  73. // for (size_t j=0;j<kvs.size();j++) {
  74. // SCOPED_TRACE(j);
  75. // ASSERT_EQ(kvs[j], cur_part[j]);
  76. // }
  77. ASSERT_EQ(kvs, cur_part);
  78. }
  79. FileSystem::getLocal().remove(outputpath);
  80. }
  81. TEST(IFile, WriteRead) {
  82. int partition = TestConfig.getInt("ifile.partition", 7);
  83. int size = TestConfig.getInt("partition.size", 20000);
  84. vector<pair<string, string> > kvs;
  85. Generate(kvs, size, "bytes");
  86. TestIFileReadWrite(TextType, partition, size, kvs);
  87. TestIFileReadWrite(BytesType, partition, size, kvs);
  88. TestIFileReadWrite(UnknownType, partition, size, kvs);
  89. #if defined HADOOP_SNAPPY_LIBRARY
  90. TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
  91. #endif
  92. }
  93. void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,
  94. const string & codec, ChecksumType checksumType, KeyValueType type) {
  95. int partition = TestConfig.getInt("ifile.partition", 50);
  96. Timer timer;
  97. OutputBuffer outputBuffer = OutputBuffer(buff, buffsize);
  98. IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL);
  99. timer.reset();
  100. for (int i = 0; i < partition; i++) {
  101. iw->startPartition();
  102. for (size_t j = 0; j < kvs.size(); j++) {
  103. iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(),
  104. kvs[j].second.length());
  105. }
  106. iw->endPartition();
  107. }
  108. SingleSpillInfo * info = iw->getSpillInfo();
  109. LOG("%s",
  110. timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str());
  111. delete iw;
  112. InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell());
  113. IFileReader * ir = new IFileReader(&inputBuffer, info);
  114. timer.reset();
  115. int sum = 0;
  116. while (ir->nextPartition()) {
  117. const char * key, *value;
  118. uint32_t keyLen, valueLen;
  119. while (NULL != (key = ir->nextKey(keyLen))) {
  120. value = ir->value(valueLen);
  121. sum += value[0];
  122. }
  123. }
  124. // use the result so that value() calls don't get optimized out
  125. ASSERT_NE(0xdeadbeef, sum);
  126. LOG("%s",
  127. timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str());
  128. delete ir;
  129. delete info;
  130. }
  131. TEST(Perf, IFile) {
  132. int size = TestConfig.getInt("partition.size", 20000);
  133. string codec = TestConfig.get("ifile.codec", "");
  134. string type = TestConfig.get("ifile.type", "bytes");
  135. vector<pair<string, string> > kvs;
  136. Generate(kvs, size, type);
  137. std::sort(kvs.begin(), kvs.end());
  138. size_t buffsize = 200 * 1024 * 1024;
  139. char * buff = new char[buffsize];
  140. memset(buff, 0, buffsize);
  141. LOG("Test TextType CRC32");
  142. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType);
  143. LOG("Test BytesType CRC32");
  144. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType);
  145. LOG("Test UnknownType CRC32");
  146. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType);
  147. LOG("Test TextType CRC32C");
  148. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType);
  149. LOG("Test BytesType CRC32C");
  150. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType);
  151. LOG("Test UnknownType CRC32C");
  152. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType);
  153. delete[] buff;
  154. }
  155. // The Glibc has a bug in the file tell api, it will overwrite the file data
  156. // unexpected.
  157. // Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
  158. // This case is to check whether the bug exists.
  159. // If it exists, it means you need to upgrade the glibc.
  160. TEST(IFile, TestGlibCBug) {
  161. std::string path("./testData/testGlibCBugSpill.out");
  162. int32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654};
  163. LOG("TestGlibCBug %s", path.c_str());
  164. IFileSegment * segments = new IFileSegment[1];
  165. segments[0].realEndOffset = 10000000;
  166. SingleSpillInfo info(segments, 1, path, CHECKSUM_NONE,
  167. IntType, TextType, "");
  168. InputStream * fileOut = FileSystem::getLocal().open(path);
  169. IFileReader * reader = new IFileReader(fileOut, &info, true);
  170. const char * key = NULL;
  171. uint32_t length = 0;
  172. reader->nextPartition();
  173. uint32_t index = 0;
  174. while (NULL != (key = reader->nextKey(length))) {
  175. int32_t realKey = (int32_t)hadoop_be32toh(*(uint32_t *)(key));
  176. ASSERT_LT(index, 5);
  177. ASSERT_EQ(expect[index], realKey);
  178. index++;
  179. }
  180. delete reader;
  181. }