Explorar o código

HDDS-1767: ContainerStateMachine should have its own executors for executing applyTransaction calls (#1087)

Lokesh Jain %!s(int64=6) %!d(string=hai) anos
pai
achega
23e9bebe13

+ 22 - 6
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -22,11 +22,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
@@ -82,6 +84,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
 import java.io.FileOutputStream;
 import java.io.FileOutputStream;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
@@ -139,7 +142,6 @@ public class ContainerStateMachine extends BaseStateMachine {
   // keeps track of the containers created per pipeline
   // keeps track of the containers created per pipeline
   private final Set<Long> createContainerSet;
   private final Set<Long> createContainerSet;
   private ExecutorService[] executors;
   private ExecutorService[] executors;
-  private final int numExecutors;
   private final Map<Long, Long> applyTransactionCompletionMap;
   private final Map<Long, Long> applyTransactionCompletionMap;
   private final Cache<Long, ByteString> stateMachineDataCache;
   private final Cache<Long, ByteString> stateMachineDataCache;
   private final boolean isBlockTokenEnabled;
   private final boolean isBlockTokenEnabled;
@@ -152,15 +154,13 @@ public class ContainerStateMachine extends BaseStateMachine {
   @SuppressWarnings("parameternumber")
   @SuppressWarnings("parameternumber")
   public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
   public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
       ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
       ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
-      List<ExecutorService> executors, long expiryInterval,
-      boolean isBlockTokenEnabled, TokenVerifier tokenVerifier) {
+      long expiryInterval, boolean isBlockTokenEnabled,
+      TokenVerifier tokenVerifier, Configuration conf) {
     this.gid = gid;
     this.gid = gid;
     this.dispatcher = dispatcher;
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
     this.chunkExecutor = chunkExecutor;
     this.ratisServer = ratisServer;
     this.ratisServer = ratisServer;
     metrics = CSMMetrics.create(gid);
     metrics = CSMMetrics.create(gid);
-    this.numExecutors = executors.size();
-    this.executors = executors.toArray(new ExecutorService[numExecutors]);
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     applyTransactionCompletionMap = new ConcurrentHashMap<>();
     applyTransactionCompletionMap = new ConcurrentHashMap<>();
     stateMachineDataCache = CacheBuilder.newBuilder()
     stateMachineDataCache = CacheBuilder.newBuilder()
@@ -171,6 +171,19 @@ public class ContainerStateMachine extends BaseStateMachine {
     this.isBlockTokenEnabled = isBlockTokenEnabled;
     this.isBlockTokenEnabled = isBlockTokenEnabled;
     this.tokenVerifier = tokenVerifier;
     this.tokenVerifier = tokenVerifier;
     this.createContainerSet = new ConcurrentSkipListSet<>();
     this.createContainerSet = new ConcurrentSkipListSet<>();
+
+    final int numContainerOpExecutors = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT);
+    this.executors = new ExecutorService[numContainerOpExecutors];
+    for (int i = 0; i < numContainerOpExecutors; i++) {
+      final int index = i;
+      this.executors[index] = Executors.newSingleThreadExecutor(r -> {
+        Thread t = new Thread(r);
+        t.setName("RatisApplyTransactionExecutor " + index);
+        return t;
+      });
+    }
   }
   }
 
 
   @Override
   @Override
@@ -367,7 +380,7 @@ public class ContainerStateMachine extends BaseStateMachine {
 
 
   private ExecutorService getCommandExecutor(
   private ExecutorService getCommandExecutor(
       ContainerCommandRequestProto requestProto) {
       ContainerCommandRequestProto requestProto) {
-    int executorId = (int)(requestProto.getContainerID() % numExecutors);
+    int executorId = (int)(requestProto.getContainerID() % executors.length);
     return executors[executorId];
     return executors[executorId];
   }
   }
 
 
@@ -700,5 +713,8 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
     evictStateMachineCache();
     evictStateMachineCache();
+    for (ExecutorService executor : executors) {
+      executor.shutdown();
+    }
   }
   }
 }
 }

