浏览代码

HDDS-629. Make ApplyTransaction calls in ContainerStateMachine idempotent. Contributed by Shashikant Banerjee.

Jitendra Pandey 6 年之前
父节点
当前提交
e13a38f4bc

+ 7 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -241,9 +241,12 @@ public class KeyValueHandler extends Handler {
         newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
         containerSet.addContainer(newContainer);
       } else {
-        throw new StorageContainerException("Container already exists with " +
-            "container Id " + containerID, ContainerProtos.Result
-            .CONTAINER_EXISTS);
+
+        // The create container request for an already existing container can
+        // arrive in case the ContainerStateMachine reapplies the transaction
+        // on datanode restart. Just log a warning msg here.
+        LOG.warn("Container already exists." +
+            "container Id " + containerID);
       }
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -370,6 +373,7 @@ public class KeyValueHandler extends Handler {
 
   /**
    * Handles Close Container Request. An open container is closed.
+   * Close Container call is idempotent.
    */
   ContainerCommandResponseProto handleCloseContainer(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

+ 4 - 7
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java

@@ -231,21 +231,18 @@ public final class ChunkUtils {
    *
    * @param chunkFile - chunkFile to write data into.
    * @param info - chunk info.
-   * @return boolean isOverwrite
-   * @throws StorageContainerException
+   * @return true if the chunkFile exists and chunkOffset < chunkFile length,
+   *         false otherwise.
    */
   public static boolean validateChunkForOverwrite(File chunkFile,
-      ChunkInfo info) throws StorageContainerException {
+      ChunkInfo info) {
 
     Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
 
     if (isOverWriteRequested(chunkFile, info)) {
       if (!isOverWritePermitted(info)) {
-        log.error("Rejecting write chunk request. Chunk overwrite " +
+        log.warn("Duplicate write chunk request. Chunk overwrite " +
             "without explicit request. {}", info.toString());
-        throw new StorageContainerException("Rejecting write chunk request. " +
-            "OverWrite flag required." + info.toString(),
-            OVERWRITE_FLAG_REQUIRED);
       }
       return true;
     }

+ 23 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java

@@ -89,11 +89,33 @@ public class BlockManagerImpl implements BlockManager {
     Preconditions.checkNotNull(db, "DB cannot be null here");
 
     long blockCommitSequenceId = data.getBlockCommitSequenceId();
+    byte[] blockCommitSequenceIdKey =
+        DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
+    byte[] blockCommitSequenceIdValue = db.get(blockCommitSequenceIdKey);
+
+    // default blockCommitSequenceId for any block is 0. It the putBlock
+    // request is not coming via Ratis(for test scenarios), it will be 0.
+    // In such cases, we should overwrite the block as well
+    if (blockCommitSequenceIdValue != null && blockCommitSequenceId != 0) {
+      if (blockCommitSequenceId <= Longs
+          .fromByteArray(blockCommitSequenceIdValue)) {
+        // Since the blockCommitSequenceId stored in the db is greater than
+        // equal to blockCommitSequenceId to be updated, it means the putBlock
+        // transaction is reapplied in the ContainerStateMachine on restart.
+        // It also implies that the given block must already exist in the db.
+        // just log and return
+        LOG.warn("blockCommitSequenceId " + Longs
+            .fromByteArray(blockCommitSequenceIdValue)
+            + " in the Container Db is greater than" + " the supplied value "
+            + blockCommitSequenceId + " .Ignoring it");
+        return data.getSize();
+      }
+    }
     // update the blockData as well as BlockCommitSequenceId here
     BatchOperation batch = new BatchOperation();
     batch.put(Longs.toByteArray(data.getLocalID()),
         data.getProtoBufMessage().toByteArray());
-    batch.put(DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX),
+    batch.put(blockCommitSequenceIdKey,
         Longs.toByteArray(blockCommitSequenceId));
     db.writeBatch(batch);
     container.updateBlockCommitSequenceId(blockCommitSequenceId);

+ 43 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java

@@ -87,6 +87,32 @@ public class ChunkManagerImpl implements ChunkManager {
 
       switch (stage) {
       case WRITE_DATA:
+        if (isOverwrite) {
+          // if the actual chunk file already exists here while writing the temp
+          // chunk file, then it means the same ozone client request has
+          // generated two raft log entries. This can happen either because
+          // retryCache expired in Ratis (or log index mismatch/corruption in
+          // Ratis). This can be solved by two approaches as of now:
+          // 1. Read the complete data in the actual chunk file ,
+          //    verify the data integrity and in case it mismatches , either
+          // 2. Delete the chunk File and write the chunk again. For now,
+          //    let's rewrite the chunk file
+          // TODO: once the checksum support for write chunks gets plugged in,
+          // the checksum needs to be verified for the actual chunk file and
+          // the data to be written here which should be efficient and
+          // it matches we can safely return without rewriting.
+          LOG.warn("ChunkFile already exists" + chunkFile + ".Deleting it.");
+          FileUtil.fullyDelete(chunkFile);
+        }
+        if (tmpChunkFile.exists()) {
+          // If the tmp chunk file already exists it means the raft log got
+          // appended, but later on the log entry got truncated in Ratis leaving
+          // behind garbage.
+          // TODO: once the checksum support for data chunks gets plugged in,
+          // instead of rewriting the chunk here, let's compare the checkSums
+          LOG.warn(
+              "tmpChunkFile already exists" + tmpChunkFile + "Overwriting it.");
+        }
         // Initially writes to temporary chunk file.
         ChunkUtils.writeData(tmpChunkFile, info, data, volumeIOStats);
         // No need to increment container stats here, as still data is not
@@ -95,6 +121,15 @@ public class ChunkManagerImpl implements ChunkManager {
       case COMMIT_DATA:
         // commit the data, means move chunk data from temporary chunk file
         // to actual chunk file.
+        if (isOverwrite) {
+          // if the actual chunk file already exists , it implies the write
+          // chunk transaction in the containerStateMachine is getting
+          // reapplied. This can happen when a node restarts.
+          // TODO: verify the checkSums for the existing chunkFile and the
+          // chunkInfo to be committed here
+          LOG.warn("ChunkFile already exists" + chunkFile);
+          return;
+        }
         commitChunk(tmpChunkFile, chunkFile);
         // Increment container stats here, as we commit the data.
         containerData.incrBytesUsed(info.getLen());
@@ -200,6 +235,14 @@ public class ChunkManagerImpl implements ChunkManager {
     if (containerData.getLayOutVersion() == ChunkLayOutVersion
         .getLatestVersion().getVersion()) {
       File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+
+      // if the chunk file does not exist, it might have already been deleted.
+      // The call might be because of reapply of transactions on datanode
+      // restart.
+      if (!chunkFile.exists()) {
+        LOG.warn("Chunk file doe not exist. chunk info :" + info.toString());
+        return;
+      }
       if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
         FileUtil.fullyDelete(chunkFile);
         containerData.decrBytesUsed(chunkFile.length());

+ 121 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+    ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.
+    SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.protocolPB.
+    StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Tests the idempotent operations in ContainerStateMachine.
+ */
+public class TestContainerStateMachineIdempotency {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration ozoneConfig;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+  private static String containerOwner = "OZONE";
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    cluster =
+        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
+    storageContainerLocationClient =
+        cluster.getStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(ozoneConfig);
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
+  }
+
+  @Test
+  public void testContainerStateMachineIdempotency() throws Exception {
+    String traceID = UUID.randomUUID().toString();
+    ContainerWithPipeline container = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.ONE, containerOwner);
+    long containerID = container.getContainerInfo().getContainerID();
+    Pipeline pipeline = container.getPipeline();
+    XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+    try {
+      //create the container
+      ContainerProtocolCalls.createContainer(client, containerID, traceID);
+      // call create Container again
+      BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+      byte[] data =
+          RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
+      ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+          ContainerTestHelper
+              .getWriteChunkRequest(container.getPipeline(), blockID,
+                  data.length);
+      client.sendCommand(writeChunkRequest);
+
+      //Make the write chunk request again without requesting for overWrite
+      client.sendCommand(writeChunkRequest);
+      // Now, explicitly make a putKey request for the block.
+      ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+          ContainerTestHelper
+              .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
+      client.sendCommand(putKeyRequest).getPutBlock();
+      // send the putBlock again
+      client.sendCommand(putKeyRequest);
+
+      // close container call
+      ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+      ContainerProtocolCalls.closeContainer(client, containerID, traceID);
+    } catch (IOException ioe) {
+      Assert.fail("Container operation failed" + ioe);
+    }
+    xceiverClientManager.releaseClient(client);
+  }
+}

+ 5 - 12
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java

@@ -462,15 +462,7 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, data, COMBINED);
-    try {
-      chunkManager.writeChunk(container, blockID, info, data, COMBINED);
-    } catch (StorageContainerException ex) {
-      Assert.assertTrue(ex.getMessage().contains(
-          "Rejecting write chunk request. OverWrite flag required"));
-      Assert.assertEquals(ex.getResult(),
-          ContainerProtos.Result.OVERWRITE_FLAG_REQUIRED);
-    }
-
+    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
     // With the overwrite flag it should work now.
     info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
     chunkManager.writeChunk(container, blockID, info, data, COMBINED);
@@ -478,7 +470,7 @@ public class TestContainerPersistence {
     Assert.assertEquals(datalen, bytesUsed);
 
     long bytesWrite = container.getContainerData().getWriteBytes();
-    Assert.assertEquals(datalen * 2, bytesWrite);
+    Assert.assertEquals(datalen * 3, bytesWrite);
   }
 
   /**
@@ -748,10 +740,11 @@ public class TestContainerPersistence {
 
   }
 
-  private BlockData writeBlockHelper(BlockID blockID)
+  private BlockData writeBlockHelper(BlockID blockID, int i)
       throws IOException, NoSuchAlgorithmException {
     ChunkInfo info = writeChunkHelper(blockID);
     BlockData blockData = new BlockData(blockID);
+    blockData.setBlockCommitSequenceId((long) i);
     List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
     chunkList.add(info.getProtoBufMessage());
     blockData.setChunks(chunkList);
@@ -766,7 +759,7 @@ public class TestContainerPersistence {
     for (int i = 0; i < 10; i++) {
       BlockID blockID = new BlockID(testContainerID, i);
       expectedBlocks.add(blockID);
-      BlockData kd = writeBlockHelper(blockID);
+      BlockData kd = writeBlockHelper(blockID, i);
       blockManager.putBlock(container, kd);
     }