瀏覽代碼

HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee.

(cherry picked from commit b4aa24d3c5ad1b9309a58795e4b48e567695c4e4)
Shashikant Banerjee 6 年之前
父節點
當前提交
4373c12701

+ 103 - 129
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.UUID;
@@ -87,7 +86,7 @@ public class BlockOutputStream extends OutputStream {
   private final long streamBufferFlushSize;
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long watchTimeout;
-  private List<ByteBuffer> bufferList;
+  private BufferPool bufferPool;
   // The IOException will be set by response handling thread in case there is an
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
   // request will fail upfront.
@@ -111,8 +110,6 @@ public class BlockOutputStream extends OutputStream {
   // map containing mapping for putBlock logIndex to to flushedDataLength Map.
   // map containing mapping for putBlock logIndex to to flushedDataLength Map.
   private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
   private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
 
 
-  private int currentBufferIndex;
-
   private List<DatanodeDetails> failedServers;
   private List<DatanodeDetails> failedServers;
 
 
   /**
   /**
@@ -124,7 +121,7 @@ public class BlockOutputStream extends OutputStream {
    * @param pipeline             pipeline where block will be written
    * @param pipeline             pipeline where block will be written
    * @param traceID              container protocol call args
    * @param traceID              container protocol call args
    * @param chunkSize            chunk size
    * @param chunkSize            chunk size
-   * @param bufferList           list of byte buffers
+   * @param bufferPool           pool of buffers
    * @param streamBufferFlushSize flush size
    * @param streamBufferFlushSize flush size
    * @param streamBufferMaxSize   max size of the currentBuffer
    * @param streamBufferMaxSize   max size of the currentBuffer
    * @param watchTimeout          watch timeout
    * @param watchTimeout          watch timeout
@@ -135,7 +132,7 @@ public class BlockOutputStream extends OutputStream {
   public BlockOutputStream(BlockID blockID, String key,
   public BlockOutputStream(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       String traceID, int chunkSize, long streamBufferFlushSize,
       String traceID, int chunkSize, long streamBufferFlushSize,
-      long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
+      long streamBufferMaxSize, long watchTimeout, BufferPool bufferPool,
       ChecksumType checksumType, int bytesPerChecksum)
       ChecksumType checksumType, int bytesPerChecksum)
       throws IOException {
       throws IOException {
     this.blockID = blockID;
     this.blockID = blockID;
@@ -154,7 +151,7 @@ public class BlockOutputStream extends OutputStream {
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.watchTimeout = watchTimeout;
-    this.bufferList = bufferList;
+    this.bufferPool = bufferPool;
     this.checksumType = checksumType;
     this.checksumType = checksumType;
     this.bytesPerChecksum = bytesPerChecksum;
     this.bytesPerChecksum = bytesPerChecksum;
 
 
@@ -164,7 +161,6 @@ public class BlockOutputStream extends OutputStream {
     totalAckDataLength = 0;
     totalAckDataLength = 0;
     futureMap = new ConcurrentHashMap<>();
     futureMap = new ConcurrentHashMap<>();
     totalDataFlushedLength = 0;
     totalDataFlushedLength = 0;
-    currentBufferIndex = 0;
     writtenDataLength = 0;
     writtenDataLength = 0;
     failedServers = Collections.emptyList();
     failedServers = Collections.emptyList();
   }
   }
@@ -181,13 +177,6 @@ public class BlockOutputStream extends OutputStream {
     return writtenDataLength;
     return writtenDataLength;
   }
   }
 
 
-  private long computeBufferData() {
-    int dataLength =
-        bufferList.stream().mapToInt(Buffer::position).sum();
-    Preconditions.checkState(dataLength <= streamBufferMaxSize);
-    return dataLength;
-  }
-
   public List<DatanodeDetails> getFailedServers() {
   public List<DatanodeDetails> getFailedServers() {
     return failedServers;
     return failedServers;
   }
   }
@@ -202,6 +191,7 @@ public class BlockOutputStream extends OutputStream {
 
 
   @Override
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
   public void write(byte[] b, int off, int len) throws IOException {
+    checkOpen();
     if (b == null) {
     if (b == null) {
       throw new NullPointerException();
       throw new NullPointerException();
     }
     }
@@ -213,53 +203,40 @@ public class BlockOutputStream extends OutputStream {
       return;
       return;
     }
     }
     while (len > 0) {
     while (len > 0) {
-      checkOpen();
       int writeLen;
       int writeLen;
-      allocateBuffer();
-      ByteBuffer currentBuffer = getCurrentBuffer();
+
+      // Allocate a buffer if needed. The buffer will be allocated only
+      // once as needed and will be reused again for mutiple blockOutputStream
+      // entries.
+      ByteBuffer  currentBuffer = bufferPool.allocateBufferIfNeeded();
+      int pos = currentBuffer.position();
       writeLen =
       writeLen =
-          Math.min(chunkSize - currentBuffer.position() % chunkSize, len);
+          Math.min(chunkSize - pos % chunkSize, len);
       currentBuffer.put(b, off, writeLen);
       currentBuffer.put(b, off, writeLen);
-      if (currentBuffer.position() % chunkSize == 0) {
-        int pos = currentBuffer.position() - chunkSize;
-        int limit = currentBuffer.position();
-        writeChunk(pos, limit);
+      if (!currentBuffer.hasRemaining()) {
+        writeChunk(currentBuffer);
       }
       }
       off += writeLen;
       off += writeLen;
       len -= writeLen;
       len -= writeLen;
       writtenDataLength += writeLen;
       writtenDataLength += writeLen;
-      if (currentBuffer.position() == streamBufferFlushSize) {
+      if (shouldFlush()) {
         totalDataFlushedLength += streamBufferFlushSize;
         totalDataFlushedLength += streamBufferFlushSize;
         handlePartialFlush();
         handlePartialFlush();
       }
       }
-      long bufferedData = computeBufferData();
-      // Data in the bufferList can not exceed streamBufferMaxSize
-      if (bufferedData == streamBufferMaxSize) {
+      // Data in the bufferPool can not exceed streamBufferMaxSize
+      if (isBufferPoolFull()) {
         handleFullBuffer();
         handleFullBuffer();
       }
       }
     }
     }
   }
   }
 
 
-  private ByteBuffer getCurrentBuffer() {
-    ByteBuffer buffer = bufferList.get(currentBufferIndex);
-    if (!buffer.hasRemaining()) {
-      currentBufferIndex =
-          currentBufferIndex < getMaxNumBuffers() - 1 ? ++currentBufferIndex :
-              0;
-    }
-    return bufferList.get(currentBufferIndex);
-  }
-
-  private int getMaxNumBuffers() {
-    return (int)(streamBufferMaxSize/streamBufferFlushSize);
+  private boolean shouldFlush() {
+    return writtenDataLength % streamBufferFlushSize == 0;
   }
   }
 
 
-  private void allocateBuffer() {
-    for (int i = bufferList.size(); i < getMaxNumBuffers(); i++) {
-      bufferList.add(ByteBuffer.allocate((int)streamBufferFlushSize));
-    }
+  private boolean isBufferPoolFull() {
+    return bufferPool.computeBufferData() == streamBufferMaxSize;
   }
   }
-
   /**
   /**
    * Will be called on the retryPath in case closedContainerException/
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
    * TimeoutException.
@@ -272,36 +249,37 @@ public class BlockOutputStream extends OutputStream {
     if (len == 0) {
     if (len == 0) {
       return;
       return;
     }
     }
-    int off = 0;
-    int pos = off;
+    int count = 0;
+    Preconditions.checkArgument(len <= streamBufferMaxSize);
     while (len > 0) {
     while (len > 0) {
       long writeLen;
       long writeLen;
       writeLen = Math.min(chunkSize, len);
       writeLen = Math.min(chunkSize, len);
       if (writeLen == chunkSize) {
       if (writeLen == chunkSize) {
-        int limit = pos + chunkSize;
-        writeChunk(pos, limit);
+        writeChunk(bufferPool.getBuffer(count));
       }
       }
-      off += writeLen;
       len -= writeLen;
       len -= writeLen;
+      count++;
       writtenDataLength += writeLen;
       writtenDataLength += writeLen;
-      if (off % streamBufferFlushSize == 0) {
-        // reset the position to zero as now we wll readng thhe next buffer in
-        // the list
-        pos = 0;
+      if (shouldFlush()) {
+        // reset the position to zero as now we will be reading the
+        // next buffer in the list
         totalDataFlushedLength += streamBufferFlushSize;
         totalDataFlushedLength += streamBufferFlushSize;
         handlePartialFlush();
         handlePartialFlush();
       }
       }
-      if (computeBufferData() % streamBufferMaxSize == 0) {
+
+      // we should not call isBufferFull here. The buffer might already be full
+      // as whole data is already cached in the buffer. We should just validate
+      // if we wrote data of size streamBufferMaxSize to call for handling
+      // full buffer condition.
+      if (writtenDataLength == streamBufferMaxSize) {
         handleFullBuffer();
         handleFullBuffer();
       }
       }
     }
     }
   }
   }
 
 
   /**
   /**
-   * just update the totalAckDataLength. Since we have allocated
-   * the currentBuffer more than the streamBufferMaxSize, we can keep on writing
-   * to the currentBuffer. In case of failure, we will read the data starting
-   * from totalAckDataLength.
+   * just update the totalAckDataLength. In case of failure,
+   * we will read the data starting from totalAckDataLength.
    */
    */
   private void updateFlushIndex(long index) {
   private void updateFlushIndex(long index) {
     if (!commitIndex2flushedDataMap.isEmpty()) {
     if (!commitIndex2flushedDataMap.isEmpty()) {
@@ -310,13 +288,15 @@ public class BlockOutputStream extends OutputStream {
       LOG.debug("Total data successfully replicated: " + totalAckDataLength);
       LOG.debug("Total data successfully replicated: " + totalAckDataLength);
       futureMap.remove(totalAckDataLength);
       futureMap.remove(totalAckDataLength);
       // Flush has been committed to required servers successful.
       // Flush has been committed to required servers successful.
-      // just swap the bufferList head and tail after clearing.
-      ByteBuffer currentBuffer = bufferList.remove(0);
-      currentBuffer.clear();
-      if (currentBufferIndex != 0) {
-        currentBufferIndex--;
+      // just release the current buffer from the buffer pool.
+
+      // every entry removed from the putBlock future Map signifies
+      // streamBufferFlushSize/chunkSize no of chunks successfully committed.
+      // Release the buffers from the buffer pool to be reused again.
+      int chunkCount = (int) (streamBufferFlushSize / chunkSize);
+      for (int i = 0; i < chunkCount; i++) {
+        bufferPool.releaseBuffer();
       }
       }
-      bufferList.add(currentBuffer);
     }
     }
   }
   }
 
 
@@ -450,91 +430,85 @@ public class BlockOutputStream extends OutputStream {
   @Override
   @Override
   public void flush() throws IOException {
   public void flush() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
     if (xceiverClientManager != null && xceiverClient != null
-        && bufferList != null) {
-      checkOpen();
-      int bufferSize = bufferList.size();
-      if (bufferSize > 0) {
-        try {
-          // flush the last chunk data residing on the currentBuffer
-          if (totalDataFlushedLength < writtenDataLength) {
-            ByteBuffer currentBuffer = getCurrentBuffer();
-            int pos = currentBuffer.position() - (currentBuffer.position()
-                % chunkSize);
-            int limit = currentBuffer.position() - pos;
-            writeChunk(pos, currentBuffer.position());
-            totalDataFlushedLength += limit;
-            handlePartialFlush();
-          }
-          waitOnFlushFutures();
-          // just check again if the exception is hit while waiting for the
-          // futures to ensure flush has indeed succeeded
-          checkOpen();
-        } catch (InterruptedException | ExecutionException e) {
-          adjustBuffersOnException();
-          throw new IOException(
-              "Unexpected Storage Container Exception: " + e.toString(), e);
-        }
+        && bufferPool != null && bufferPool.getSize() > 0) {
+      try {
+        handleFlush();
+      } catch (InterruptedException | ExecutionException e) {
+        adjustBuffersOnException();
+        throw new IOException(
+            "Unexpected Storage Container Exception: " + e.toString(), e);
       }
       }
     }
     }
   }
   }
 
 
-  private void writeChunk(int pos, int limit) throws IOException {
+
+  private void writeChunk(ByteBuffer buffer)
+      throws IOException {
     // Please note : We are not flipping the slice when we write since
     // Please note : We are not flipping the slice when we write since
     // the slices are pointing the currentBuffer start and end as needed for
     // the slices are pointing the currentBuffer start and end as needed for
     // the chunk write. Also please note, Duplicate does not create a
     // the chunk write. Also please note, Duplicate does not create a
     // copy of data, it only creates metadata that points to the data
     // copy of data, it only creates metadata that points to the data
     // stream.
     // stream.
-    ByteBuffer chunk = bufferList.get(currentBufferIndex).duplicate();
-    chunk.position(pos);
-    chunk.limit(limit);
+    ByteBuffer chunk = buffer.duplicate();
+    chunk.position(0);
+    chunk.limit(buffer.position());
     writeChunkToContainer(chunk);
     writeChunkToContainer(chunk);
   }
   }
 
 
+  private void handleFlush()
+      throws IOException, InterruptedException, ExecutionException {
+    checkOpen();
+    // flush the last chunk data residing on the currentBuffer
+    if (totalDataFlushedLength < writtenDataLength) {
+      ByteBuffer currentBuffer = bufferPool.getBuffer();
+      int pos = currentBuffer.position();
+      writeChunk(currentBuffer);
+      totalDataFlushedLength += pos;
+      handlePartialFlush();
+    }
+    waitOnFlushFutures();
+    // just check again if the exception is hit while waiting for the
+    // futures to ensure flush has indeed succeeded
+
+    // irrespective of whether the commitIndex2flushedDataMap is empty
+    // or not, ensure there is no exception set
+    checkOpen();
+
+  }
+
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
     if (xceiverClientManager != null && xceiverClient != null
-        && bufferList != null) {
-      int bufferSize = bufferList.size();
-      if (bufferSize > 0) {
-        try {
-          // flush the last chunk data residing on the currentBuffer
-          if (totalDataFlushedLength < writtenDataLength) {
-            ByteBuffer currentBuffer = getCurrentBuffer();
-            int pos = currentBuffer.position() - (currentBuffer.position()
-                % chunkSize);
-            int limit = currentBuffer.position() - pos;
-            writeChunk(pos, currentBuffer.position());
-            totalDataFlushedLength += limit;
-            handlePartialFlush();
-          }
-          waitOnFlushFutures();
-          // irrespective of whether the commitIndex2flushedDataMap is empty
-          // or not, ensure there is no exception set
-          checkOpen();
-          if (!commitIndex2flushedDataMap.isEmpty()) {
-            // wait for the last commit index in the commitIndex2flushedDataMap
-            // to get committed to all or majority of nodes in case timeout
-            // happens.
-            long lastIndex =
-                commitIndex2flushedDataMap.keySet().stream()
-                    .mapToLong(v -> v).max().getAsLong();
-            LOG.debug(
-                "waiting for last flush Index " + lastIndex + " to catch up");
-            watchForCommit(lastIndex);
-          }
-        } catch (InterruptedException | ExecutionException e) {
-          adjustBuffersOnException();
-          throw new IOException(
-              "Unexpected Storage Container Exception: " + e.toString(), e);
-        } finally {
-          cleanup(false);
+        && bufferPool != null && bufferPool.getSize() > 0) {
+      try {
+        handleFlush();
+        if (!commitIndex2flushedDataMap.isEmpty()) {
+          // wait for the last commit index in the commitIndex2flushedDataMap
+          // to get committed to all or majority of nodes in case timeout
+          // happens.
+          long lastIndex =
+              commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
+                  .max().getAsLong();
+          LOG.debug(
+              "waiting for last flush Index " + lastIndex + " to catch up");
+          watchForCommit(lastIndex);
         }
         }
+      } catch (InterruptedException | ExecutionException e) {
+        adjustBuffersOnException();
+        throw new IOException(
+            "Unexpected Storage Container Exception: " + e.toString(), e);
+      } finally {
+        cleanup(false);
       }
       }
-      // clear the currentBuffer
-      bufferList.stream().forEach(ByteBuffer::clear);
+      // TODO: Turn the below buffer empty check on whne Standalone pipeline
+      // is removed in the write path in tests
+      // Preconditions.checkArgument(buffer.position() == 0);
+      // bufferPool.checkBufferPoolEmpty();
+
     }
     }
   }
   }
 
 
