TestCompressions.cc 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "lz4.h"
  19. #include "config.h"
  20. #include "lib/commons.h"
  21. #include "lib/Path.h"
  22. #include "lib/BufferStream.h"
  23. #include "lib/FileSystem.h"
  24. #include "lib/Compressions.h"
  25. #include "test_commons.h"
  26. #if defined HADOOP_SNAPPY_LIBRARY
  27. #include <snappy.h>
  28. #endif
  29. void TestCodec(const string & codec) {
  30. string data;
  31. size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024);
  32. uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
  33. string type = TestConfig.get("compression.input.type", "bytes");
  34. Timer timer;
  35. GenerateKVTextLength(data, length, type);
  36. LOG("%s", timer.getInterval("Generate data").c_str());
  37. InputBuffer inputBuffer = InputBuffer(data);
  38. size_t buffLen = data.length() / 2 * 3;
  39. timer.reset();
  40. char * buff = new char[buffLen];
  41. char * buff2 = new char[buffLen];
  42. memset(buff, 0, buffLen);
  43. memset(buff2, 0, buffLen);
  44. LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str());
  45. OutputBuffer outputBuffer = OutputBuffer(buff, buffLen);
  46. CompressStream * compressor = Compressions::getCompressionStream(codec, &outputBuffer, buffhint);
  47. LOG("%s", codec.c_str());
  48. timer.reset();
  49. for (size_t i = 0; i < data.length(); i += 128 * 1024) {
  50. compressor->write(data.c_str() + i, std::min(data.length() - i, (size_t)(128 * 1024)));
  51. }
  52. compressor->flush();
  53. LOG("%s",
  54. timer.getSpeedM2("compress origin/compressed", data.length(), outputBuffer.tell()).c_str());
  55. InputBuffer decompInputBuffer = InputBuffer(buff, outputBuffer.tell());
  56. DecompressStream * decompressor = Compressions::getDecompressionStream(codec, &decompInputBuffer,
  57. buffhint);
  58. size_t total = 0;
  59. timer.reset();
  60. while (true) {
  61. int32_t rd = decompressor->read(buff2 + total, buffLen - total);
  62. if (rd <= 0) {
  63. break;
  64. }
  65. total += rd;
  66. }
  67. LOG("%s", timer.getSpeedM2("decompress origin/uncompressed", outputBuffer.tell(), total).c_str());
  68. LOG("ratio: %.3lf", outputBuffer.tell() / (double )total);
  69. ASSERT_EQ(data.length(), total);
  70. ASSERT_EQ(0, memcmp(data.c_str(), buff2, total));
  71. delete[] buff;
  72. delete[] buff2;
  73. delete compressor;
  74. delete decompressor;
  75. }
  76. TEST(Perf, CompressionUtil) {
  77. string inputfile = TestConfig.get("input", "");
  78. string outputfile = TestConfig.get("output", "");
  79. uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
  80. string inputcodec = Compressions::getCodecByFile(inputfile);
  81. string outputcodec = Compressions::getCodecByFile(outputfile);
  82. size_t bufferSize = buffhint;
  83. if (inputcodec.length() > 0 && outputcodec.length() == 0) {
  84. // decompression
  85. InputStream * fin = FileSystem::getLocal().open(inputfile);
  86. if (fin == NULL) {
  87. THROW_EXCEPTION(IOException, "input file not found");
  88. }
  89. DecompressStream * source = Compressions::getDecompressionStream(inputcodec, fin, bufferSize);
  90. OutputStream * fout = FileSystem::getLocal().create(outputfile, true);
  91. char * buffer = new char[bufferSize];
  92. while (true) {
  93. int rd = source->read(buffer, bufferSize);
  94. if (rd <= 0) {
  95. break;
  96. }
  97. fout->write(buffer, rd);
  98. }
  99. source->close();
  100. delete source;
  101. fin->close();
  102. delete fin;
  103. fout->flush();
  104. fout->close();
  105. delete fout;
  106. delete buffer;
  107. } else if (inputcodec.length() == 0 && outputcodec.length() > 0) {
  108. // compression
  109. InputStream * fin = FileSystem::getLocal().open(inputfile);
  110. if (fin == NULL) {
  111. THROW_EXCEPTION(IOException, "input file not found");
  112. }
  113. OutputStream * fout = FileSystem::getLocal().create(outputfile, true);
  114. CompressStream * dest = Compressions::getCompressionStream(outputcodec, fout, bufferSize);
  115. char * buffer = new char[bufferSize];
  116. while (true) {
  117. int rd = fin->read(buffer, bufferSize);
  118. if (rd <= 0) {
  119. break;
  120. }
  121. dest->write(buffer, rd);
  122. }
  123. dest->flush();
  124. dest->close();
  125. delete dest;
  126. fout->close();
  127. delete fout;
  128. fin->close();
  129. delete fin;
  130. delete buffer;
  131. } else {
  132. LOG("Not compression or decompression, do nothing");
  133. }
  134. }
  135. class CompressResult {
  136. public:
  137. uint64_t uncompressedSize;
  138. uint64_t compressedSize;
  139. uint64_t compressTime;
  140. uint64_t uncompressTime;
  141. CompressResult()
  142. : uncompressedSize(0), compressedSize(0), compressTime(0), uncompressTime(0) {
  143. }
  144. CompressResult & operator+=(const CompressResult & rhs) {
  145. uncompressedSize += rhs.uncompressedSize;
  146. compressedSize += rhs.compressedSize;
  147. compressTime += rhs.compressTime;
  148. uncompressTime += rhs.uncompressTime;
  149. return *this;
  150. }
  151. string toString() {
  152. return StringUtil::Format("Compress: %4.0fM/s Decompress: %5.0fM/s(%5.0fM/s) ratio: %.1f%%",
  153. (uncompressedSize / 1024.0 / 1024) / (compressTime / 1000000000.),
  154. (compressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.),
  155. (uncompressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.),
  156. compressedSize / (float)uncompressedSize * 100);
  157. }
  158. };
  159. TEST(Perf, GzipCodec) {
  160. TestCodec("org.apache.hadoop.io.compress.GzipCodec");
  161. }
  162. void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t blockSize,
  163. int times) {
  164. string data;
  165. ReadFile(data, path);
  166. size_t maxlength = std::max((size_t)(blockSize * 1.005), blockSize + 8);
  167. char * outputBuffer = new char[maxlength];
  168. char * dest = new char[blockSize + 8];
  169. CompressResult result;
  170. Timer t;
  171. for (size_t start = 0; start < data.length(); start += blockSize) {
  172. size_t currentblocksize = std::min(data.length() - start, blockSize);
  173. uint64_t startTime = t.now();
  174. for (int i = 0; i < times; i++) {
  175. int osize = LZ4_compress((char*)data.data() + start, outputBuffer, currentblocksize);
  176. result.compressedSize += osize;
  177. result.uncompressedSize += currentblocksize;
  178. }
  179. uint64_t endTime = t.now();
  180. result.compressTime += endTime - startTime;
  181. startTime = t.now();
  182. for (int i = 0; i < times; i++) {
  183. int osize = LZ4_decompress_fast(outputBuffer, dest, currentblocksize);
  184. ASSERT_EQ(currentblocksize, osize);
  185. }
  186. endTime = t.now();
  187. result.uncompressTime += endTime - startTime;
  188. }
  189. printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str());
  190. delete[] outputBuffer;
  191. delete[] dest;
  192. total += result;
  193. }
  194. TEST(Perf, RawCompressionLz4) {
  195. string inputdir = TestConfig.get("compressions.input.path", "");
  196. int64_t times = TestConfig.getInt("compression.time", 400);
  197. int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64);
  198. vector<FileEntry> inputfiles;
  199. FileSystem::getLocal().list(inputdir, inputfiles);
  200. CompressResult total;
  201. printf("Block size: %lldK\n", (long long int)(blockSize / 1024));
  202. for (size_t i = 0; i < inputfiles.size(); i++) {
  203. if (!inputfiles[i].isDirectory) {
  204. MeasureSingleFileLz4((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize, times);
  205. }
  206. }
  207. printf("%s - Total\n", total.toString().c_str());
  208. }
  209. TEST(Perf, Lz4Codec) {
  210. TestCodec("org.apache.hadoop.io.compress.Lz4Codec");
  211. }
  212. #if defined HADOOP_SNAPPY_LIBRARY
  213. void MeasureSingleFileSnappy(const string & path, CompressResult & total, size_t blockSize,
  214. int times) {
  215. string data;
  216. ReadFile(data, path);
  217. size_t maxlength = snappy::MaxCompressedLength(blockSize);
  218. char * outputBuffer = new char[maxlength];
  219. char * dest = new char[blockSize];
  220. CompressResult result;
  221. Timer t;
  222. int compressedSize = -1;
  223. for (size_t start = 0; start < data.length(); start += blockSize) {
  224. size_t currentblocksize = std::min(data.length() - start, blockSize);
  225. uint64_t startTime = t.now();
  226. for (int i = 0; i < times; i++) {
  227. size_t osize = maxlength;
  228. snappy::RawCompress(data.data() + start, currentblocksize, outputBuffer, &osize);
  229. compressedSize = osize;
  230. result.compressedSize += osize;
  231. result.uncompressedSize += currentblocksize;
  232. }
  233. uint64_t endTime = t.now();
  234. result.compressTime += endTime - startTime;
  235. startTime = t.now();
  236. for (int i = 0; i < times; i++) {
  237. snappy::RawUncompress(outputBuffer, compressedSize, dest);
  238. }
  239. endTime = t.now();
  240. result.uncompressTime += endTime - startTime;
  241. }
  242. printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str());
  243. delete[] outputBuffer;
  244. delete[] dest;
  245. total += result;
  246. }
  247. TEST(Perf, RawCompressionSnappy) {
  248. string inputdir = TestConfig.get("compressions.input.path", "");
  249. int64_t times = TestConfig.getInt("compression.time", 400);
  250. int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64);
  251. vector<FileEntry> inputfiles;
  252. FileSystem::getLocal().list(inputdir, inputfiles);
  253. CompressResult total;
  254. printf("Block size: %"PRId64"K\n", blockSize / 1024);
  255. for (size_t i = 0; i < inputfiles.size(); i++) {
  256. if (!inputfiles[i].isDirectory) {
  257. MeasureSingleFileSnappy((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize,
  258. times);
  259. }
  260. }
  261. printf("%s - Total\n", total.toString().c_str());
  262. }
  263. TEST(Perf, SnappyCodec) {
  264. TestCodec("org.apache.hadoop.io.compress.SnappyCodec");
  265. }
  266. #endif // define HADOOP_SNAPPY_LIBRARY