|
@@ -708,13 +708,17 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* Aborts this output stream and releases any system
|
|
|
* resources associated with this stream.
|
|
|
*/
|
|
|
- synchronized void abort() throws IOException {
|
|
|
- if (isClosed()) {
|
|
|
- return;
|
|
|
+ void abort() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (isClosed()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ getStreamer().getLastException().set(
|
|
|
+ new IOException("Lease timeout of "
|
|
|
+ + (dfsClient.getConf().getHdfsTimeout() / 1000)
|
|
|
+ + " seconds expired."));
|
|
|
+ closeThreads(true);
|
|
|
}
|
|
|
- getStreamer().getLastException().set(new IOException("Lease timeout of "
|
|
|
- + (dfsClient.getConf().getHdfsTimeout() / 1000) + " seconds expired."));
|
|
|
- closeThreads(true);
|
|
|
dfsClient.endFileLease(fileId);
|
|
|
}
|
|
|
|
|
@@ -747,11 +751,14 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* resources associated with this stream.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- try (TraceScope ignored =
|
|
|
- dfsClient.newPathTraceScope("DFSOutputStream#close", src)) {
|
|
|
- closeImpl();
|
|
|
+ public void close() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ try (TraceScope ignored = dfsClient.newPathTraceScope(
|
|
|
+ "DFSOutputStream#close", src)) {
|
|
|
+ closeImpl();
|
|
|
+ }
|
|
|
}
|
|
|
+ dfsClient.endFileLease(fileId);
|
|
|
}
|
|
|
|
|
|
protected synchronized void closeImpl() throws IOException {
|
|
@@ -779,7 +786,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
dfsClient.getTracer().newScope("completeFile")) {
|
|
|
completeFile(lastBlock);
|
|
|
}
|
|
|
- dfsClient.endFileLease(fileId);
|
|
|
} catch (ClosedChannelException ignored) {
|
|
|
} finally {
|
|
|
setClosed();
|