|
@@ -310,6 +310,9 @@ class MapTask extends Task {
|
|
private static final int KEYSTART = 1; // key offset in acct
|
|
private static final int KEYSTART = 1; // key offset in acct
|
|
private static final int VALSTART = 2; // val offset in acct
|
|
private static final int VALSTART = 2; // val offset in acct
|
|
private static final int ACCTSIZE = 3; // total #fields in acct
|
|
private static final int ACCTSIZE = 3; // total #fields in acct
|
|
|
|
+ private static final int RECSIZE =
|
|
|
|
+ (ACCTSIZE + 1) * 4; // acct bytes per record
|
|
|
|
+
|
|
|
|
|
|
// spill accounting
|
|
// spill accounting
|
|
private volatile int numSpills = 0;
|
|
private volatile int numSpills = 0;
|
|
@@ -352,12 +355,12 @@ class MapTask extends Task {
|
|
// buffers and accounting
|
|
// buffers and accounting
|
|
int maxMemUsage = sortmb << 20;
|
|
int maxMemUsage = sortmb << 20;
|
|
int recordCapacity = (int)(maxMemUsage * recper);
|
|
int recordCapacity = (int)(maxMemUsage * recper);
|
|
- recordCapacity += (recordCapacity >>> 2) % 4;
|
|
|
|
|
|
+ recordCapacity -= recordCapacity % RECSIZE;
|
|
kvbuffer = new byte[maxMemUsage - recordCapacity];
|
|
kvbuffer = new byte[maxMemUsage - recordCapacity];
|
|
bufvoid = kvbuffer.length;
|
|
bufvoid = kvbuffer.length;
|
|
- int kvcapacity = recordCapacity >>> 2;
|
|
|
|
- kvoffsets = new int[kvcapacity];
|
|
|
|
- kvindices = new int[recordCapacity - kvcapacity];
|
|
|
|
|
|
+ recordCapacity /= RECSIZE;
|
|
|
|
+ kvoffsets = new int[recordCapacity];
|
|
|
|
+ kvindices = new int[recordCapacity * ACCTSIZE];
|
|
softBufferLimit = (int)(kvbuffer.length * spillper);
|
|
softBufferLimit = (int)(kvbuffer.length * spillper);
|
|
softRecordLimit = (int)(kvoffsets.length * spillper);
|
|
softRecordLimit = (int)(kvoffsets.length * spillper);
|
|
// k/v serialization
|
|
// k/v serialization
|