|
@@ -1512,18 +1512,54 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
// entry point for quorum/Learner.java
|
|
|
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
|
|
|
- return processTxn(null, hdr, txn);
|
|
|
+ processTxnForSessionEvents(null, hdr, txn);
|
|
|
+ return processTxnInDB(hdr, txn);
|
|
|
}
|
|
|
|
|
|
// entry point for FinalRequestProcessor.java
|
|
|
public ProcessTxnResult processTxn(Request request) {
|
|
|
- return processTxn(request, request.getHdr(), request.getTxn());
|
|
|
+ TxnHeader hdr = request.getHdr();
|
|
|
+ processTxnForSessionEvents(request, hdr, request.getTxn());
|
|
|
+
|
|
|
+ final boolean writeRequest = (hdr != null);
|
|
|
+ final boolean quorumRequest = request.isQuorum();
|
|
|
+
|
|
|
+ // return fast w/o synchronization when we get a read
|
|
|
+ if (!writeRequest && !quorumRequest) {
|
|
|
+ return new ProcessTxnResult();
|
|
|
+ }
|
|
|
+ synchronized (outstandingChanges) {
|
|
|
+ ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn());
|
|
|
+
|
|
|
+ // request.hdr is set for write requests, which are the only ones
|
|
|
+ // that add to outstandingChanges.
|
|
|
+ if (writeRequest) {
|
|
|
+ long zxid = hdr.getZxid();
|
|
|
+ while (!outstandingChanges.isEmpty()
|
|
|
+ && outstandingChanges.peek().zxid <= zxid) {
|
|
|
+ ChangeRecord cr = outstandingChanges.remove();
|
|
|
+ ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
|
|
|
+ if (cr.zxid < zxid) {
|
|
|
+ LOG.warn("Zxid outstanding " + cr.zxid
|
|
|
+ + " is less than current " + zxid);
|
|
|
+ }
|
|
|
+ if (outstandingChangesForPath.get(cr.path) == cr) {
|
|
|
+ outstandingChangesForPath.remove(cr.path);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // do not add non quorum packets to the queue.
|
|
|
+ if (quorumRequest) {
|
|
|
+ getZKDatabase().addCommittedProposal(request);
|
|
|
+ }
|
|
|
+ return rc;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) {
|
|
|
- ProcessTxnResult rc;
|
|
|
- int opCode = request != null ? request.type : hdr.getType();
|
|
|
- long sessionId = request != null ? request.sessionId : hdr.getClientId();
|
|
|
+ private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
|
|
|
+ int opCode = (request == null) ? hdr.getType() : request.type;
|
|
|
+ long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
|
|
|
|
|
|
if (opCode == OpCode.createSession) {
|
|
|
if (hdr != null && txn instanceof CreateSessionTxn) {
|
|
@@ -1535,13 +1571,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
} else if (opCode == OpCode.closeSession) {
|
|
|
sessionTracker.removeSession(sessionId);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (hdr != null) {
|
|
|
- rc = getZKDatabase().processTxn(hdr, txn);
|
|
|
+ private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn) {
|
|
|
+ if (hdr == null) {
|
|
|
+ return new ProcessTxnResult();
|
|
|
} else {
|
|
|
- rc = new ProcessTxnResult();
|
|
|
+ return getZKDatabase().processTxn(hdr, txn);
|
|
|
}
|
|
|
- return rc;
|
|
|
}
|
|
|
|
|
|
public Map<Long, Set<Long>> getSessionExpiryMap() {
|