|
@@ -33,7 +33,9 @@ import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -51,7 +53,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|
|
private boolean closed;
|
|
|
private String key;
|
|
|
private File blockFile;
|
|
|
- private List<File> blockFiles = new ArrayList<>();
|
|
|
+ private Map<Integer, File> blockFiles = new HashMap<>();
|
|
|
private long blockSize;
|
|
|
private int blockId = 0;
|
|
|
private long blockWritten = 0L;
|
|
@@ -95,8 +97,9 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|
|
|
|
|
blockStream.flush();
|
|
|
blockStream.close();
|
|
|
- if (!blockFiles.contains(blockFile)) {
|
|
|
- blockFiles.add(blockFile);
|
|
|
+ if (!blockFiles.values().contains(blockFile)) {
|
|
|
+ blockId++;
|
|
|
+ blockFiles.put(blockId, blockFile);
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -110,7 +113,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|
|
@Override
|
|
|
public PartETag call() throws Exception {
|
|
|
PartETag partETag = store.uploadPart(blockFile, key, uploadId,
|
|
|
- blockId + 1);
|
|
|
+ blockId);
|
|
|
return partETag;
|
|
|
}
|
|
|
});
|
|
@@ -124,11 +127,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|
|
store.completeMultipartUpload(key, uploadId, partETags);
|
|
|
}
|
|
|
} finally {
|
|
|
- for (File tFile: blockFiles) {
|
|
|
- if (tFile.exists() && !tFile.delete()) {
|
|
|
- LOG.warn("Failed to delete temporary file {}", tFile);
|
|
|
- }
|
|
|
- }
|
|
|
+ removePartFiles();
|
|
|
closed = true;
|
|
|
}
|
|
|
}
|
|
@@ -145,41 +144,55 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|
|
if (closed) {
|
|
|
throw new IOException("Stream closed.");
|
|
|
}
|
|
|
- try {
|
|
|
- blockStream.write(b, off, len);
|
|
|
- blockWritten += len;
|
|
|
- if (blockWritten >= blockSize) {
|
|
|
- uploadCurrentPart();
|
|
|
- blockWritten = 0L;
|
|
|
+ blockStream.write(b, off, len);
|
|
|
+ blockWritten += len;
|
|
|
+ if (blockWritten >= blockSize) {
|
|
|
+ uploadCurrentPart();
|
|
|
+ blockWritten = 0L;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removePartFiles() throws IOException {
|
|
|
+ for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
|
|
|
+ if (!partETagFuture.isDone()) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- } finally {
|
|
|
- for (File tFile: blockFiles) {
|
|
|
- if (tFile.exists() && !tFile.delete()) {
|
|
|
- LOG.warn("Failed to delete temporary file {}", tFile);
|
|
|
+
|
|
|
+ try {
|
|
|
+ File blockFile = blockFiles.get(partETagFuture.get().getPartNumber());
|
|
|
+ if (blockFile != null && blockFile.exists() && !blockFile.delete()) {
|
|
|
+ LOG.warn("Failed to delete temporary file {}", blockFile);
|
|
|
}
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ throw new IOException(e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void uploadCurrentPart() throws IOException {
|
|
|
- blockFiles.add(blockFile);
|
|
|
blockStream.flush();
|
|
|
blockStream.close();
|
|
|
if (blockId == 0) {
|
|
|
uploadId = store.getUploadId(key);
|
|
|
}
|
|
|
+
|
|
|
+ blockId++;
|
|
|
+ blockFiles.put(blockId, blockFile);
|
|
|
+
|
|
|
+ final File currentFile = blockFile;
|
|
|
+ final int currentBlockId = blockId;
|
|
|
ListenableFuture<PartETag> partETagFuture =
|
|
|
executorService.submit(new Callable<PartETag>() {
|
|
|
@Override
|
|
|
public PartETag call() throws Exception {
|
|
|
- PartETag partETag = store.uploadPart(blockFile, key, uploadId,
|
|
|
- blockId + 1);
|
|
|
+ PartETag partETag = store.uploadPart(currentFile, key, uploadId,
|
|
|
+ currentBlockId);
|
|
|
return partETag;
|
|
|
}
|
|
|
});
|
|
|
partETagsFutures.add(partETagFuture);
|
|
|
+ removePartFiles();
|
|
|
blockFile = newBlockFile();
|
|
|
- blockId++;
|
|
|
blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
|
|
|
}
|
|
|
|