TestPartitionBucket.cc 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "lib/commons.h"
  19. #include "test_commons.h"
  20. #include "lib/PartitionBucket.h"
  21. #include "lib/PartitionBucketIterator.h"
  22. #include "lib/MemoryBlock.h"
  23. #include "lib/IFile.h"
  24. namespace NativeTask {
  25. class MockIFileWriter : public IFileWriter {
  26. private:
  27. char * _buff;
  28. uint32_t _position;
  29. uint32_t _capacity;
  30. public:
  31. MockIFileWriter(char * buff, uint32_t capacity)
  32. : IFileWriter(NULL, CHECKSUM_NONE, TextType, TextType, "", NULL), _buff(buff), _position(0),
  33. _capacity(capacity) {
  34. }
  35. virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
  36. KVBuffer * kv = (KVBuffer *)(_buff + _position);
  37. kv->keyLength = keyLen;
  38. kv->valueLength = valueLen;
  39. *((uint32_t *)kv->getKey()) = *((uint32_t *)key);
  40. *((uint32_t *)kv->getValue()) = *((uint32_t *)value);
  41. _position += kv->length();
  42. }
  43. char * buff() {
  44. return _buff;
  45. }
  46. };
  47. TEST(PartitionBucket, general) {
  48. MemoryPool * pool = new MemoryPool();
  49. const uint32_t POOL_SIZE = 1024 * 1024; // 1MB
  50. const uint32_t BLOCK_SIZE = 1024; // 1KB
  51. const uint32_t PARTITION_ID = 3;
  52. pool->init(POOL_SIZE);
  53. ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
  54. PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
  55. ASSERT_EQ(0, bucket->getKVCount());
  56. KVIterator * NULLPOINTER = 0;
  57. ASSERT_EQ(NULLPOINTER, bucket->getIterator());
  58. ASSERT_EQ(PARTITION_ID, bucket->getPartitionId());
  59. bucket->sort(DUALPIVOTSORT);
  60. bucket->spill(NULL);
  61. delete bucket;
  62. delete pool;
  63. }
  64. TEST(PartitionBucket, multipleMemoryBlock) {
  65. MemoryPool * pool = new MemoryPool();
  66. const uint32_t POOL_SIZE = 1024 * 1024; // 1MB
  67. const uint32_t BLOCK_SIZE = 1024; // 1KB
  68. const uint32_t PARTITION_ID = 3;
  69. pool->init(POOL_SIZE);
  70. ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
  71. PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
  72. const uint32_t KV_SIZE = 700;
  73. const uint32_t SMALL_KV_SIZE = 100;
  74. // To suppress valgrind error
  75. // the allocated buffer needs to be initialized before
  76. // create iterator on the PartitionBucker, because
  77. // those memory will be compared when create minheap
  78. KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
  79. memset(kv1, 0, KV_SIZE);
  80. KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
  81. memset(kv2, 0, SMALL_KV_SIZE);
  82. KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
  83. memset(kv3, 0, KV_SIZE);
  84. ASSERT_EQ(3, bucket->getKVCount());
  85. KVIterator * NULLPOINTER = 0;
  86. KVIterator * iter = bucket->getIterator();
  87. ASSERT_NE(NULLPOINTER, iter);
  88. delete iter;
  89. ASSERT_EQ(2, bucket->getMemoryBlockCount());
  90. bucket->reset();
  91. iter = bucket->getIterator();
  92. ASSERT_EQ(NULLPOINTER, iter);
  93. delete iter;
  94. ASSERT_EQ(0, bucket->getMemoryBlockCount());
  95. delete bucket;
  96. delete pool;
  97. }
  98. TEST(PartitionBucket, sort) {
  99. MemoryPool * pool = new MemoryPool();
  100. const uint32_t POOL_SIZE = 1024 * 1024; // 1MB
  101. const uint32_t BLOCK_SIZE = 1024; // 1KB
  102. const uint32_t PARTITION_ID = 3;
  103. pool->init(POOL_SIZE);
  104. ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
  105. PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
  106. const uint32_t KV_SIZE = 700;
  107. const uint32_t SMALL_KV_SIZE = 100;
  108. KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
  109. KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
  110. KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
  111. const uint32_t SMALL = 10;
  112. const uint32_t MEDIUM = 100;
  113. const uint32_t BIG = 1000;
  114. kv1->keyLength = 4;
  115. *((uint32_t *)kv1->getKey()) = bswap(BIG);
  116. kv1->valueLength = KV_SIZE - kv1->headerLength() - kv1->keyLength;
  117. kv2->keyLength = 4;
  118. *((uint32_t *)kv2->getKey()) = bswap(SMALL);
  119. kv2->valueLength = KV_SIZE - kv2->headerLength() - kv2->keyLength;
  120. kv3->keyLength = 4;
  121. *((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
  122. kv3->valueLength = KV_SIZE - kv3->headerLength() - kv3->keyLength;
  123. bucket->sort(DUALPIVOTSORT);
  124. KVIterator * iter = bucket->getIterator();
  125. Buffer key;
  126. Buffer value;
  127. iter->next(key, value);
  128. ASSERT_EQ(SMALL, bswap(*(uint32_t * )key.data()));
  129. iter->next(key, value);
  130. ASSERT_EQ(MEDIUM, bswap(*(uint32_t * )key.data()));
  131. iter->next(key, value);
  132. ASSERT_EQ(BIG, bswap(*(uint32_t * )key.data()));
  133. delete iter;
  134. delete bucket;
  135. delete pool;
  136. }
  137. TEST(PartitionBucket, spill) {
  138. MemoryPool * pool = new MemoryPool();
  139. const uint32_t POOL_SIZE = 1024 * 1024; // 1MB
  140. const uint32_t BLOCK_SIZE = 1024; // 1KB
  141. const uint32_t PARTITION_ID = 3;
  142. pool->init(POOL_SIZE);
  143. ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
  144. PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
  145. const uint32_t KV_SIZE = 700;
  146. const uint32_t SMALL_KV_SIZE = 100;
  147. KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
  148. KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
  149. KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
  150. const uint32_t SMALL = 10;
  151. const uint32_t MEDIUM = 100;
  152. const uint32_t BIG = 1000;
  153. kv1->keyLength = 4;
  154. *((uint32_t *)kv1->getKey()) = bswap(BIG);
  155. kv1->valueLength = KV_SIZE - KVBuffer::headerLength() - kv1->keyLength;
  156. kv2->keyLength = 4;
  157. *((uint32_t *)kv2->getKey()) = bswap(SMALL);
  158. kv2->valueLength = KV_SIZE - KVBuffer::headerLength() - kv2->keyLength;
  159. kv3->keyLength = 4;
  160. *((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
  161. kv3->valueLength = KV_SIZE - KVBuffer::headerLength() - kv3->keyLength;
  162. bucket->sort(DUALPIVOTSORT);
  163. uint32_t BUFF_SIZE = 1024 * 1024;
  164. char * buff = new char[BUFF_SIZE];
  165. MockIFileWriter writer(buff, BUFF_SIZE);
  166. bucket->spill(&writer);
  167. // check the result
  168. KVBuffer * first = (KVBuffer *)writer.buff();
  169. ASSERT_EQ(4, first->keyLength);
  170. ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, first->valueLength);
  171. ASSERT_EQ(bswap(SMALL), (*(uint32_t * )(first->getKey())));
  172. KVBuffer * second = first->next();
  173. ASSERT_EQ(4, second->keyLength);
  174. ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, second->valueLength);
  175. ASSERT_EQ(bswap(MEDIUM), (*(uint32_t * )(second->getKey())));
  176. KVBuffer * third = second->next();
  177. ASSERT_EQ(4, third->keyLength);
  178. ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, third->valueLength);
  179. ASSERT_EQ(bswap(BIG), (*(uint32_t * )(third->getKey())));
  180. delete [] buff;
  181. delete bucket;
  182. delete pool;
  183. }
  184. } // namespace NativeTask