+
   private void waitOnFlushFutures()
   private void waitOnFlushFutures()
       throws InterruptedException, ExecutionException {
       throws InterruptedException, ExecutionException {
     CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
     CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(

+ 106 - 0
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java

@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class creates and manages pool of n buffers.
+ */
+public class BufferPool {
+
+  private List<ByteBuffer> bufferList;
+  private int currentBufferIndex;
+  private final int bufferSize;
+  private final int capacity;
+
+  public BufferPool(int bufferSize, int capacity) {
+    this.capacity = capacity;
+    this.bufferSize = bufferSize;
+    bufferList = new ArrayList<>(capacity);
+    currentBufferIndex = -1;
+  }
+
+  public ByteBuffer getBuffer() {
+    return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex);
+  }
+
+  /**
+   * If the currentBufferIndex is less than the buffer size - 1,
+   * it means, the next buffer in the list has been freed up for
+   * rewriting. Reuse the next available buffer in such cases.
+   *
+   * In case, the currentBufferIndex == buffer.size and buffer size is still
+   * less than the capacity to be allocated, just allocate a buffer of size
+   * chunk size.
+   *
+   */
+  public ByteBuffer allocateBufferIfNeeded() {
+    ByteBuffer buffer = getBuffer();
+    if (buffer != null && buffer.hasRemaining()) {
+      return buffer;
+    }
+    if (currentBufferIndex < bufferList.size() - 1) {
+      buffer = getBuffer(currentBufferIndex + 1);
+    } else {
+      buffer = ByteBuffer.allocate(bufferSize);
+      bufferList.add(buffer);
+    }
+    Preconditions.checkArgument(bufferList.size() <= capacity);
+    currentBufferIndex++;
+    // TODO: Turn the below precondition check on when Standalone pipeline
+    // is removed in the write path in tests
+    // Preconditions.checkArgument(buffer.position() == 0);
+    return buffer;
+  }
+
+  public void releaseBuffer() {
+    // always remove from head of the list and append at last
+    ByteBuffer buffer = bufferList.remove(0);
+    buffer.clear();
+    bufferList.add(buffer);
+    currentBufferIndex--;
+  }
+
+  public void clearBufferPool() {
+    bufferList.clear();
+    currentBufferIndex = -1;
+  }
+
+  public void checkBufferPoolEmpty() {
+    Preconditions.checkArgument(computeBufferData() == 0);
+  }
+  public long computeBufferData() {
+    return bufferList.stream().mapToInt(value -> value.position())
+        .sum();
+  }
+
+  public int getSize() {
+    return bufferList.size();
+  }
+
+  ByteBuffer getBuffer(int index) {
+    return bufferList.get(index);
+  }
+
+}

+ 11 - 12
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java

@@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.client.io;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
 
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -29,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -58,14 +57,14 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private final long streamBufferFlushSize;
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long watchTimeout;
-  private List<ByteBuffer> bufferList;
+  private BufferPool bufferPool;
 
 
   @SuppressWarnings("parameternumber")
   @SuppressWarnings("parameternumber")
   private BlockOutputStreamEntry(BlockID blockID, String key,
   private BlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager,
       XceiverClientManager xceiverClientManager,
       Pipeline pipeline, String requestId, int chunkSize,
       Pipeline pipeline, String requestId, int chunkSize,
       long length, long streamBufferFlushSize, long streamBufferMaxSize,
       long length, long streamBufferFlushSize, long streamBufferMaxSize,
-      long watchTimeout, List<ByteBuffer> bufferList,
+      long watchTimeout, BufferPool bufferPool,
       ChecksumType checksumType, int bytesPerChecksum,
       ChecksumType checksumType, int bytesPerChecksum,
       Token<OzoneBlockTokenIdentifier> token) {
       Token<OzoneBlockTokenIdentifier> token) {
     this.outputStream = null;
     this.outputStream = null;
@@ -81,7 +80,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.watchTimeout = watchTimeout;
-    this.bufferList = bufferList;
+    this.bufferPool = bufferPool;
     this.checksumType = checksumType;
     this.checksumType = checksumType;
     this.bytesPerChecksum = bytesPerChecksum;
     this.bytesPerChecksum = bytesPerChecksum;
   }
   }
@@ -112,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       this.outputStream =
       this.outputStream =
           new BlockOutputStream(blockID, key, xceiverClientManager,
           new BlockOutputStream(blockID, key, xceiverClientManager,
               pipeline, requestId, chunkSize, streamBufferFlushSize,
               pipeline, requestId, chunkSize, streamBufferFlushSize,
-              streamBufferMaxSize, watchTimeout, bufferList, checksumType,
+              streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
               bytesPerChecksum);
               bytesPerChecksum);
     }
     }
   }
   }
