|
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ExecutorCompletionService;
|
|
import java.util.concurrent.ExecutorCompletionService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
-import java.util.concurrent.Callable;
|
|
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@@ -43,6 +42,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
|
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
|
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
|
|
|
+import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
|
|
+import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
|
|
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
|
|
|
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
|
|
|
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
|
|
|
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
|
|
import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
@@ -57,7 +62,8 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestPara
|
|
/**
|
|
/**
|
|
* The BlobFsOutputStream for Rest AbfsClient.
|
|
* The BlobFsOutputStream for Rest AbfsClient.
|
|
*/
|
|
*/
|
|
-public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {
|
|
|
|
|
|
+public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
|
|
+ StreamCapabilities, IOStatisticsSource {
|
|
|
|
|
|
private final AbfsClient client;
|
|
private final AbfsClient client;
|
|
private final String path;
|
|
private final String path;
|
|
@@ -97,6 +103,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
|
|
|
|
private final Statistics statistics;
|
|
private final Statistics statistics;
|
|
private final AbfsOutputStreamStatistics outputStreamStatistics;
|
|
private final AbfsOutputStreamStatistics outputStreamStatistics;
|
|
|
|
+ private IOStatistics ioStatistics;
|
|
|
|
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(AbfsOutputStream.class);
|
|
LoggerFactory.getLogger(AbfsOutputStream.class);
|
|
@@ -144,6 +151,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
|
|
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
|
|
this.cachedSasToken = new CachedSASToken(
|
|
this.cachedSasToken = new CachedSASToken(
|
|
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
|
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ this.ioStatistics = outputStreamStatistics.getIOStatistics();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -354,11 +364,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
if (bufferIndex == 0) {
|
|
if (bufferIndex == 0) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
-
|
|
|
|
final byte[] bytes = buffer;
|
|
final byte[] bytes = buffer;
|
|
final int bytesLength = bufferIndex;
|
|
final int bytesLength = bufferIndex;
|
|
- outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
+ outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
+ }
|
|
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
bufferIndex = 0;
|
|
bufferIndex = 0;
|
|
final long offset = position;
|
|
final long offset = position;
|
|
@@ -370,7 +381,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
bytesLength, APPEND_MODE, true);
|
|
bytesLength, APPEND_MODE, true);
|
|
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
|
|
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
|
|
cachedSasToken.update(op.getSasToken());
|
|
cachedSasToken.update(op.getSasToken());
|
|
- outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
+ }
|
|
perfInfo.registerResult(op.getResult());
|
|
perfInfo.registerResult(op.getResult());
|
|
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
|
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
|
perfInfo.registerSuccess(true);
|
|
perfInfo.registerSuccess(true);
|
|
@@ -402,55 +415,63 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
if (bufferIndex == 0) {
|
|
if (bufferIndex == 0) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
numOfAppendsToServerSinceLastFlush++;
|
|
numOfAppendsToServerSinceLastFlush++;
|
|
|
|
|
|
final byte[] bytes = buffer;
|
|
final byte[] bytes = buffer;
|
|
final int bytesLength = bufferIndex;
|
|
final int bytesLength = bufferIndex;
|
|
- outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
+ outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
+ }
|
|
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
bufferIndex = 0;
|
|
bufferIndex = 0;
|
|
final long offset = position;
|
|
final long offset = position;
|
|
position += bytesLength;
|
|
position += bytesLength;
|
|
|
|
|
|
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
|
|
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
|
|
- long start = System.currentTimeMillis();
|
|
|
|
- waitForTaskToComplete();
|
|
|
|
- outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final Future<Void> job = completionService.submit(new Callable<Void>() {
|
|
|
|
- @Override
|
|
|
|
- public Void call() throws Exception {
|
|
|
|
- AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
|
|
|
- try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
|
|
|
- "writeCurrentBufferToService", "append")) {
|
|
|
|
- AppendRequestParameters.Mode
|
|
|
|
- mode = APPEND_MODE;
|
|
|
|
- if (isFlush & isClose) {
|
|
|
|
- mode = FLUSH_CLOSE_MODE;
|
|
|
|
- } else if (isFlush) {
|
|
|
|
- mode = FLUSH_MODE;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- AppendRequestParameters reqParams = new AppendRequestParameters(
|
|
|
|
- offset, 0, bytesLength, mode, false);
|
|
|
|
- AbfsRestOperation op = client.append(path, bytes, reqParams,
|
|
|
|
- cachedSasToken.get());
|
|
|
|
-
|
|
|
|
- cachedSasToken.update(op.getSasToken());
|
|
|
|
- perfInfo.registerResult(op.getResult());
|
|
|
|
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
|
|
|
- perfInfo.registerSuccess(true);
|
|
|
|
- return null;
|
|
|
|
|
|
+ //Tracking time spent on waiting for task to complete.
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
|
|
|
|
+ waitForTaskToComplete();
|
|
}
|
|
}
|
|
|
|
+ } else {
|
|
|
|
+ waitForTaskToComplete();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ final Future<Void> job =
|
|
|
|
+ completionService.submit(IOStatisticsBinding
|
|
|
|
+ .trackDurationOfCallable((IOStatisticsStore) ioStatistics,
|
|
|
|
+ StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
|
|
|
|
+ () -> {
|
|
|
|
+ AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
|
|
|
+ try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
|
|
|
+ "writeCurrentBufferToService", "append")) {
|
|
|
|
+ AppendRequestParameters.Mode
|
|
|
|
+ mode = APPEND_MODE;
|
|
|
|
+ if (isFlush & isClose) {
|
|
|
|
+ mode = FLUSH_CLOSE_MODE;
|
|
|
|
+ } else if (isFlush) {
|
|
|
|
+ mode = FLUSH_MODE;
|
|
|
|
+ }
|
|
|
|
+ AppendRequestParameters reqParams = new AppendRequestParameters(
|
|
|
|
+ offset, 0, bytesLength, mode, false);
|
|
|
|
+ AbfsRestOperation op = client.append(path, bytes, reqParams,
|
|
|
|
+ cachedSasToken.get());
|
|
|
|
+ cachedSasToken.update(op.getSasToken());
|
|
|
|
+ perfInfo.registerResult(op.getResult());
|
|
|
|
+ byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
|
|
|
+ perfInfo.registerSuccess(true);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ if (job.isCancelled()) {
|
|
|
|
+ outputStreamStatistics.uploadFailed(bytesLength);
|
|
|
|
+ } else {
|
|
|
|
+ outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
}
|
|
}
|
|
- });
|
|
|
|
-
|
|
|
|
- if (job.isCancelled()) {
|
|
|
|
- outputStreamStatistics.uploadFailed(bytesLength);
|
|
|
|
- } else {
|
|
|
|
- outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
}
|
|
}
|
|
writeOperations.add(new WriteOperation(job, offset, bytesLength));
|
|
writeOperations.add(new WriteOperation(job, offset, bytesLength));
|
|
|
|
|
|
@@ -527,7 +548,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
lastTotalAppendOffset += writeOperations.peek().length;
|
|
lastTotalAppendOffset += writeOperations.peek().length;
|
|
writeOperations.remove();
|
|
writeOperations.remove();
|
|
// Incrementing statistics to indicate queue has been shrunk.
|
|
// Incrementing statistics to indicate queue has been shrunk.
|
|
- outputStreamStatistics.queueShrunk();
|
|
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ outputStreamStatistics.queueShrunk();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
if (e.getCause() instanceof AzureBlobFileSystemException) {
|
|
if (e.getCause() instanceof AzureBlobFileSystemException) {
|
|
@@ -615,6 +638,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
return isAppendBlob;
|
|
return isAppendBlob;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public IOStatistics getIOStatistics() {
|
|
|
|
+ return ioStatistics;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Appending AbfsOutputStream statistics to base toString().
|
|
* Appending AbfsOutputStream statistics to base toString().
|
|
*
|
|
*
|
|
@@ -623,9 +651,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|
@Override
|
|
@Override
|
|
public String toString() {
|
|
public String toString() {
|
|
final StringBuilder sb = new StringBuilder(super.toString());
|
|
final StringBuilder sb = new StringBuilder(super.toString());
|
|
- sb.append("AbfsOuputStream@").append(this.hashCode()).append("){");
|
|
|
|
- sb.append(outputStreamStatistics.toString());
|
|
|
|
- sb.append("}");
|
|
|
|
|
|
+ if (outputStreamStatistics != null) {
|
|
|
|
+ sb.append("AbfsOutputStream@").append(this.hashCode());
|
|
|
|
+ sb.append("){");
|
|
|
|
+ sb.append(outputStreamStatistics.toString());
|
|
|
|
+ sb.append("}");
|
|
|
|
+ }
|
|
return sb.toString();
|
|
return sb.toString();
|
|
}
|
|
}
|
|
}
|
|
}
|