123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <algorithm>
- #include "lib/commons.h"
- #include "config.h"
- #include "lib/BufferStream.h"
- #include "lib/FileSystem.h"
- #include "lib/IFile.h"
- #include "test_commons.h"
- SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
- const string & path, KeyValueType type, const string & codec) {
- FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path);
- IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL);
- for (int i = 0; i < partition; i++) {
- iw->startPartition();
- for (size_t i = 0; i < kvs.size(); i++) {
- pair<string, string> & p = kvs[i];
- iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length());
- }
- iw->endPartition();
- }
- SingleSpillInfo * info = iw->getSpillInfo();
- delete iw;
- delete fout;
- return info;
- }
- void readIFile(vector<pair<string, string> > & kvs, const string & path, KeyValueType type,
- SingleSpillInfo * info, const string & codec) {
- FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path);
- IFileReader * ir = new IFileReader(fin, info);
- while (ir->nextPartition()) {
- const char * key, *value;
- uint32_t keyLen, valueLen;
- while (NULL != (key = ir->nextKey(keyLen))) {
- value = ir->value(valueLen);
- string keyS(key, keyLen);
- string valueS(value, valueLen);
- kvs.push_back(std::make_pair(keyS, valueS));
- }
- }
- delete ir;
- delete fin;
- }
- void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
- vector<pair<string, string> > & kvs, const string & codec = "") {
- string outputpath = "ifilewriter";
- SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec);
- LOG("write finished");
- vector<pair<string, string> > readkvs;
- readIFile(readkvs, outputpath, kvtype, info, codec);
- LOG("read finished");
- delete info;
- ASSERT_EQ(kvs.size() * partition, readkvs.size());
- for (int i = 0; i < partition; i++) {
- vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
- readkvs.begin() + (i + 1) * kvs.size());
- ASSERT_EQ(kvs.size(), cur_part.size());
- // for (size_t j=0;j<kvs.size();j++) {
- // SCOPED_TRACE(j);
- // ASSERT_EQ(kvs[j], cur_part[j]);
- // }
- ASSERT_EQ(kvs, cur_part);
- }
- FileSystem::getLocal().remove(outputpath);
- }
- TEST(IFile, WriteRead) {
- int partition = TestConfig.getInt("ifile.partition", 7);
- int size = TestConfig.getInt("partition.size", 20000);
- vector<pair<string, string> > kvs;
- Generate(kvs, size, "bytes");
- TestIFileReadWrite(TextType, partition, size, kvs);
- TestIFileReadWrite(BytesType, partition, size, kvs);
- TestIFileReadWrite(UnknownType, partition, size, kvs);
- #if defined HADOOP_SNAPPY_LIBRARY
- TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
- #endif
- }
- void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,
- const string & codec, ChecksumType checksumType, KeyValueType type) {
- int partition = TestConfig.getInt("ifile.partition", 50);
- Timer timer;
- OutputBuffer outputBuffer = OutputBuffer(buff, buffsize);
- IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL);
- timer.reset();
- for (int i = 0; i < partition; i++) {
- iw->startPartition();
- for (size_t j = 0; j < kvs.size(); j++) {
- iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(),
- kvs[j].second.length());
- }
- iw->endPartition();
- }
- SingleSpillInfo * info = iw->getSpillInfo();
- LOG("%s",
- timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str());
- delete iw;
- InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell());
- IFileReader * ir = new IFileReader(&inputBuffer, info);
- timer.reset();
- int sum = 0;
- while (ir->nextPartition()) {
- const char * key, *value;
- uint32_t keyLen, valueLen;
- while (NULL != (key = ir->nextKey(keyLen))) {
- value = ir->value(valueLen);
- sum += value[0];
- }
- }
- // use the result so that value() calls don't get optimized out
- ASSERT_NE(0xdeadbeef, sum);
- LOG("%s",
- timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str());
- delete ir;
- delete info;
- }
- TEST(Perf, IFile) {
- int size = TestConfig.getInt("partition.size", 20000);
- string codec = TestConfig.get("ifile.codec", "");
- string type = TestConfig.get("ifile.type", "bytes");
- vector<pair<string, string> > kvs;
- Generate(kvs, size, type);
- std::sort(kvs.begin(), kvs.end());
- size_t buffsize = 200 * 1024 * 1024;
- char * buff = new char[buffsize];
- memset(buff, 0, buffsize);
- LOG("Test TextType CRC32");
- TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType);
- LOG("Test BytesType CRC32");
- TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType);
- LOG("Test UnknownType CRC32");
- TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType);
- LOG("Test TextType CRC32C");
- TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType);
- LOG("Test BytesType CRC32C");
- TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType);
- LOG("Test UnknownType CRC32C");
- TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType);
- delete[] buff;
- }
- // The Glibc has a bug in the file tell api, it will overwrite the file data
- // unexpected.
- // Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
- // This case is to check whether the bug exists.
- // If it exists, it means you need to upgrade the glibc.
- TEST(IFile, TestGlibCBug) {
- std::string path("./testData/testGlibCBugSpill.out");
- int32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654};
- LOG("TestGlibCBug %s", path.c_str());
- IFileSegment * segments = new IFileSegment[1];
- segments[0].realEndOffset = 10000000;
- SingleSpillInfo info(segments, 1, path, CHECKSUM_NONE,
- IntType, TextType, "");
- InputStream * fileOut = FileSystem::getLocal().open(path);
- IFileReader * reader = new IFileReader(fileOut, &info, true);
- const char * key = NULL;
- uint32_t length = 0;
- reader->nextPartition();
- uint32_t index = 0;
- while (NULL != (key = reader->nextKey(length))) {
- int32_t realKey = (int32_t)hadoop_be32toh(*(uint32_t *)(key));
- ASSERT_LT(index, 5);
- ASSERT_EQ(expect[index], realKey);
- index++;
- }
- delete reader;
- }
|