|
@@ -277,7 +277,7 @@ class MapTask extends Task {
|
|
private final Object pendingKeyvalBufferLock = new Object();
|
|
private final Object pendingKeyvalBufferLock = new Object();
|
|
// since sort-spill and collect are done concurrently, exceptions are
|
|
// since sort-spill and collect are done concurrently, exceptions are
|
|
// passed through shared variable
|
|
// passed through shared variable
|
|
- private volatile IOException sortSpillException;
|
|
|
|
|
|
+ private volatile Throwable sortSpillException;
|
|
private int maxBufferSize; //the max amount of in-memory space after which
|
|
private int maxBufferSize; //the max amount of in-memory space after which
|
|
//we will spill the keyValBuffer to disk
|
|
//we will spill the keyValBuffer to disk
|
|
private int numSpills; //maintains the no. of spills to disk done so far
|
|
private int numSpills; //maintains the no. of spills to disk done so far
|
|
@@ -373,7 +373,7 @@ class MapTask extends Task {
|
|
|
|
|
|
// check if the earlier sort-spill generated an exception
|
|
// check if the earlier sort-spill generated an exception
|
|
if (sortSpillException != null) {
|
|
if (sortSpillException != null) {
|
|
- throw sortSpillException;
|
|
|
|
|
|
+ throw (IOException) new IOException().initCause(sortSpillException);
|
|
}
|
|
}
|
|
|
|
|
|
if (keyValBuffer == null) {
|
|
if (keyValBuffer == null) {
|
|
@@ -426,7 +426,7 @@ class MapTask extends Task {
|
|
|
|
|
|
// check if the earlier sort-spill thread generated an exception
|
|
// check if the earlier sort-spill thread generated an exception
|
|
if (sortSpillException != null) {
|
|
if (sortSpillException != null) {
|
|
- throw sortSpillException;
|
|
|
|
|
|
+ throw (IOException) new IOException().initCause(sortSpillException);
|
|
}
|
|
}
|
|
|
|
|
|
// Start the sort-spill thread. While the sort and spill takes place
|
|
// Start the sort-spill thread. While the sort and spill takes place
|
|
@@ -501,8 +501,8 @@ class MapTask extends Task {
|
|
numSpills++;
|
|
numSpills++;
|
|
out.close();
|
|
out.close();
|
|
indexOut.close();
|
|
indexOut.close();
|
|
- } catch (IOException ioe) {
|
|
|
|
- sortSpillException = ioe;
|
|
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ sortSpillException = t;
|
|
} finally { // make sure that the collector never waits indefinitely
|
|
} finally { // make sure that the collector never waits indefinitely
|
|
pendingKeyvalBuffer = null;
|
|
pendingKeyvalBuffer = null;
|
|
for (int i = 0; i < partitions; i++) {
|
|
for (int i = 0; i < partitions; i++) {
|
|
@@ -700,7 +700,7 @@ class MapTask extends Task {
|
|
|
|
|
|
// check if the earlier sort-spill thread generated an exception
|
|
// check if the earlier sort-spill thread generated an exception
|
|
if (sortSpillException != null) {
|
|
if (sortSpillException != null) {
|
|
- throw sortSpillException;
|
|
|
|
|
|
+ throw (IOException) new IOException().initCause(sortSpillException);
|
|
}
|
|
}
|
|
|
|
|
|
if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
|
|
if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
|
|
@@ -716,7 +716,7 @@ class MapTask extends Task {
|
|
|
|
|
|
// check if the last sort-spill thread generated an exception
|
|
// check if the last sort-spill thread generated an exception
|
|
if (sortSpillException != null) {
|
|
if (sortSpillException != null) {
|
|
- throw sortSpillException;
|
|
|
|
|
|
+ throw (IOException) new IOException().initCause(sortSpillException);
|
|
}
|
|
}
|
|
mergeParts();
|
|
mergeParts();
|
|
}
|
|
}
|