|
@@ -20,24 +20,20 @@ package org.apache.hadoop.fs.azurebfs.services;
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.io.InterruptedIOException;
|
|
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
-import java.nio.ByteBuffer;
|
|
|
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
|
-import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
-import java.util.concurrent.ExecutorCompletionService;
|
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
|
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
|
-
|
|
|
|
|
|
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
|
|
|
|
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.hadoop.fs.PathIOException;
|
|
import org.apache.hadoop.fs.PathIOException;
|
|
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
|
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
|
@@ -47,10 +43,9 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
|
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
|
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
|
import org.apache.hadoop.fs.azurebfs.utils.Listener;
|
|
import org.apache.hadoop.fs.azurebfs.utils.Listener;
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
-import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
|
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
|
-import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
|
|
|
|
+import org.apache.hadoop.fs.store.DataBlocks;
|
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
import org.apache.hadoop.fs.StreamCapabilities;
|
|
import org.apache.hadoop.fs.StreamCapabilities;
|
|
@@ -63,6 +58,7 @@ import static org.apache.hadoop.io.IOUtils.wrapException;
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
|
|
|
|
+import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
|
|
|
|
|
|
/**
|
|
/**
|
|
* The BlobFsOutputStream for Rest AbfsClient.
|
|
* The BlobFsOutputStream for Rest AbfsClient.
|
|
@@ -72,6 +68,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
|
|
|
|
private final AbfsClient client;
|
|
private final AbfsClient client;
|
|
private final String path;
|
|
private final String path;
|
|
|
|
+ /** The position in the file being uploaded, where the next block would be
|
|
|
|
+ * uploaded.
|
|
|
|
+ * This is used in constructing the AbfsClient requests to ensure that,
|
|
|
|
+ * even if blocks are uploaded out of order, they are reassembled in
|
|
|
|
+ * correct order.
|
|
|
|
+ * */
|
|
private long position;
|
|
private long position;
|
|
private boolean closed;
|
|
private boolean closed;
|
|
private boolean supportFlush;
|
|
private boolean supportFlush;
|
|
@@ -91,8 +93,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
private final int maxRequestsThatCanBeQueued;
|
|
private final int maxRequestsThatCanBeQueued;
|
|
|
|
|
|
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
|
|
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
|
|
- private final ThreadPoolExecutor threadExecutor;
|
|
|
|
- private final ExecutorCompletionService<Void> completionService;
|
|
|
|
|
|
|
|
// SAS tokens can be re-used until they expire
|
|
// SAS tokens can be re-used until they expire
|
|
private CachedSASToken cachedSasToken;
|
|
private CachedSASToken cachedSasToken;
|
|
@@ -103,15 +103,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
private AbfsLease lease;
|
|
private AbfsLease lease;
|
|
private String leaseId;
|
|
private String leaseId;
|
|
|
|
|
|
- /**
|
|
|
|
- * Queue storing buffers with the size of the Azure block ready for
|
|
|
|
- * reuse. The pool allows reusing the blocks instead of allocating new
|
|
|
|
- * blocks. After the data is sent to the service, the buffer is returned
|
|
|
|
- * back to the queue
|
|
|
|
- */
|
|
|
|
- private ElasticByteBufferPool byteBufferPool
|
|
|
|
- = new ElasticByteBufferPool();
|
|
|
|
-
|
|
|
|
private final Statistics statistics;
|
|
private final Statistics statistics;
|
|
private final AbfsOutputStreamStatistics outputStreamStatistics;
|
|
private final AbfsOutputStreamStatistics outputStreamStatistics;
|
|
private IOStatistics ioStatistics;
|
|
private IOStatistics ioStatistics;
|
|
@@ -119,17 +110,27 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(AbfsOutputStream.class);
|
|
LoggerFactory.getLogger(AbfsOutputStream.class);
|
|
|
|
|
|
- public AbfsOutputStream(
|
|
|
|
- final AbfsClient client,
|
|
|
|
- final Statistics statistics,
|
|
|
|
- final String path,
|
|
|
|
- final long position,
|
|
|
|
- AbfsOutputStreamContext abfsOutputStreamContext,
|
|
|
|
- TracingContext tracingContext) {
|
|
|
|
- this.client = client;
|
|
|
|
- this.statistics = statistics;
|
|
|
|
- this.path = path;
|
|
|
|
- this.position = position;
|
|
|
|
|
|
+ /** Factory for blocks. */
|
|
|
|
+ private final DataBlocks.BlockFactory blockFactory;
|
|
|
|
+
|
|
|
|
+ /** Current data block. Null means none currently active. */
|
|
|
|
+ private DataBlocks.DataBlock activeBlock;
|
|
|
|
+
|
|
|
|
+ /** Count of blocks uploaded. */
|
|
|
|
+ private long blockCount = 0;
|
|
|
|
+
|
|
|
|
+ /** The size of a single block. */
|
|
|
|
+ private final int blockSize;
|
|
|
|
+
|
|
|
|
+ /** Executor service to carry out the parallel upload requests. */
|
|
|
|
+ private final ListeningExecutorService executorService;
|
|
|
|
+
|
|
|
|
+ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
|
|
|
|
+ throws IOException {
|
|
|
|
+ this.client = abfsOutputStreamContext.getClient();
|
|
|
|
+ this.statistics = abfsOutputStreamContext.getStatistics();
|
|
|
|
+ this.path = abfsOutputStreamContext.getPath();
|
|
|
|
+ this.position = abfsOutputStreamContext.getPosition();
|
|
this.closed = false;
|
|
this.closed = false;
|
|
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
|
|
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
|
|
this.disableOutputStreamFlush = abfsOutputStreamContext
|
|
this.disableOutputStreamFlush = abfsOutputStreamContext
|
|
@@ -140,7 +141,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
this.lastError = null;
|
|
this.lastError = null;
|
|
this.lastFlushOffset = 0;
|
|
this.lastFlushOffset = 0;
|
|
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
|
|
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
|
|
- this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
|
|
this.bufferIndex = 0;
|
|
this.bufferIndex = 0;
|
|
this.numOfAppendsToServerSinceLastFlush = 0;
|
|
this.numOfAppendsToServerSinceLastFlush = 0;
|
|
this.writeOperations = new ConcurrentLinkedDeque<>();
|
|
this.writeOperations = new ConcurrentLinkedDeque<>();
|
|
@@ -157,23 +157,20 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
|
|
|
|
this.lease = abfsOutputStreamContext.getLease();
|
|
this.lease = abfsOutputStreamContext.getLease();
|
|
this.leaseId = abfsOutputStreamContext.getLeaseId();
|
|
this.leaseId = abfsOutputStreamContext.getLeaseId();
|
|
-
|
|
|
|
- this.threadExecutor
|
|
|
|
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
|
|
|
|
- maxConcurrentRequestCount,
|
|
|
|
- 10L,
|
|
|
|
- TimeUnit.SECONDS,
|
|
|
|
- new LinkedBlockingQueue<>());
|
|
|
|
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
|
|
|
|
|
|
+ this.executorService =
|
|
|
|
+ MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService());
|
|
this.cachedSasToken = new CachedSASToken(
|
|
this.cachedSasToken = new CachedSASToken(
|
|
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
|
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- this.ioStatistics = outputStreamStatistics.getIOStatistics();
|
|
|
|
- }
|
|
|
|
this.outputStreamId = createOutputStreamId();
|
|
this.outputStreamId = createOutputStreamId();
|
|
- this.tracingContext = new TracingContext(tracingContext);
|
|
|
|
|
|
+ this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext());
|
|
this.tracingContext.setStreamID(outputStreamId);
|
|
this.tracingContext.setStreamID(outputStreamId);
|
|
this.tracingContext.setOperation(FSOperationType.WRITE);
|
|
this.tracingContext.setOperation(FSOperationType.WRITE);
|
|
|
|
+ this.ioStatistics = outputStreamStatistics.getIOStatistics();
|
|
|
|
+ this.blockFactory = abfsOutputStreamContext.getBlockFactory();
|
|
|
|
+ this.blockSize = bufferSize;
|
|
|
|
+ // create that first block. This guarantees that an open + close sequence
|
|
|
|
+ // writes a 0-byte entry.
|
|
|
|
+ createBlockIfNeeded();
|
|
}
|
|
}
|
|
|
|
|
|
private String createOutputStreamId() {
|
|
private String createOutputStreamId() {
|
|
@@ -219,10 +216,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
@Override
|
|
@Override
|
|
public synchronized void write(final byte[] data, final int off, final int length)
|
|
public synchronized void write(final byte[] data, final int off, final int length)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ // validate if data is not null and index out of bounds.
|
|
|
|
+ DataBlocks.validateWriteArgs(data, off, length);
|
|
maybeThrowLastError();
|
|
maybeThrowLastError();
|
|
|
|
|
|
- Preconditions.checkArgument(data != null, "null data");
|
|
|
|
-
|
|
|
|
if (off < 0 || length < 0 || length > data.length - off) {
|
|
if (off < 0 || length < 0 || length > data.length - off) {
|
|
throw new IndexOutOfBoundsException();
|
|
throw new IndexOutOfBoundsException();
|
|
}
|
|
}
|
|
@@ -230,27 +227,182 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
if (hasLease() && isLeaseFreed()) {
|
|
if (hasLease() && isLeaseFreed()) {
|
|
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
|
|
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
|
|
}
|
|
}
|
|
|
|
+ DataBlocks.DataBlock block = createBlockIfNeeded();
|
|
|
|
+ int written = block.write(data, off, length);
|
|
|
|
+ int remainingCapacity = block.remainingCapacity();
|
|
|
|
+
|
|
|
|
+ if (written < length) {
|
|
|
|
+ // Number of bytes to write is more than the data block capacity,
|
|
|
|
+ // trigger an upload and then write on the next block.
|
|
|
|
+ LOG.debug("writing more data than block capacity -triggering upload");
|
|
|
|
+ uploadCurrentBlock();
|
|
|
|
+ // tail recursion is mildly expensive, but given buffer sizes must be MB.
|
|
|
|
+ // it's unlikely to recurse very deeply.
|
|
|
|
+ this.write(data, off + written, length - written);
|
|
|
|
+ } else {
|
|
|
|
+ if (remainingCapacity == 0) {
|
|
|
|
+ // the whole buffer is done, trigger an upload
|
|
|
|
+ uploadCurrentBlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ incrementWriteOps();
|
|
|
|
+ }
|
|
|
|
|
|
- int currentOffset = off;
|
|
|
|
- int writableBytes = bufferSize - bufferIndex;
|
|
|
|
- int numberOfBytesToWrite = length;
|
|
|
|
-
|
|
|
|
- while (numberOfBytesToWrite > 0) {
|
|
|
|
- if (writableBytes <= numberOfBytesToWrite) {
|
|
|
|
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
|
|
|
|
- bufferIndex += writableBytes;
|
|
|
|
- writeCurrentBufferToService();
|
|
|
|
- currentOffset += writableBytes;
|
|
|
|
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
|
|
|
|
- } else {
|
|
|
|
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
|
|
|
|
- bufferIndex += numberOfBytesToWrite;
|
|
|
|
- numberOfBytesToWrite = 0;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Demand create a destination block.
|
|
|
|
+ *
|
|
|
|
+ * @return the active block; null if there isn't one.
|
|
|
|
+ * @throws IOException on any failure to create
|
|
|
|
+ */
|
|
|
|
+ private synchronized DataBlocks.DataBlock createBlockIfNeeded()
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (activeBlock == null) {
|
|
|
|
+ blockCount++;
|
|
|
|
+ activeBlock = blockFactory
|
|
|
|
+ .create(blockCount, this.blockSize, outputStreamStatistics);
|
|
|
|
+ }
|
|
|
|
+ return activeBlock;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Start an asynchronous upload of the current block.
|
|
|
|
+ *
|
|
|
|
+ * @throws IOException Problems opening the destination for upload,
|
|
|
|
+ * initializing the upload, or if a previous operation has failed.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void uploadCurrentBlock() throws IOException {
|
|
|
|
+ checkState(hasActiveBlock(), "No active block");
|
|
|
|
+ LOG.debug("Writing block # {}", blockCount);
|
|
|
|
+ try {
|
|
|
|
+ uploadBlockAsync(getActiveBlock(), false, false);
|
|
|
|
+ } finally {
|
|
|
|
+ // set the block to null, so the next write will create a new block.
|
|
|
|
+ clearActiveBlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Upload a block of data.
|
|
|
|
+ * This will take the block.
|
|
|
|
+ *
|
|
|
|
+ * @param blockToUpload block to upload.
|
|
|
|
+ * @throws IOException upload failure
|
|
|
|
+ */
|
|
|
|
+ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
|
|
|
|
+ boolean isFlush, boolean isClose)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (this.isAppendBlob) {
|
|
|
|
+ writeAppendBlobCurrentBufferToService();
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (!blockToUpload.hasData()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ numOfAppendsToServerSinceLastFlush++;
|
|
|
|
+
|
|
|
|
+ final int bytesLength = blockToUpload.dataSize();
|
|
|
|
+ final long offset = position;
|
|
|
|
+ position += bytesLength;
|
|
|
|
+ outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
+ outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
+ DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
|
|
|
|
+ final Future<Void> job =
|
|
|
|
+ executorService.submit(() -> {
|
|
|
|
+ AbfsPerfTracker tracker =
|
|
|
|
+ client.getAbfsPerfTracker();
|
|
|
|
+ try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
|
|
|
+ "writeCurrentBufferToService", "append")) {
|
|
|
|
+ AppendRequestParameters.Mode
|
|
|
|
+ mode = APPEND_MODE;
|
|
|
|
+ if (isFlush & isClose) {
|
|
|
|
+ mode = FLUSH_CLOSE_MODE;
|
|
|
|
+ } else if (isFlush) {
|
|
|
|
+ mode = FLUSH_MODE;
|
|
|
|
+ }
|
|
|
|
+ /*
|
|
|
|
+ * Parameters Required for an APPEND call.
|
|
|
|
+ * offset(here) - refers to the position in the file.
|
|
|
|
+ * bytesLength - Data to be uploaded from the block.
|
|
|
|
+ * mode - If it's append, flush or flush_close.
|
|
|
|
+ * leaseId - The AbfsLeaseId for this request.
|
|
|
|
+ */
|
|
|
|
+ AppendRequestParameters reqParams = new AppendRequestParameters(
|
|
|
|
+ offset, 0, bytesLength, mode, false, leaseId);
|
|
|
|
+ AbfsRestOperation op =
|
|
|
|
+ client.append(path, blockUploadData.toByteArray(), reqParams,
|
|
|
|
+ cachedSasToken.get(), new TracingContext(tracingContext));
|
|
|
|
+ cachedSasToken.update(op.getSasToken());
|
|
|
|
+ perfInfo.registerResult(op.getResult());
|
|
|
|
+ perfInfo.registerSuccess(true);
|
|
|
|
+ outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
+ return null;
|
|
|
|
+ } finally {
|
|
|
|
+ IOUtils.close(blockUploadData);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ writeOperations.add(new WriteOperation(job, offset, bytesLength));
|
|
|
|
+
|
|
|
|
+ // Try to shrink the queue
|
|
|
|
+ shrinkWriteOperationQueue();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A method to set the lastError if an exception is caught.
|
|
|
|
+ * @param ex Exception caught.
|
|
|
|
+ * @throws IOException Throws the lastError.
|
|
|
|
+ */
|
|
|
|
+ private void failureWhileSubmit(Exception ex) throws IOException {
|
|
|
|
+ if (ex instanceof AbfsRestOperationException) {
|
|
|
|
+ if (((AbfsRestOperationException) ex).getStatusCode()
|
|
|
|
+ == HttpURLConnection.HTTP_NOT_FOUND) {
|
|
|
|
+ throw new FileNotFoundException(ex.getMessage());
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+ if (ex instanceof IOException) {
|
|
|
|
+ lastError = (IOException) ex;
|
|
|
|
+ } else {
|
|
|
|
+ lastError = new IOException(ex);
|
|
|
|
+ }
|
|
|
|
+ throw lastError;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Synchronized accessor to the active block.
|
|
|
|
+ *
|
|
|
|
+ * @return the active block; null if there isn't one.
|
|
|
|
+ */
|
|
|
|
+ private synchronized DataBlocks.DataBlock getActiveBlock() {
|
|
|
|
+ return activeBlock;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Predicate to query whether or not there is an active block.
|
|
|
|
+ *
|
|
|
|
+ * @return true if there is an active block.
|
|
|
|
+ */
|
|
|
|
+ private synchronized boolean hasActiveBlock() {
|
|
|
|
+ return activeBlock != null;
|
|
|
|
+ }
|
|
|
|
|
|
- writableBytes = bufferSize - bufferIndex;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Is there an active block and is there any data in it to upload?
|
|
|
|
+ *
|
|
|
|
+ * @return true if there is some data to upload in an active block else false.
|
|
|
|
+ */
|
|
|
|
+ private boolean hasActiveBlockDataToUpload() {
|
|
|
|
+ return hasActiveBlock() && getActiveBlock().hasData();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Clear the active block.
|
|
|
|
+ */
|
|
|
|
+ private void clearActiveBlock() {
|
|
|
|
+ if (activeBlock != null) {
|
|
|
|
+ LOG.debug("Clearing active block");
|
|
|
|
+ }
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ activeBlock = null;
|
|
}
|
|
}
|
|
- incrementWriteOps();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -335,7 +487,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
|
|
|
|
try {
|
|
try {
|
|
flushInternal(true);
|
|
flushInternal(true);
|
|
- threadExecutor.shutdown();
|
|
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
// Problems surface in try-with-resources clauses if
|
|
// Problems surface in try-with-resources clauses if
|
|
// the exception thrown in a close == the one already thrown
|
|
// the exception thrown in a close == the one already thrown
|
|
@@ -352,9 +503,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
bufferIndex = 0;
|
|
bufferIndex = 0;
|
|
closed = true;
|
|
closed = true;
|
|
writeOperations.clear();
|
|
writeOperations.clear();
|
|
- byteBufferPool = null;
|
|
|
|
- if (!threadExecutor.isShutdown()) {
|
|
|
|
- threadExecutor.shutdownNow();
|
|
|
|
|
|
+ if (hasActiveBlock()) {
|
|
|
|
+ clearActiveBlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.debug("Closing AbfsOutputStream : {}", this);
|
|
LOG.debug("Closing AbfsOutputStream : {}", this);
|
|
@@ -368,19 +518,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
&& enableSmallWriteOptimization
|
|
&& enableSmallWriteOptimization
|
|
&& (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
|
|
&& (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
|
|
&& (writeOperations.size() == 0) // double checking no appends in progress
|
|
&& (writeOperations.size() == 0) // double checking no appends in progress
|
|
- && (bufferIndex > 0)) { // there is some data that is pending to be written
|
|
|
|
|
|
+ && hasActiveBlockDataToUpload()) { // there is
|
|
|
|
+ // some data that is pending to be written
|
|
smallWriteOptimizedflushInternal(isClose);
|
|
smallWriteOptimizedflushInternal(isClose);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- writeCurrentBufferToService();
|
|
|
|
|
|
+ if (hasActiveBlockDataToUpload()) {
|
|
|
|
+ uploadCurrentBlock();
|
|
|
|
+ }
|
|
flushWrittenBytesToService(isClose);
|
|
flushWrittenBytesToService(isClose);
|
|
numOfAppendsToServerSinceLastFlush = 0;
|
|
numOfAppendsToServerSinceLastFlush = 0;
|
|
}
|
|
}
|
|
|
|
|
|
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
|
|
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
|
|
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
|
|
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
|
|
- writeCurrentBufferToService(true, isClose);
|
|
|
|
|
|
+ uploadBlockAsync(getActiveBlock(), true, isClose);
|
|
waitForAppendsToComplete();
|
|
waitForAppendsToComplete();
|
|
shrinkWriteOperationQueue();
|
|
shrinkWriteOperationQueue();
|
|
maybeThrowLastError();
|
|
maybeThrowLastError();
|
|
@@ -389,124 +542,52 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
|
|
|
|
private synchronized void flushInternalAsync() throws IOException {
|
|
private synchronized void flushInternalAsync() throws IOException {
|
|
maybeThrowLastError();
|
|
maybeThrowLastError();
|
|
- writeCurrentBufferToService();
|
|
|
|
|
|
+ if (hasActiveBlockDataToUpload()) {
|
|
|
|
+ uploadCurrentBlock();
|
|
|
|
+ }
|
|
|
|
+ waitForAppendsToComplete();
|
|
flushWrittenBytesToServiceAsync();
|
|
flushWrittenBytesToServiceAsync();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Appending the current active data block to service. Clearing the active
|
|
|
|
+ * data block and releasing all buffered data.
|
|
|
|
+ * @throws IOException if there is any failure while starting an upload for
|
|
|
|
+ * the dataBlock or while closing the BlockUploadData.
|
|
|
|
+ */
|
|
private void writeAppendBlobCurrentBufferToService() throws IOException {
|
|
private void writeAppendBlobCurrentBufferToService() throws IOException {
|
|
- if (bufferIndex == 0) {
|
|
|
|
|
|
+ DataBlocks.DataBlock activeBlock = getActiveBlock();
|
|
|
|
+ // No data, return.
|
|
|
|
+ if (!hasActiveBlockDataToUpload()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- final byte[] bytes = buffer;
|
|
|
|
- final int bytesLength = bufferIndex;
|
|
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
- outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
- }
|
|
|
|
- buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
|
|
- bufferIndex = 0;
|
|
|
|
|
|
+
|
|
|
|
+ final int bytesLength = activeBlock.dataSize();
|
|
|
|
+ DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
|
|
|
|
+ clearActiveBlock();
|
|
|
|
+ outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
+ outputStreamStatistics.bytesToUpload(bytesLength);
|
|
final long offset = position;
|
|
final long offset = position;
|
|
position += bytesLength;
|
|
position += bytesLength;
|
|
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
|
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
|
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
|
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
|
- "writeCurrentBufferToService", "append")) {
|
|
|
|
|
|
+ "writeCurrentBufferToService", "append")) {
|
|
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
|
|
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
|
|
bytesLength, APPEND_MODE, true, leaseId);
|
|
bytesLength, APPEND_MODE, true, leaseId);
|
|
- AbfsRestOperation op = client
|
|
|
|
- .append(path, bytes, reqParams, cachedSasToken.get(),
|
|
|
|
- new TracingContext(tracingContext));
|
|
|
|
|
|
+ AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
|
|
|
|
+ cachedSasToken.get(), new TracingContext(tracingContext));
|
|
cachedSasToken.update(op.getSasToken());
|
|
cachedSasToken.update(op.getSasToken());
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
- }
|
|
|
|
|
|
+ outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
+
|
|
perfInfo.registerResult(op.getResult());
|
|
perfInfo.registerResult(op.getResult());
|
|
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
|
|
|
perfInfo.registerSuccess(true);
|
|
perfInfo.registerSuccess(true);
|
|
return;
|
|
return;
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
- if (ex instanceof AbfsRestOperationException) {
|
|
|
|
- if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
|
|
|
|
- throw new FileNotFoundException(ex.getMessage());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (ex instanceof AzureBlobFileSystemException) {
|
|
|
|
- ex = (AzureBlobFileSystemException) ex;
|
|
|
|
- }
|
|
|
|
- lastError = new IOException(ex);
|
|
|
|
- throw lastError;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void writeCurrentBufferToService() throws IOException {
|
|
|
|
- writeCurrentBufferToService(false, false);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
|
|
|
|
- if (this.isAppendBlob) {
|
|
|
|
- writeAppendBlobCurrentBufferToService();
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (bufferIndex == 0) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- numOfAppendsToServerSinceLastFlush++;
|
|
|
|
-
|
|
|
|
- final byte[] bytes = buffer;
|
|
|
|
- final int bytesLength = bufferIndex;
|
|
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- outputStreamStatistics.writeCurrentBuffer();
|
|
|
|
- outputStreamStatistics.bytesToUpload(bytesLength);
|
|
|
|
- }
|
|
|
|
- buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
|
|
|
- bufferIndex = 0;
|
|
|
|
- final long offset = position;
|
|
|
|
- position += bytesLength;
|
|
|
|
-
|
|
|
|
- if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
|
|
|
|
- //Tracking time spent on waiting for task to complete.
|
|
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
|
|
|
|
- waitForTaskToComplete();
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- waitForTaskToComplete();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- final Future<Void> job = completionService.submit(() -> {
|
|
|
|
- AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
|
|
|
- try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
|
|
|
- "writeCurrentBufferToService", "append")) {
|
|
|
|
- AppendRequestParameters.Mode
|
|
|
|
- mode = APPEND_MODE;
|
|
|
|
- if (isFlush & isClose) {
|
|
|
|
- mode = FLUSH_CLOSE_MODE;
|
|
|
|
- } else if (isFlush) {
|
|
|
|
- mode = FLUSH_MODE;
|
|
|
|
- }
|
|
|
|
- AppendRequestParameters reqParams = new AppendRequestParameters(
|
|
|
|
- offset, 0, bytesLength, mode, false, leaseId);
|
|
|
|
- AbfsRestOperation op = client.append(path, bytes, reqParams,
|
|
|
|
- cachedSasToken.get(), new TracingContext(tracingContext));
|
|
|
|
- cachedSasToken.update(op.getSasToken());
|
|
|
|
- perfInfo.registerResult(op.getResult());
|
|
|
|
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
|
|
|
- perfInfo.registerSuccess(true);
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- if (job.isCancelled()) {
|
|
|
|
- outputStreamStatistics.uploadFailed(bytesLength);
|
|
|
|
- } else {
|
|
|
|
- outputStreamStatistics.uploadSuccessful(bytesLength);
|
|
|
|
- }
|
|
|
|
|
|
+ outputStreamStatistics.uploadFailed(bytesLength);
|
|
|
|
+ failureWhileSubmit(ex);
|
|
|
|
+ } finally {
|
|
|
|
+ IOUtils.close(uploadData);
|
|
}
|
|
}
|
|
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
|
|
|
|
-
|
|
|
|
- // Try to shrink the queue
|
|
|
|
- shrinkWriteOperationQueue();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private synchronized void waitForAppendsToComplete() throws IOException {
|
|
private synchronized void waitForAppendsToComplete() throws IOException {
|
|
@@ -514,6 +595,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
try {
|
|
try {
|
|
writeOperation.task.get();
|
|
writeOperation.task.get();
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
|
|
+ outputStreamStatistics.uploadFailed(writeOperation.length);
|
|
if (ex.getCause() instanceof AbfsRestOperationException) {
|
|
if (ex.getCause() instanceof AbfsRestOperationException) {
|
|
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
|
|
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
|
|
throw new FileNotFoundException(ex.getMessage());
|
|
throw new FileNotFoundException(ex.getMessage());
|
|
@@ -563,7 +645,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
throw new FileNotFoundException(ex.getMessage());
|
|
throw new FileNotFoundException(ex.getMessage());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- throw new IOException(ex);
|
|
|
|
|
|
+ lastError = new IOException(ex);
|
|
|
|
+ throw lastError;
|
|
}
|
|
}
|
|
this.lastFlushOffset = offset;
|
|
this.lastFlushOffset = offset;
|
|
}
|
|
}
|
|
@@ -574,14 +657,14 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
*/
|
|
*/
|
|
private synchronized void shrinkWriteOperationQueue() throws IOException {
|
|
private synchronized void shrinkWriteOperationQueue() throws IOException {
|
|
try {
|
|
try {
|
|
- while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
|
|
|
|
- writeOperations.peek().task.get();
|
|
|
|
- lastTotalAppendOffset += writeOperations.peek().length;
|
|
|
|
|
|
+ WriteOperation peek = writeOperations.peek();
|
|
|
|
+ while (peek != null && peek.task.isDone()) {
|
|
|
|
+ peek.task.get();
|
|
|
|
+ lastTotalAppendOffset += peek.length;
|
|
writeOperations.remove();
|
|
writeOperations.remove();
|
|
|
|
+ peek = writeOperations.peek();
|
|
// Incrementing statistics to indicate queue has been shrunk.
|
|
// Incrementing statistics to indicate queue has been shrunk.
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- outputStreamStatistics.queueShrunk();
|
|
|
|
- }
|
|
|
|
|
|
+ outputStreamStatistics.queueShrunk();
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
if (e.getCause() instanceof AzureBlobFileSystemException) {
|
|
if (e.getCause() instanceof AzureBlobFileSystemException) {
|
|
@@ -593,26 +676,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void waitForTaskToComplete() throws IOException {
|
|
|
|
- boolean completed;
|
|
|
|
- for (completed = false; completionService.poll() != null; completed = true) {
|
|
|
|
- // keep polling until there is no data
|
|
|
|
- }
|
|
|
|
- // for AppendBLob, jobs are not submitted to completion service
|
|
|
|
- if (isAppendBlob) {
|
|
|
|
- completed = true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!completed) {
|
|
|
|
- try {
|
|
|
|
- completionService.take();
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
|
|
|
|
- throw lastError;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private static class WriteOperation {
|
|
private static class WriteOperation {
|
|
private final Future<Void> task;
|
|
private final Future<Void> task;
|
|
private final long startOffset;
|
|
private final long startOffset;
|
|
@@ -631,7 +694,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public synchronized void waitForPendingUploads() throws IOException {
|
|
public synchronized void waitForPendingUploads() throws IOException {
|
|
- waitForTaskToComplete();
|
|
|
|
|
|
+ waitForAppendsToComplete();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -695,12 +758,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
|
@Override
|
|
@Override
|
|
public String toString() {
|
|
public String toString() {
|
|
final StringBuilder sb = new StringBuilder(super.toString());
|
|
final StringBuilder sb = new StringBuilder(super.toString());
|
|
- if (outputStreamStatistics != null) {
|
|
|
|
- sb.append("AbfsOutputStream@").append(this.hashCode());
|
|
|
|
- sb.append("){");
|
|
|
|
- sb.append(outputStreamStatistics.toString());
|
|
|
|
- sb.append("}");
|
|
|
|
- }
|
|
|
|
|
|
+ sb.append("AbfsOutputStream@").append(this.hashCode());
|
|
|
|
+ sb.append("){");
|
|
|
|
+ sb.append(outputStreamStatistics.toString());
|
|
|
|
+ sb.append("}");
|
|
return sb.toString();
|
|
return sb.toString();
|
|
}
|
|
}
|
|
}
|
|
}
|