소스 검색

HDDS-1816: ContainerStateMachine should limit number of pending apply transactions. Adds a config, uses snapshot threshold default value. (#1150)

Lokesh Jain 5 년 전
부모
커밋
d4ab9aea6f

+ 8 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -90,6 +90,14 @@ public final class ScmConfigKeys {
       "dfs.container.ratis.statemachinedata.sync.retries";
   public static final int
       DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT = -1;
+  public static final String
+      DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS =
+      "dfs.container.ratis.statemachine.max.pending.apply-transactions";
+  // The default value of maximum number of pending state machine apply
+  // transactions is kept same as default snapshot threshold.
+  public static final int
+      DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS_DEFAULT =
+      100000;
   public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS =
       "dfs.container.ratis.log.queue.num-elements";
   public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT =

+ 9 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -186,6 +186,15 @@
       taken.
     </description>
   </property>
+  <property>
+    <name>dfs.container.ratis.statemachine.max.pending.apply-transactions</name>
+    <value>10000</value>
+    <tag>OZONE, RATIS</tag>
+    <description>Maximum number of pending apply transactions in a data
+      pipeline. The default value is kept same as default snapshot threshold
+      dfs.ratis.snapshot.threshold.
+    </description>
+  </property>
   <property>
     <name>dfs.container.ratis.num.write.chunk.threads</name>
     <value>60</value>

+ 13 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -79,6 +80,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -146,6 +148,8 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final Cache<Long, ByteString> stateMachineDataCache;
   private final boolean isBlockTokenEnabled;
   private final TokenVerifier tokenVerifier;
+
+  private final Semaphore applyTransactionSemaphore;
   /**
    * CSM metrics.
    */
@@ -175,6 +179,12 @@ public class ContainerStateMachine extends BaseStateMachine {
     final int numContainerOpExecutors = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT);
+    int maxPendingApplyTransactions = conf.getInt(
+        ScmConfigKeys.
+            DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS,
+        ScmConfigKeys.
+            DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS_DEFAULT);
+    applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
     this.executors = new ExecutorService[numContainerOpExecutors];
     for (int i = 0; i < numContainerOpExecutors; i++) {
       final int index = i;
@@ -626,6 +636,7 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setLogIndex(index);
 
     try {
+      applyTransactionSemaphore.acquire();
       metrics.incNumApplyTransactionsOps();
       ContainerCommandRequestProto requestProto =
           getContainerCommandRequestProto(
@@ -663,9 +674,9 @@ public class ContainerStateMachine extends BaseStateMachine {
               requestProto.getWriteChunk().getChunkData().getLen());
         }
         updateLastApplied();
-      });
+      }).whenComplete((r, t) -> applyTransactionSemaphore.release());
       return future;
-    } catch (IOException e) {
+    } catch (IOException | InterruptedException e) {
       metrics.incNumApplyTransactionsFails();
       return completeExceptionally(e);
     }