Przeglądaj źródła

HDDS-1753. Datanode unable to find chunk while replication data using ratis. (#1318)

bshashikant 5 lat temu
rodzic
commit
5d31a4eff7
17 zmienionych plików z 564 dodań i 111 usunięć
  1. 6 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
  2. 5 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
  3. 6 15
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
  4. 6 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
  5. 5 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
  6. 18 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
  7. 26 22
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  8. 2 26
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
  9. 2 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
  10. 78 14
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
  11. 18 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  12. 5 4
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java
  13. 291 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
  14. 27 6
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  15. 27 7
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
  16. 30 6
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java
  17. 12 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java

+ 6 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java

@@ -242,4 +242,10 @@ public interface RatisHelper {
         .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
     return retryPolicy;
   }
+
+  static Long getMinReplicatedIndex(
+      Collection<RaftProtos.CommitInfoProto> commitInfos) {
+    return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex)
+        .min(Long::compareTo).orElse(null);
+  }
 }

+ 5 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java

@@ -544,4 +544,9 @@ public abstract class ContainerData {
    * @return Protocol Buffer Message
    */
   public abstract ContainerProtos.ContainerDataProto getProtoBufMessage();
+
+  /**
+   * Returns the blockCommitSequenceId.
+   */
+  public abstract long getBlockCommitSequenceId();
 }

+ 6 - 15
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java

@@ -27,16 +27,15 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common
-    .interfaces.ContainerDeletionChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
+import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -165,6 +164,10 @@ public class ContainerSet {
     return ImmutableMap.copyOf(containerMap);
   }
 
