|
@@ -24,6 +24,7 @@ import java.util.UUID;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
|
+import org.apache.hadoop.ipc.metrics.RetryCacheMetrics;
|
|
import org.apache.hadoop.util.LightWeightCache;
|
|
import org.apache.hadoop.util.LightWeightCache;
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
|
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
|
@@ -43,6 +44,8 @@ import com.google.common.base.Preconditions;
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
public class RetryCache {
|
|
public class RetryCache {
|
|
public static final Log LOG = LogFactory.getLog(RetryCache.class);
|
|
public static final Log LOG = LogFactory.getLog(RetryCache.class);
|
|
|
|
+ private final RetryCacheMetrics retryCacheMetrics;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* CacheEntry is tracked using unique client ID and callId of the RPC request
|
|
* CacheEntry is tracked using unique client ID and callId of the RPC request
|
|
*/
|
|
*/
|
|
@@ -178,6 +181,7 @@ public class RetryCache {
|
|
|
|
|
|
private final LightWeightGSet<CacheEntry, CacheEntry> set;
|
|
private final LightWeightGSet<CacheEntry, CacheEntry> set;
|
|
private final long expirationTime;
|
|
private final long expirationTime;
|
|
|
|
+ private String cacheName;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Constructor
|
|
* Constructor
|
|
@@ -191,6 +195,8 @@ public class RetryCache {
|
|
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
|
|
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
|
|
expirationTime, 0);
|
|
expirationTime, 0);
|
|
this.expirationTime = expirationTime;
|
|
this.expirationTime = expirationTime;
|
|
|
|
+ this.cacheName = cacheName;
|
|
|
|
+ this.retryCacheMetrics = RetryCacheMetrics.create(this);
|
|
}
|
|
}
|
|
|
|
|
|
private static boolean skipRetryCache() {
|
|
private static boolean skipRetryCache() {
|
|
@@ -199,12 +205,29 @@ public class RetryCache {
|
|
return !Server.isRpcInvocation() || Server.getCallId() < 0
|
|
return !Server.isRpcInvocation() || Server.getCallId() < 0
|
|
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
|
|
|| Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ private void incrCacheClearedCounter() {
|
|
|
|
+ retryCacheMetrics.incrCacheCleared();
|
|
|
|
+ }
|
|
|
|
+
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
|
|
public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
|
|
return set;
|
|
return set;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public RetryCacheMetrics getMetricsForTests() {
|
|
|
|
+ return retryCacheMetrics;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This method returns cache name for metrics.
|
|
|
|
+ */
|
|
|
|
+ public String getCacheName() {
|
|
|
|
+ return cacheName;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This method handles the following conditions:
|
|
* This method handles the following conditions:
|
|
* <ul>
|
|
* <ul>
|
|
@@ -234,7 +257,10 @@ public class RetryCache {
|
|
+ newEntry.callId + " to retryCache");
|
|
+ newEntry.callId + " to retryCache");
|
|
}
|
|
}
|
|
set.put(newEntry);
|
|
set.put(newEntry);
|
|
|
|
+ retryCacheMetrics.incrCacheUpdated();
|
|
return newEntry;
|
|
return newEntry;
|
|
|
|
+ } else {
|
|
|
|
+ retryCacheMetrics.incrCacheHit();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Entry already exists in cache. Wait for completion and return its state
|
|
// Entry already exists in cache. Wait for completion and return its state
|
|
@@ -269,6 +295,7 @@ public class RetryCache {
|
|
synchronized(this) {
|
|
synchronized(this) {
|
|
set.put(newEntry);
|
|
set.put(newEntry);
|
|
}
|
|
}
|
|
|
|
+ retryCacheMetrics.incrCacheUpdated();
|
|
}
|
|
}
|
|
|
|
|
|
public void addCacheEntryWithPayload(byte[] clientId, int callId,
|
|
public void addCacheEntryWithPayload(byte[] clientId, int callId,
|
|
@@ -279,6 +306,7 @@ public class RetryCache {
|
|
synchronized(this) {
|
|
synchronized(this) {
|
|
set.put(newEntry);
|
|
set.put(newEntry);
|
|
}
|
|
}
|
|
|
|
+ retryCacheMetrics.incrCacheUpdated();
|
|
}
|
|
}
|
|
|
|
|
|
private static CacheEntry newEntry(long expirationTime) {
|
|
private static CacheEntry newEntry(long expirationTime) {
|
|
@@ -330,6 +358,7 @@ public class RetryCache {
|
|
public static void clear(RetryCache cache) {
|
|
public static void clear(RetryCache cache) {
|
|
if (cache != null) {
|
|
if (cache != null) {
|
|
cache.set.clear();
|
|
cache.set.clear();
|
|
|
|
+ cache.incrCacheClearedCounter();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|