+ 20 - 35
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -81,8 +81,6 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -102,7 +100,6 @@ public final class XceiverServerRatis extends XceiverServer {
   private int port;
   private int port;
   private final RaftServer server;
   private final RaftServer server;
   private ThreadPoolExecutor chunkExecutor;
   private ThreadPoolExecutor chunkExecutor;
-  private final List<ExecutorService> executors;
   private final ContainerDispatcher dispatcher;
   private final ContainerDispatcher dispatcher;
   private ClientId clientId = ClientId.randomId();
   private ClientId clientId = ClientId.randomId();
   private final StateContext context;
   private final StateContext context;
@@ -111,16 +108,18 @@ public final class XceiverServerRatis extends XceiverServer {
   private final long cacheEntryExpiryInteval;
   private final long cacheEntryExpiryInteval;
   private boolean isStarted = false;
   private boolean isStarted = false;
   private DatanodeDetails datanodeDetails;
   private DatanodeDetails datanodeDetails;
+  private final Configuration conf;
 
 
   private XceiverServerRatis(DatanodeDetails dd, int port,
   private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext
       ContainerDispatcher dispatcher, Configuration conf, StateContext
       context, GrpcTlsConfig tlsConfig, CertificateClient caClient)
       context, GrpcTlsConfig tlsConfig, CertificateClient caClient)
       throws IOException {
       throws IOException {
     super(conf, caClient);
     super(conf, caClient);
+    this.conf = conf;
     Objects.requireNonNull(dd, "id == null");
     Objects.requireNonNull(dd, "id == null");
     datanodeDetails = dd;
     datanodeDetails = dd;
     this.port = port;
     this.port = port;
-    RaftProperties serverProperties = newRaftProperties(conf);
+    RaftProperties serverProperties = newRaftProperties();
     final int numWriteChunkThreads = conf.getInt(
     final int numWriteChunkThreads = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
@@ -129,23 +128,16 @@ public final class XceiverServerRatis extends XceiverServer {
             100, TimeUnit.SECONDS,
             100, TimeUnit.SECONDS,
             new ArrayBlockingQueue<>(1024),
             new ArrayBlockingQueue<>(1024),
             new ThreadPoolExecutor.CallerRunsPolicy());
             new ThreadPoolExecutor.CallerRunsPolicy());
-    final int numContainerOpExecutors = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT);
     this.context = context;
     this.context = context;
     this.replicationLevel =
     this.replicationLevel =
         conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
         conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
             OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
             OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
-    this.executors = new ArrayList<>();
     cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys.
     cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys.
             DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL,
             DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL,
         OzoneConfigKeys.
         OzoneConfigKeys.
             DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT,
             DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
         TimeUnit.MILLISECONDS);
     this.dispatcher = dispatcher;
     this.dispatcher = dispatcher;
-    for (int i = 0; i < numContainerOpExecutors; i++) {
-      executors.add(Executors.newSingleThreadExecutor());
-    }
 
 
     RaftServer.Builder builder = RaftServer.newBuilder()
     RaftServer.Builder builder = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(dd))
         .setServerId(RatisHelper.toRaftPeerId(dd))
@@ -159,22 +151,22 @@ public final class XceiverServerRatis extends XceiverServer {
 
 
   private ContainerStateMachine getStateMachine(RaftGroupId gid) {
   private ContainerStateMachine getStateMachine(RaftGroupId gid) {
     return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this,
     return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this,
-        Collections.unmodifiableList(executors), cacheEntryExpiryInteval,
-        getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier());
+        cacheEntryExpiryInteval, getSecurityConfig().isBlockTokenEnabled(),
+        getBlockTokenVerifier(), conf);
   }
   }
 
 