@@ -212,7 +211,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
     private long streamBufferFlushSize;
     private long streamBufferFlushSize;
     private long streamBufferMaxSize;
     private long streamBufferMaxSize;
     private long watchTimeout;
     private long watchTimeout;
-    private List<ByteBuffer> bufferList;
+    private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private Token<OzoneBlockTokenIdentifier> token;
     private ChecksumType checksumType;
     private ChecksumType checksumType;
     private int bytesPerChecksum;
     private int bytesPerChecksum;
@@ -278,8 +277,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return this;
       return this;
     }
     }
 
 
-    public Builder setBufferList(List<ByteBuffer> bffrLst) {
-      this.bufferList = bffrLst;
+    public Builder setbufferPool(BufferPool pool) {
+      this.bufferPool = pool;
       return this;
       return this;
     }
     }
 
 
@@ -292,7 +291,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return new BlockOutputStreamEntry(blockID, key,
       return new BlockOutputStreamEntry(blockID, key,
           xceiverClientManager, pipeline, requestId, chunkSize,
           xceiverClientManager, pipeline, requestId, chunkSize,
           length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
           length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
-          bufferList, checksumType, bytesPerChecksum, token);
+          bufferPool, checksumType, bytesPerChecksum, token);
     }
     }
   }
   }
 
 
