/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "lib/commons.h" #include "config.h" #include "lib/BufferStream.h" #include "lib/FileSystem.h" #include "lib/IFile.h" #include "test_commons.h" SingleSpillInfo * writeIFile(int partition, vector > & kvs, const string & path, KeyValueType type, const string & codec) { FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path); IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL); for (int i = 0; i < partition; i++) { iw->startPartition(); for (size_t i = 0; i < kvs.size(); i++) { pair & p = kvs[i]; iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length()); } iw->endPartition(); } SingleSpillInfo * info = iw->getSpillInfo(); delete iw; delete fout; return info; } void readIFile(vector > & kvs, const string & path, KeyValueType type, SingleSpillInfo * info, const string & codec) { FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path); IFileReader * ir = new IFileReader(fin, info); while (ir->nextPartition()) { const char * key, *value; uint32_t keyLen, valueLen; while (NULL != (key = ir->nextKey(keyLen))) { value = ir->value(valueLen); string keyS(key, keyLen); string valueS(value, valueLen); kvs.push_back(std::make_pair(keyS, valueS)); } } delete ir; delete fin; } void TestIFileReadWrite(KeyValueType kvtype, int partition, int size, vector > & kvs, const string & codec = "") { string outputpath = "ifilewriter"; SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec); LOG("write finished"); vector > readkvs; readIFile(readkvs, outputpath, kvtype, info, codec); LOG("read finished"); delete info; ASSERT_EQ(kvs.size() * partition, readkvs.size()); for (int i = 0; i < partition; i++) { vector > cur_part(readkvs.begin() + i * kvs.size(), readkvs.begin() + (i + 1) * kvs.size()); ASSERT_EQ(kvs.size(), cur_part.size()); // for (size_t j=0;j > kvs; Generate(kvs, size, "bytes"); TestIFileReadWrite(TextType, partition, size, kvs); TestIFileReadWrite(BytesType, partition, size, kvs); TestIFileReadWrite(UnknownType, partition, size, kvs); #if defined HADOOP_SNAPPY_LIBRARY TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec"); #endif } void TestIFileWriteRead2(vector > & kvs, char * buff, size_t buffsize, const string & codec, ChecksumType checksumType, KeyValueType type) { int partition = TestConfig.getInt("ifile.partition", 50); Timer timer; OutputBuffer outputBuffer = OutputBuffer(buff, buffsize); IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL); timer.reset(); for (int i = 0; i < partition; i++) { iw->startPartition(); for (size_t j = 0; j < kvs.size(); j++) { iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(), kvs[j].second.length()); } iw->endPartition(); } SingleSpillInfo * info = iw->getSpillInfo(); LOG("%s", timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str()); delete iw; InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell()); IFileReader * ir = new IFileReader(&inputBuffer, info); timer.reset(); int sum = 0; while (ir->nextPartition()) { const char * key, *value; uint32_t keyLen, valueLen; while (NULL != (key = ir->nextKey(keyLen))) { value = ir->value(valueLen); sum += value[0]; } } // use the result so that value() calls don't get optimized out ASSERT_NE(0xdeadbeef, sum); LOG("%s", timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str()); delete ir; delete info; } TEST(Perf, IFile) { int size = TestConfig.getInt("partition.size", 20000); string codec = TestConfig.get("ifile.codec", ""); string type = TestConfig.get("ifile.type", "bytes"); vector > kvs; Generate(kvs, size, type); std::sort(kvs.begin(), kvs.end()); size_t buffsize = 200 * 1024 * 1024; char * buff = new char[buffsize]; memset(buff, 0, buffsize); LOG("Test TextType CRC32"); TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType); LOG("Test BytesType CRC32"); TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType); LOG("Test UnknownType CRC32"); TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType); LOG("Test TextType CRC32C"); TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType); LOG("Test BytesType CRC32C"); TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType); LOG("Test UnknownType CRC32C"); TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType); delete[] buff; } // The Glibc has a bug in the file tell api, it will overwrite the file data // unexpected. // Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html // This case is to check whether the bug exists. // If it exists, it means you need to upgrade the glibc. TEST(IFile, TestGlibCBug) { std::string path("./testData/testGlibCBugSpill.out"); int32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654}; LOG("TestGlibCBug %s", path.c_str()); IFileSegment * segments = new IFileSegment[1]; segments[0].realEndOffset = 10000000; SingleSpillInfo info(segments, 1, path, CHECKSUM_NONE, IntType, TextType, ""); InputStream * fileOut = FileSystem::getLocal().open(path); IFileReader * reader = new IFileReader(fileOut, &info, true); const char * key = NULL; uint32_t length = 0; reader->nextPartition(); uint32_t index = 0; while (NULL != (key = reader->nextKey(length))) { int32_t realKey = (int32_t)hadoop_be32toh(*(uint32_t *)(key)); ASSERT_LT(index, 5); ASSERT_EQ(expect[index], realKey); index++; } delete reader; }