|
@@ -18,8 +18,9 @@
|
|
|
package org.apache.hadoop.crypto.key.kms;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.HashSet;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -28,6 +29,9 @@ import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
@@ -67,8 +71,17 @@ public class ValueQueue <E> {
|
|
|
|
|
|
private static final String REFILL_THREAD =
|
|
|
ValueQueue.class.getName() + "_thread";
|
|
|
+ private static final int LOCK_ARRAY_SIZE = 16;
|
|
|
+ // Using a mask assuming array size is the power of 2, of MAX_VALUE.
|
|
|
+ private static final int MASK = LOCK_ARRAY_SIZE == Integer.MAX_VALUE ?
|
|
|
+ LOCK_ARRAY_SIZE :
|
|
|
+ LOCK_ARRAY_SIZE - 1;
|
|
|
|
|
|
private final LoadingCache<String, LinkedBlockingQueue<E>> keyQueues;
|
|
|
+ // Stripped rwlocks based on key name to synchronize the queue from
|
|
|
+ // the sync'ed rw-thread and the background async refill thread.
|
|
|
+ private final List<ReadWriteLock> lockArray =
|
|
|
+ new ArrayList<>(LOCK_ARRAY_SIZE);
|
|
|
private final ThreadPoolExecutor executor;
|
|
|
private final UniqueKeyBlockingQueue queue = new UniqueKeyBlockingQueue();
|
|
|
private final QueueRefiller<E> refiller;
|
|
@@ -84,9 +97,47 @@ public class ValueQueue <E> {
|
|
|
*/
|
|
|
private abstract static class NamedRunnable implements Runnable {
|
|
|
final String name;
|
|
|
+ private AtomicBoolean canceled = new AtomicBoolean(false);
|
|
|
private NamedRunnable(String keyName) {
|
|
|
this.name = keyName;
|
|
|
}
|
|
|
+
|
|
|
+ public void cancel() {
|
|
|
+ canceled.set(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isCanceled() {
|
|
|
+ return canceled.get();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void readLock(String keyName) {
|
|
|
+ getLock(keyName).readLock().lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void readUnlock(String keyName) {
|
|
|
+ getLock(keyName).readLock().unlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeUnlock(String keyName) {
|
|
|
+ getLock(keyName).writeLock().unlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeLock(String keyName) {
|
|
|
+ getLock(keyName).writeLock().lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the stripped lock given a key name.
|
|
|
+ *
|
|
|
+ * @param keyName The key name.
|
|
|
+ */
|
|
|
+ private ReadWriteLock getLock(String keyName) {
|
|
|
+ return lockArray.get(indexFor(keyName));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static int indexFor(String keyName) {
|
|
|
+ return keyName.hashCode() & MASK;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -103,11 +154,12 @@ public class ValueQueue <E> {
|
|
|
LinkedBlockingQueue<Runnable> {
|
|
|
|
|
|
private static final long serialVersionUID = -2152747693695890371L;
|
|
|
- private HashSet<String> keysInProgress = new HashSet<String>();
|
|
|
+ private HashMap<String, Runnable> keysInProgress = new HashMap<>();
|
|
|
|
|
|
@Override
|
|
|
public synchronized void put(Runnable e) throws InterruptedException {
|
|
|
- if (keysInProgress.add(((NamedRunnable)e).name)) {
|
|
|
+ if (!keysInProgress.containsKey(((NamedRunnable)e).name)) {
|
|
|
+ keysInProgress.put(((NamedRunnable)e).name, e);
|
|
|
super.put(e);
|
|
|
}
|
|
|
}
|
|
@@ -131,6 +183,14 @@ public class ValueQueue <E> {
|
|
|
return k;
|
|
|
}
|
|
|
|
|
|
+ public Runnable deleteByName(String name) {
|
|
|
+ NamedRunnable e = (NamedRunnable) keysInProgress.remove(name);
|
|
|
+ if (e != null) {
|
|
|
+ e.cancel();
|
|
|
+ super.remove(e);
|
|
|
+ }
|
|
|
+ return e;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -172,6 +232,9 @@ public class ValueQueue <E> {
|
|
|
this.policy = policy;
|
|
|
this.numValues = numValues;
|
|
|
this.lowWatermark = lowWatermark;
|
|
|
+ for (int i = 0; i < LOCK_ARRAY_SIZE; ++i) {
|
|
|
+ lockArray.add(i, new ReentrantReadWriteLock());
|
|
|
+ }
|
|
|
keyQueues = CacheBuilder.newBuilder()
|
|
|
.expireAfterAccess(expiry, TimeUnit.MILLISECONDS)
|
|
|
.build(new CacheLoader<String, LinkedBlockingQueue<E>>() {
|
|
@@ -233,9 +296,18 @@ public class ValueQueue <E> {
|
|
|
*
|
|
|
* @param keyName the key to drain the Queue for
|
|
|
*/
|
|
|
- public void drain(String keyName ) {
|
|
|
+ public void drain(String keyName) {
|
|
|
try {
|
|
|
- keyQueues.get(keyName).clear();
|
|
|
+ Runnable e;
|
|
|
+ while ((e = queue.deleteByName(keyName)) != null) {
|
|
|
+ executor.remove(e);
|
|
|
+ }
|
|
|
+ writeLock(keyName);
|
|
|
+ try {
|
|
|
+ keyQueues.get(keyName).clear();
|
|
|
+ } finally {
|
|
|
+ writeUnlock(keyName);
|
|
|
+ }
|
|
|
} catch (ExecutionException ex) {
|
|
|
//NOP
|
|
|
}
|
|
@@ -247,14 +319,19 @@ public class ValueQueue <E> {
|
|
|
* @return int queue size
|
|
|
*/
|
|
|
public int getSize(String keyName) {
|
|
|
- // We can't do keyQueues.get(keyName).size() here,
|
|
|
- // since that will have the side effect of populating the cache.
|
|
|
- Map<String, LinkedBlockingQueue<E>> map =
|
|
|
- keyQueues.getAllPresent(Arrays.asList(keyName));
|
|
|
- if (map.get(keyName) == null) {
|
|
|
- return 0;
|
|
|
+ readLock(keyName);
|
|
|
+ try {
|
|
|
+ // We can't do keyQueues.get(keyName).size() here,
|
|
|
+ // since that will have the side effect of populating the cache.
|
|
|
+ Map<String, LinkedBlockingQueue<E>> map =
|
|
|
+ keyQueues.getAllPresent(Arrays.asList(keyName));
|
|
|
+ if (map.get(keyName) == null) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return map.get(keyName).size();
|
|
|
+ } finally {
|
|
|
+ readUnlock(keyName);
|
|
|
}
|
|
|
- return map.get(keyName).size();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -276,7 +353,9 @@ public class ValueQueue <E> {
|
|
|
LinkedList<E> ekvs = new LinkedList<E>();
|
|
|
try {
|
|
|
for (int i = 0; i < num; i++) {
|
|
|
+ readLock(keyName);
|
|
|
E val = keyQueue.poll();
|
|
|
+ readUnlock(keyName);
|
|
|
// If queue is empty now, Based on the provided SyncGenerationPolicy,
|
|
|
// figure out how many new values need to be generated synchronously
|
|
|
if (val == null) {
|
|
@@ -336,9 +415,17 @@ public class ValueQueue <E> {
|
|
|
int threshold = (int) (lowWatermark * (float) cacheSize);
|
|
|
// Need to ensure that only one refill task per key is executed
|
|
|
try {
|
|
|
- if (keyQueue.size() < threshold) {
|
|
|
- refiller.fillQueueForKey(name, keyQueue,
|
|
|
- cacheSize - keyQueue.size());
|
|
|
+ writeLock(keyName);
|
|
|
+ try {
|
|
|
+ if (keyQueue.size() < threshold && !isCanceled()) {
|
|
|
+ refiller.fillQueueForKey(name, keyQueue,
|
|
|
+ cacheSize - keyQueue.size());
|
|
|
+ }
|
|
|
+ if (isCanceled()) {
|
|
|
+ keyQueue.clear();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeUnlock(keyName);
|
|
|
}
|
|
|
} catch (final Exception e) {
|
|
|
throw new RuntimeException(e);
|