|
@@ -2171,13 +2171,15 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* Aborts this output stream and releases any system
|
|
|
* resources associated with this stream.
|
|
|
*/
|
|
|
- synchronized void abort() throws IOException {
|
|
|
- if (closed) {
|
|
|
- return;
|
|
|
+ void abort() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (closed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ streamer.setLastException(new IOException("Lease timeout of "
|
|
|
+ + (dfsClient.getHdfsTimeout() / 1000) + " seconds expired."));
|
|
|
+ closeThreads(true);
|
|
|
}
|
|
|
- streamer.setLastException(new IOException("Lease timeout of "
|
|
|
- + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
|
|
|
- closeThreads(true);
|
|
|
dfsClient.endFileLease(fileId);
|
|
|
}
|
|
|
|
|
@@ -2204,39 +2206,42 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* resources associated with this stream.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- if (closed) {
|
|
|
- IOException e = lastException.getAndSet(null);
|
|
|
- if (e == null)
|
|
|
- return;
|
|
|
- else
|
|
|
- throw e;
|
|
|
- }
|
|
|
+ public void close() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (closed) {
|
|
|
+ IOException e = lastException.getAndSet(null);
|
|
|
+ if (e == null)
|
|
|
+ return;
|
|
|
+ else
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- flushBuffer(); // flush from all upper layers
|
|
|
+ try {
|
|
|
+ flushBuffer(); // flush from all upper layers
|
|
|
|
|
|
- if (currentPacket != null) {
|
|
|
- waitAndQueueCurrentPacket();
|
|
|
- }
|
|
|
+ if (currentPacket != null) {
|
|
|
+ waitAndQueueCurrentPacket();
|
|
|
+ }
|
|
|
|
|
|
- if (bytesCurBlock != 0) {
|
|
|
- // send an empty packet to mark the end of the block
|
|
|
- currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
|
|
- currentPacket.lastPacketInBlock = true;
|
|
|
- currentPacket.syncBlock = shouldSyncBlock;
|
|
|
- }
|
|
|
+ if (bytesCurBlock != 0) {
|
|
|
+ // send an empty packet to mark the end of the block
|
|
|
+ currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
|
|
+ currentPacket.lastPacketInBlock = true;
|
|
|
+ currentPacket.syncBlock = shouldSyncBlock;
|
|
|
+ }
|
|
|
|
|
|
- flushInternal(); // flush all data to Datanodes
|
|
|
- // get last block before destroying the streamer
|
|
|
- ExtendedBlock lastBlock = streamer.getBlock();
|
|
|
- closeThreads(false);
|
|
|
- completeFile(lastBlock);
|
|
|
- dfsClient.endFileLease(fileId);
|
|
|
- } catch (ClosedChannelException e) {
|
|
|
- } finally {
|
|
|
- closed = true;
|
|
|
+ flushInternal(); // flush all data to Datanodes
|
|
|
+ // get last block before destroying the streamer
|
|
|
+ ExtendedBlock lastBlock = streamer.getBlock();
|
|
|
+ closeThreads(false);
|
|
|
+ completeFile(lastBlock);
|
|
|
+
|
|
|
+ } catch (ClosedChannelException e) {
|
|
|
+ } finally {
|
|
|
+ closed = true;
|
|
|
+ }
|
|
|
}
|
|
|
+ dfsClient.endFileLease(fileId);
|
|
|
}
|
|
|
|
|
|
// should be called holding (this) lock since setTestFilename() may
|