123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "lz4.h"
- #include "config.h"
- #include "lib/commons.h"
- #include "lib/Path.h"
- #include "lib/BufferStream.h"
- #include "lib/FileSystem.h"
- #include "lib/Compressions.h"
- #include "test_commons.h"
- #if defined HADOOP_SNAPPY_LIBRARY
- #include <snappy.h>
- #endif
- void TestCodec(const string & codec) {
- string data;
- size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024);
- uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
- string type = TestConfig.get("compression.input.type", "bytes");
- Timer timer;
- GenerateKVTextLength(data, length, type);
- LOG("%s", timer.getInterval("Generate data").c_str());
- InputBuffer inputBuffer = InputBuffer(data);
- size_t buffLen = data.length() / 2 * 3;
- timer.reset();
- char * buff = new char[buffLen];
- char * buff2 = new char[buffLen];
- memset(buff, 0, buffLen);
- memset(buff2, 0, buffLen);
- LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str());
- OutputBuffer outputBuffer = OutputBuffer(buff, buffLen);
- CompressStream * compressor = Compressions::getCompressionStream(codec, &outputBuffer, buffhint);
- LOG("%s", codec.c_str());
- timer.reset();
- for (size_t i = 0; i < data.length(); i += 128 * 1024) {
- compressor->write(data.c_str() + i, std::min(data.length() - i, (size_t)(128 * 1024)));
- }
- compressor->flush();
- LOG("%s",
- timer.getSpeedM2("compress origin/compressed", data.length(), outputBuffer.tell()).c_str());
- InputBuffer decompInputBuffer = InputBuffer(buff, outputBuffer.tell());
- DecompressStream * decompressor = Compressions::getDecompressionStream(codec, &decompInputBuffer,
- buffhint);
- size_t total = 0;
- timer.reset();
- while (true) {
- int32_t rd = decompressor->read(buff2 + total, buffLen - total);
- if (rd <= 0) {
- break;
- }
- total += rd;
- }
- LOG("%s", timer.getSpeedM2("decompress origin/uncompressed", outputBuffer.tell(), total).c_str());
- LOG("ratio: %.3lf", outputBuffer.tell() / (double )total);
- ASSERT_EQ(data.length(), total);
- ASSERT_EQ(0, memcmp(data.c_str(), buff2, total));
- delete[] buff;
- delete[] buff2;
- delete compressor;
- delete decompressor;
- }
- TEST(Perf, CompressionUtil) {
- string inputfile = TestConfig.get("input", "");
- string outputfile = TestConfig.get("output", "");
- uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
- string inputcodec = Compressions::getCodecByFile(inputfile);
- string outputcodec = Compressions::getCodecByFile(outputfile);
- size_t bufferSize = buffhint;
- if (inputcodec.length() > 0 && outputcodec.length() == 0) {
- // decompression
- InputStream * fin = FileSystem::getLocal().open(inputfile);
- if (fin == NULL) {
- THROW_EXCEPTION(IOException, "input file not found");
- }
- DecompressStream * source = Compressions::getDecompressionStream(inputcodec, fin, bufferSize);
- OutputStream * fout = FileSystem::getLocal().create(outputfile, true);
- char * buffer = new char[bufferSize];
- while (true) {
- int rd = source->read(buffer, bufferSize);
- if (rd <= 0) {
- break;
- }
- fout->write(buffer, rd);
- }
- source->close();
- delete source;
- fin->close();
- delete fin;
- fout->flush();
- fout->close();
- delete fout;
- delete buffer;
- } else if (inputcodec.length() == 0 && outputcodec.length() > 0) {
- // compression
- InputStream * fin = FileSystem::getLocal().open(inputfile);
- if (fin == NULL) {
- THROW_EXCEPTION(IOException, "input file not found");
- }
- OutputStream * fout = FileSystem::getLocal().create(outputfile, true);
- CompressStream * dest = Compressions::getCompressionStream(outputcodec, fout, bufferSize);
- char * buffer = new char[bufferSize];
- while (true) {
- int rd = fin->read(buffer, bufferSize);
- if (rd <= 0) {
- break;
- }
- dest->write(buffer, rd);
- }
- dest->flush();
- dest->close();
- delete dest;
- fout->close();
- delete fout;
- fin->close();
- delete fin;
- delete buffer;
- } else {
- LOG("Not compression or decompression, do nothing");
- }
- }
- class CompressResult {
- public:
- uint64_t uncompressedSize;
- uint64_t compressedSize;
- uint64_t compressTime;
- uint64_t uncompressTime;
- CompressResult()
- : uncompressedSize(0), compressedSize(0), compressTime(0), uncompressTime(0) {
- }
- CompressResult & operator+=(const CompressResult & rhs) {
- uncompressedSize += rhs.uncompressedSize;
- compressedSize += rhs.compressedSize;
- compressTime += rhs.compressTime;
- uncompressTime += rhs.uncompressTime;
- return *this;
- }
- string toString() {
- return StringUtil::Format("Compress: %4.0fM/s Decompress: %5.0fM/s(%5.0fM/s) ratio: %.1f%%",
- (uncompressedSize / 1024.0 / 1024) / (compressTime / 1000000000.),
- (compressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.),
- (uncompressedSize / 1024.0 / 1024) / (uncompressTime / 1000000000.),
- compressedSize / (float)uncompressedSize * 100);
- }
- };
- TEST(Perf, GzipCodec) {
- TestCodec("org.apache.hadoop.io.compress.GzipCodec");
- }
- void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t blockSize,
- int times) {
- string data;
- ReadFile(data, path);
- size_t maxlength = std::max((size_t)(blockSize * 1.005), blockSize + 8);
- char * outputBuffer = new char[maxlength];
- char * dest = new char[blockSize + 8];
- CompressResult result;
- Timer t;
- for (size_t start = 0; start < data.length(); start += blockSize) {
- size_t currentblocksize = std::min(data.length() - start, blockSize);
- uint64_t startTime = t.now();
- for (int i = 0; i < times; i++) {
- int osize = LZ4_compress((char*)data.data() + start, outputBuffer, currentblocksize);
- result.compressedSize += osize;
- result.uncompressedSize += currentblocksize;
- }
- uint64_t endTime = t.now();
- result.compressTime += endTime - startTime;
- startTime = t.now();
- for (int i = 0; i < times; i++) {
- int osize = LZ4_decompress_fast(outputBuffer, dest, currentblocksize);
- ASSERT_EQ(currentblocksize, osize);
- }
- endTime = t.now();
- result.uncompressTime += endTime - startTime;
- }
- printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str());
- delete[] outputBuffer;
- delete[] dest;
- total += result;
- }
- TEST(Perf, RawCompressionLz4) {
- string inputdir = TestConfig.get("compressions.input.path", "");
- int64_t times = TestConfig.getInt("compression.time", 400);
- int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64);
- vector<FileEntry> inputfiles;
- FileSystem::getLocal().list(inputdir, inputfiles);
- CompressResult total;
- printf("Block size: %lldK\n", (long long int)(blockSize / 1024));
- for (size_t i = 0; i < inputfiles.size(); i++) {
- if (!inputfiles[i].isDirectory) {
- MeasureSingleFileLz4((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize, times);
- }
- }
- printf("%s - Total\n", total.toString().c_str());
- }
- TEST(Perf, Lz4Codec) {
- TestCodec("org.apache.hadoop.io.compress.Lz4Codec");
- }
- #if defined HADOOP_SNAPPY_LIBRARY
- void MeasureSingleFileSnappy(const string & path, CompressResult & total, size_t blockSize,
- int times) {
- string data;
- ReadFile(data, path);
- size_t maxlength = snappy::MaxCompressedLength(blockSize);
- char * outputBuffer = new char[maxlength];
- char * dest = new char[blockSize];
- CompressResult result;
- Timer t;
- int compressedSize = -1;
- for (size_t start = 0; start < data.length(); start += blockSize) {
- size_t currentblocksize = std::min(data.length() - start, blockSize);
- uint64_t startTime = t.now();
- for (int i = 0; i < times; i++) {
- size_t osize = maxlength;
- snappy::RawCompress(data.data() + start, currentblocksize, outputBuffer, &osize);
- compressedSize = osize;
- result.compressedSize += osize;
- result.uncompressedSize += currentblocksize;
- }
- uint64_t endTime = t.now();
- result.compressTime += endTime - startTime;
- startTime = t.now();
- for (int i = 0; i < times; i++) {
- snappy::RawUncompress(outputBuffer, compressedSize, dest);
- }
- endTime = t.now();
- result.uncompressTime += endTime - startTime;
- }
- printf("%s - %s\n", result.toString().c_str(), Path::GetName(path).c_str());
- delete[] outputBuffer;
- delete[] dest;
- total += result;
- }
- TEST(Perf, RawCompressionSnappy) {
- string inputdir = TestConfig.get("compressions.input.path", "");
- int64_t times = TestConfig.getInt("compression.time", 400);
- int64_t blockSize = TestConfig.getInt("compression.block.size", 1024 * 64);
- vector<FileEntry> inputfiles;
- FileSystem::getLocal().list(inputdir, inputfiles);
- CompressResult total;
- printf("Block size: %"PRId64"K\n", blockSize / 1024);
- for (size_t i = 0; i < inputfiles.size(); i++) {
- if (!inputfiles[i].isDirectory) {
- MeasureSingleFileSnappy((inputdir + "/" + inputfiles[i].name).c_str(), total, blockSize,
- times);
- }
- }
- printf("%s - Total\n", total.toString().c_str());
- }
- TEST(Perf, SnappyCodec) {
- TestCodec("org.apache.hadoop.io.compress.SnappyCodec");
- }
- #endif // define HADOOP_SNAPPY_LIBRARY
|