|
@@ -592,8 +592,8 @@ class MapTask extends Task {
|
|
|
|
|
|
}
|
|
|
|
|
|
- class MapOutputBuffer<K extends Object, V extends Object>
|
|
|
- implements MapOutputCollector<K, V>, IndexedSortable {
|
|
|
+ class MapOutputBuffer<K extends Object, V extends Object>
|
|
|
+ implements MapOutputCollector<K, V>, IndexedSortable {
|
|
|
private final int partitions;
|
|
|
private final Partitioner<K, V> partitioner;
|
|
|
private final JobConf job;
|
|
@@ -635,6 +635,8 @@ class MapTask extends Task {
|
|
|
private volatile Throwable sortSpillException = null;
|
|
|
private final int softRecordLimit;
|
|
|
private final int softBufferLimit;
|
|
|
+ private int recordRemaining;
|
|
|
+ private int bufferRemaining;
|
|
|
private final int minSpillsForCombine;
|
|
|
private final IndexedSorter sorter;
|
|
|
private final ReentrantLock spillLock = new ReentrantLock();
|
|
@@ -682,8 +684,8 @@ class MapTask extends Task {
|
|
|
if ((sortmb & 0x7FF) != sortmb) {
|
|
|
throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
|
|
|
}
|
|
|
- sorter = ReflectionUtils.newInstance(
|
|
|
- job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
|
|
|
+ sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
|
|
|
+ QuickSort.class, IndexedSorter.class), job);
|
|
|
LOG.info("io.sort.mb = " + sortmb);
|
|
|
// buffers and accounting
|
|
|
int maxMemUsage = sortmb << 20;
|
|
@@ -696,6 +698,8 @@ class MapTask extends Task {
|
|
|
kvindices = new int[recordCapacity * ACCTSIZE];
|
|
|
softBufferLimit = (int)(kvbuffer.length * spillper);
|
|
|
softRecordLimit = (int)(kvoffsets.length * spillper);
|
|
|
+ recordRemaining = softRecordLimit;
|
|
|
+ bufferRemaining = softBufferLimit;
|
|
|
LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
|
|
|
LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
|
|
|
// k/v serialization
|
|
@@ -763,38 +767,52 @@ class MapTask extends Task {
|
|
|
+ value.getClass().getName());
|
|
|
}
|
|
|
final int kvnext = (kvindex + 1) % kvoffsets.length;
|
|
|
- spillLock.lock();
|
|
|
- try {
|
|
|
- boolean kvfull;
|
|
|
- do {
|
|
|
- if (sortSpillException != null) {
|
|
|
- throw (IOException)new IOException("Spill failed"
|
|
|
- ).initCause(sortSpillException);
|
|
|
- }
|
|
|
- // sufficient acct space
|
|
|
- kvfull = kvnext == kvstart;
|
|
|
- final boolean kvsoftlimit = ((kvnext > kvend)
|
|
|
- ? kvnext - kvend > softRecordLimit
|
|
|
- : kvend - kvnext <= kvoffsets.length - softRecordLimit);
|
|
|
- if (kvstart == kvend && kvsoftlimit) {
|
|
|
- LOG.info("Spilling map output: record full = " + kvsoftlimit);
|
|
|
- startSpill();
|
|
|
- }
|
|
|
- if (kvfull) {
|
|
|
- try {
|
|
|
- while (kvstart != kvend) {
|
|
|
- reporter.progress();
|
|
|
- spillDone.await();
|
|
|
+ if (--recordRemaining <= 0) {
|
|
|
+ // Possible for check to remain < zero, if soft limit remains
|
|
|
+ // in force but unsatisfiable because spill is in progress
|
|
|
+ spillLock.lock();
|
|
|
+ try {
|
|
|
+ boolean kvfull;
|
|
|
+ do {
|
|
|
+ if (sortSpillException != null) {
|
|
|
+ throw (IOException)new IOException("Spill failed"
|
|
|
+ ).initCause(sortSpillException);
|
|
|
+ }
|
|
|
+ // sufficient acct space
|
|
|
+ kvfull = kvnext == kvstart;
|
|
|
+ final boolean kvsoftlimit = ((kvnext > kvend)
|
|
|
+ ? kvnext - kvend > softRecordLimit
|
|
|
+ : kvend - kvnext <= kvoffsets.length - softRecordLimit);
|
|
|
+ if (kvstart == kvend && kvsoftlimit) {
|
|
|
+ LOG.info("Spilling map output: record full = " + kvfull);
|
|
|
+ startSpill();
|
|
|
+ }
|
|
|
+ if (kvfull) {
|
|
|
+ try {
|
|
|
+ while (kvstart != kvend) {
|
|
|
+ reporter.progress();
|
|
|
+ spillDone.await();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw (IOException)new IOException(
|
|
|
+ "Collector interrupted while waiting for the writer"
|
|
|
+ ).initCause(e);
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw (IOException)new IOException(
|
|
|
- "Collector interrupted while waiting for the writer"
|
|
|
- ).initCause(e);
|
|
|
}
|
|
|
- }
|
|
|
- } while (kvfull);
|
|
|
- } finally {
|
|
|
- spillLock.unlock();
|
|
|
+ } while (kvfull);
|
|
|
+ final int softOff = kvend + softRecordLimit;
|
|
|
+ recordRemaining = Math.min(
|
|
|
+ // out of acct space
|
|
|
+ (kvnext < kvstart
|
|
|
+ ? kvstart - kvnext
|
|
|
+ : kvoffsets.length - kvnext + kvstart),
|
|
|
+ // soft limit
|
|
|
+ (kvend < kvnext
|
|
|
+ ? softOff - kvnext
|
|
|
+ : kvnext + (softOff - kvoffsets.length)));
|
|
|
+ } finally {
|
|
|
+ spillLock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -905,7 +923,7 @@ class MapTask extends Task {
|
|
|
* likely result in data loss or corruption.
|
|
|
* @see #markRecord()
|
|
|
*/
|
|
|
- protected synchronized void reset() throws IOException {
|
|
|
+ protected void reset() throws IOException {
|
|
|
// spillLock unnecessary; If spill wraps, then
|
|
|
// bufindex < bufstart < bufend so contention is impossible
|
|
|
// a stale value for bufstart does not affect correctness, since
|
|
@@ -931,7 +949,7 @@ class MapTask extends Task {
|
|
|
private final byte[] scratch = new byte[1];
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void write(int v)
|
|
|
+ public void write(int v)
|
|
|
throws IOException {
|
|
|
scratch[0] = (byte)v;
|
|
|
write(scratch, 0, 1);
|
|
@@ -945,69 +963,86 @@ class MapTask extends Task {
|
|
|
* deserialize into the collection buffer.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized void write(byte b[], int off, int len)
|
|
|
+ public void write(byte b[], int off, int len)
|
|
|
throws IOException {
|
|
|
boolean buffull = false;
|
|
|
boolean wrap = false;
|
|
|
- spillLock.lock();
|
|
|
- try {
|
|
|
- do {
|
|
|
- if (sortSpillException != null) {
|
|
|
- throw (IOException)new IOException("Spill failed"
|
|
|
- ).initCause(sortSpillException);
|
|
|
- }
|
|
|
+ bufferRemaining -= len;
|
|
|
+ if (bufferRemaining <= 0) {
|
|
|
+ // writing these bytes could exhaust available buffer space
|
|
|
+ // check if spill or blocking is necessary
|
|
|
+ spillLock.lock();
|
|
|
+ try {
|
|
|
+ do {
|
|
|
+ if (sortSpillException != null) {
|
|
|
+ throw (IOException)new IOException("Spill failed"
|
|
|
+ ).initCause(sortSpillException);
|
|
|
+ }
|
|
|
|
|
|
- // sufficient buffer space?
|
|
|
- if (bufstart <= bufend && bufend <= bufindex) {
|
|
|
- buffull = bufindex + len > bufvoid;
|
|
|
- wrap = (bufvoid - bufindex) + bufstart > len;
|
|
|
- } else {
|
|
|
- // bufindex <= bufstart <= bufend
|
|
|
- // bufend <= bufindex <= bufstart
|
|
|
- wrap = false;
|
|
|
- buffull = bufindex + len > bufstart;
|
|
|
- }
|
|
|
+ // sufficient buffer space?
|
|
|
+ if (bufstart <= bufend && bufend <= bufindex) {
|
|
|
+ buffull = bufindex + len > bufvoid;
|
|
|
+ wrap = (bufvoid - bufindex) + bufstart > len;
|
|
|
+ } else {
|
|
|
+ // bufindex <= bufstart <= bufend
|
|
|
+ // bufend <= bufindex <= bufstart
|
|
|
+ wrap = false;
|
|
|
+ buffull = bufindex + len > bufstart;
|
|
|
+ }
|
|
|
|
|
|
- if (kvstart == kvend) {
|
|
|
- // spill thread not running
|
|
|
- if (kvend != kvindex) {
|
|
|
- // we have records we can spill
|
|
|
- final boolean bufsoftlimit = (bufindex > bufend)
|
|
|
- ? bufindex - bufend > softBufferLimit
|
|
|
- : bufend - bufindex < bufvoid - softBufferLimit;
|
|
|
- if (bufsoftlimit || (buffull && !wrap)) {
|
|
|
- LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
|
|
|
- startSpill();
|
|
|
+ if (kvstart == kvend) {
|
|
|
+ // spill thread not running
|
|
|
+ if (kvend != kvindex) {
|
|
|
+ // we have records we can spill
|
|
|
+ final boolean bufsoftlimit = (bufindex > bufend)
|
|
|
+ ? bufindex - bufend > softBufferLimit
|
|
|
+ : bufend - bufindex < bufvoid - softBufferLimit;
|
|
|
+ if (bufsoftlimit || (buffull && !wrap)) {
|
|
|
+ LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
|
|
|
+ startSpill();
|
|
|
+ }
|
|
|
+ } else if (buffull && !wrap) {
|
|
|
+ // We have no buffered records, and this record is too large
|
|
|
+ // to write into kvbuffer. We must spill it directly from
|
|
|
+ // collect
|
|
|
+ final int size = ((bufend <= bufindex)
|
|
|
+ ? bufindex - bufend
|
|
|
+ : (bufvoid - bufend) + bufindex) + len;
|
|
|
+ bufstart = bufend = bufindex = bufmark = 0;
|
|
|
+ kvstart = kvend = kvindex = 0;
|
|
|
+ bufvoid = kvbuffer.length;
|
|
|
+ throw new MapBufferTooSmallException(size + " bytes");
|
|
|
}
|
|
|
- } else if (buffull && !wrap) {
|
|
|
- // We have no buffered records, and this record is too large
|
|
|
- // to write into kvbuffer. We must spill it directly from
|
|
|
- // collect
|
|
|
- final int size = ((bufend <= bufindex)
|
|
|
- ? bufindex - bufend
|
|
|
- : (bufvoid - bufend) + bufindex) + len;
|
|
|
- bufstart = bufend = bufindex = bufmark = 0;
|
|
|
- kvstart = kvend = kvindex = 0;
|
|
|
- bufvoid = kvbuffer.length;
|
|
|
- throw new MapBufferTooSmallException(size + " bytes");
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- if (buffull && !wrap) {
|
|
|
- try {
|
|
|
- while (kvstart != kvend) {
|
|
|
- reporter.progress();
|
|
|
- spillDone.await();
|
|
|
+ if (buffull && !wrap) {
|
|
|
+ try {
|
|
|
+ while (kvstart != kvend) {
|
|
|
+ reporter.progress();
|
|
|
+ spillDone.await();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw (IOException)new IOException(
|
|
|
+ "Buffer interrupted while waiting for the writer"
|
|
|
+ ).initCause(e);
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw (IOException)new IOException(
|
|
|
- "Buffer interrupted while waiting for the writer"
|
|
|
- ).initCause(e);
|
|
|
}
|
|
|
- }
|
|
|
- } while (buffull && !wrap);
|
|
|
- } finally {
|
|
|
- spillLock.unlock();
|
|
|
+ } while (buffull && !wrap);
|
|
|
+ final int softOff = bufend + softBufferLimit;
|
|
|
+ bufferRemaining = Math.min(
|
|
|
+ // out of buffer space
|
|
|
+ (bufindex < bufstart
|
|
|
+ ? bufstart - bufindex
|
|
|
+ : kvbuffer.length - bufindex + bufstart),
|
|
|
+ // soft limit
|
|
|
+ (bufend < bufindex
|
|
|
+ ? softOff - bufindex
|
|
|
+ : bufindex + (softOff - kvbuffer.length)));
|
|
|
+ } finally {
|
|
|
+ spillLock.unlock();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ buffull = bufindex + len > bufvoid;
|
|
|
}
|
|
|
// here, we know that we have sufficient space to write
|
|
|
if (buffull) {
|
|
@@ -1019,11 +1054,12 @@ class MapTask extends Task {
|
|
|
}
|
|
|
System.arraycopy(b, off, kvbuffer, bufindex, len);
|
|
|
bufindex += len;
|
|
|
+ bufferRemaining -= len;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized void flush() throws IOException, ClassNotFoundException,
|
|
|
- InterruptedException {
|
|
|
+ public void flush() throws IOException, ClassNotFoundException,
|
|
|
+ InterruptedException {
|
|
|
LOG.info("Starting flush of map output");
|
|
|
spillLock.lock();
|
|
|
try {
|
|
@@ -1103,7 +1139,7 @@ class MapTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized void startSpill() {
|
|
|
+ private void startSpill() {
|
|
|
LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
|
|
|
"; bufvoid = " + bufvoid);
|
|
|
LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
|