|
@@ -0,0 +1,635 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.util;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
|
|
|
+import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
|
|
|
+import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
|
|
|
+import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test {@link ByteArrayManager}.
|
|
|
+ */
|
|
|
+public class TestByteArrayManager {
|
|
|
+ static {
|
|
|
+ ((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
|
|
|
+ ).getLogger().setLevel(Level.ALL);
|
|
|
+ }
|
|
|
+
|
|
|
+ static final Log LOG = LogFactory.getLog(TestByteArrayManager.class);
|
|
|
+
|
|
|
+ private static final Comparator<Future<Integer>> CMP = new Comparator<Future<Integer>>() {
|
|
|
+ @Override
|
|
|
+ public int compare(Future<Integer> left, Future<Integer> right) {
|
|
|
+ try {
|
|
|
+ return left.get().intValue() - right.get().intValue();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCounter() throws Exception {
|
|
|
+ final long countResetTimePeriodMs = 200L;
|
|
|
+ final Counter c = new Counter(countResetTimePeriodMs);
|
|
|
+
|
|
|
+ final int n = DFSUtil.getRandom().nextInt(512) + 512;
|
|
|
+ final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
|
|
|
+
|
|
|
+ final ExecutorService pool = Executors.newFixedThreadPool(32);
|
|
|
+ try {
|
|
|
+ // increment
|
|
|
+ for(int i = 0; i < n; i++) {
|
|
|
+ futures.add(pool.submit(new Callable<Integer>() {
|
|
|
+ @Override
|
|
|
+ public Integer call() throws Exception {
|
|
|
+ return (int)c.increment();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ // sort and wait for the futures
|
|
|
+ Collections.sort(futures, CMP);
|
|
|
+ } finally {
|
|
|
+ pool.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ // check futures
|
|
|
+ Assert.assertEquals(n, futures.size());
|
|
|
+ for(int i = 0; i < n; i++) {
|
|
|
+ Assert.assertEquals(i + 1, futures.get(i).get().intValue());
|
|
|
+ }
|
|
|
+ Assert.assertEquals(n, c.getCount());
|
|
|
+
|
|
|
+ // test auto-reset
|
|
|
+ Thread.sleep(countResetTimePeriodMs + 100);
|
|
|
+ Assert.assertEquals(1, c.increment());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAllocateRecycle() throws Exception {
|
|
|
+ final int countThreshold = 4;
|
|
|
+ final int countLimit = 8;
|
|
|
+ final long countResetTimePeriodMs = 200L;
|
|
|
+ final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
|
|
|
+ new ByteArrayManager.Conf(
|
|
|
+ countThreshold, countLimit, countResetTimePeriodMs));
|
|
|
+
|
|
|
+ final CounterMap counters = bam.getCounters();
|
|
|
+ final ManagerMap managers = bam.getManagers();
|
|
|
+
|
|
|
+ final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64};
|
|
|
+ final int arrayLength = 1024;
|
|
|
+
|
|
|
+
|
|
|
+ final Allocator allocator = new Allocator(bam);
|
|
|
+ final Recycler recycler = new Recycler(bam);
|
|
|
+ try {
|
|
|
+ { // allocate within threshold
|
|
|
+ for(int i = 0; i < countThreshold; i++) {
|
|
|
+ allocator.submit(arrayLength);
|
|
|
+ }
|
|
|
+ waitForAll(allocator.futures);
|
|
|
+
|
|
|
+ Assert.assertEquals(countThreshold,
|
|
|
+ counters.get(arrayLength, false).getCount());
|
|
|
+ Assert.assertNull(managers.get(arrayLength, false));
|
|
|
+ for(int n : uncommonArrays) {
|
|
|
+ Assert.assertNull(counters.get(n, false));
|
|
|
+ Assert.assertNull(managers.get(n, false));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ { // recycle half of the arrays
|
|
|
+ for(int i = 0; i < countThreshold/2; i++) {
|
|
|
+ recycler.submit(removeLast(allocator.futures));
|
|
|
+ }
|
|
|
+
|
|
|
+ for(Future<Integer> f : recycler.furtures) {
|
|
|
+ Assert.assertEquals(-1, f.get().intValue());
|
|
|
+ }
|
|
|
+ recycler.furtures.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ { // allocate one more
|
|
|
+ allocator.submit(arrayLength).get();
|
|
|
+
|
|
|
+ Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount());
|
|
|
+ Assert.assertNotNull(managers.get(arrayLength, false));
|
|
|
+ }
|
|
|
+
|
|
|
+ { // recycle the remaining arrays
|
|
|
+ final int n = allocator.recycleAll(recycler);
|
|
|
+
|
|
|
+ recycler.verify(n);
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ // allocate until the maximum.
|
|
|
+ for(int i = 0; i < countLimit; i++) {
|
|
|
+ allocator.submit(arrayLength);
|
|
|
+ }
|
|
|
+ waitForAll(allocator.futures);
|
|
|
+
|
|
|
+ // allocate one more should be blocked
|
|
|
+ final AllocatorThread t = new AllocatorThread(arrayLength, bam);
|
|
|
+ t.start();
|
|
|
+
|
|
|
+ // check if the thread is waiting, timed wait or runnable.
|
|
|
+ for(int i = 0; i < 5; i++) {
|
|
|
+ Thread.sleep(100);
|
|
|
+ final Thread.State threadState = t.getState();
|
|
|
+ if (threadState != Thread.State.RUNNABLE
|
|
|
+ && threadState != Thread.State.WAITING
|
|
|
+ && threadState != Thread.State.TIMED_WAITING) {
|
|
|
+ Assert.fail("threadState = " + threadState);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // recycle an array
|
|
|
+ recycler.submit(removeLast(allocator.futures));
|
|
|
+ Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
|
|
|
+
|
|
|
+ // check if the thread is unblocked
|
|
|
+ Thread.sleep(100);
|
|
|
+ Assert.assertEquals(Thread.State.TERMINATED, t.getState());
|
|
|
+
|
|
|
+ // recycle the remaining, the recycle should be full.
|
|
|
+ Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler));
|
|
|
+ recycler.submit(t.array);
|
|
|
+ recycler.verify(countLimit);
|
|
|
+
|
|
|
+ // recycle one more; it should not increase the free queue size
|
|
|
+ Assert.assertEquals(countLimit, bam.release(new byte[arrayLength]));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ allocator.pool.shutdown();
|
|
|
+ recycler.pool.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static <T> T removeLast(List<Future<T>> furtures) throws Exception {
|
|
|
+ return remove(furtures, furtures.size() - 1);
|
|
|
+ }
|
|
|
+ static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
|
|
|
+ return furtures.isEmpty()? null: furtures.remove(i).get();
|
|
|
+ }
|
|
|
+
|
|
|
+ static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
|
|
|
+ for(Future<T> f : furtures) {
|
|
|
+ f.get();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class AllocatorThread extends Thread {
|
|
|
+ private final ByteArrayManager bam;
|
|
|
+ private final int arrayLength;
|
|
|
+ private byte[] array;
|
|
|
+
|
|
|
+ AllocatorThread(int arrayLength, ByteArrayManager bam) {
|
|
|
+ this.bam = bam;
|
|
|
+ this.arrayLength = arrayLength;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ array = bam.newByteArray(arrayLength);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class Allocator {
|
|
|
+ private final ByteArrayManager bam;
|
|
|
+ final ExecutorService pool = Executors.newFixedThreadPool(8);
|
|
|
+ final List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
|
|
|
+
|
|
|
+ Allocator(ByteArrayManager bam) {
|
|
|
+ this.bam = bam;
|
|
|
+ }
|
|
|
+
|
|
|
+ Future<byte[]> submit(final int arrayLength) {
|
|
|
+ final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
|
|
|
+ @Override
|
|
|
+ public byte[] call() throws Exception {
|
|
|
+ final byte[] array = bam.newByteArray(arrayLength);
|
|
|
+ Assert.assertEquals(arrayLength, array.length);
|
|
|
+ return array;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ futures.add(f);
|
|
|
+ return f;
|
|
|
+ }
|
|
|
+
|
|
|
+ int recycleAll(Recycler recycler) throws Exception {
|
|
|
+ final int n = futures.size();
|
|
|
+ for(Future<byte[]> f : futures) {
|
|
|
+ recycler.submit(f.get());
|
|
|
+ }
|
|
|
+ futures.clear();
|
|
|
+ return n;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class Recycler {
|
|
|
+ private final ByteArrayManager bam;
|
|
|
+ final ExecutorService pool = Executors.newFixedThreadPool(8);
|
|
|
+ final List<Future<Integer>> furtures = new LinkedList<Future<Integer>>();
|
|
|
+
|
|
|
+ Recycler(ByteArrayManager bam) {
|
|
|
+ this.bam = bam;
|
|
|
+ }
|
|
|
+
|
|
|
+ Future<Integer> submit(final byte[] array) {
|
|
|
+ final Future<Integer> f = pool.submit(new Callable<Integer>() {
|
|
|
+ @Override
|
|
|
+ public Integer call() throws Exception {
|
|
|
+ return bam.release(array);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ furtures.add(f);
|
|
|
+ return f;
|
|
|
+ }
|
|
|
+
|
|
|
+ void verify(final int expectedSize) throws Exception {
|
|
|
+ Assert.assertEquals(expectedSize, furtures.size());
|
|
|
+ Collections.sort(furtures, CMP);
|
|
|
+ for(int i = 0; i < furtures.size(); i++) {
|
|
|
+ Assert.assertEquals(i+1, furtures.get(i).get().intValue());
|
|
|
+ }
|
|
|
+ furtures.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testByteArrayManager() throws Exception {
|
|
|
+ final int countThreshold = 32;
|
|
|
+ final int countLimit = 64;
|
|
|
+ final long countResetTimePeriodMs = 1000L;
|
|
|
+ final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
|
|
|
+ new ByteArrayManager.Conf(
|
|
|
+ countThreshold, countLimit, countResetTimePeriodMs));
|
|
|
+
|
|
|
+ final CounterMap counters = bam.getCounters();
|
|
|
+ final ManagerMap managers = bam.getManagers();
|
|
|
+
|
|
|
+ final ExecutorService pool = Executors.newFixedThreadPool(128);
|
|
|
+
|
|
|
+ final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
|
|
|
+ final Thread[] threads = new Thread[runners.length];
|
|
|
+
|
|
|
+ final int num = 1 << 8;
|
|
|
+ for(int i = 0; i < runners.length; i++) {
|
|
|
+ runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
|
|
|
+ threads[i] = runners[i].start(num);
|
|
|
+ }
|
|
|
+
|
|
|
+ final Thread randomRecycler = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ LOG.info("randomRecycler start");
|
|
|
+ for(int i = 0; shouldRun(); i++) {
|
|
|
+ final int j = DFSUtil.getRandom().nextInt(runners.length);
|
|
|
+ try {
|
|
|
+ runners[j].recycle();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ Assert.fail(this + " has " + e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((i & 0xFF) == 0) {
|
|
|
+ sleepMs(100);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("randomRecycler done");
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean shouldRun() {
|
|
|
+ for(int i = 0; i < runners.length; i++) {
|
|
|
+ if (threads[i].isAlive()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if (!runners[i].isEmpty()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ randomRecycler.start();
|
|
|
+
|
|
|
+ randomRecycler.join();
|
|
|
+
|
|
|
+ Assert.assertNull(counters.get(0, false));
|
|
|
+ for(int i = 1; i < runners.length; i++) {
|
|
|
+ if (!runners[i].assertionErrors.isEmpty()) {
|
|
|
+ for(AssertionError e : runners[i].assertionErrors) {
|
|
|
+ LOG.error("AssertionError " + i, e);
|
|
|
+ }
|
|
|
+ Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)");
|
|
|
+ }
|
|
|
+
|
|
|
+ final int arrayLength = Runner.index2arrayLength(i);
|
|
|
+ final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold;
|
|
|
+ final FixedLengthManager m = managers.get(arrayLength, false);
|
|
|
+ if (exceedCountThreshold) {
|
|
|
+ Assert.assertNotNull(m);
|
|
|
+ } else {
|
|
|
+ Assert.assertNull(m);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static void sleepMs(long ms) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(ms);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ Assert.fail("Sleep is interrupted: " + e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class Runner implements Runnable {
|
|
|
+ static final int NUM_RUNNERS = 4;
|
|
|
+
|
|
|
+ static int index2arrayLength(int index) {
|
|
|
+ return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ private final ByteArrayManager bam;
|
|
|
+ final int maxArrayLength;
|
|
|
+ final int countThreshold;
|
|
|
+ final int maxArrays;
|
|
|
+ final ExecutorService pool;
|
|
|
+ final List<Future<byte[]>> arrays = new ArrayList<Future<byte[]>>();
|
|
|
+
|
|
|
+ final AtomicInteger count = new AtomicInteger();
|
|
|
+ final int p;
|
|
|
+ private int n;
|
|
|
+
|
|
|
+ final List<AssertionError> assertionErrors = new ArrayList<AssertionError>();
|
|
|
+
|
|
|
+ Runner(int index, int countThreshold, int maxArrays,
|
|
|
+ ExecutorService pool, int p, ByteArrayManager bam) {
|
|
|
+ this.maxArrayLength = index2arrayLength(index);
|
|
|
+ this.countThreshold = countThreshold;
|
|
|
+ this.maxArrays = maxArrays;
|
|
|
+ this.pool = pool;
|
|
|
+ this.p = p;
|
|
|
+ this.bam = bam;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isEmpty() {
|
|
|
+ synchronized (arrays) {
|
|
|
+ return arrays.isEmpty();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Future<byte[]> submitAllocate() {
|
|
|
+ count.incrementAndGet();
|
|
|
+
|
|
|
+ final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
|
|
|
+ @Override
|
|
|
+ public byte[] call() throws Exception {
|
|
|
+ final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
|
|
|
+ 0: maxArrayLength >> 1;
|
|
|
+ final int arrayLength = DFSUtil.getRandom().nextInt(
|
|
|
+ maxArrayLength - lower) + lower + 1;
|
|
|
+ final byte[] array = bam.newByteArray(arrayLength);
|
|
|
+ try {
|
|
|
+ Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower,
|
|
|
+ maxArrayLength, array.length);
|
|
|
+ } catch(AssertionError e) {
|
|
|
+ assertionErrors.add(e);
|
|
|
+ }
|
|
|
+ return array;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ synchronized (arrays) {
|
|
|
+ arrays.add(f);
|
|
|
+ }
|
|
|
+ return f;
|
|
|
+ }
|
|
|
+
|
|
|
+ byte[] removeFirst() throws Exception {
|
|
|
+ synchronized (arrays) {
|
|
|
+ return remove(arrays, 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void recycle() throws Exception {
|
|
|
+ final byte[] a = removeFirst();
|
|
|
+ if (a != null) {
|
|
|
+ recycle(a);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int recycle(final byte[] array) {
|
|
|
+ return bam.release(array);
|
|
|
+ }
|
|
|
+
|
|
|
+ Future<Integer> submitRecycle(final byte[] array) {
|
|
|
+ count.decrementAndGet();
|
|
|
+
|
|
|
+ final Future<Integer> f = pool.submit(new Callable<Integer>() {
|
|
|
+ @Override
|
|
|
+ public Integer call() throws Exception {
|
|
|
+ return recycle(array);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return f;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ for(int i = 0; i < n; i++) {
|
|
|
+ final boolean isAllocate = DFSUtil.getRandom().nextInt(NUM_RUNNERS) < p;
|
|
|
+ if (isAllocate) {
|
|
|
+ submitAllocate();
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ final byte[] a = removeFirst();
|
|
|
+ if (a != null) {
|
|
|
+ submitRecycle(a);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ Assert.fail(this + " has " + e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((i & 0xFF) == 0) {
|
|
|
+ sleepMs(100);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Thread start(int n) {
|
|
|
+ this.n = n;
|
|
|
+ final Thread t = new Thread(this);
|
|
|
+ t.start();
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return getClass().getSimpleName() + ": max=" + maxArrayLength
|
|
|
+ + ", count=" + count;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class NewByteArrayWithLimit extends ByteArrayManager {
|
|
|
+ private final int maxCount;
|
|
|
+ private int count = 0;
|
|
|
+
|
|
|
+ NewByteArrayWithLimit(int maxCount) {
|
|
|
+ this.maxCount = maxCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized byte[] newByteArray(int size) throws InterruptedException {
|
|
|
+ for(; count >= maxCount; ) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ count++;
|
|
|
+ return new byte[size];
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized int release(byte[] array) {
|
|
|
+ if (count == maxCount) {
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+ count--;
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+ ((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
|
|
|
+ ).getLogger().setLevel(Level.OFF);
|
|
|
+
|
|
|
+ final int arrayLength = 64 * 1024; //64k
|
|
|
+ final int nThreads = 512;
|
|
|
+ final int nAllocations = 1 << 15;
|
|
|
+ final int maxArrays = 1 << 10;
|
|
|
+ final int nTrials = 5;
|
|
|
+
|
|
|
+ System.out.println("arrayLength=" + arrayLength
|
|
|
+ + ", nThreads=" + nThreads
|
|
|
+ + ", nAllocations=" + nAllocations
|
|
|
+ + ", maxArrays=" + maxArrays);
|
|
|
+
|
|
|
+ final Random ran = DFSUtil.getRandom();
|
|
|
+ final ByteArrayManager[] impls = {
|
|
|
+ new ByteArrayManager.NewByteArrayWithoutLimit(),
|
|
|
+ new NewByteArrayWithLimit(maxArrays),
|
|
|
+ new ByteArrayManager.Impl(new ByteArrayManager.Conf(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT,
|
|
|
+ maxArrays,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
|
|
|
+ };
|
|
|
+ final double[] avg = new double[impls.length];
|
|
|
+
|
|
|
+ for(int i = 0; i < impls.length; i++) {
|
|
|
+ double duration = 0;
|
|
|
+ printf("%26s:", impls[i].getClass().getSimpleName());
|
|
|
+ for(int j = 0; j < nTrials; j++) {
|
|
|
+ final int[] sleepTime = new int[nAllocations];
|
|
|
+ for(int k = 0; k < sleepTime.length; k++) {
|
|
|
+ sleepTime[k] = ran.nextInt(100);
|
|
|
+ }
|
|
|
+
|
|
|
+ final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
|
|
|
+ sleepTime, impls[i]);
|
|
|
+ duration += elapsed;
|
|
|
+ printf("%5d, ", elapsed);
|
|
|
+ }
|
|
|
+ avg[i] = duration/nTrials;
|
|
|
+ printf("avg=%6.3fs", avg[i]/1000);
|
|
|
+ for(int j = 0; j < i; j++) {
|
|
|
+ printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i]));
|
|
|
+ }
|
|
|
+ printf("\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static double percentageDiff(double original, double newValue) {
|
|
|
+ return (newValue - original)/original*100;
|
|
|
+ }
|
|
|
+
|
|
|
+ static void printf(String format, Object... args) {
|
|
|
+ System.out.printf(format, args);
|
|
|
+ System.out.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ static long performanceTest(final int arrayLength, final int maxArrays,
|
|
|
+ final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl)
|
|
|
+ throws Exception {
|
|
|
+ final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
|
|
|
+ final List<Future<Void>> futures = new ArrayList<Future<Void>>(sleepTimeMSs.length);
|
|
|
+ final long startTime = Time.monotonicNow();
|
|
|
+
|
|
|
+ for(int i = 0; i < sleepTimeMSs.length; i++) {
|
|
|
+ final long sleepTime = sleepTimeMSs[i];
|
|
|
+ futures.add(pool.submit(new Callable<Void>() {
|
|
|
+ @Override
|
|
|
+ public Void call() throws Exception {
|
|
|
+ byte[] array = impl.newByteArray(arrayLength);
|
|
|
+ sleepMs(sleepTime);
|
|
|
+ impl.release(array);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ for(Future<Void> f : futures) {
|
|
|
+ f.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ final long endTime = Time.monotonicNow();
|
|
|
+ pool.shutdown();
|
|
|
+ return endTime - startTime;
|
|
|
+ }
|
|
|
+}
|