|
@@ -502,6 +502,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
String mirrAddr, BlockTransferThrottler throttlerArg,
|
|
String mirrAddr, BlockTransferThrottler throttlerArg,
|
|
int numTargets) throws IOException {
|
|
int numTargets) throws IOException {
|
|
|
|
|
|
|
|
+ boolean responderClosed = false;
|
|
mirrorOut = mirrOut;
|
|
mirrorOut = mirrOut;
|
|
mirrorAddr = mirrAddr;
|
|
mirrorAddr = mirrAddr;
|
|
throttler = throttlerArg;
|
|
throttler = throttlerArg;
|
|
@@ -535,8 +536,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
|
|
|
// wait for all outstanding packet responses. And then
|
|
// wait for all outstanding packet responses. And then
|
|
// indicate responder to gracefully shutdown.
|
|
// indicate responder to gracefully shutdown.
|
|
|
|
+ // Mark that responder has been closed for future processing
|
|
if (responder != null) {
|
|
if (responder != null) {
|
|
((PacketResponder)responder.getRunnable()).close();
|
|
((PacketResponder)responder.getRunnable()).close();
|
|
|
|
+ responderClosed = true;
|
|
}
|
|
}
|
|
|
|
|
|
// if this write is for a replication request (and not
|
|
// if this write is for a replication request (and not
|
|
@@ -555,13 +558,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
LOG.info("Exception in receiveBlock for block " + block +
|
|
LOG.info("Exception in receiveBlock for block " + block +
|
|
" " + ioe);
|
|
" " + ioe);
|
|
- IOUtils.closeStream(this);
|
|
|
|
- if (responder != null) {
|
|
|
|
- responder.interrupt();
|
|
|
|
- }
|
|
|
|
- cleanupBlock();
|
|
|
|
throw ioe;
|
|
throw ioe;
|
|
} finally {
|
|
} finally {
|
|
|
|
+ if (!responderClosed) { // Abnormal termination of the flow above
|
|
|
|
+ IOUtils.closeStream(this);
|
|
|
|
+ if (responder != null) {
|
|
|
|
+ responder.interrupt();
|
|
|
|
+ }
|
|
|
|
+ cleanupBlock();
|
|
|
|
+ }
|
|
if (responder != null) {
|
|
if (responder != null) {
|
|
try {
|
|
try {
|
|
responder.join();
|
|
responder.join();
|