|
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
|
|
|
import org.apache.hadoop.hdds.client.BlockID;
|
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
|
|
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
|
|
|
import org.apache.hadoop.ozone.common.Checksum;
|
|
|
import org.apache.hadoop.ozone.om.helpers.*;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
|
@@ -35,7 +36,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
|
|
|
.StorageContainerException;
|
|
|
import org.apache.hadoop.hdds.scm.protocolPB
|
|
|
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
|
|
-import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream;
|
|
|
import org.apache.ratis.protocol.RaftRetryFailureException;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -58,13 +58,13 @@ import java.util.concurrent.TimeoutException;
|
|
|
*
|
|
|
* TODO : currently not support multi-thread access.
|
|
|
*/
|
|
|
-public class ChunkGroupOutputStream extends OutputStream {
|
|
|
+public class KeyOutputStream extends OutputStream {
|
|
|
|
|
|
public static final Logger LOG =
|
|
|
- LoggerFactory.getLogger(ChunkGroupOutputStream.class);
|
|
|
+ LoggerFactory.getLogger(KeyOutputStream.class);
|
|
|
|
|
|
// array list's get(index) is O(1)
|
|
|
- private final ArrayList<ChunkOutputStreamEntry> streamEntries;
|
|
|
+ private final ArrayList<BlockOutputStreamEntry> streamEntries;
|
|
|
private int currentStreamIndex;
|
|
|
private final OzoneManagerProtocolClientSideTranslatorPB omClient;
|
|
|
private final
|
|
@@ -86,7 +86,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
* A constructor for testing purpose only.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- public ChunkGroupOutputStream() {
|
|
|
+ public KeyOutputStream() {
|
|
|
streamEntries = new ArrayList<>();
|
|
|
omClient = null;
|
|
|
scmClient = null;
|
|
@@ -116,11 +116,11 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
@VisibleForTesting
|
|
|
public void addStream(OutputStream outputStream, long length) {
|
|
|
streamEntries.add(
|
|
|
- new ChunkOutputStreamEntry(outputStream, length, checksum));
|
|
|
+ new BlockOutputStreamEntry(outputStream, length, checksum));
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- public List<ChunkOutputStreamEntry> getStreamEntries() {
|
|
|
+ public List<BlockOutputStreamEntry> getStreamEntries() {
|
|
|
return streamEntries;
|
|
|
}
|
|
|
@VisibleForTesting
|
|
@@ -130,7 +130,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
|
|
|
public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
|
|
|
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
|
|
|
- for (ChunkOutputStreamEntry streamEntry : streamEntries) {
|
|
|
+ for (BlockOutputStreamEntry streamEntry : streamEntries) {
|
|
|
OmKeyLocationInfo info =
|
|
|
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
|
|
|
.setLength(streamEntry.currentPosition).setOffset(0)
|
|
@@ -143,7 +143,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
return locationInfoList;
|
|
|
}
|
|
|
|
|
|
- public ChunkGroupOutputStream(OpenKeySession handler,
|
|
|
+ public KeyOutputStream(OpenKeySession handler,
|
|
|
XceiverClientManager xceiverClientManager,
|
|
|
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
|
|
|
OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
|
|
@@ -212,7 +212,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
.getContainerWithPipeline(subKeyInfo.getContainerID());
|
|
|
XceiverClientSpi xceiverClient =
|
|
|
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
|
|
|
- streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
|
|
|
+ streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(),
|
|
|
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
|
|
|
chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
|
|
|
streamBufferMaxSize, watchTimeout, bufferList, checksum));
|
|
@@ -280,7 +280,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
// in theory, this condition should never violate due the check above
|
|
|
// still do a sanity check.
|
|
|
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
|
|
|
- ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
|
|
|
+ BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
|
|
|
|
|
|
// length(len) will be in int range if the call is happening through
|
|
|
// write API of chunkOutputStream. Length can be in long range if it comes
|
|
@@ -323,7 +323,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
// currentStreamIndex < streamEntries.size() signifies that, there are still
|
|
|
// pre allocated blocks available.
|
|
|
if (currentStreamIndex < streamEntries.size()) {
|
|
|
- ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
|
|
|
+ ListIterator<BlockOutputStreamEntry> streamEntryIterator =
|
|
|
streamEntries.listIterator(currentStreamIndex);
|
|
|
while (streamEntryIterator.hasNext()) {
|
|
|
if (streamEntryIterator.next().blockID.getContainerID()
|
|
@@ -342,7 +342,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
*/
|
|
|
private void removeEmptyBlocks() {
|
|
|
if (currentStreamIndex < streamEntries.size()) {
|
|
|
- ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
|
|
|
+ ListIterator<BlockOutputStreamEntry> streamEntryIterator =
|
|
|
streamEntries.listIterator(currentStreamIndex);
|
|
|
while (streamEntryIterator.hasNext()) {
|
|
|
if (streamEntryIterator.next().currentPosition == 0) {
|
|
@@ -361,7 +361,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
* @param streamIndex Index of the entry
|
|
|
* @throws IOException Throws IOException if Write fails
|
|
|
*/
|
|
|
- private void handleException(ChunkOutputStreamEntry streamEntry,
|
|
|
+ private void handleException(BlockOutputStreamEntry streamEntry,
|
|
|
int streamIndex) throws IOException {
|
|
|
long totalSuccessfulFlushedData =
|
|
|
streamEntry.getTotalSuccessfulFlushedData();
|
|
@@ -428,7 +428,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
* Contact OM to get a new block. Set the new block with the index (e.g.
|
|
|
* first block has index = 0, second has index = 1 etc.)
|
|
|
*
|
|
|
- * The returned block is made to new ChunkOutputStreamEntry to write.
|
|
|
+ * The returned block is made to new BlockOutputStreamEntry to write.
|
|
|
*
|
|
|
* @param index the index of the block.
|
|
|
* @throws IOException
|
|
@@ -457,7 +457,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
int size = streamEntries.size();
|
|
|
int streamIndex =
|
|
|
currentStreamIndex >= size ? size - 1 : currentStreamIndex;
|
|
|
- ChunkOutputStreamEntry entry = streamEntries.get(streamIndex);
|
|
|
+ BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
|
|
|
if (entry != null) {
|
|
|
try {
|
|
|
if (close) {
|
|
@@ -507,7 +507,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
omClient.commitKey(keyArgs, openID);
|
|
|
}
|
|
|
} else {
|
|
|
- LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
|
|
|
+ LOG.warn("Closing KeyOutputStream, but key args is null");
|
|
|
}
|
|
|
} catch (IOException ioe) {
|
|
|
throw ioe;
|
|
@@ -524,7 +524,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Builder class of ChunkGroupOutputStream.
|
|
|
+ * Builder class of KeyOutputStream.
|
|
|
*/
|
|
|
public static class Builder {
|
|
|
private OpenKeySession openHandler;
|
|
@@ -627,15 +627,15 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
- public ChunkGroupOutputStream build() throws IOException {
|
|
|
- return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
|
|
|
+ public KeyOutputStream build() throws IOException {
|
|
|
+ return new KeyOutputStream(openHandler, xceiverManager, scmClient,
|
|
|
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
|
|
|
streamBufferMaxSize, blockSize, watchTimeout, checksum,
|
|
|
multipartUploadID, multipartNumber, isMultipartKey);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static class ChunkOutputStreamEntry extends OutputStream {
|
|
|
+ private static class BlockOutputStreamEntry extends OutputStream {
|
|
|
private OutputStream outputStream;
|
|
|
private BlockID blockID;
|
|
|
private final String key;
|
|
@@ -654,7 +654,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
private final long watchTimeout;
|
|
|
private List<ByteBuffer> bufferList;
|
|
|
|
|
|
- ChunkOutputStreamEntry(BlockID blockID, String key,
|
|
|
+ BlockOutputStreamEntry(BlockID blockID, String key,
|
|
|
XceiverClientManager xceiverClientManager,
|
|
|
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
|
|
|
long length, long streamBufferFlushSize, long streamBufferMaxSize,
|
|
@@ -681,7 +681,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
* @param outputStream a existing writable output stream
|
|
|
* @param length the length of data to write to the stream
|
|
|
*/
|
|
|
- ChunkOutputStreamEntry(OutputStream outputStream, long length,
|
|
|
+ BlockOutputStreamEntry(OutputStream outputStream, long length,
|
|
|
Checksum checksum) {
|
|
|
this.outputStream = outputStream;
|
|
|
this.blockID = null;
|
|
@@ -711,7 +711,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
private void checkStream() {
|
|
|
if (this.outputStream == null) {
|
|
|
this.outputStream =
|
|
|
- new ChunkOutputStream(blockID, key, xceiverClientManager,
|
|
|
+ new BlockOutputStream(blockID, key, xceiverClientManager,
|
|
|
xceiverClient, requestId, chunkSize, streamBufferFlushSize,
|
|
|
streamBufferMaxSize, watchTimeout, bufferList, checksum);
|
|
|
}
|
|
@@ -744,15 +744,15 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
this.outputStream.close();
|
|
|
// after closing the chunkOutPutStream, blockId would have been
|
|
|
// reconstructed with updated bcsId
|
|
|
- if (this.outputStream instanceof ChunkOutputStream) {
|
|
|
- this.blockID = ((ChunkOutputStream) outputStream).getBlockID();
|
|
|
+ if (this.outputStream instanceof BlockOutputStream) {
|
|
|
+ this.blockID = ((BlockOutputStream) outputStream).getBlockID();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
long getTotalSuccessfulFlushedData() throws IOException {
|
|
|
- if (this.outputStream instanceof ChunkOutputStream) {
|
|
|
- ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
|
|
|
+ if (this.outputStream instanceof BlockOutputStream) {
|
|
|
+ BlockOutputStream out = (BlockOutputStream) this.outputStream;
|
|
|
blockID = out.getBlockID();
|
|
|
return out.getTotalSuccessfulFlushedData();
|
|
|
} else if (outputStream == null) {
|
|
@@ -765,8 +765,8 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
}
|
|
|
|
|
|
long getWrittenDataLength() throws IOException {
|
|
|
- if (this.outputStream instanceof ChunkOutputStream) {
|
|
|
- ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
|
|
|
+ if (this.outputStream instanceof BlockOutputStream) {
|
|
|
+ BlockOutputStream out = (BlockOutputStream) this.outputStream;
|
|
|
return out.getWrittenDataLength();
|
|
|
} else if (outputStream == null) {
|
|
|
// For a pre allocated block for which no write has been initiated,
|
|
@@ -779,16 +779,16 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|
|
|
|
|
void cleanup() {
|
|
|
checkStream();
|
|
|
- if (this.outputStream instanceof ChunkOutputStream) {
|
|
|
- ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
|
|
|
+ if (this.outputStream instanceof BlockOutputStream) {
|
|
|
+ BlockOutputStream out = (BlockOutputStream) this.outputStream;
|
|
|
out.cleanup();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void writeOnRetry(long len) throws IOException {
|
|
|
checkStream();
|
|
|
- if (this.outputStream instanceof ChunkOutputStream) {
|
|
|
- ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
|
|
|
+ if (this.outputStream instanceof BlockOutputStream) {
|
|
|
+ BlockOutputStream out = (BlockOutputStream) this.outputStream;
|
|
|
out.writeOnRetry(len);
|
|
|
this.currentPosition += len;
|
|
|
} else {
|