|
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
|
|
|
|
|
|
import java.util.Arrays;
|
|
|
import java.util.UUID;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -183,6 +184,8 @@ public class RetryCache {
|
|
|
private final long expirationTime;
|
|
|
private String cacheName;
|
|
|
|
|
|
+ private final ReentrantLock lock = new ReentrantLock();
|
|
|
+
|
|
|
/**
|
|
|
* Constructor
|
|
|
* @param cacheName name to identify the cache by
|
|
@@ -206,6 +209,13 @@ public class RetryCache {
|
|
|
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
|
|
|
}
|
|
|
|
|
|
+ public void lock() {
|
|
|
+ this.lock.lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void unlock() {
|
|
|
+ this.lock.unlock();
|
|
|
+ }
|
|
|
|
|
|
private void incrCacheClearedCounter() {
|
|
|
retryCacheMetrics.incrCacheCleared();
|
|
@@ -247,7 +257,8 @@ public class RetryCache {
|
|
|
*/
|
|
|
private CacheEntry waitForCompletion(CacheEntry newEntry) {
|
|
|
CacheEntry mapEntry = null;
|
|
|
- synchronized (this) {
|
|
|
+ lock.lock();
|
|
|
+ try {
|
|
|
mapEntry = set.get(newEntry);
|
|
|
// If an entry in the cache does not exist, add a new one
|
|
|
if (mapEntry == null) {
|
|
@@ -262,6 +273,8 @@ public class RetryCache {
|
|
|
} else {
|
|
|
retryCacheMetrics.incrCacheHit();
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ lock.unlock();
|
|
|
}
|
|
|
// Entry already exists in cache. Wait for completion and return its state
|
|
|
Preconditions.checkNotNull(mapEntry,
|
|
@@ -292,8 +305,11 @@ public class RetryCache {
|
|
|
public void addCacheEntry(byte[] clientId, int callId) {
|
|
|
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
|
|
|
+ expirationTime, true);
|
|
|
- synchronized(this) {
|
|
|
+ lock.lock();
|
|
|
+ try {
|
|
|
set.put(newEntry);
|
|
|
+ } finally {
|
|
|
+ lock.unlock();
|
|
|
}
|
|
|
retryCacheMetrics.incrCacheUpdated();
|
|
|
}
|
|
@@ -303,8 +319,11 @@ public class RetryCache {
|
|
|
// since the entry is loaded from editlog, we can assume it succeeded.
|
|
|
CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
|
|
|
System.nanoTime() + expirationTime, true);
|
|
|
- synchronized(this) {
|
|
|
+ lock.lock();
|
|
|
+ try {
|
|
|
set.put(newEntry);
|
|
|
+ } finally {
|
|
|
+ lock.unlock();
|
|
|
}
|
|
|
retryCacheMetrics.incrCacheUpdated();
|
|
|
}
|