-  private RaftProperties newRaftProperties(Configuration conf) {
+  private RaftProperties newRaftProperties() {
     final RaftProperties properties = new RaftProperties();
     final RaftProperties properties = new RaftProperties();
 
 
     // Set rpc type
     // Set rpc type
-    final RpcType rpc = setRpcType(conf, properties);
+    final RpcType rpc = setRpcType(properties);
 
 
     // set raft segment size
     // set raft segment size
-    setRaftSegmentSize(conf, properties);
+    setRaftSegmentSize(properties);
 
 
     // set raft segment pre-allocated size
     // set raft segment pre-allocated size
     final int raftSegmentPreallocatedSize =
     final int raftSegmentPreallocatedSize =
-        setRaftSegmentPreallocatedSize(conf, properties);
+        setRaftSegmentPreallocatedSize(properties);
 
 
     // Set max write buffer size, which is the scm chunk size
     // Set max write buffer size, which is the scm chunk size
     final int maxChunkSize = setMaxWriteBuffer(properties);
     final int maxChunkSize = setMaxWriteBuffer(properties);
@@ -196,19 +188,19 @@ public final class XceiverServerRatis extends XceiverServer {
         .setSyncTimeout(properties, dataSyncTimeout);
         .setSyncTimeout(properties, dataSyncTimeout);
 
 
     // Set the server Request timeout
     // Set the server Request timeout
-    setServerRequestTimeout(conf, properties);
+    setServerRequestTimeout(properties);
 
 
     // set timeout for a retry cache entry
     // set timeout for a retry cache entry
-    setTimeoutForRetryCache(conf, properties);
+    setTimeoutForRetryCache(properties);
 
 
     // Set the ratis leader election timeout
     // Set the ratis leader election timeout
-    setRatisLeaderElectionTimeout(conf, properties);
+    setRatisLeaderElectionTimeout(properties);
 
 
     // Set the maximum cache segments
     // Set the maximum cache segments
     RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
     RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
 
 
     // set the node failure timeout
     // set the node failure timeout
-    setNodeFailureTimeout(conf, properties);
+    setNodeFailureTimeout(properties);
 
 
     // Set the ratis storage directory
     // Set the ratis storage directory
     String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
     String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
@@ -266,8 +258,7 @@ public final class XceiverServerRatis extends XceiverServer {
     return properties;
     return properties;
   }
   }
 
 
-  private void setNodeFailureTimeout(Configuration conf,
-                                     RaftProperties properties) {
+  private void setNodeFailureTimeout(RaftProperties properties) {
     TimeUnit timeUnit;
     TimeUnit timeUnit;
     long duration;
     long duration;
     timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
     timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
@@ -285,8 +276,7 @@ public final class XceiverServerRatis extends XceiverServer {
     nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
     nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
   }
   }
 
 
-  private void setRatisLeaderElectionTimeout(Configuration conf,
-                                             RaftProperties properties) {
+  private void setRatisLeaderElectionTimeout(RaftProperties properties) {
     long duration;
     long duration;
     TimeUnit leaderElectionMinTimeoutUnit =
     TimeUnit leaderElectionMinTimeoutUnit =
         OzoneConfigKeys.
         OzoneConfigKeys.
@@ -307,8 +297,7 @@ public final class XceiverServerRatis extends XceiverServer {
         TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
         TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
   }
   }
 
 
-  private void setTimeoutForRetryCache(Configuration conf,
-                                       RaftProperties properties) {
+  private void setTimeoutForRetryCache(RaftProperties properties) {
     TimeUnit timeUnit;
     TimeUnit timeUnit;
     long duration;
     long duration;
     timeUnit =
     timeUnit =
@@ -324,8 +313,7 @@ public final class XceiverServerRatis extends XceiverServer {
         .setExpiryTime(properties, retryCacheTimeout);
         .setExpiryTime(properties, retryCacheTimeout);
   }
   }
 
 
-  private void setServerRequestTimeout(Configuration conf,
-                                       RaftProperties properties) {
+  private void setServerRequestTimeout(RaftProperties properties) {
     TimeUnit timeUnit;
     TimeUnit timeUnit;
     long duration;
     long duration;
     timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
     timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
@@ -347,8 +335,7 @@ public final class XceiverServerRatis extends XceiverServer {
     return maxChunkSize;
     return maxChunkSize;
   }
   }
 
 
-  private int setRaftSegmentPreallocatedSize(Configuration conf,
-                                             RaftProperties properties) {
+  private int setRaftSegmentPreallocatedSize(RaftProperties properties) {
     final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
     final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
@@ -371,8 +358,7 @@ public final class XceiverServerRatis extends XceiverServer {
     return raftSegmentPreallocatedSize;
     return raftSegmentPreallocatedSize;
   }
   }
 
 
-  private void setRaftSegmentSize(Configuration conf,
-                                  RaftProperties properties) {
+  private void setRaftSegmentSize(RaftProperties properties) {
     final int raftSegmentSize = (int)conf.getStorageSize(
     final int raftSegmentSize = (int)conf.getStorageSize(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT,
@@ -381,7 +367,7 @@ public final class XceiverServerRatis extends XceiverServer {
         SizeInBytes.valueOf(raftSegmentSize));
         SizeInBytes.valueOf(raftSegmentSize));
   }
   }
 
 
-  private RpcType setRpcType(Configuration conf, RaftProperties properties) {
+  private RpcType setRpcType(RaftProperties properties) {
     final String rpcType = conf.get(
     final String rpcType = conf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@@ -447,7 +433,6 @@ public final class XceiverServerRatis extends XceiverServer {
         // some of the tasks would be executed using the executors.
         // some of the tasks would be executed using the executors.
         server.close();
         server.close();
         chunkExecutor.shutdown();
         chunkExecutor.shutdown();
-        executors.forEach(ExecutorService::shutdown);
         isStarted = false;
         isStarted = false;
       } catch (IOException e) {
       } catch (IOException e) {
         throw new RuntimeException(e);
         throw new RuntimeException(e);