|
@@ -312,7 +312,7 @@ class MapTask extends Task {
|
|
|
|
|
|
// spill accounting
|
|
|
private volatile int numSpills = 0;
|
|
|
- private volatile IOException sortSpillException = null;
|
|
|
+ private volatile Throwable sortSpillException = null;
|
|
|
private final int softRecordLimit;
|
|
|
private final int softBufferLimit;
|
|
|
private final Object spillLock = new Object();
|
|
@@ -424,7 +424,8 @@ class MapTask extends Task {
|
|
|
+ value.getClass().getName());
|
|
|
}
|
|
|
if (sortSpillException != null) {
|
|
|
- throw sortSpillException;
|
|
|
+ throw (IOException)new IOException("Spill failed"
|
|
|
+ ).initCause(sortSpillException);
|
|
|
}
|
|
|
try {
|
|
|
int keystart = bufindex;
|
|
@@ -588,8 +589,8 @@ class MapTask extends Task {
|
|
|
synchronized(spillLock) {
|
|
|
do {
|
|
|
if (sortSpillException != null) {
|
|
|
- throw (IOException)new IOException().initCause(
|
|
|
- sortSpillException);
|
|
|
+ throw (IOException)new IOException("Spill failed"
|
|
|
+ ).initCause(sortSpillException);
|
|
|
}
|
|
|
|
|
|
// sufficient accounting space?
|
|
@@ -679,7 +680,8 @@ class MapTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
if (sortSpillException != null) {
|
|
|
- throw (IOException)new IOException().initCause(sortSpillException);
|
|
|
+ throw (IOException)new IOException("Spill failed"
|
|
|
+ ).initCause(sortSpillException);
|
|
|
}
|
|
|
if (kvend != kvindex) {
|
|
|
kvend = kvindex;
|
|
@@ -698,7 +700,7 @@ class MapTask extends Task {
|
|
|
public void run() {
|
|
|
try {
|
|
|
sortAndSpill();
|
|
|
- } catch (IOException e) {
|
|
|
+ } catch (Throwable e) {
|
|
|
sortSpillException = e;
|
|
|
} finally {
|
|
|
synchronized(spillLock) {
|
|
@@ -850,7 +852,7 @@ class MapTask extends Task {
|
|
|
private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
|
|
|
final int nextindex = kvoff / ACCTSIZE == kvend - 1
|
|
|
? bufend
|
|
|
- : kvindices[kvoff + ACCTSIZE + KEYSTART];
|
|
|
+ : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
|
|
|
int vallen = (nextindex > kvindices[kvoff + VALSTART])
|
|
|
? nextindex - kvindices[kvoff + VALSTART]
|
|
|
: (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
|