|
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
import org.apache.zookeeper.server.Request;
|
|
|
import org.apache.zookeeper.server.RequestProcessor;
|
|
|
+import org.apache.zookeeper.server.ServerMetrics;
|
|
|
import org.apache.zookeeper.server.SyncRequestProcessor;
|
|
|
import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
|
|
|
import org.slf4j.Logger;
|
|
@@ -39,11 +40,22 @@ public class ProposalRequestProcessor implements RequestProcessor {
|
|
|
|
|
|
SyncRequestProcessor syncProcessor;
|
|
|
|
|
|
+ // If this property is set, requests from Learners won't be forwarded
|
|
|
+ // to the CommitProcessor in order to save resources
|
|
|
+ public static final String FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED =
|
|
|
+ "zookeeper.forward_learner_requests_to_commit_processor_disabled";
|
|
|
+ private final boolean forwardLearnerRequestsToCommitProcessorDisabled;
|
|
|
+
|
|
|
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
|
|
|
this.zks = zks;
|
|
|
this.nextProcessor = nextProcessor;
|
|
|
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
|
|
|
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
|
|
|
+
|
|
|
+ forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
|
|
|
+ FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
|
|
|
+ LOG.info("{} = {}", FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED,
|
|
|
+ forwardLearnerRequestsToCommitProcessorDisabled);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -70,7 +82,9 @@ public class ProposalRequestProcessor implements RequestProcessor {
|
|
|
if (request instanceof LearnerSyncRequest) {
|
|
|
zks.getLeader().processSync((LearnerSyncRequest) request);
|
|
|
} else {
|
|
|
- nextProcessor.processRequest(request);
|
|
|
+ if (shouldForwardToNextProcessor(request)) {
|
|
|
+ nextProcessor.processRequest(request);
|
|
|
+ }
|
|
|
if (request.getHdr() != null) {
|
|
|
// We need to sync and get consensus on any transactions
|
|
|
try {
|
|
@@ -89,4 +103,14 @@ public class ProposalRequestProcessor implements RequestProcessor {
|
|
|
syncProcessor.shutdown();
|
|
|
}
|
|
|
|
|
|
+ private boolean shouldForwardToNextProcessor(Request request) {
|
|
|
+ if (!forwardLearnerRequestsToCommitProcessorDisabled) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if (request.getOwner() instanceof LearnerHandler) {
|
|
|
+ ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.add(1);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}
|