Bläddra i källkod

HDDS-450. Generate BlockCommitSequenceId in ContainerStateMachine for every commit operation in Ratis. Contributed by Shashikant Banerjee.

Nanda kumar 6 år sedan
förälder
incheckning
7367ff333b

+ 11 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -65,6 +66,7 @@ public class ChunkOutputStream extends OutputStream {
   private final String streamId;
   private int chunkIndex;
   private int chunkSize;
+  private long blockCommitSequenceId;
 
   /**
    * Creates a new ChunkOutputStream.
@@ -93,12 +95,17 @@ public class ChunkOutputStream extends OutputStream {
     this.buffer = ByteBuffer.allocate(chunkSize);
     this.streamId = UUID.randomUUID().toString();
     this.chunkIndex = 0;
+    blockCommitSequenceId = 0;
   }
 
   public ByteBuffer getBuffer() {
     return buffer;
   }
 
+  public long getBlockCommitSequenceId() {
+    return blockCommitSequenceId;
+  }
+
   @Override
   public void write(int b) throws IOException {
     checkOpen();
@@ -155,7 +162,10 @@ public class ChunkOutputStream extends OutputStream {
         writeChunkToContainer();
       }
       try {
-        putBlock(xceiverClient, containerBlockData.build(), traceID);
+        ContainerProtos.PutBlockResponseProto responseProto =
+            putBlock(xceiverClient, containerBlockData.build(), traceID);
+        blockCommitSequenceId =
+            responseProto.getCommittedBlockLength().getBlockCommitSequenceId();
       } catch (IOException e) {
         throw new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);

+ 12 - 13
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

@@ -141,24 +141,23 @@ public final class ContainerProtocolCalls  {
    * @param xceiverClient client to perform call
    * @param containerBlockData block data to identify container
    * @param traceID container protocol call args
+   * @return putBlockResponse
    * @throws IOException if there is an I/O error while performing the call
    */
-  public static void putBlock(XceiverClientSpi xceiverClient,
-      BlockData containerBlockData, String traceID) throws IOException {
-    PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto
-        .newBuilder()
-        .setBlockData(containerBlockData);
+  public static ContainerProtos.PutBlockResponseProto putBlock(
+      XceiverClientSpi xceiverClient, BlockData containerBlockData,
+      String traceID) throws IOException {
+    PutBlockRequestProto.Builder createBlockRequest =
+        PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
     String id = xceiverClient.getPipeline().getLeader().getUuidString();
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.PutBlock)
-        .setContainerID(containerBlockData.getBlockID().getContainerID())
-        .setTraceID(traceID)
-        .setDatanodeUuid(id)
-        .setPutBlock(createBlockRequest)
-        .build();
+    ContainerCommandRequestProto request =
+        ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
+            .setContainerID(containerBlockData.getBlockID().getContainerID())
+            .setTraceID(traceID).setDatanodeUuid(id)
+            .setPutBlock(createBlockRequest).build();
     ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
     validateContainerResponse(response);
+    return response.getPutBlock();
   }
 
   /**

+ 12 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java

@@ -34,6 +34,7 @@ import java.util.ArrayList;
 public class BlockData {
   private final BlockID blockID;
   private final Map<String, String> metadata;
+  private long blockCommitSequenceId;
 
   /**
    * Represent a list of chunks.
@@ -64,6 +65,15 @@ public class BlockData {
     this.blockID = blockID;
     this.metadata = new TreeMap<>();
     this.size = 0;
+    blockCommitSequenceId = 0;
+  }
+
+  public long getBlockCommitSequenceId() {
+    return blockCommitSequenceId;
+  }
+
+  public void setBlockCommitSequenceId(long blockCommitSequenceId) {
+    this.blockCommitSequenceId = blockCommitSequenceId;
   }
 
   /**
@@ -85,6 +95,7 @@ public class BlockData {
     if (data.hasSize()) {
       Preconditions.checkArgument(data.getSize() == blockData.getSize());
     }
+    blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
     return blockData;
   }
 
@@ -104,6 +115,7 @@ public class BlockData {
     }
     builder.addAllChunks(getChunks());
     builder.setSize(size);
+    builder.setBlockCommitSequenceId(blockCommitSequenceId);
     return builder.build();
   }
 

+ 2 - 0
hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto

@@ -300,6 +300,7 @@ message BlockData {
   repeated KeyValue metadata = 3;
   repeated ChunkInfo chunks = 4;
   optional int64 size = 5;
+  optional uint64 blockCommitSequenceId = 6;
 }
 
 // Block Messages.
@@ -331,6 +332,7 @@ message  GetCommittedBlockLengthRequestProto {
 message  GetCommittedBlockLengthResponseProto {
   required DatanodeBlockID blockID = 1;
   required int64 blockLength = 2;
+  optional uint64 blockCommitSequenceId = 3;
 }
 
 message   DeleteBlockResponseProto {

+ 26 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -506,17 +508,36 @@ public class ContainerStateMachine extends BaseStateMachine {
     // on a container
 
     private CompletableFuture<Message> handlePutBlock(
-        ContainerCommandRequestProto requestProto) {
+        ContainerCommandRequestProto requestProto, long index) {
       List<CompletableFuture<Message>> futureList = new ArrayList<>();
-      long localId =
-          requestProto.getPutBlock().getBlockData().getBlockID().getLocalID();
+      BlockData blockData = null;
+      ContainerProtos.BlockData blockDataProto =
+          requestProto.getPutBlock().getBlockData();
+
+      // set the blockCommitSequenceId
+      try {
+        blockData = BlockData.getFromProtoBuf(blockDataProto);
+      } catch (IOException ioe) {
+        LOG.error("unable to retrieve blockData info for Block {}",
+            blockDataProto.getBlockID());
+        return completeExceptionally(ioe);
+      }
+      blockData.setBlockCommitSequenceId(index);
+      final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
+          ContainerProtos.PutBlockRequestProto
+              .newBuilder(requestProto.getPutBlock())
+              .setBlockData(blockData.getProtoBufMessage()).build();
+      ContainerCommandRequestProto containerCommandRequestProto =
+          ContainerCommandRequestProto.newBuilder(requestProto)
+              .setPutBlock(putBlockRequestProto).build();
+      long localId = blockDataProto.getBlockID().getLocalID();
       // Need not wait for create container future here as it has already
       // finished.
       if (block2ChunkMap.get(localId) != null) {
         futureList.addAll(block2ChunkMap.get(localId).getAll());
       }
       CompletableFuture<Message> effectiveFuture =
-          runCommandAfterFutures(futureList, requestProto);
+          runCommandAfterFutures(futureList, containerCommandRequestProto);
 
       CompletableFuture<Message> putBlockFuture =
           effectiveFuture.thenApply(message -> {
@@ -616,7 +637,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       case CloseContainer:
         return handleCloseContainer(requestProto);
       case PutBlock:
-        return handlePutBlock(requestProto);
+        return handlePutBlock(requestProto, index);
       case CreateContainer:
         return handleCreateContainer(requestProto);
       default:

+ 4 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java

@@ -133,10 +133,13 @@ public final class BlockUtils {
    */
   public static ContainerCommandResponseProto putBlockResponseSuccess(
       ContainerCommandRequestProto msg, long blockLength) {
+    ContainerProtos.BlockData blockData = msg.getPutBlock().getBlockData();
     GetCommittedBlockLengthResponseProto.Builder
         committedBlockLengthResponseBuilder =
         getCommittedBlockLengthResponseBuilder(blockLength,
-            msg.getPutBlock().getBlockData().getBlockID());
+            blockData.getBlockID());
+    committedBlockLengthResponseBuilder
+        .setBlockCommitSequenceId(blockData.getBlockCommitSequenceId());
     PutBlockResponseProto.Builder putKeyResponse =
         PutBlockResponseProto.newBuilder();
     putKeyResponse

+ 0 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java

@@ -85,7 +85,6 @@ public class BlockManagerImpl implements BlockManager {
     Preconditions.checkNotNull(db, "DB cannot be null here");
     db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
         .toByteArray());
-
     // Increment keycount here
     container.getContainerData().incrKeyCount();
     return data.getSize();

+ 12 - 4
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -117,13 +117,15 @@ public class ChunkGroupOutputStream extends OutputStream {
     return streamEntries;
   }
 
-  public List<OmKeyLocationInfo> getLocationInfoList() {
+  public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
     List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
     for (ChunkOutputStreamEntry streamEntry : streamEntries) {
       OmKeyLocationInfo info =
           new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
               .setShouldCreateContainer(false)
-              .setLength(streamEntry.currentPosition).setOffset(0).build();
+              .setLength(streamEntry.currentPosition).setOffset(0)
+              .setBlockCommitSequenceId(streamEntry.getBlockCommitSequenceId())
+              .build();
       locationInfoList.add(info);
     }
     return locationInfoList;
@@ -153,8 +155,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.chunkSize = chunkSize;
     this.requestID = requestId;
     this.retryPolicy = retryPolicy;
-    LOG.debug("Expecting open key with one block, but got" +
-        info.getKeyLocationVersions().size());
   }
 
   /**
@@ -708,6 +708,14 @@ public class ChunkGroupOutputStream extends OutputStream {
       throw new IOException("Invalid Output Stream for Key: " + key);
     }
 
+    long getBlockCommitSequenceId() throws IOException {
+      if (this.outputStream instanceof ChunkOutputStream) {
+        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+        return out.getBlockCommitSequenceId();
+      }
+      throw new IOException("Invalid Output Stream for Key: " + key);
+    }
+
     public void cleanup() {
       checkStream();
       if (this.outputStream instanceof ChunkOutputStream) {

+ 14 - 3
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java

@@ -31,13 +31,15 @@ public final class OmKeyLocationInfo {
   private final long offset;
   // the version number indicating when this block was added
   private long createVersion;
+  private final long blockCommitSequenceId;
 
   private OmKeyLocationInfo(BlockID blockID, boolean shouldCreateContainer,
-                            long length, long offset) {
+      long length, long offset, long blockCommitSequenceId) {
     this.blockID = blockID;
     this.shouldCreateContainer = shouldCreateContainer;
     this.length = length;
     this.offset = offset;
+    this.blockCommitSequenceId = blockCommitSequenceId;
   }
 
   public void setCreateVersion(long version) {
@@ -84,6 +86,7 @@ public final class OmKeyLocationInfo {
     private boolean shouldCreateContainer;
     private long length;
     private long offset;
+    private long blockCommitSequenceId;
 
     public Builder setBlockID(BlockID blockId) {
       this.blockID = blockId;
@@ -105,9 +108,14 @@ public final class OmKeyLocationInfo {
       return this;
     }
 
+    public Builder setBlockCommitSequenceId(long sequenceId) {
+      this.blockCommitSequenceId = sequenceId;
+      return this;
+    }
+
     public OmKeyLocationInfo build() {
       return new OmKeyLocationInfo(blockID,
-          shouldCreateContainer, length, offset);
+          shouldCreateContainer, length, offset, blockCommitSequenceId);
     }
   }
 
@@ -118,6 +126,7 @@ public final class OmKeyLocationInfo {
         .setLength(length)
         .setOffset(offset)
         .setCreateVersion(createVersion)
+        .setBlockCommitSequenceId(blockCommitSequenceId)
         .build();
   }
 
@@ -126,7 +135,8 @@ public final class OmKeyLocationInfo {
         BlockID.getFromProtobuf(keyLocation.getBlockID()),
         keyLocation.getShouldCreateContainer(),
         keyLocation.getLength(),
-        keyLocation.getOffset());
+        keyLocation.getOffset(),
+        keyLocation.getBlockCommitSequenceId());
     info.setCreateVersion(keyLocation.getCreateVersion());
     return info;
   }
@@ -138,6 +148,7 @@ public final class OmKeyLocationInfo {
         ", shouldCreateContainer=" + shouldCreateContainer +
         ", length=" + length +
         ", offset=" + offset +
+        ", blockCommitSequenceId=" + blockCommitSequenceId +
         ", createVersion=" + createVersion + '}';
   }
 }

+ 1 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -244,6 +244,7 @@ message KeyLocation {
     required uint64 length = 4;
     // indicated at which version this block gets created.
     optional uint64 createVersion = 5;
+    optional uint64 blockCommitSequenceId = 6;
 }
 
 message KeyLocationList {

+ 3 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java

@@ -219,7 +219,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
     ContainerProtos.PutBlockResponseProto response;
     String traceID = UUID.randomUUID().toString();
     ContainerWithPipeline container = storageContainerLocationClient
-        .allocateContainer(xceiverClientManager.getType(),
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     long containerID = container.getContainerInfo().getContainerID();
     Pipeline pipeline = container.getPipeline();
@@ -249,6 +249,8 @@ public class TestGetCommittedBlockLengthAndPutKey {
         blockID);
     Assert.assertEquals(
         response.getCommittedBlockLength().getBlockLength(), data.length);
+    Assert.assertTrue(
+        response.getCommittedBlockLength().getBlockCommitSequenceId() > 0);
     xceiverClientManager.releaseClient(client);
   }
 }

+ 2 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -172,6 +172,7 @@ public class KeyManagerImpl implements KeyManager {
         .setBlockID(allocatedBlock.getBlockID())
         .setShouldCreateContainer(allocatedBlock.getCreateContainer())
         .setLength(scmBlockSize)
+        .setBlockCommitSequenceId(0)
         .setOffset(0)
         .build();
     // current version not committed, so new blocks coming now are added to
@@ -236,6 +237,7 @@ public class KeyManagerImpl implements KeyManager {
             .setBlockID(allocatedBlock.getBlockID())
             .setShouldCreateContainer(allocatedBlock.getCreateContainer())
             .setLength(allocateSize)
+            .setBlockCommitSequenceId(0)
             .setOffset(0)
             .build();
         locations.add(subKeyInfo);