|
@@ -2433,17 +2433,15 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (streamerClosed || hasError || !clientRunning) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // done receiving all acks
|
|
|
- if (response != null) {
|
|
|
- response.close(); // notify responder to close
|
|
|
+
|
|
|
+ if (ackQueue.isEmpty()) { // done receiving all acks
|
|
|
+ if (response != null) {
|
|
|
+ response.close(); // notify responder to close
|
|
|
+ }
|
|
|
+ // indicate end-of-block
|
|
|
+ blockStream.writeInt(0);
|
|
|
+ blockStream.flush();
|
|
|
}
|
|
|
- // indicate end-of-block
|
|
|
- blockStream.writeInt(0);
|
|
|
- blockStream.flush();
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("DataStreamer block " + block +
|
|
@@ -2500,10 +2498,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
* close both streamer and DFSOutputStream, should be called only
|
|
|
* by an external thread and only after all data to be sent has
|
|
|
* been flushed to datanode.
|
|
|
+ *
|
|
|
+ * Interrupt this data streamer if force is true
|
|
|
+ *
|
|
|
+ * @param force if this data stream is forced to be closed
|
|
|
*/
|
|
|
- void close() {
|
|
|
+ void close(boolean force) {
|
|
|
streamerClosed = true;
|
|
|
- this.interrupt();
|
|
|
+ if (force) {
|
|
|
+ this.interrupt();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void closeResponder() {
|
|
@@ -3267,7 +3271,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
lastException = new IOException("IOException flush:" + e);
|
|
|
- closeThreads();
|
|
|
+ closeThreads(true);
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
@@ -3308,13 +3312,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
streamer.setLastException(new IOException("Lease timeout of " +
|
|
|
(hdfsTimeout/1000) + " seconds expired."));
|
|
|
- closeThreads();
|
|
|
+ closeThreads(true);
|
|
|
}
|
|
|
|
|
|
// shutdown datastreamer and responseprocessor threads.
|
|
|
- private void closeThreads() throws IOException {
|
|
|
+ // interrupt datastreamer if force is true
|
|
|
+ private void closeThreads(boolean force) throws IOException {
|
|
|
try {
|
|
|
- streamer.close();
|
|
|
+ streamer.close(force);
|
|
|
streamer.join();
|
|
|
if (s != null) {
|
|
|
s.close();
|
|
@@ -3358,7 +3363,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
|
|
|
flushInternal(); // flush all data to Datanodes
|
|
|
- closeThreads();
|
|
|
+ closeThreads(false);
|
|
|
completeFile();
|
|
|
leasechecker.remove(src);
|
|
|
} finally {
|