|
@@ -46,6 +46,7 @@ import java.util.UUID;
|
|
|
import java.util.List;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.concurrent.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
|
|
|
.putBlockAsync;
|
|
@@ -108,7 +109,10 @@ public class BlockOutputStream extends OutputStream {
|
|
|
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
|
|
|
futureMap;
|
|
|
// map containing mapping for putBlock logIndex to to flushedDataLength Map.
|
|
|
- private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
|
|
|
+
|
|
|
+ // The map should maintain the keys (logIndexes) in order so that while
|
|
|
+ // removing we always end up updating incremented data flushed length.
|
|
|
+ private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap;
|
|
|
|
|
|
private List<DatanodeDetails> failedServers;
|
|
|
|
|
@@ -157,7 +161,7 @@ public class BlockOutputStream extends OutputStream {
|
|
|
|
|
|
// A single thread executor handle the responses of async requests
|
|
|
responseExecutor = Executors.newSingleThreadExecutor();
|
|
|
- commitIndex2flushedDataMap = new ConcurrentHashMap<>();
|
|
|
+ commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
|
|
|
totalAckDataLength = 0;
|
|
|
futureMap = new ConcurrentHashMap<>();
|
|
|
totalDataFlushedLength = 0;
|
|
@@ -206,7 +210,7 @@ public class BlockOutputStream extends OutputStream {
|
|
|
int writeLen;
|
|
|
|
|
|
// Allocate a buffer if needed. The buffer will be allocated only
|
|
|
- // once as needed and will be reused again for mutiple blockOutputStream
|
|
|
+ // once as needed and will be reused again for multiple blockOutputStream
|
|
|
// entries.
|
|
|
ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded();
|
|
|
int pos = currentBuffer.position();
|
|
@@ -281,10 +285,18 @@ public class BlockOutputStream extends OutputStream {
|
|
|
* just update the totalAckDataLength. In case of failure,
|
|
|
* we will read the data starting from totalAckDataLength.
|
|
|
*/
|
|
|
- private void updateFlushIndex(long index) {
|
|
|
- if (!commitIndex2flushedDataMap.isEmpty()) {
|
|
|
+ private void updateFlushIndex(List<Long> indexes) {
|
|
|
+ Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
|
|
|
+ for (long index : indexes) {
|
|
|
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
|
|
|
- totalAckDataLength = commitIndex2flushedDataMap.remove(index);
|
|
|
+ long length = commitIndex2flushedDataMap.remove(index);
|
|
|
+
|
|
|
+ // totalAckDataLength replicated yet should always be less than equal to
|
|
|
+ // the current length being returned from commitIndex2flushedDataMap.
|
|
|
+ // The below precondition would ensure commitIndex2flushedDataMap entries
|
|
|
+ // are removed in order of the insertion to the map.
|
|
|
+ Preconditions.checkArgument(totalAckDataLength < length);
|
|
|
+ totalAckDataLength = length;
|
|
|
LOG.debug("Total data successfully replicated: " + totalAckDataLength);
|
|
|
futureMap.remove(totalAckDataLength);
|
|
|
// Flush has been committed to required servers successful.
|
|
@@ -325,13 +337,13 @@ public class BlockOutputStream extends OutputStream {
|
|
|
}
|
|
|
|
|
|
private void adjustBuffers(long commitIndex) {
|
|
|
- commitIndex2flushedDataMap.keySet().stream().forEach(index -> {
|
|
|
- if (index <= commitIndex) {
|
|
|
- updateFlushIndex(index);
|
|
|
- } else {
|
|
|
- return;
|
|
|
- }
|
|
|
- });
|
|
|
+ List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
|
|
|
+ .filter(p -> p <= commitIndex).collect(Collectors.toList());
|
|
|
+ if (keyList.isEmpty()) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ updateFlushIndex(keyList);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// It may happen that once the exception is encountered , we still might
|