|
@@ -89,6 +89,7 @@ import java.util.Map.Entry;
|
|
|
import java.util.Set;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
@@ -304,6 +305,8 @@ public class DataNode extends ReconfigurableBase
|
|
|
private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
|
|
|
private final FileIoProvider fileIoProvider;
|
|
|
|
|
|
+ private static final String NETWORK_ERRORS = "networkErrors";
|
|
|
+
|
|
|
/**
|
|
|
* Use {@link NetUtils#createSocketAddr(String)} instead.
|
|
|
*/
|
|
@@ -340,8 +343,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
private DataNodePeerMetrics peerMetrics;
|
|
|
private DataNodeDiskMetrics diskMetrics;
|
|
|
private InetSocketAddress streamingAddr;
|
|
|
-
|
|
|
- // See the note below in incrDatanodeNetworkErrors re: concurrency.
|
|
|
+
|
|
|
private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
|
|
|
|
|
|
private String hostName;
|
|
@@ -507,9 +509,9 @@ public class DataNode extends ReconfigurableBase
|
|
|
.maximumSize(dncCacheMaxSize)
|
|
|
.build(new CacheLoader<String, Map<String, Long>>() {
|
|
|
@Override
|
|
|
- public Map<String, Long> load(String key) throws Exception {
|
|
|
- final Map<String, Long> ret = new HashMap<String, Long>();
|
|
|
- ret.put("networkErrors", 0L);
|
|
|
+ public Map<String, Long> load(String key) {
|
|
|
+ final Map<String, Long> ret = new ConcurrentHashMap<>();
|
|
|
+ ret.put(NETWORK_ERRORS, 0L);
|
|
|
return ret;
|
|
|
}
|
|
|
});
|
|
@@ -2272,19 +2274,11 @@ public class DataNode extends ReconfigurableBase
|
|
|
void incrDatanodeNetworkErrors(String host) {
|
|
|
metrics.incrDatanodeNetworkErrors();
|
|
|
|
|
|
- /*
|
|
|
- * Synchronizing on the whole cache is a big hammer, but since it's only
|
|
|
- * accumulating errors, it should be ok. If this is ever expanded to include
|
|
|
- * non-error stats, then finer-grained concurrency should be applied.
|
|
|
- */
|
|
|
- synchronized (datanodeNetworkCounts) {
|
|
|
- try {
|
|
|
- final Map<String, Long> curCount = datanodeNetworkCounts.get(host);
|
|
|
- curCount.put("networkErrors", curCount.get("networkErrors") + 1L);
|
|
|
- datanodeNetworkCounts.put(host, curCount);
|
|
|
- } catch (ExecutionException e) {
|
|
|
- LOG.warn("failed to increment network error counts for host: {}", host);
|
|
|
- }
|
|
|
+ try {
|
|
|
+ datanodeNetworkCounts.get(host).compute(NETWORK_ERRORS,
|
|
|
+ (key, errors) -> errors == null ? 1L : errors + 1L);
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ LOG.warn("Failed to increment network error counts for host: {}", host);
|
|
|
}
|
|
|
}
|
|
|
|