|
@@ -77,30 +77,30 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
|
|
|
/**
|
|
|
* Requests that we are holding until the commit comes in.
|
|
|
*/
|
|
|
- private final LinkedBlockingQueue<Request> queuedRequests =
|
|
|
+ protected final LinkedBlockingQueue<Request> queuedRequests =
|
|
|
new LinkedBlockingQueue<Request>();
|
|
|
|
|
|
/**
|
|
|
* Requests that have been committed.
|
|
|
*/
|
|
|
- private final LinkedBlockingQueue<Request> committedRequests =
|
|
|
+ protected final LinkedBlockingQueue<Request> committedRequests =
|
|
|
new LinkedBlockingQueue<Request>();
|
|
|
|
|
|
/** Request for which we are currently awaiting a commit */
|
|
|
- private final AtomicReference<Request> nextPending =
|
|
|
+ protected final AtomicReference<Request> nextPending =
|
|
|
new AtomicReference<Request>();
|
|
|
/** Request currently being committed (ie, sent off to next processor) */
|
|
|
private final AtomicReference<Request> currentlyCommitting =
|
|
|
new AtomicReference<Request>();
|
|
|
|
|
|
/** The number of requests currently being processed */
|
|
|
- private AtomicInteger numRequestsProcessing = new AtomicInteger(0);
|
|
|
+ protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
|
|
|
|
|
|
RequestProcessor nextProcessor;
|
|
|
|
|
|
- private volatile boolean stopped = true;
|
|
|
+ protected volatile boolean stopped = true;
|
|
|
private long workerShutdownTimeoutMS;
|
|
|
- private WorkerService workerPool;
|
|
|
+ protected WorkerService workerPool;
|
|
|
|
|
|
/**
|
|
|
* This flag indicates whether we need to wait for a response to come back from the
|
|
@@ -181,37 +181,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
|
|
|
* came in for the pending request. We can only commit a
|
|
|
* request when there is no other request being processed.
|
|
|
*/
|
|
|
- if (!stopped && !isProcessingRequest() &&
|
|
|
- (request = committedRequests.poll()) != null) {
|
|
|
- /*
|
|
|
- * We match with nextPending so that we can move to the
|
|
|
- * next request when it is committed. We also want to
|
|
|
- * use nextPending because it has the cnxn member set
|
|
|
- * properly.
|
|
|
- */
|
|
|
- Request pending = nextPending.get();
|
|
|
- if (pending != null &&
|
|
|
- pending.sessionId == request.sessionId &&
|
|
|
- pending.cxid == request.cxid) {
|
|
|
- // we want to send our version of the request.
|
|
|
- // the pointer to the connection in the request
|
|
|
- pending.setHdr(request.getHdr());
|
|
|
- pending.setTxn(request.getTxn());
|
|
|
- pending.zxid = request.zxid;
|
|
|
- // Set currentlyCommitting so we will block until this
|
|
|
- // completes. Cleared by CommitWorkRequest after
|
|
|
- // nextProcessor returns.
|
|
|
- currentlyCommitting.set(pending);
|
|
|
- nextPending.set(null);
|
|
|
- sendToNextProcessor(pending);
|
|
|
- } else {
|
|
|
- // this request came from someone else so just
|
|
|
- // send the commit packet
|
|
|
- currentlyCommitting.set(request);
|
|
|
- sendToNextProcessor(request);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ processCommitted();
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Interrupted exception while waiting", e);
|
|
@@ -221,6 +191,56 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
|
|
|
LOG.info("CommitProcessor exited loop!");
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Separated this method from the main run loop
|
|
|
+ * for test purposes (ZOOKEEPER-1863)
|
|
|
+ */
|
|
|
+ protected void processCommitted() {
|
|
|
+ Request request;
|
|
|
+
|
|
|
+ if (!stopped && !isProcessingRequest() &&
|
|
|
+ (committedRequests.peek() != null)) {
|
|
|
+
|
|
|
+ /*
|
|
|
+ * ZOOKEEPER-1863: continue only if there is no new request
|
|
|
+ * waiting in queuedRequests or it is waiting for a
|
|
|
+ * commit.
|
|
|
+ */
|
|
|
+ if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ request = committedRequests.poll();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * We match with nextPending so that we can move to the
|
|
|
+ * next request when it is committed. We also want to
|
|
|
+ * use nextPending because it has the cnxn member set
|
|
|
+ * properly.
|
|
|
+ */
|
|
|
+ Request pending = nextPending.get();
|
|
|
+ if (pending != null &&
|
|
|
+ pending.sessionId == request.sessionId &&
|
|
|
+ pending.cxid == request.cxid) {
|
|
|
+ // we want to send our version of the request.
|
|
|
+ // the pointer to the connection in the request
|
|
|
+ pending.setHdr(request.getHdr());
|
|
|
+ pending.setTxn(request.getTxn());
|
|
|
+ pending.zxid = request.zxid;
|
|
|
+ // Set currentlyCommitting so we will block until this
|
|
|
+ // completes. Cleared by CommitWorkRequest after
|
|
|
+ // nextProcessor returns.
|
|
|
+ currentlyCommitting.set(pending);
|
|
|
+ nextPending.set(null);
|
|
|
+ sendToNextProcessor(pending);
|
|
|
+ } else {
|
|
|
+ // this request came from someone else so just
|
|
|
+ // send the commit packet
|
|
|
+ currentlyCommitting.set(request);
|
|
|
+ sendToNextProcessor(request);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void start() {
|
|
|
int numCores = Runtime.getRuntime().availableProcessors();
|