|
@@ -22,11 +22,8 @@ import java.io.DataInput;
|
|
|
import java.io.DataOutput;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Iterator;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
-import com.google.common.collect.ImmutableSet;
|
|
|
-import com.google.common.collect.Iterators;
|
|
|
-import com.google.common.collect.Maps;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -34,6 +31,8 @@ import org.apache.hadoop.io.WritableUtils;
|
|
|
import org.apache.hadoop.mapreduce.Counter;
|
|
|
import org.apache.hadoop.mapreduce.util.ResourceBundles;
|
|
|
|
|
|
+import com.google.common.collect.Iterators;
|
|
|
+
|
|
|
/**
|
|
|
* An abstract class to provide common implementation of the
|
|
|
* generic counter group in both mapred and mapreduce package.
|
|
@@ -46,7 +45,8 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
|
|
|
|
|
private final String name;
|
|
|
private String displayName;
|
|
|
- private final Map<String, T> counters = Maps.newTreeMap();
|
|
|
+ private final ConcurrentMap<String, T> counters =
|
|
|
+ new ConcurrentSkipListMap<String, T>();
|
|
|
private final Limits limits;
|
|
|
|
|
|
public AbstractCounterGroup(String name, String displayName,
|
|
@@ -80,7 +80,7 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
|
|
@Override
|
|
|
public synchronized T addCounter(String counterName, String displayName,
|
|
|
long value) {
|
|
|
- String saveName = limits.filterCounterName(counterName);
|
|
|
+ String saveName = Limits.filterCounterName(counterName);
|
|
|
T counter = findCounterImpl(saveName, false);
|
|
|
if (counter == null) {
|
|
|
return addCounterImpl(saveName, displayName, value);
|
|
@@ -97,7 +97,9 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
|
|
|
|
|
@Override
|
|
|
public synchronized T findCounter(String counterName, String displayName) {
|
|
|
- String saveName = limits.filterCounterName(counterName);
|
|
|
+ // Take lock to avoid two threads not finding a counter and trying to add
|
|
|
+ // the same counter.
|
|
|
+ String saveName = Limits.filterCounterName(counterName);
|
|
|
T counter = findCounterImpl(saveName, false);
|
|
|
if (counter == null) {
|
|
|
return addCounterImpl(saveName, displayName, 0);
|
|
@@ -106,10 +108,12 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized T findCounter(String counterName, boolean create) {
|
|
|
- return findCounterImpl(limits.filterCounterName(counterName), create);
|
|
|
+ public T findCounter(String counterName, boolean create) {
|
|
|
+ return findCounterImpl(Limits.filterCounterName(counterName), create);
|
|
|
}
|
|
|
|
|
|
+ // Lock the object. Cannot simply use concurrent constructs on the counters
|
|
|
+ // data-structure (like putIfAbsent) because of localization, limits etc.
|
|
|
private synchronized T findCounterImpl(String counterName, boolean create) {
|
|
|
T counter = counters.get(counterName);
|
|
|
if (counter == null && create) {
|
|
@@ -142,8 +146,8 @@ public abstract class AbstractCounterGroup<T extends Counter>
|
|
|
protected abstract T newCounter();
|
|
|
|
|
|
@Override
|
|
|
- public synchronized Iterator<T> iterator() {
|
|
|
- return ImmutableSet.copyOf(counters.values()).iterator();
|
|
|
+ public Iterator<T> iterator() {
|
|
|
+ return counters.values().iterator();
|
|
|
}
|
|
|
|
|
|
/**
|