|
@@ -2174,13 +2174,15 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* Aborts this output stream and releases any system
|
|
|
* resources associated with this stream.
|
|
|
*/
|
|
|
- synchronized void abort() throws IOException {
|
|
|
- if (isClosed()) {
|
|
|
- return;
|
|
|
+ void abort() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (isClosed()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ streamer.setLastException(new IOException("Lease timeout of "
|
|
|
+ + (dfsClient.getHdfsTimeout() / 1000) + " seconds expired."));
|
|
|
+ closeThreads(true);
|
|
|
}
|
|
|
- streamer.setLastException(new IOException("Lease timeout of "
|
|
|
- + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
|
|
|
- closeThreads(true);
|
|
|
dfsClient.endFileLease(fileId);
|
|
|
}
|
|
|
|
|
@@ -2226,14 +2228,17 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* resources associated with this stream.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- TraceScope scope =
|
|
|
- dfsClient.getPathTraceScope("DFSOutputStream#close", src);
|
|
|
- try {
|
|
|
- closeImpl();
|
|
|
- } finally {
|
|
|
- scope.close();
|
|
|
+ public void close() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#close",
|
|
|
+ src);
|
|
|
+ try {
|
|
|
+ closeImpl();
|
|
|
+ } finally {
|
|
|
+ scope.close();
|
|
|
+ }
|
|
|
}
|
|
|
+ dfsClient.endFileLease(fileId);
|
|
|
}
|
|
|
|
|
|
private synchronized void closeImpl() throws IOException {
|
|
@@ -2268,7 +2273,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
} finally {
|
|
|
scope.close();
|
|
|
}
|
|
|
- dfsClient.endFileLease(fileId);
|
|
|
} catch (ClosedChannelException e) {
|
|
|
} finally {
|
|
|
setClosed();
|