+  public Map<Long, Container> getContainerMap() {
+    return Collections.unmodifiableMap(containerMap);
+  }
+
   /**
    * A simple interface for container Iterations.
    * <p>
@@ -232,18 +235,6 @@ public class ContainerSet {
     return crBuilder.build();
   }
 
-  public List<ContainerData> chooseContainerForBlockDeletion(int count,
-      ContainerDeletionChoosingPolicy deletionPolicy)
-      throws StorageContainerException {
-    Map<Long, ContainerData> containerDataMap = containerMap.entrySet().stream()
-        .filter(e -> deletionPolicy.isValidContainerType(
-            e.getValue().getContainerType()))
-        .collect(Collectors.toMap(Map.Entry::getKey,
-            e -> e.getValue().getContainerData()));
-    return deletionPolicy
-        .chooseContainerForBlockDeletion(count, containerDataMap);
-  }
-
   public Set<Long> getMissingContainerSet() {
     return missingContainerSet;
   }

+ 6 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java

@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
@@ -81,6 +82,11 @@ public final class CommandDispatcher {
     return handlerMap.get(Type.closeContainerCommand);
   }
 
+  @VisibleForTesting
+  public CommandHandler getDeleteBlocksCommandHandler() {
+    return handlerMap.get(Type.deleteBlocksCommand);
+  }
+
   /**
    * Dispatch the command to the correct handler.
    *

+ 5 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java

@@ -171,6 +171,11 @@ public class CSMMetrics {
     return numReadStateMachineMissCount.value();
   }
 
+  @VisibleForTesting
+  public long getNumReadStateMachineOps() {
+    return numReadStateMachineOps.value();
+  }
+
   @VisibleForTesting
   public long getNumBytesWrittenCount() {
     return numBytesWrittenCount.value();

+ 18 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -518,7 +519,8 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   private ByteString readStateMachineData(
-      ContainerCommandRequestProto requestProto, long term, long index) {
+      ContainerCommandRequestProto requestProto, long term, long index)
+      throws IOException {
     // the stateMachine data is not present in the stateMachine cache,
     // increment the stateMachine cache miss count
     metrics.incNumReadStateMachineMissCount();
@@ -532,18 +534,24 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setChunkData(chunkInfo);
     ContainerCommandRequestProto dataContainerCommandProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
-            .setCmdType(Type.ReadChunk)
-            .setReadChunk(readChunkRequestProto)
+            .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
             .build();
     DispatcherContext context =
-        new DispatcherContext.Builder()
-            .setTerm(term)
-            .setLogIndex(index)
-            .setReadFromTmpFile(true)
-            .build();
+        new DispatcherContext.Builder().setTerm(term).setLogIndex(index)
+            .setReadFromTmpFile(true).build();
     // read the chunk
     ContainerCommandResponseProto response =
         dispatchCommand(dataContainerCommandProto, context);
+    if (response.getResult() != ContainerProtos.Result.SUCCESS) {
+      StorageContainerException sce =
+          new StorageContainerException(response.getMessage(),
+              response.getResult());
+      LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
+              + "{} Container Result: {}", gid, response.getCmdType(), index,
+          response.getMessage(), response.getResult());
+      throw sce;
+    }
+
     ReadChunkResponseProto responseProto = response.getReadChunk();
 
     ByteString data = responseProto.getData();
@@ -746,7 +754,8 @@ public class ContainerStateMachine extends BaseStateMachine {
     return future;
   }
 
-  private void evictStateMachineCache() {
+  @VisibleForTesting
+  public void evictStateMachineCache() {
     stateMachineDataCache.invalidateAll();
     stateMachineDataCache.cleanUp();
   }

+ 26 - 22
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -22,15 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@@ -50,14 +46,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.NotLeaderException;
-import org.apache.ratis.protocol.StateMachineException;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
@@ -74,11 +63,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.ArrayList;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -139,10 +128,10 @@ public final class XceiverServerRatis extends XceiverServer {
         TimeUnit.MILLISECONDS);
     this.dispatcher = dispatcher;
 
-    RaftServer.Builder builder = RaftServer.newBuilder()
-        .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setProperties(serverProperties)
-        .setStateMachineRegistry(this::getStateMachine);
+    RaftServer.Builder builder =
+        RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd))
+            .setProperties(serverProperties)
+            .setStateMachineRegistry(this::getStateMachine);
     if (tlsConfig != null) {
       builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
     }
@@ -507,6 +496,13 @@ public final class XceiverServerRatis extends XceiverServer {
         null);
   }
 
+  private GroupInfoRequest createGroupInfoRequest(
+      HddsProtos.PipelineID pipelineID) {
+    return new GroupInfoRequest(clientId, server.getId(),
+        RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()),
+        nextCallId());
+  }
+
   private void handlePipelineFailure(RaftGroupId groupId,
       RoleInfoProto roleInfoProto) {
     String msg;
@@ -654,4 +650,12 @@ public final class XceiverServerRatis extends XceiverServer {
     triggerPipelineClose(groupId, msg,
         ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true);
   }
+
+  public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
+    Long minIndex;
+    GroupInfoReply reply = getServer()
+        .getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf()));
+    minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos());
+    return minIndex == null ? -1 : minIndex.longValue();
+  }
 }

+ 2 - 26
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
@@ -76,8 +75,6 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
 import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
-import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
-    .BlockDeletingService;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -86,15 +83,8 @@ import com.google.common.base.Preconditions;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+    Result.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,7 +99,6 @@ public class KeyValueHandler extends Handler {
   private final ContainerType containerType;
   private final BlockManager blockManager;
   private final ChunkManager chunkManager;
-  private final BlockDeletingService blockDeletingService;
   private final VolumeChoosingPolicy volumeChoosingPolicy;
   private final long maxContainerSize;
 
@@ -126,18 +115,6 @@ public class KeyValueHandler extends Handler {
         conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY,
             OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT);
     chunkManager = new ChunkManagerImpl(doSyncWrite);
-    long svcInterval = config
-        .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
-            OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    long serviceTimeout = config
-        .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
-            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    this.blockDeletingService =
-        new BlockDeletingService(containerSet, svcInterval, serviceTimeout,
-            TimeUnit.MILLISECONDS, config);
-    blockDeletingService.start();
     volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass(
         HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
             .class, VolumeChoosingPolicy.class), conf);
@@ -160,7 +137,6 @@ public class KeyValueHandler extends Handler {
 
   @Override
   public void stop() {
-    blockDeletingService.shutdown();
   }
 
   @Override

+ 2 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java

@@ -207,7 +207,8 @@ public class ChunkManagerImpl implements ChunkManager {
 
         // In case the chunk file does not exist but tmp chunk file exist,
         // read from tmp chunk file if readFromTmpFile is set to true
-        if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) {
+        if (!chunkFile.exists() && dispatcherContext != null
+            && dispatcherContext.isReadFromTmpFile()) {
           chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
         }
         data = ChunkUtils.readData(chunkFile, info, volumeIOStats);

+ 78 - 14
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java

@@ -20,12 +20,14 @@ package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
@@ -48,10 +50,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
@@ -67,12 +72,12 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
  * of deleting staled ozone blocks.
  */
 // TODO: Fix BlockDeletingService to work with new StorageLayer
