TestIFile.cc 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <algorithm>
  19. #include "commons.h"
  20. #include "BufferStream.h"
  21. #include "FileSystem.h"
  22. #include "IFile.h"
  23. #include "test_commons.h"
  24. SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
  25. const string & path, KeyValueType type, const string & codec) {
  26. FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path);
  27. IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL);
  28. for (int i = 0; i < partition; i++) {
  29. iw->startPartition();
  30. for (size_t i = 0; i < kvs.size(); i++) {
  31. pair<string, string> & p = kvs[i];
  32. iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length());
  33. }
  34. iw->endPartition();
  35. }
  36. SingleSpillInfo * info = iw->getSpillInfo();
  37. delete iw;
  38. delete fout;
  39. return info;
  40. }
  41. void readIFile(vector<pair<string, string> > & kvs, const string & path, KeyValueType type,
  42. SingleSpillInfo * info, const string & codec) {
  43. FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path);
  44. IFileReader * ir = new IFileReader(fin, info);
  45. while (ir->nextPartition()) {
  46. const char * key, *value;
  47. uint32_t keyLen, valueLen;
  48. while (NULL != (key = ir->nextKey(keyLen))) {
  49. value = ir->value(valueLen);
  50. string keyS(key, keyLen);
  51. string valueS(value, valueLen);
  52. kvs.push_back(std::make_pair(keyS, valueS));
  53. }
  54. }
  55. delete ir;
  56. delete fin;
  57. }
  58. void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
  59. vector<pair<string, string> > & kvs, const string & codec = "") {
  60. string outputpath = "ifilewriter";
  61. SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec);
  62. LOG("write finished");
  63. vector<pair<string, string> > readkvs;
  64. readIFile(readkvs, outputpath, kvtype, info, codec);
  65. LOG("read finished");
  66. delete info;
  67. ASSERT_EQ(kvs.size() * partition, readkvs.size());
  68. for (int i = 0; i < partition; i++) {
  69. vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
  70. readkvs.begin() + (i + 1) * kvs.size());
  71. ASSERT_EQ(kvs.size(), cur_part.size());
  72. // for (size_t j=0;j<kvs.size();j++) {
  73. // SCOPED_TRACE(j);
  74. // ASSERT_EQ(kvs[j], cur_part[j]);
  75. // }
  76. ASSERT_EQ(kvs, cur_part);
  77. }
  78. FileSystem::getLocal().remove(outputpath);
  79. }
  80. TEST(IFile, WriteRead) {
  81. int partition = TestConfig.getInt("ifile.partition", 7);
  82. int size = TestConfig.getInt("partition.size", 20000);
  83. vector<pair<string, string> > kvs;
  84. Generate(kvs, size, "bytes");
  85. TestIFileReadWrite(TextType, partition, size, kvs);
  86. TestIFileReadWrite(BytesType, partition, size, kvs);
  87. TestIFileReadWrite(UnknownType, partition, size, kvs);
  88. TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
  89. }
  90. void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,
  91. const string & codec, ChecksumType checksumType, KeyValueType type) {
  92. int partition = TestConfig.getInt("ifile.partition", 50);
  93. Timer timer;
  94. OutputBuffer outputBuffer = OutputBuffer(buff, buffsize);
  95. IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL);
  96. timer.reset();
  97. for (int i = 0; i < partition; i++) {
  98. iw->startPartition();
  99. for (size_t j = 0; j < kvs.size(); j++) {
  100. iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(),
  101. kvs[j].second.length());
  102. }
  103. iw->endPartition();
  104. }
  105. SingleSpillInfo * info = iw->getSpillInfo();
  106. LOG("%s",
  107. timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str());
  108. delete iw;
  109. InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell());
  110. IFileReader * ir = new IFileReader(&inputBuffer, info);
  111. timer.reset();
  112. while (ir->nextPartition()) {
  113. const char * key, *value;
  114. uint32_t keyLen, valueLen;
  115. while (NULL != (key = ir->nextKey(keyLen))) {
  116. value = ir->value(valueLen);
  117. }
  118. }
  119. LOG("%s",
  120. timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str());
  121. delete ir;
  122. delete info;
  123. }
  124. TEST(Perf, IFile) {
  125. int size = TestConfig.getInt("partition.size", 20000);
  126. string codec = TestConfig.get("ifile.codec", "");
  127. string type = TestConfig.get("ifile.type", "bytes");
  128. vector<pair<string, string> > kvs;
  129. Generate(kvs, size, type);
  130. std::sort(kvs.begin(), kvs.end());
  131. size_t buffsize = 200 * 1024 * 1024;
  132. char * buff = new char[buffsize];
  133. memset(buff, 0, buffsize);
  134. LOG("Test TextType CRC32");
  135. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType);
  136. LOG("Test BytesType CRC32");
  137. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType);
  138. LOG("Test UnknownType CRC32");
  139. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType);
  140. LOG("Test TextType CRC32C");
  141. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType);
  142. LOG("Test BytesType CRC32C");
  143. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType);
  144. LOG("Test UnknownType CRC32C");
  145. TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType);
  146. delete[] buff;
  147. }
  148. // The Glibc has a bug in the file tell api, it will overwrite the file data
  149. // unexpected.
  150. // Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
  151. // This case is to check whether the bug exists.
  152. // If it exists, it means you need to upgrade the glibc.
  153. TEST(IFile, TestGlibCBug) {
  154. std::string path("./testData/testGlibCBugSpill.out");
  155. int32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654};
  156. LOG("TestGlibCBug %s", path.c_str());
  157. IFileSegment * segments = new IFileSegment[1];
  158. segments[0].realEndOffset = 10000000;
  159. SingleSpillInfo info(segments, 1, path, CHECKSUM_NONE,
  160. IntType, TextType, "");
  161. InputStream * fileOut = FileSystem::getLocal().open(path);
  162. IFileReader * reader = new IFileReader(fileOut, &info, true);
  163. const char * key = NULL;
  164. uint32_t length = 0;
  165. reader->nextPartition();
  166. uint32_t index = 0;
  167. while(NULL != (key = reader->nextKey(length))) {
  168. int32_t realKey = (int32_t)bswap(*(uint32_t *)(key));
  169. ASSERT_LT(index, 5);
  170. ASSERT_EQ(expect[index], realKey);
  171. index++;
  172. }
  173. delete reader;
  174. }