|
@@ -22,6 +22,7 @@ import java.util.Comparator;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.NavigableMap;
|
|
|
+import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
@@ -44,7 +45,8 @@ import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
|
|
|
@InterfaceAudience.Private
|
|
|
public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|
|
|
|
|
- private static final int DEFAULT_PARTITION_CAPACITY = 2027;
|
|
|
+ private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027;
|
|
|
+ private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f;
|
|
|
|
|
|
/**
|
|
|
* An ordered map of contiguous segments of elements.
|
|
@@ -81,8 +83,11 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|
|
final E rootKey) {
|
|
|
this.partitions = new TreeMap<K, PartitionEntry>(comparator);
|
|
|
this.latchLock = latchLock;
|
|
|
- addNewPartition(rootKey).put(rootKey);
|
|
|
- this.size = 1;
|
|
|
+ // addNewPartition(rootKey).put(rootKey);
|
|
|
+ // this.size = 1;
|
|
|
+ this.size = 0;
|
|
|
+ LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY);
|
|
|
+ LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -90,16 +95,19 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|
|
* @param key
|
|
|
* @return
|
|
|
*/
|
|
|
- private PartitionEntry addNewPartition(final K key) {
|
|
|
+ public PartitionEntry addNewPartition(final K key) {
|
|
|
+ Entry<K, PartitionEntry> lastEntry = partitions.lastEntry();
|
|
|
PartitionEntry lastPart = null;
|
|
|
- if(size > 0)
|
|
|
- lastPart = partitions.lastEntry().getValue();
|
|
|
+ if(lastEntry != null)
|
|
|
+ lastPart = lastEntry.getValue();
|
|
|
|
|
|
PartitionEntry newPart =
|
|
|
new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
|
|
|
// assert size == 0 || newPart.partLock.isWriteTopLocked() :
|
|
|
// "Must hold write Lock: key = " + key;
|
|
|
- partitions.put(key, newPart);
|
|
|
+ PartitionEntry oldPart = partitions.put(key, newPart);
|
|
|
+ assert oldPart == null :
|
|
|
+ "RangeMap already has a partition associated with " + key;
|
|
|
|
|
|
LOG.debug("Total GSet size = {}", size);
|
|
|
LOG.debug("Number of partitions = {}", partitions.size());
|
|
@@ -173,7 +181,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|
|
|
|
|
private PartitionEntry addNewPartitionIfNeeded(
|
|
|
PartitionEntry curPart, K key) {
|
|
|
- if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1
|
|
|
+ if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW
|
|
|
|| curPart.contains(key)) {
|
|
|
return curPart;
|
|
|
}
|
|
@@ -197,12 +205,56 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|
|
public void clear() {
|
|
|
LOG.error("Total GSet size = {}", size);
|
|
|
LOG.error("Number of partitions = {}", partitions.size());
|
|
|
+ printStats();
|
|
|
// assert latchLock.hasWriteTopLock() : "Must hold write topLock";
|
|
|
// SHV May need to clear all partitions?
|
|
|
partitions.clear();
|
|
|
size = 0;
|
|
|
}
|
|
|
|
|
|
+ private void printStats() {
|
|
|
+ int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0;
|
|
|
+ long totalSize = 0;
|
|
|
+ int numEmptyPartitions = 0, numFullPartitions = 0;
|
|
|
+ Collection<PartitionEntry> parts = partitions.values();
|
|
|
+ Set<Entry<K, PartitionEntry>> entries = partitions.entrySet();
|
|
|
+ int i = 0;
|
|
|
+ for(Entry<K, PartitionEntry> e : entries) {
|
|
|
+ PartitionEntry part = e.getValue();
|
|
|
+ int s = part.size;
|
|
|
+ if(s == 0) numEmptyPartitions++;
|
|
|
+ if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++;
|
|
|
+ totalSize += s;
|
|
|
+ partSizeMin = (s < partSizeMin ? s : partSizeMin);
|
|
|
+ partSizeMax = (partSizeMax < s ? s : partSizeMax);
|
|
|
+ Class<?> inodeClass = e.getKey().getClass();
|
|
|
+ try {
|
|
|
+ long[] key = (long[]) inodeClass.
|
|
|
+ getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
|
|
|
+ long[] firstKey = new long[0];
|
|
|
+ if(part.iterator().hasNext()) {
|
|
|
+ Object first = part.iterator().next();
|
|
|
+ firstKey = (long[]) inodeClass.getMethod(
|
|
|
+ "getNamespaceKey", int.class).invoke(first, 2);
|
|
|
+ Object parent = inodeClass.
|
|
|
+ getMethod("getParent").invoke(first);
|
|
|
+ long parentId = (parent == null ? 0L :
|
|
|
+ (long) inodeClass.getMethod("getId").invoke(parent));
|
|
|
+ firstKey[0] = parentId;
|
|
|
+ }
|
|
|
+ LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
|
|
|
+ i++, key, s, firstKey); // SHV should be info
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ partSizeAvg = (int) (totalSize / parts.size());
|
|
|
+ LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
|
|
|
+ partSizeMin, partSizeAvg, partSizeMax, totalSize);
|
|
|
+ LOG.error("Number of partitions: empty = {}, full = {}",
|
|
|
+ numEmptyPartitions, numFullPartitions);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public Collection<E> values() {
|
|
|
// TODO Auto-generated method stub
|
|
@@ -234,15 +286,19 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
|
|
|
|
|
|
@Override
|
|
|
public boolean hasNext() {
|
|
|
- if(partitionIterator.hasNext()) {
|
|
|
- return true;
|
|
|
+ while(!partitionIterator.hasNext()) {
|
|
|
+ if(!keyIterator.hasNext()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ K curKey = keyIterator.next();
|
|
|
+ partitionIterator = getPartition(curKey).iterator();
|
|
|
}
|
|
|
- return keyIterator.hasNext();
|
|
|
+ return partitionIterator.hasNext();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public E next() {
|
|
|
- if(!partitionIterator.hasNext()) {
|
|
|
+ while(!partitionIterator.hasNext()) {
|
|
|
K curKey = keyIterator.next();
|
|
|
partitionIterator = getPartition(curKey).iterator();
|
|
|
}
|