|
@@ -200,7 +200,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
@Override
|
|
|
public void hsync() throws IOException {
|
|
|
if (supportFlush) {
|
|
|
- flushInternal();
|
|
|
+ flushInternal(false);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -211,7 +211,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
@Override
|
|
|
public void hflush() throws IOException {
|
|
|
if (supportFlush) {
|
|
|
- flushInternal();
|
|
|
+ flushInternal(false);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -230,7 +230,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- flushInternal();
|
|
|
+ flushInternal(true);
|
|
|
threadExecutor.shutdown();
|
|
|
} finally {
|
|
|
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
@@ -244,10 +244,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized void flushInternal() throws IOException {
|
|
|
+ private synchronized void flushInternal(boolean isClose) throws IOException {
|
|
|
maybeThrowLastError();
|
|
|
writeCurrentBufferToService();
|
|
|
- flushWrittenBytesToService();
|
|
|
+ flushWrittenBytesToService(isClose);
|
|
|
}
|
|
|
|
|
|
private synchronized void flushInternalAsync() throws IOException {
|
|
@@ -288,7 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
shrinkWriteOperationQueue();
|
|
|
}
|
|
|
|
|
|
- private synchronized void flushWrittenBytesToService() throws IOException {
|
|
|
+ private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
|
|
|
for (WriteOperation writeOperation : writeOperations) {
|
|
|
try {
|
|
|
writeOperation.task.get();
|
|
@@ -306,21 +306,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
throw lastError;
|
|
|
}
|
|
|
}
|
|
|
- flushWrittenBytesToServiceInternal(position, false);
|
|
|
+ flushWrittenBytesToServiceInternal(position, false, isClose);
|
|
|
}
|
|
|
|
|
|
private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
|
|
|
shrinkWriteOperationQueue();
|
|
|
|
|
|
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
|
|
|
- this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true);
|
|
|
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
|
|
|
+ false/*Async flush on close not permitted*/);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private synchronized void flushWrittenBytesToServiceInternal(final long offset,
|
|
|
- final boolean retainUncommitedData) throws IOException {
|
|
|
+ final boolean retainUncommitedData, final boolean isClose) throws IOException {
|
|
|
try {
|
|
|
- client.flush(path, offset, retainUncommitedData);
|
|
|
+ client.flush(path, offset, retainUncommitedData, isClose);
|
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
|
if (ex instanceof AbfsRestOperationException) {
|
|
|
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
|