@@ -340,8 +339,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
     return watchTimeout;
     return watchTimeout;
   }
   }
 
 
-  public List<ByteBuffer> getBufferList() {
-    return bufferList;
+  public BufferPool getBufferPool() {
+    return bufferPool;
   }
   }
 
 
   public void setCurrentPosition(long curPosition) {
   public void setCurrentPosition(long curPosition) {

+ 8 - 13
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -45,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Collection;
 import java.util.Collection;
@@ -83,7 +83,7 @@ public class KeyOutputStream extends OutputStream {
   private final long blockSize;
   private final long blockSize;
   private final int bytesPerChecksum;
   private final int bytesPerChecksum;
   private final ChecksumType checksumType;
   private final ChecksumType checksumType;
-  private List<ByteBuffer> bufferList;
+  private final BufferPool bufferPool;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
   private FileEncryptionInfo feInfo;
   private ExcludeList excludeList;
   private ExcludeList excludeList;
@@ -104,9 +104,7 @@ public class KeyOutputStream extends OutputStream {
     closed = false;
     closed = false;
     streamBufferFlushSize = 0;
     streamBufferFlushSize = 0;
     streamBufferMaxSize = 0;
     streamBufferMaxSize = 0;
-    bufferList = new ArrayList<>(1);
-    ByteBuffer buffer = ByteBuffer.allocate(1);
-    bufferList.add(buffer);
+    bufferPool = new BufferPool(chunkSize, 1);
     watchTimeout = 0;
     watchTimeout = 0;
     blockSize = 0;
     blockSize = 0;
     this.checksumType = ChecksumType.valueOf(
     this.checksumType = ChecksumType.valueOf(
@@ -182,7 +180,8 @@ public class KeyOutputStream extends OutputStream {
     Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
     Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
     Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
     Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
     Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
     Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
-    this.bufferList = new ArrayList<>();
+    this.bufferPool =
+        new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
     this.excludeList = new ExcludeList();
     this.excludeList = new ExcludeList();
   }
   }
 
 
@@ -228,7 +227,7 @@ public class KeyOutputStream extends OutputStream {
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setWatchTimeout(watchTimeout)
-            .setBufferList(bufferList)
+            .setbufferPool(bufferPool)
             .setChecksumType(checksumType)
             .setChecksumType(checksumType)
             .setBytesPerChecksum(bytesPerChecksum)
             .setBytesPerChecksum(bytesPerChecksum)
             .setToken(subKeyInfo.getToken());
             .setToken(subKeyInfo.getToken());
@@ -272,8 +271,7 @@ public class KeyOutputStream extends OutputStream {
   }
   }
 
 
   private long computeBufferData() {
   private long computeBufferData() {
-    return bufferList.stream().mapToInt(value -> value.position())
-        .sum();
+    return bufferPool.computeBufferData();
   }
   }
 
 
   private void handleWrite(byte[] b, int off, long len, boolean retry)
   private void handleWrite(byte[] b, int off, long len, boolean retry)
@@ -580,10 +578,7 @@ public class KeyOutputStream extends OutputStream {
     } catch (IOException ioe) {
     } catch (IOException ioe) {
       throw ioe;
       throw ioe;
     } finally {
     } finally {
-      if (bufferList != null) {
-        bufferList.stream().forEach(e -> e.clear());
-      }
-      bufferList = null;
+      bufferPool.clearBufferPool();
     }
     }
   }
   }
 
 

+ 50 - 12
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -87,6 +87,7 @@ public class TestCloseContainerHandlingByClient {
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.setQuietMode(false);
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
         StorageUnit.MB);
@@ -133,7 +134,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
         .build();
 
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     key.write(data);
     key.write(data);
     key.flush();
     key.flush();
     key.close();
     key.close();
@@ -166,7 +167,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
         .build();
 
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     key.close();
     key.close();
     // read the key from OM again and match the length.The length will still
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
     // be the equal to the original data size.
@@ -180,12 +181,12 @@ public class TestCloseContainerHandlingByClient {
 
 
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key =
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
+        createKey(keyName, ReplicationType.RATIS, (3 * blockSize));
     KeyOutputStream keyOutputStream =
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) key.getOutputStream();
         (KeyOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     // With the initial size provided, it should have preallocated 4 blocks
-    Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
-    // write data more than 1 chunk
+    Assert.assertEquals(3, keyOutputStream.getStreamEntries().size());
+    // write data more than 1 block
     byte[] data =
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
         ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
             .getBytes(UTF_8);
             .getBytes(UTF_8);
@@ -199,7 +200,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
         .build();
 
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     // write 1 more block worth of data. It will fail and new block will be
     // write 1 more block worth of data. It will fail and new block will be
     // allocated
     // allocated
     key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
     key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
@@ -258,7 +259,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .build();
         .build();
 
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
 
 
     key.close();
     key.close();
     // read the key from OM again and match the length.The length will still
     // read the key from OM again and match the length.The length will still
@@ -301,7 +302,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
         .build();
 
 
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     // write 3 more chunks worth of data. It will fail and new block will be
     // write 3 more chunks worth of data. It will fail and new block will be
     // allocated. This write completes 4 blocks worth of data written to key
     // allocated. This write completes 4 blocks worth of data written to key
     data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
     data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
@@ -330,8 +331,7 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertEquals(4 * blockSize, length);
     Assert.assertEquals(4 * blockSize, length);
   }
   }
 
 
-  private void waitForContainerClose(String keyName,
-      OzoneOutputStream outputStream)
+  private void waitForContainerClose(OzoneOutputStream outputStream)
       throws Exception {
       throws Exception {
     KeyOutputStream keyOutputStream =
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) outputStream.getOutputStream();
         (KeyOutputStream) outputStream.getOutputStream();
@@ -375,7 +375,7 @@ public class TestCloseContainerHandlingByClient {
             .getPipeline(container.getPipelineID());
             .getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
     Assert.assertEquals(1, datanodes.size());
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     dataString =
     dataString =
         ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
         ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     data = dataString.getBytes(UTF_8);
     data = dataString.getBytes(UTF_8);
@@ -421,7 +421,7 @@ public class TestCloseContainerHandlingByClient {
         .build();
         .build();
 
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    waitForContainerClose(keyName, key);
+    waitForContainerClose(key);
     // Again Write the Data. This will throw an exception which will be handled
     // Again Write the Data. This will throw an exception which will be handled
     // and new blocks will be allocated
     // and new blocks will be allocated
     key.write(data);
     key.write(data);
@@ -435,4 +435,42 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     validateData(keyName, dataString.getBytes(UTF_8));
     validateData(keyName, dataString.getBytes(UTF_8));
   }
   }
+
+  @Test
+  public void testBlockWrites() throws Exception {
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, 2 * chunkSize)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(key);
+    byte[] data2 =
+        ContainerTestHelper.getFixedLengthString(keyString, 3 * chunkSize)
+            .getBytes(UTF_8);
+    key.write(data2);
+    key.flush();
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(5 * chunkSize, keyInfo.getDataSize());
+
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    // Written the same data twice
+    String dataString2 = new String(data2, UTF_8);
+    dataString = dataString.concat(dataString2);
+    validateData(keyName, dataString.getBytes(UTF_8));
+  }
+
 }
 }

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java

@@ -142,7 +142,7 @@ public class TestBlockDeletion {
 
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
         .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).build();
         .setFactor(HddsProtos.ReplicationFactor.ONE).build();
     List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
     List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
         om.lookupKey(keyArgs).getKeyLocationVersions();
         om.lookupKey(keyArgs).getKeyLocationVersions();