|
@@ -50,16 +50,21 @@ abstract class BasicTypeSorterBase implements BufferSorter {
|
|
|
//4 for indices into startOffsets array in the
|
|
|
//pointers array (ignored the partpointers list itself)
|
|
|
static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
|
|
|
+ static private final int INITIAL_ARRAY_SIZE = 5;
|
|
|
+ //we maintain the max lengths of the key/val that we encounter. During
|
|
|
+ //iteration of the sorted results, we will create a DataOutputBuffer to
|
|
|
+ //return the keys. The max size of the DataOutputBuffer will be the max
|
|
|
+ //keylength that we encounter. Expose this value to model memory more
|
|
|
+ //accurately.
|
|
|
+ private int maxKeyLength = 0;
|
|
|
+ private int maxValLength = 0;
|
|
|
+
|
|
|
//Reference to the Progressable object for sending KeepAlive
|
|
|
private Progressable reporter;
|
|
|
|
|
|
//Implementation of methods of the SorterBase interface
|
|
|
//
|
|
|
public void configure(JobConf conf) {
|
|
|
- startOffsets = new int[1024];
|
|
|
- keyLengths = new int[1024];
|
|
|
- valueLengths = new int[1024];
|
|
|
- pointers = new int[1024];
|
|
|
comparator = conf.getOutputKeyComparator();
|
|
|
}
|
|
|
|
|
@@ -70,10 +75,16 @@ abstract class BasicTypeSorterBase implements BufferSorter {
|
|
|
public void addKeyValue(int recordOffset, int keyLength, int valLength) {
|
|
|
//Add the start offset of the key in the startOffsets array and the
|
|
|
//length in the keyLengths array.
|
|
|
- if (count == startOffsets.length)
|
|
|
+ if (startOffsets == null || count == startOffsets.length)
|
|
|
grow();
|
|
|
startOffsets[count] = recordOffset;
|
|
|
keyLengths[count] = keyLength;
|
|
|
+ if (keyLength > maxKeyLength) {
|
|
|
+ maxKeyLength = keyLength;
|
|
|
+ }
|
|
|
+ if (valLength > maxValLength) {
|
|
|
+ maxValLength = valLength;
|
|
|
+ }
|
|
|
valueLengths[count] = valLength;
|
|
|
pointers[count] = count;
|
|
|
count++;
|
|
@@ -85,14 +96,30 @@ abstract class BasicTypeSorterBase implements BufferSorter {
|
|
|
}
|
|
|
|
|
|
public long getMemoryUtilized() {
|
|
|
- return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD;
|
|
|
+ //the total length of the arrays + the max{Key,Val}Length (this will be the
|
|
|
+ //max size of the DataOutputBuffers during the iteration of the sorted
|
|
|
+ //keys).
|
|
|
+ if (startOffsets != null) {
|
|
|
+ return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD +
|
|
|
+ maxKeyLength + maxValLength;
|
|
|
+ }
|
|
|
+ else { //nothing from this yet
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public abstract RawKeyValueIterator sort();
|
|
|
|
|
|
public void close() {
|
|
|
- //just set count to 0; we reuse the arrays
|
|
|
+ //set count to 0; also, we don't reuse the arrays since we want to maintain
|
|
|
+ //consistency in the memory model
|
|
|
count = 0;
|
|
|
+ startOffsets = null;
|
|
|
+ keyLengths = null;
|
|
|
+ valueLengths = null;
|
|
|
+ pointers = null;
|
|
|
+ maxKeyLength = 0;
|
|
|
+ maxValLength = 0;
|
|
|
}
|
|
|
//A compare method that references the keyValBuffer through the indirect
|
|
|
//pointers
|
|
@@ -106,7 +133,11 @@ abstract class BasicTypeSorterBase implements BufferSorter {
|
|
|
}
|
|
|
|
|
|
private void grow() {
|
|
|
- int newLength = startOffsets.length * 3/2;
|
|
|
+ int currLength = 0;
|
|
|
+ if (startOffsets != null) {
|
|
|
+ currLength = startOffsets.length;
|
|
|
+ }
|
|
|
+ int newLength = (int)(currLength * 1.1) + 1;
|
|
|
startOffsets = grow(startOffsets, newLength);
|
|
|
keyLengths = grow(keyLengths, newLength);
|
|
|
valueLengths = grow(valueLengths, newLength);
|
|
@@ -115,7 +146,9 @@ abstract class BasicTypeSorterBase implements BufferSorter {
|
|
|
|
|
|
private int[] grow(int[] old, int newLength) {
|
|
|
int[] result = new int[newLength];
|
|
|
- System.arraycopy(old, 0, result, 0, old.length);
|
|
|
+ if(old != null) {
|
|
|
+ System.arraycopy(old, 0, result, 0, old.length);
|
|
|
+ }
|
|
|
return result;
|
|
|
}
|
|
|
} //BasicTypeSorterBase
|