|
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
|
|
@@ -317,8 +318,9 @@ public class ValueQueue <E> {
|
|
|
/**
|
|
|
* Get size of the Queue for keyName. This is only used in unit tests.
|
|
|
* @param keyName the key name
|
|
|
- * @return int queue size
|
|
|
+ * @return int queue size. Zero means the queue is empty or the key does not exist.
|
|
|
*/
|
|
|
+ @VisibleForTesting
|
|
|
public int getSize(String keyName) {
|
|
|
readLock(keyName);
|
|
|
try {
|
|
@@ -326,10 +328,12 @@ public class ValueQueue <E> {
|
|
|
// since that will have the side effect of populating the cache.
|
|
|
Map<String, LinkedBlockingQueue<E>> map =
|
|
|
keyQueues.getAllPresent(Arrays.asList(keyName));
|
|
|
- if (map.get(keyName) == null) {
|
|
|
+ final LinkedBlockingQueue<E> linkedQueue = map.get(keyName);
|
|
|
+ if (linkedQueue == null) {
|
|
|
return 0;
|
|
|
+ } else {
|
|
|
+ return linkedQueue.size();
|
|
|
}
|
|
|
- return map.get(keyName).size();
|
|
|
} finally {
|
|
|
readUnlock(keyName);
|
|
|
}
|