-public class BlockDeletingService extends BackgroundService{
+public class BlockDeletingService extends BackgroundService {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockDeletingService.class);
 
-  private ContainerSet containerSet;
+  private OzoneContainer ozoneContainer;
   private ContainerDeletionChoosingPolicy containerDeletionPolicy;
   private final Configuration conf;
 
@@ -88,22 +93,23 @@ public class BlockDeletingService extends BackgroundService{
   // Core pool size for container tasks
   private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
 
-  public BlockDeletingService(ContainerSet containerSet, long serviceInterval,
-      long serviceTimeout, TimeUnit timeUnit, Configuration conf) {
+  public BlockDeletingService(OzoneContainer ozoneContainer,
+      long serviceInterval, long serviceTimeout, TimeUnit timeUnit,
+      Configuration conf) {
     super("BlockDeletingService", serviceInterval, timeUnit,
         BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
-    this.containerSet = containerSet;
+    this.ozoneContainer = ozoneContainer;
     containerDeletionPolicy = ReflectionUtils.newInstance(conf.getClass(
         ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY,
         TopNOrderedContainerDeletionChoosingPolicy.class,
         ContainerDeletionChoosingPolicy.class), conf);
     this.conf = conf;
-    this.blockLimitPerTask = conf.getInt(
-        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
-        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
-    this.containerLimitPerInterval = conf.getInt(
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+    this.blockLimitPerTask =
+        conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+            OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
+    this.containerLimitPerInterval =
+        conf.getInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+            OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
   }
 
 
@@ -117,8 +123,8 @@ public class BlockDeletingService extends BackgroundService{
       // We must ensure there is no empty container in this result.
       // The chosen result depends on what container deletion policy is
       // configured.
-      containers = containerSet.chooseContainerForBlockDeletion(
-          containerLimitPerInterval, containerDeletionPolicy);
+      containers = chooseContainerForBlockDeletion(containerLimitPerInterval,
+              containerDeletionPolicy);
       if (containers.size() > 0) {
         LOG.info("Plan to choose {} containers for block deletion, "
                 + "actually returns {} valid containers.",
@@ -143,6 +149,64 @@ public class BlockDeletingService extends BackgroundService{
     return queue;
   }
 
+  public List<ContainerData> chooseContainerForBlockDeletion(int count,
+      ContainerDeletionChoosingPolicy deletionPolicy)
+      throws StorageContainerException {
+    Map<Long, ContainerData> containerDataMap =
+        ozoneContainer.getContainerSet().getContainerMap().entrySet().stream()
+            .filter(e -> isDeletionAllowed(e.getValue().getContainerData(),
+                deletionPolicy)).collect(Collectors
+            .toMap(Map.Entry::getKey, e -> e.getValue().getContainerData()));
+    return deletionPolicy
+        .chooseContainerForBlockDeletion(count, containerDataMap);
+  }
+
+  private boolean isDeletionAllowed(ContainerData containerData,
+      ContainerDeletionChoosingPolicy deletionPolicy) {
+    if (!deletionPolicy
+        .isValidContainerType(containerData.getContainerType())) {
+      return false;
+    } else if (!containerData.isClosed()) {
+      return false;
+    } else {
+      if (ozoneContainer.getWriteChannel() instanceof XceiverServerRatis) {
+        XceiverServerRatis ratisServer =
+            (XceiverServerRatis) ozoneContainer.getWriteChannel();
+        PipelineID pipelineID = PipelineID
+            .valueOf(UUID.fromString(containerData.getOriginPipelineId()));
+        // in case te ratis group does not exist, just mark it for deletion.
+        if (!ratisServer.isExist(pipelineID.getProtobuf())) {
+          return true;
+        }
+        try {
+          long minReplicatedIndex =
+              ratisServer.getMinReplicatedIndex(pipelineID);
+          long containerBCSID = containerData.getBlockCommitSequenceId();
+          if (minReplicatedIndex >= 0 && minReplicatedIndex < containerBCSID) {
+            LOG.warn("Close Container log Index {} is not replicated across all"
+                    + "the servers in the pipeline {} as the min replicated "
+                    + "index is {}. Deletion is not allowed in this container "
+                    + "yet.", containerBCSID,
+                containerData.getOriginPipelineId(), minReplicatedIndex);
+            return false;
+          } else {
+            return true;
+          }
+        } catch (IOException ioe) {
+          // in case of any exception check again whether the pipeline exist
+          // and in case the pipeline got destroyed, just mark it for deletion
+          if (!ratisServer.isExist(pipelineID.getProtobuf())) {
+            return true;
+          } else {
+            LOG.info(ioe.getMessage());
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+  }
+
   private static class ContainerBackgroundTaskResult
       implements BackgroundTaskResult {
     private List<String> deletedBlockIds;

+ 18 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 
+import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import org.apache.hadoop.ozone.container.replication
     .OnDemandContainerReplicationSource;
@@ -53,6 +54,9 @@ import java.io.*;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
 
 /**
  * Ozone main class sets up the network servers and initializes the container
@@ -72,6 +76,7 @@ public class OzoneContainer {
   private final XceiverServerSpi readChannel;
   private final ContainerController controller;
   private ContainerScrubber scrubber;
+  private final BlockDeletingService blockDeletingService;
 
   /**
    * Construct OzoneContainer object.
@@ -111,7 +116,17 @@ public class OzoneContainer {
     this.readChannel = new XceiverServerGrpc(
         datanodeDetails, config, hddsDispatcher, certClient,
         createReplicationService());
-
+    long svcInterval = config
+        .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+            OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    long serviceTimeout = config
+        .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    this.blockDeletingService =
+        new BlockDeletingService(this, svcInterval, serviceTimeout,
+            TimeUnit.MILLISECONDS, config);
   }
 
   private GrpcReplicationService createReplicationService() {
@@ -189,6 +204,7 @@ public class OzoneContainer {
     readChannel.start();
     hddsDispatcher.init();
     hddsDispatcher.setScmId(scmId);
+    blockDeletingService.start();
   }
 
   /**
@@ -203,6 +219,7 @@ public class OzoneContainer {
     this.handlers.values().forEach(Handler::stop);
     hddsDispatcher.shutdown();
     volumeSet.shutdown();
+    blockDeletingService.shutdown();
   }
 
 

+ 5 - 4
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java

@@ -19,9 +19,9 @@ package org.apache.hadoop.ozone.container.testutils;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
     .BlockDeletingService;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
@@ -42,9 +42,9 @@ public class BlockDeletingServiceTestImpl
   private Thread testingThread;
   private AtomicInteger numOfProcessed = new AtomicInteger(0);
 
-  public BlockDeletingServiceTestImpl(ContainerSet containerSet,
+  public BlockDeletingServiceTestImpl(OzoneContainer container,
       int serviceInterval, Configuration conf) {
-    super(containerSet, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS,
+    super(container, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS,
         TimeUnit.MILLISECONDS, conf);
   }
 
@@ -67,7 +67,8 @@ public class BlockDeletingServiceTestImpl
   }
 
   // Override the implementation to start a single on-call control thread.
-  @Override public void start() {
+  @Override
+  public void start() {
     PeriodicalTask svc = new PeriodicalTask();
     // In test mode, relies on a latch countdown to runDeletingTasks tasks.
     Runnable r = () -> {

+ 291 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java

@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests delete key operation with a slow follower in the datanode
+ * pipeline.
+ */
+public class TestDeleteWithSlowFollower {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static String volumeName;
+  private static String bucketName;
+  private static String path;
+  private static XceiverClientManager xceiverClientManager;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    path = GenericTestUtils
+        .getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
+    File baseDir = new File(path);
+    baseDir.mkdirs();
+
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    // Make the stale, dead and server failure timeout higher so that a dead
+    // node is not detecte at SCM as well as the pipeline close action
+    // never gets initiated early at Datanode in the test.
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1000, TimeUnit.SECONDS);
+    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, 2000,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1000,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        1000, TimeUnit.SECONDS);
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+        1, TimeUnit.SECONDS);
+
+    conf.setQuietMode(false);
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+            .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    xceiverClientManager = new XceiverClientManager(conf);
+    volumeName = "testcontainerstatemachinefailures";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * The test simulates a slow follower by first writing key thereby creating a
+   * a container on 3 dns of the cluster. Then, a dn is shutdown and a close
+   * container cmd gets issued so that in the leader and the alive follower,
+   * container gets closed. And then, key is deleted and
+   * the node is started up again so that it
+   * rejoins the ring and starts applying the transaction from where it left
+   * by fetching the entries from the leader. Until and unless this follower
+   * catches up and its replica gets closed,
+   * the data is not deleted from any of the nodes which have the
+   * closed replica.
+   */
+  @Test
+  public void testDeleteKeyWithSlowFollower() throws Exception {
+
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis", 0, ReplicationType.RATIS,
+                ReplicationFactor.THREE, new HashMap<>());
+    byte[] testData = "ratis".getBytes();
+    // First write and flush creates a container in the datanode
+    key.write(testData);
+    key.flush();
+
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    long containerID = omKeyLocationInfo.getContainerID();
+    // A container is created on the datanode. Now figure out a follower node to
+    // kill/slow down.
+    HddsDatanodeService follower = null;
+    HddsDatanodeService leader = null;
+
+    List<Pipeline> pipelineList =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipelines(HddsProtos.ReplicationType.RATIS,
+                HddsProtos.ReplicationFactor.THREE);
+    Assert.assertTrue(pipelineList.size() == 1);
+    Pipeline pipeline = pipelineList.get(0);
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
+        follower = dn;
+      } else if (ContainerTestHelper.isRatisLeader(dn, pipeline)) {
+        leader = dn;
+      }
+    }
+    Assert.assertNotNull(follower);
+    Assert.assertNotNull(leader);
+    // shutdown the slow follower
+    cluster.shutdownHddsDatanode(follower.getDatanodeDetails());
+    key.write(testData);
+    key.close();
+
+    // now move the container to the closed on the datanode.
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    ContainerProtos.ContainerCommandRequestProto.Builder request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder();
+    request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
+    request.setCmdType(ContainerProtos.Type.CloseContainer);
+    request.setContainerID(containerID);
+    request.setCloseContainer(
+        ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+    xceiverClient.sendCommand(request.build());
+
+    ContainerStateMachine stateMachine =
+        (ContainerStateMachine) ContainerTestHelper
+            .getStateMachine(leader, pipeline);
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName("ratis")
+        .build();
+    OmKeyInfo info = cluster.getOzoneManager().lookupKey(keyArgs);
+    BlockID blockID =
+        info.getKeyLocationVersions().get(0).getLocationList().get(0)
+            .getBlockID();
+    OzoneContainer ozoneContainer;
+    final DatanodeStateMachine dnStateMachine =
+        leader.getDatanodeStateMachine();
+    ozoneContainer = dnStateMachine.getContainer();
+    KeyValueHandler keyValueHandler =
+        (KeyValueHandler) ozoneContainer.getDispatcher()
+            .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+    Container container =
+        ozoneContainer.getContainerSet().getContainer(blockID.getContainerID());
+    KeyValueContainerData containerData =
+        ((KeyValueContainerData) container.getContainerData());
+    long delTrxId = containerData.getDeleteTransactionId();
+    long numPendingDeletionBlocks = containerData.getNumPendingDeletionBlocks();
+    BlockData blockData =
+        keyValueHandler.getBlockManager().getBlock(container, blockID);
+    cluster.getOzoneManager().deleteKey(keyArgs);
+    GenericTestUtils.waitFor(() -> {
+      return
+          dnStateMachine.getCommandDispatcher().getDeleteBlocksCommandHandler()
+              .getInvocationCount() >= 1;
+    }, 500, 100000);
+    Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId);
+    Assert.assertTrue(
+        containerData.getNumPendingDeletionBlocks() > numPendingDeletionBlocks);
+    // make sure the chunk was never deleted on the leader even though
+    // deleteBlock handler is invoked
+    try {
+      for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) {
+        keyValueHandler.getChunkManager()
+            .readChunk(container, blockID, ChunkInfo.getFromProtoBuf(chunkInfo),
+                null);
+      }
+    } catch (IOException ioe) {
+      Assert.fail("Exception should not be thrown.");
+
+    }
+    long numReadStateMachineOps =
+        stateMachine.getMetrics().getNumReadStateMachineOps();
+    Assert.assertTrue(
+        stateMachine.getMetrics().getNumReadStateMachineFails() == 0);
+    stateMachine.evictStateMachineCache();
+    cluster.restartHddsDatanode(follower.getDatanodeDetails(), false);
+    // wait for the raft server to come up and join the ratis ring
+    Thread.sleep(10000);
+
+    // Make sure the readStateMachine call got triggered after the follower
+    // caught up
+    Assert.assertTrue(stateMachine.getMetrics().getNumReadStateMachineOps()
+        > numReadStateMachineOps);
+    Assert.assertTrue(
+        stateMachine.getMetrics().getNumReadStateMachineFails() == 0);
+    // wait for the chunk to get deleted now
+    Thread.sleep(10000);
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      keyValueHandler =
+          (KeyValueHandler) dn.getDatanodeStateMachine().getContainer()
+              .getDispatcher()
+              .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+      // make sure the chunk is now deleted on the all dns
+      try {
+        for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) {
+          keyValueHandler.getChunkManager().readChunk(container, blockID,
+              ChunkInfo.getFromProtoBuf(chunkInfo), null);
+        }
+        Assert.fail("Expected exception is not thrown");
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe instanceof StorageContainerException);
+        Assert.assertTrue(((StorageContainerException) ioe).getResult()
+            == ContainerProtos.Result.UNABLE_TO_FIND_CHUNK);
+      }
+    }
+
+  }
+}

+ 27 - 6
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
@@ -873,13 +874,33 @@ public final class ContainerTestHelper {
 
   public static StateMachine getStateMachine(MiniOzoneCluster cluster)
       throws Exception {
-    XceiverServerSpi server =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
-            getContainer().getWriteChannel();
+    return getStateMachine(cluster.getHddsDatanodes().get(0), null);
+  }
+
+  private static RaftServerImpl getRaftServerImpl(HddsDatanodeService dn,
+      Pipeline pipeline) throws Exception {
+    XceiverServerSpi server = dn.getDatanodeStateMachine().
+        getContainer().getWriteChannel();
     RaftServerProxy proxy =
         (RaftServerProxy) (((XceiverServerRatis) server).getServer());
-    RaftGroupId groupId = proxy.getGroupIds().iterator().next();
-    RaftServerImpl impl = proxy.getImpl(groupId);
-    return impl.getStateMachine();
+    RaftGroupId groupId =
+        pipeline == null ? proxy.getGroupIds().iterator().next() :
+            RatisHelper.newRaftGroup(pipeline).getGroupId();
+    return proxy.getImpl(groupId);
+  }
+
+  public static StateMachine getStateMachine(HddsDatanodeService dn,
+      Pipeline pipeline) throws Exception {
+    return getRaftServerImpl(dn, pipeline).getStateMachine();
+  }
+
+  public static boolean isRatisLeader(HddsDatanodeService dn, Pipeline pipeline)
+      throws Exception {
+    return getRaftServerImpl(dn, pipeline).isLeader();
+  }
+
+  public static boolean isRatisFollower(HddsDatanodeService dn,
+      Pipeline pipeline) throws Exception {
+    return getRaftServerImpl(dn, pipeline).isFollower();
   }
 }

+ 27 - 7
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
 import org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
@@ -50,6 +51,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.BeforeClass;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,6 +115,7 @@ public class TestBlockDeletingService {
       KeyValueContainerData data = new KeyValueContainerData(containerID,
           ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
+      data.closeContainer();
       Container container = new KeyValueContainer(data, conf);
       container.create(new VolumeSet(scmId, clusterID, conf),
           new RoundRobinVolumeChoosingPolicy(), scmId);
@@ -196,7 +199,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, conf, 1, 3, 1);
 
     BlockDeletingServiceTestImpl svc =
-        new BlockDeletingServiceTestImpl(containerSet, 1000, conf);
+        getBlockDeletinService(containerSet, conf, 1000);
     svc.start();
     GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000);
 
@@ -256,7 +259,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, conf, 1, 100, 1);
 
     BlockDeletingServiceTestImpl service =
-        new BlockDeletingServiceTestImpl(containerSet, 1000, conf);
+        getBlockDeletinService(containerSet, conf, 1000);
     service.start();
     GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
 
@@ -285,7 +288,12 @@ public class TestBlockDeletingService {
 
     // set timeout value as 1ns to trigger timeout behavior
     long timeout  = 1;
-    BlockDeletingService svc = new BlockDeletingService(containerSet,
+    OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class);
+    Mockito.when(ozoneContainer.getContainerSet())
+        .thenReturn(containerSet);
+    Mockito.when(ozoneContainer.getWriteChannel())
+        .thenReturn(null);
+    BlockDeletingService svc = new BlockDeletingService(ozoneContainer,
         TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.NANOSECONDS,
         conf);
     svc.start();
@@ -307,7 +315,7 @@ public class TestBlockDeletingService {
     // test for normal case that doesn't have timeout limitation
     timeout  = 0;
     createToDeleteBlocks(containerSet, conf, 1, 3, 1);
-    svc = new BlockDeletingService(containerSet,
+    svc = new BlockDeletingService(ozoneContainer,
         TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.MILLISECONDS,
         conf);
     svc.start();
@@ -338,6 +346,16 @@ public class TestBlockDeletingService {
     svc.shutdown();
   }
 
+  private BlockDeletingServiceTestImpl getBlockDeletinService(
+      ContainerSet containerSet, Configuration conf, int timeout) {
+    OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class);
+    Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null);
+    BlockDeletingServiceTestImpl service =
+        new BlockDeletingServiceTestImpl(ozoneContainer, timeout, conf);
+    return service;
+  }
+
   @Test(timeout = 30000)
   public void testContainerThrottle() throws Exception {
     // Properties :
@@ -360,7 +378,7 @@ public class TestBlockDeletingService {
     createToDeleteBlocks(containerSet, conf, 2, 1, 10);
 
     BlockDeletingServiceTestImpl service =
-        new BlockDeletingServiceTestImpl(containerSet, 1000, conf);
+        getBlockDeletinService(containerSet, conf, 1000);
     service.start();
 
     try {
@@ -410,9 +428,11 @@ public class TestBlockDeletingService {
 
     // Make sure chunks are created
     Assert.assertEquals(15, getNumberOfChunksInContainers(containerSet));
-
+    OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class);
+    Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null);
     BlockDeletingServiceTestImpl service =
-        new BlockDeletingServiceTestImpl(containerSet, 1000, conf);
+        getBlockDeletinService(containerSet, conf, 1000);
     service.start();
 
     try {

+ 30 - 6
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java

@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomUtils;
@@ -35,18 +36,26 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * The class for testing container deletion choosing policy.
  */
 public class TestContainerDeletionChoosingPolicy {
   private static String path;
-  private  ContainerSet containerSet;
+  private OzoneContainer ozoneContainer;
+  private ContainerSet containerSet;
   private OzoneConfiguration conf;
+  private BlockDeletingService blockDeletingService;
+  // the service timeout
+  private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0;
+  private static final int SERVICE_INTERVAL_IN_MILLISECONDS = 1000;
 
   @Before
   public void init() throws Throwable {
@@ -75,23 +84,25 @@ public class TestContainerDeletionChoosingPolicy {
       KeyValueContainerData data = new KeyValueContainerData(i,
           ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
           UUID.randomUUID().toString());
+      data.closeContainer();
       KeyValueContainer container = new KeyValueContainer(data, conf);
       containerSet.addContainer(container);
       Assert.assertTrue(
           containerSet.getContainerMapCopy()
               .containsKey(data.getContainerID()));
     }
+    blockDeletingService = getBlockDeletingService();
 
     ContainerDeletionChoosingPolicy deletionPolicy =
         new RandomContainerDeletionChoosingPolicy();
     List<ContainerData> result0 =
-        containerSet.chooseContainerForBlockDeletion(5, deletionPolicy);
+        blockDeletingService.chooseContainerForBlockDeletion(5, deletionPolicy);
     Assert.assertEquals(5, result0.size());
 
     // test random choosing
-    List<ContainerData> result1 = containerSet
+    List<ContainerData> result1 = blockDeletingService
         .chooseContainerForBlockDeletion(numContainers, deletionPolicy);
-    List<ContainerData> result2 = containerSet
+    List<ContainerData> result2 = blockDeletingService
         .chooseContainerForBlockDeletion(numContainers, deletionPolicy);
 
     boolean hasShuffled = false;
@@ -137,18 +148,20 @@ public class TestContainerDeletionChoosingPolicy {
         name2Count.put(containerId, deletionBlocks);
       }
       KeyValueContainer container = new KeyValueContainer(data, conf);
+      data.closeContainer();
       containerSet.addContainer(container);
       Assert.assertTrue(
           containerSet.getContainerMapCopy().containsKey(containerId));
     }
 
+    blockDeletingService = getBlockDeletingService();
     ContainerDeletionChoosingPolicy deletionPolicy =
         new TopNOrderedContainerDeletionChoosingPolicy();
     List<ContainerData> result0 =
-        containerSet.chooseContainerForBlockDeletion(5, deletionPolicy);
+        blockDeletingService.chooseContainerForBlockDeletion(5, deletionPolicy);
     Assert.assertEquals(5, result0.size());
 
-    List<ContainerData> result1 = containerSet
+    List<ContainerData> result1 = blockDeletingService
         .chooseContainerForBlockDeletion(numContainers + 1, deletionPolicy);
     // the empty deletion blocks container should not be chosen
     Assert.assertEquals(numContainers, result1.size());
@@ -164,4 +177,15 @@ public class TestContainerDeletionChoosingPolicy {
     // ensure all the container data are compared
     Assert.assertEquals(0, name2Count.size());
   }
+
+  private BlockDeletingService getBlockDeletingService() {
+    ozoneContainer = Mockito.mock(OzoneContainer.class);
+    Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null);
+    blockDeletingService = new BlockDeletingService(ozoneContainer,
+        SERVICE_INTERVAL_IN_MILLISECONDS, SERVICE_TIMEOUT_IN_MILLISECONDS,
+        TimeUnit.MILLISECONDS, conf);
+    return blockDeletingService;
+
+  }
 }

+ 12 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java

@@ -22,6 +22,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -45,6 +46,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
 import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
@@ -172,6 +174,16 @@ public class TestBlockDeletion {
     OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm);
 
     waitForDatanodeCommandRetry();
+
+    // make sure the containers are closed on the dn
+    omKeyLocationInfoGroupList.forEach((group) -> {
+      List<OmKeyLocationInfo> locationInfo = group.getLocationList();
+      locationInfo.forEach(
+          (info) -> cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+              .getContainer().getContainerSet()
+              .getContainer(info.getContainerID()).getContainerData()
+              .setState(ContainerProtos.ContainerDataProto.State.CLOSED));
+    });
     waitForDatanodeBlockDeletionStart();
     // The blocks should be deleted in the DN.
     verifyBlocksDeleted(omKeyLocationInfoGroupList);