|
@@ -0,0 +1,232 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.io;
|
|
|
+
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Random;
|
|
|
+
|
|
|
+import org.assertj.core.api.Assertions;
|
|
|
+import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.junit.runners.Parameterized;
|
|
|
+
|
|
|
+import org.apache.hadoop.test.HadoopTestBase;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
|
|
|
+ */
|
|
|
+@RunWith(Parameterized.class)
|
|
|
+public class TestWeakReferencedElasticByteBufferPool
|
|
|
+ extends HadoopTestBase {
|
|
|
+
|
|
|
+ private final boolean isDirect;
|
|
|
+
|
|
|
+ private final String type;
|
|
|
+
|
|
|
+ @Parameterized.Parameters(name = "Buffer type : {0}")
|
|
|
+ public static List<String> params() {
|
|
|
+ return Arrays.asList("direct", "array");
|
|
|
+ }
|
|
|
+
|
|
|
+ public TestWeakReferencedElasticByteBufferPool(String type) {
|
|
|
+ this.type = type;
|
|
|
+ this.isDirect = !"array".equals(type);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetAndPutBasic() {
|
|
|
+ WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
|
|
|
+ int bufferSize = 5;
|
|
|
+ ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
|
|
|
+ Assertions.assertThat(buffer.isDirect())
|
|
|
+ .describedAs("Buffered returned should be of correct type {}", type)
|
|
|
+ .isEqualTo(isDirect);
|
|
|
+ Assertions.assertThat(buffer.capacity())
|
|
|
+ .describedAs("Initial capacity of returned buffer from pool")
|
|
|
+ .isEqualTo(bufferSize);
|
|
|
+ Assertions.assertThat(buffer.position())
|
|
|
+ .describedAs("Initial position of returned buffer from pool")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ byte[] arr = createByteArray(bufferSize);
|
|
|
+ buffer.put(arr, 0, arr.length);
|
|
|
+ buffer.flip();
|
|
|
+ validateBufferContent(buffer, arr);
|
|
|
+ Assertions.assertThat(buffer.position())
|
|
|
+ .describedAs("Buffer's position after filling bytes in it")
|
|
|
+ .isEqualTo(bufferSize);
|
|
|
+ // releasing buffer to the pool.
|
|
|
+ pool.putBuffer(buffer);
|
|
|
+ Assertions.assertThat(buffer.position())
|
|
|
+ .describedAs("Position should be reset to 0 after returning buffer to the pool")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPoolingWithDifferentSizes() {
|
|
|
+ WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
|
|
|
+ ByteBuffer buffer = pool.getBuffer(isDirect, 5);
|
|
|
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
|
|
|
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
|
|
|
+
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ pool.putBuffer(buffer1);
|
|
|
+ pool.putBuffer(buffer2);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(2);
|
|
|
+ ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
|
|
|
+ Assertions.assertThat(buffer3.capacity())
|
|
|
+ .describedAs("Pooled buffer should have older capacity")
|
|
|
+ .isEqualTo(15);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(1);
|
|
|
+ pool.putBuffer(buffer);
|
|
|
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
|
|
|
+ Assertions.assertThat(buffer4.capacity())
|
|
|
+ .describedAs("Pooled buffer should have older capacity")
|
|
|
+ .isEqualTo(10);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(1);
|
|
|
+
|
|
|
+ pool.release();
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool post release")
|
|
|
+ .isEqualTo(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPoolingWithDifferentInsertionTime() {
|
|
|
+ WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
|
|
|
+ ByteBuffer buffer = pool.getBuffer(isDirect, 10);
|
|
|
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
|
|
|
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
|
|
|
+
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(0);
|
|
|
+
|
|
|
+ pool.putBuffer(buffer1);
|
|
|
+ pool.putBuffer(buffer2);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(2);
|
|
|
+ ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
|
|
|
+ // As buffer1 is returned to the pool before buffer2, it should
|
|
|
+ // be returned when buffer of same size is asked again from
|
|
|
+ // the pool. Memory references must match not just content
|
|
|
+ // that is why {@code Assertions.isSameAs} is used here rather
|
|
|
+ // than usual {@code Assertions.isEqualTo}.
|
|
|
+ Assertions.assertThat(buffer3)
|
|
|
+ .describedAs("Buffers should be returned in order of their " +
|
|
|
+ "insertion time")
|
|
|
+ .isSameAs(buffer1);
|
|
|
+ pool.putBuffer(buffer);
|
|
|
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
|
|
|
+ Assertions.assertThat(buffer4)
|
|
|
+ .describedAs("Buffers should be returned in order of their " +
|
|
|
+ "insertion time")
|
|
|
+ .isSameAs(buffer2);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGarbageCollection() {
|
|
|
+ WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
|
|
|
+ ByteBuffer buffer = pool.getBuffer(isDirect, 5);
|
|
|
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
|
|
|
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(0);
|
|
|
+ pool.putBuffer(buffer1);
|
|
|
+ pool.putBuffer(buffer2);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(2);
|
|
|
+ // Before GC.
|
|
|
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
|
|
|
+ Assertions.assertThat(buffer4.capacity())
|
|
|
+ .describedAs("Pooled buffer should have older capacity")
|
|
|
+ .isEqualTo(15);
|
|
|
+ pool.putBuffer(buffer4);
|
|
|
+ // Removing the references
|
|
|
+ buffer1 = null;
|
|
|
+ buffer2 = null;
|
|
|
+ buffer4 = null;
|
|
|
+ System.gc();
|
|
|
+ ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
|
|
|
+ Assertions.assertThat(buffer3.capacity())
|
|
|
+ .describedAs("After garbage collection new buffer should be " +
|
|
|
+ "returned with fixed capacity")
|
|
|
+ .isEqualTo(12);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testWeakReferencesPruning() {
|
|
|
+ WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
|
|
|
+ ByteBuffer buffer1 = pool.getBuffer(isDirect, 5);
|
|
|
+ ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
|
|
|
+ ByteBuffer buffer3 = pool.getBuffer(isDirect, 15);
|
|
|
+
|
|
|
+ pool.putBuffer(buffer2);
|
|
|
+ pool.putBuffer(buffer3);
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(2);
|
|
|
+
|
|
|
+ // marking only buffer2 to be garbage collected.
|
|
|
+ buffer2 = null;
|
|
|
+ System.gc();
|
|
|
+ ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
|
|
|
+ // Number of buffers in the pool is 0 as one got garbage
|
|
|
+ // collected and other got returned in above call.
|
|
|
+ Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
|
|
|
+ .describedAs("Number of buffers in the pool")
|
|
|
+ .isEqualTo(0);
|
|
|
+ Assertions.assertThat(buffer4.capacity())
|
|
|
+ .describedAs("After gc, pool should return next greater than " +
|
|
|
+ "available buffer")
|
|
|
+ .isEqualTo(15);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void validateBufferContent(ByteBuffer buffer, byte[] arr) {
|
|
|
+ for (int i=0; i<arr.length; i++) {
|
|
|
+ Assertions.assertThat(buffer.get())
|
|
|
+ .describedAs("Content of buffer at index {} should match " +
|
|
|
+ "with content of byte array", i)
|
|
|
+ .isEqualTo(arr[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private byte[] createByteArray(int length) {
|
|
|
+ byte[] arr = new byte[length];
|
|
|
+ Random r = new Random();
|
|
|
+ r.nextBytes(arr);
|
|
|
+ return arr;
|
|
|
+ }
|
|
|
+}
|