Zab.tla 67 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251
  1. (*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *)
  18. -------------------------------- MODULE Zab ---------------------------------
  19. (* This is the formal specification for the Zab consensus algorithm,
  20. in DSN'2011, which represents protocol specification in our work.*)
  21. EXTENDS Integers, FiniteSets, Sequences, Naturals, TLC
  22. -----------------------------------------------------------------------------
  23. \* The set of servers
  24. CONSTANT Server
  25. \* States of server
  26. CONSTANTS LOOKING, FOLLOWING, LEADING
  27. \* Zab states of server
  28. CONSTANTS ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
  29. \* Message types
  30. CONSTANTS CEPOCH, NEWEPOCH, ACKEPOCH, NEWLEADER, ACKLD, COMMITLD, PROPOSE, ACK, COMMIT
  31. \* [MaxTimeoutFailures, MaxTransactionNum, MaxEpoch, MaxRestarts]
  32. CONSTANT Parameters
  33. MAXEPOCH == 10
  34. NullPoint == CHOOSE p: p \notin Server
  35. Quorums == {Q \in SUBSET Server: Cardinality(Q)*2 > Cardinality(Server)}
  36. -----------------------------------------------------------------------------
  37. \* Variables that all servers use.
  38. VARIABLES state, \* State of server, in {LOOKING, FOLLOWING, LEADING}.
  39. zabState, \* Current phase of server, in
  40. \* {ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST}.
  41. acceptedEpoch, \* Epoch of the last LEADERINFO packet accepted,
  42. \* namely f.p in paper.
  43. currentEpoch, \* Epoch of the last NEWLEADER packet accepted,
  44. \* namely f.a in paper.
  45. history, \* History of servers: sequence of transactions,
  46. \* containing: [zxid, value, ackSid, epoch].
  47. lastCommitted \* Maximum index and zxid known to be committed,
  48. \* namely 'lastCommitted' in Leader. Starts from 0,
  49. \* and increases monotonically before restarting.
  50. \* Variables only used for leader.
  51. VARIABLES learners, \* Set of servers leader connects.
  52. cepochRecv, \* Set of learners leader has received CEPOCH from.
  53. \* Set of record [sid, connected, epoch],
  54. \* where epoch means f.p from followers.
  55. ackeRecv, \* Set of learners leader has received ACKEPOCH from.
  56. \* Set of record
  57. \* [sid, connected, peerLastEpoch, peerHistory],
  58. \* to record f.a and h(f) from followers.
  59. ackldRecv, \* Set of learners leader has received ACKLD from.
  60. \* Set of record [sid, connected].
  61. sendCounter \* Count of txns leader has broadcast.
  62. \* Variables only used for follower.
  63. VARIABLES connectInfo \* If follower has connected with leader.
  64. \* If follower lost connection, then null.
  65. \* Variable representing oracle of leader.
  66. VARIABLE leaderOracle \* Current oracle.
  67. \* Variables about network channel.
  68. VARIABLE msgs \* Simulates network channel.
  69. \* msgs[i][j] means the input buffer of server j
  70. \* from server i.
  71. \* Variables only used in verifying properties.
  72. VARIABLES epochLeader, \* Set of leaders in every epoch.
  73. proposalMsgsLog, \* Set of all broadcast messages.
  74. violatedInvariants \* Check whether there are conditions
  75. \* contrary to the facts.
  76. \* Variable used for recording critical data,
  77. \* to constrain state space or update values.
  78. VARIABLE recorder \* Consists: members of Parameters and pc, values.
  79. \* Form is record:
  80. \* [pc, nTransaction, maxEpoch, nTimeout, nRestart, nClientRequest]
  81. serverVars == <<state, zabState, acceptedEpoch, currentEpoch,
  82. history, lastCommitted>>
  83. leaderVars == <<learners, cepochRecv, ackeRecv, ackldRecv,
  84. sendCounter>>
  85. followerVars == connectInfo
  86. electionVars == leaderOracle
  87. msgVars == msgs
  88. verifyVars == <<proposalMsgsLog, epochLeader, violatedInvariants>>
  89. vars == <<serverVars, leaderVars, followerVars, electionVars,
  90. msgVars, verifyVars, recorder>>
  91. -----------------------------------------------------------------------------
  92. \* Return the maximum value from the set S
  93. Maximum(S) == IF S = {} THEN -1
  94. ELSE CHOOSE n \in S: \A m \in S: n >= m
  95. \* Return the minimum value from the set S
  96. Minimum(S) == IF S = {} THEN -1
  97. ELSE CHOOSE n \in S: \A m \in S: n <= m
  98. \* Check server state
  99. IsLeader(s) == state[s] = LEADING
  100. IsFollower(s) == state[s] = FOLLOWING
  101. IsLooking(s) == state[s] = LOOKING
  102. \* Check if s is a quorum
  103. IsQuorum(s) == s \in Quorums
  104. IsMyLearner(i, j) == j \in learners[i]
  105. IsMyLeader(i, j) == connectInfo[i] = j
  106. HasNoLeader(i) == connectInfo[i] = NullPoint
  107. HasLeader(i) == connectInfo[i] /= NullPoint
  108. -----------------------------------------------------------------------------
  109. \* FALSE: zxid1 <= zxid2; TRUE: zxid1 > zxid2
  110. ZxidCompare(zxid1, zxid2) == \/ zxid1[1] > zxid2[1]
  111. \/ /\ zxid1[1] = zxid2[1]
  112. /\ zxid1[2] > zxid2[2]
  113. ZxidEqual(zxid1, zxid2) == zxid1[1] = zxid2[1] /\ zxid1[2] = zxid2[2]
  114. TxnZxidEqual(txn, z) == txn.zxid[1] = z[1] /\ txn.zxid[2] = z[2]
  115. TxnEqual(txn1, txn2) == /\ ZxidEqual(txn1.zxid, txn2.zxid)
  116. /\ txn1.value = txn2.value
  117. EpochPrecedeInTxn(txn1, txn2) == txn1.zxid[1] < txn2.zxid[1]
  118. -----------------------------------------------------------------------------
  119. \* Actions about recorder
  120. GetParameter(p) == IF p \in DOMAIN Parameters THEN Parameters[p] ELSE 0
  121. GetRecorder(p) == IF p \in DOMAIN recorder THEN recorder[p] ELSE 0
  122. RecorderGetHelper(m) == (m :> recorder[m])
  123. RecorderIncHelper(m) == (m :> recorder[m] + 1)
  124. RecorderIncTimeout == RecorderIncHelper("nTimeout")
  125. RecorderGetTimeout == RecorderGetHelper("nTimeout")
  126. RecorderIncRestart == RecorderIncHelper("nRestart")
  127. RecorderGetRestart == RecorderGetHelper("nRestart")
  128. RecorderSetTransactionNum(pc) == ("nTransaction" :>
  129. IF pc[1] = "LeaderProcessRequest" THEN
  130. LET s == CHOOSE i \in Server:
  131. \A j \in Server: Len(history'[i]) >= Len(history'[j])
  132. IN Len(history'[s])
  133. ELSE recorder["nTransaction"])
  134. RecorderSetMaxEpoch(pc) == ("maxEpoch" :>
  135. IF pc[1] = "LeaderProcessCEPOCH" THEN
  136. LET s == CHOOSE i \in Server:
  137. \A j \in Server: acceptedEpoch'[i] >= acceptedEpoch'[j]
  138. IN acceptedEpoch'[s]
  139. ELSE recorder["maxEpoch"])
  140. RecorderSetRequests(pc) == ("nClientRequest" :>
  141. IF pc[1] = "LeaderProcessRequest" THEN
  142. recorder["nClientRequest"] + 1
  143. ELSE recorder["nClientRequest"] )
  144. RecorderSetPc(pc) == ("pc" :> pc)
  145. RecorderSetFailure(pc) == CASE pc[1] = "Timeout" -> RecorderIncTimeout @@ RecorderGetRestart
  146. [] pc[1] = "LeaderTimeout" -> RecorderIncTimeout @@ RecorderGetRestart
  147. [] pc[1] = "FollowerTimeout" -> RecorderIncTimeout @@ RecorderGetRestart
  148. [] pc[1] = "Restart" -> RecorderIncTimeout @@ RecorderIncRestart
  149. [] OTHER -> RecorderGetTimeout @@ RecorderGetRestart
  150. UpdateRecorder(pc) == recorder' = RecorderSetFailure(pc) @@ RecorderSetTransactionNum(pc)
  151. @@ RecorderSetMaxEpoch(pc) @@ RecorderSetPc(pc)
  152. @@ RecorderSetRequests(pc) @@ recorder
  153. UnchangeRecorder == UNCHANGED recorder
  154. CheckParameterHelper(n, p, Comp(_,_)) == IF p \in DOMAIN Parameters
  155. THEN Comp(n, Parameters[p])
  156. ELSE TRUE
  157. CheckParameterLimit(n, p) == CheckParameterHelper(n, p, LAMBDA i, j: i < j)
  158. CheckTimeout == CheckParameterLimit(recorder.nTimeout, "MaxTimeoutFailures")
  159. CheckTransactionNum == CheckParameterLimit(recorder.nTransaction, "MaxTransactionNum")
  160. CheckEpoch == CheckParameterLimit(recorder.maxEpoch, "MaxEpoch")
  161. CheckRestart == /\ CheckTimeout
  162. /\ CheckParameterLimit(recorder.nRestart, "MaxRestarts")
  163. CheckStateConstraints == CheckTimeout /\ CheckTransactionNum /\ CheckEpoch /\ CheckRestart
  164. -----------------------------------------------------------------------------
  165. \* Actions about network
  166. PendingCEPOCH(i, j) == /\ msgs[j][i] /= << >>
  167. /\ msgs[j][i][1].mtype = CEPOCH
  168. PendingNEWEPOCH(i, j) == /\ msgs[j][i] /= << >>
  169. /\ msgs[j][i][1].mtype = NEWEPOCH
  170. PendingACKEPOCH(i, j) == /\ msgs[j][i] /= << >>
  171. /\ msgs[j][i][1].mtype = ACKEPOCH
  172. PendingNEWLEADER(i, j) == /\ msgs[j][i] /= << >>
  173. /\ msgs[j][i][1].mtype = NEWLEADER
  174. PendingACKLD(i, j) == /\ msgs[j][i] /= << >>
  175. /\ msgs[j][i][1].mtype = ACKLD
  176. PendingCOMMITLD(i, j) == /\ msgs[j][i] /= << >>
  177. /\ msgs[j][i][1].mtype = COMMITLD
  178. PendingPROPOSE(i, j) == /\ msgs[j][i] /= << >>
  179. /\ msgs[j][i][1].mtype = PROPOSE
  180. PendingACK(i, j) == /\ msgs[j][i] /= << >>
  181. /\ msgs[j][i][1].mtype = ACK
  182. PendingCOMMIT(i, j) == /\ msgs[j][i] /= << >>
  183. /\ msgs[j][i][1].mtype = COMMIT
  184. \* Add a message to msgs - add a message m to msgs.
  185. Send(i, j, m) == msgs' = [msgs EXCEPT ![i][j] = Append(msgs[i][j], m)]
  186. \* Remove a message from msgs - discard head of msgs.
  187. Discard(i, j) == msgs' = IF msgs[i][j] /= << >> THEN [msgs EXCEPT ![i][j] = Tail(msgs[i][j])]
  188. ELSE msgs
  189. \* Combination of Send and Discard - discard head of msgs[j][i] and add m into msgs.
  190. Reply(i, j, m) == msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
  191. ![i][j] = Append(msgs[i][j], m)]
  192. \* Shuffle input buffer.
  193. Clean(i, j) == msgs' = [msgs EXCEPT ![j][i] = << >>, ![i][j] = << >>]
  194. CleanInputBuffer(S) == msgs' = [s \in Server |->
  195. [v \in Server |-> IF v \in S THEN << >>
  196. ELSE msgs[s][v] ] ]
  197. \* Leader broadcasts a message PROPOSE to all other servers in Q.
  198. \* Note: In paper, Q is fuzzy. We think servers who leader broadcasts NEWLEADER to
  199. \* should receive every PROPOSE. So we consider ackeRecv as Q.
  200. \* Since we let ackeRecv = Q, there may exist some follower receiving COMMIT before
  201. \* COMMITLD, and zxid in COMMIT later than zxid in COMMITLD. To avoid this situation,
  202. \* if f \in ackeRecv but \notin ackldRecv, f should not receive COMMIT until
  203. \* f \in ackldRecv and receives COMMITLD.
  204. Broadcast(i, m) ==
  205. LET ackeRecv_quorum == {a \in ackeRecv[i]: a.connected = TRUE }
  206. sid_ackeRecv == { a.sid: a \in ackeRecv_quorum }
  207. IN msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ v \in sid_ackeRecv
  208. /\ v \in learners[i]
  209. /\ v /= i
  210. THEN Append(msgs[i][v], m)
  211. ELSE msgs[i][v] ] ]
  212. \* Since leader decides to broadcasts message COMMIT when processing ACK, so
  213. \* we need to discard ACK and broadcast COMMIT.
  214. \* Here Q is ackldRecv, because we assume that f should not receive COMMIT until
  215. \* f receives COMMITLD.
  216. DiscardAndBroadcast(i, j, m) ==
  217. LET ackldRecv_quorum == {a \in ackldRecv[i]: a.connected = TRUE }
  218. sid_ackldRecv == { a.sid: a \in ackldRecv_quorum }
  219. IN msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
  220. ![i] = [v \in Server |-> IF /\ v \in sid_ackldRecv
  221. /\ v \in learners[i]
  222. /\ v /= i
  223. THEN Append(msgs[i][v], m)
  224. ELSE msgs[i][v] ] ]
  225. \* Leader broadcasts LEADERINFO to all other servers in cepochRecv.
  226. DiscardAndBroadcastNEWEPOCH(i, j, m) ==
  227. LET new_cepochRecv_quorum == {c \in cepochRecv'[i]: c.connected = TRUE }
  228. new_sid_cepochRecv == { c.sid: c \in new_cepochRecv_quorum }
  229. IN msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
  230. ![i] = [v \in Server |-> IF /\ v \in new_sid_cepochRecv
  231. /\ v \in learners[i]
  232. /\ v /= i
  233. THEN Append(msgs[i][v], m)
  234. ELSE msgs[i][v] ] ]
  235. \* Leader broadcasts NEWLEADER to all other servers in ackeRecv.
  236. DiscardAndBroadcastNEWLEADER(i, j, m) ==
  237. LET new_ackeRecv_quorum == {a \in ackeRecv'[i]: a.connected = TRUE }
  238. new_sid_ackeRecv == { a.sid: a \in new_ackeRecv_quorum }
  239. IN msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
  240. ![i] = [v \in Server |-> IF /\ v \in new_sid_ackeRecv
  241. /\ v \in learners[i]
  242. /\ v /= i
  243. THEN Append(msgs[i][v], m)
  244. ELSE msgs[i][v] ] ]
  245. \* Leader broadcasts COMMITLD to all other servers in ackldRecv.
  246. DiscardAndBroadcastCOMMITLD(i, j, m) ==
  247. LET new_ackldRecv_quorum == {a \in ackldRecv'[i]: a.connected = TRUE }
  248. new_sid_ackldRecv == { a.sid: a \in new_ackldRecv_quorum }
  249. IN msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
  250. ![i] = [v \in Server |-> IF /\ v \in new_sid_ackldRecv
  251. /\ v \in learners[i]
  252. /\ v /= i
  253. THEN Append(msgs[i][v], m)
  254. ELSE msgs[i][v] ] ]
  255. -----------------------------------------------------------------------------
  256. \* Define initial values for all variables
  257. InitServerVars == /\ state = [s \in Server |-> LOOKING]
  258. /\ zabState = [s \in Server |-> ELECTION]
  259. /\ acceptedEpoch = [s \in Server |-> 0]
  260. /\ currentEpoch = [s \in Server |-> 0]
  261. /\ history = [s \in Server |-> << >>]
  262. /\ lastCommitted = [s \in Server |-> [ index |-> 0,
  263. zxid |-> <<0, 0>> ] ]
  264. InitLeaderVars == /\ learners = [s \in Server |-> {}]
  265. /\ cepochRecv = [s \in Server |-> {}]
  266. /\ ackeRecv = [s \in Server |-> {}]
  267. /\ ackldRecv = [s \in Server |-> {}]
  268. /\ sendCounter = [s \in Server |-> 0]
  269. InitFollowerVars == connectInfo = [s \in Server |-> NullPoint]
  270. InitElectionVars == leaderOracle = NullPoint
  271. InitMsgVars == msgs = [s \in Server |-> [v \in Server |-> << >>] ]
  272. InitVerifyVars == /\ proposalMsgsLog = {}
  273. /\ epochLeader = [i \in 1..MAXEPOCH |-> {} ]
  274. /\ violatedInvariants = [stateInconsistent |-> FALSE,
  275. proposalInconsistent |-> FALSE,
  276. commitInconsistent |-> FALSE,
  277. ackInconsistent |-> FALSE,
  278. messageIllegal |-> FALSE ]
  279. InitRecorder == recorder = [nTimeout |-> 0,
  280. nTransaction |-> 0,
  281. maxEpoch |-> 0,
  282. nRestart |-> 0,
  283. pc |-> <<"Init">>,
  284. nClientRequest |-> 0]
  285. Init == /\ InitServerVars
  286. /\ InitLeaderVars
  287. /\ InitFollowerVars
  288. /\ InitElectionVars
  289. /\ InitVerifyVars
  290. /\ InitMsgVars
  291. /\ InitRecorder
  292. -----------------------------------------------------------------------------
  293. \* Utils in state switching
  294. FollowerShutdown(i) ==
  295. /\ state' = [state EXCEPT ![i] = LOOKING]
  296. /\ zabState' = [zabState EXCEPT ![i] = ELECTION]
  297. /\ connectInfo' = [connectInfo EXCEPT ![i] = NullPoint]
  298. LeaderShutdown(i) ==
  299. /\ LET S == learners[i]
  300. IN /\ state' = [s \in Server |-> IF s \in S THEN LOOKING ELSE state[s] ]
  301. /\ zabState' = [s \in Server |-> IF s \in S THEN ELECTION ELSE zabState[s] ]
  302. /\ connectInfo' = [s \in Server |-> IF s \in S THEN NullPoint ELSE connectInfo[s] ]
  303. /\ CleanInputBuffer(S)
  304. /\ learners' = [learners EXCEPT ![i] = {}]
  305. SwitchToFollower(i) ==
  306. /\ state' = [state EXCEPT ![i] = FOLLOWING]
  307. /\ zabState' = [zabState EXCEPT ![i] = DISCOVERY]
  308. SwitchToLeader(i) ==
  309. /\ state' = [state EXCEPT ![i] = LEADING]
  310. /\ zabState' = [zabState EXCEPT ![i] = DISCOVERY]
  311. /\ learners' = [learners EXCEPT ![i] = {i}]
  312. /\ cepochRecv' = [cepochRecv EXCEPT ![i] = { [ sid |-> i,
  313. connected |-> TRUE,
  314. epoch |-> acceptedEpoch[i] ] }]
  315. /\ ackeRecv' = [ackeRecv EXCEPT ![i] = { [ sid |-> i,
  316. connected |-> TRUE,
  317. peerLastEpoch |-> currentEpoch[i],
  318. peerHistory |-> history[i] ] }]
  319. /\ ackldRecv' = [ackldRecv EXCEPT ![i] = { [ sid |-> i,
  320. connected |-> TRUE ] }]
  321. /\ sendCounter' = [sendCounter EXCEPT ![i] = 0]
  322. RemoveCepochRecv(set, sid) ==
  323. LET sid_cepochRecv == {s.sid: s \in set}
  324. IN IF sid \notin sid_cepochRecv THEN set
  325. ELSE LET info == CHOOSE s \in set: s.sid = sid
  326. new_info == [ sid |-> sid,
  327. connected |-> FALSE,
  328. epoch |-> info.epoch ]
  329. IN (set \ {info}) \union {new_info}
  330. RemoveAckeRecv(set, sid) ==
  331. LET sid_ackeRecv == {s.sid: s \in set}
  332. IN IF sid \notin sid_ackeRecv THEN set
  333. ELSE LET info == CHOOSE s \in set: s.sid = sid
  334. new_info == [ sid |-> sid,
  335. connected |-> FALSE,
  336. peerLastEpoch |-> info.peerLastEpoch,
  337. peerHistory |-> info.peerHistory ]
  338. IN (set \ {info}) \union {new_info}
  339. RemoveAckldRecv(set, sid) ==
  340. LET sid_ackldRecv == {s.sid: s \in set}
  341. IN IF sid \notin sid_ackldRecv THEN set
  342. ELSE LET info == CHOOSE s \in set: s.sid = sid
  343. new_info == [ sid |-> sid,
  344. connected |-> FALSE ]
  345. IN (set \ {info}) \union {new_info}
  346. RemoveLearner(i, j) ==
  347. /\ learners' = [learners EXCEPT ![i] = @ \ {j}]
  348. /\ cepochRecv' = [cepochRecv EXCEPT ![i] = RemoveCepochRecv(@, j) ]
  349. /\ ackeRecv' = [ackeRecv EXCEPT ![i] = RemoveAckeRecv(@, j) ]
  350. /\ ackldRecv' = [ackldRecv EXCEPT ![i] = RemoveAckldRecv(@, j) ]
  351. -----------------------------------------------------------------------------
  352. \* Actions of election
  353. UpdateLeader(i) ==
  354. /\ IsLooking(i)
  355. /\ leaderOracle /= i
  356. /\ leaderOracle' = i
  357. /\ SwitchToLeader(i)
  358. /\ UNCHANGED <<acceptedEpoch, currentEpoch, history, lastCommitted,
  359. followerVars, verifyVars, msgVars>>
  360. /\ UpdateRecorder(<<"UpdateLeader", i>>)
  361. FollowLeader(i) ==
  362. /\ IsLooking(i)
  363. /\ leaderOracle /= NullPoint
  364. /\ \/ /\ leaderOracle = i
  365. /\ SwitchToLeader(i)
  366. \/ /\ leaderOracle /= i
  367. /\ SwitchToFollower(i)
  368. /\ UNCHANGED leaderVars
  369. /\ UNCHANGED <<acceptedEpoch, currentEpoch, history, lastCommitted,
  370. electionVars, followerVars, verifyVars, msgVars>>
  371. /\ UpdateRecorder(<<"FollowLeader", i>>)
  372. -----------------------------------------------------------------------------
  373. (* Actions of situation error. Situation error in protocol spec is different
  374. from latter specs. This is for compressing state space, we focus on results
  375. from external events (e.g. network partition, node failure, etc.), so we do
  376. not need to add variables and actions about network conditions and node
  377. conditions. It is reasonable that we have action 'Restart' but no 'Crash',
  378. because when a node does not execute any internal events after restarting,
  379. this is equivalent to executing a crash.
  380. *)
  381. \* Timeout between leader and follower.
  382. Timeout(i, j) ==
  383. /\ CheckTimeout \* test restrictions of timeout
  384. /\ IsLeader(i) /\ IsMyLearner(i, j)
  385. /\ IsFollower(j) /\ IsMyLeader(j, i)
  386. /\ LET newLearners == learners[i] \ {j}
  387. IN \/ /\ IsQuorum(newLearners) \* just remove this learner
  388. /\ RemoveLearner(i, j)
  389. /\ FollowerShutdown(j)
  390. /\ Clean(i, j)
  391. \/ /\ ~IsQuorum(newLearners) \* leader switches to looking
  392. /\ LeaderShutdown(i)
  393. /\ UNCHANGED <<cepochRecv, ackeRecv, ackldRecv>>
  394. /\ UNCHANGED <<acceptedEpoch, currentEpoch, history, lastCommitted,
  395. sendCounter, electionVars, verifyVars>>
  396. /\ UpdateRecorder(<<"Timeout", i, j>>)
  397. Restart(i) ==
  398. /\ CheckRestart \* test restrictions of restart
  399. /\ \/ /\ IsLooking(i)
  400. /\ UNCHANGED <<state, zabState, learners, followerVars, msgVars,
  401. cepochRecv, ackeRecv, ackldRecv>>
  402. \/ /\ IsFollower(i)
  403. /\ LET connectedWithLeader == HasLeader(i)
  404. IN \/ /\ connectedWithLeader
  405. /\ LET leader == connectInfo[i]
  406. newLearners == learners[leader] \ {i}
  407. IN
  408. \/ /\ IsQuorum(newLearners) \* leader remove learner i
  409. /\ RemoveLearner(leader, i)
  410. /\ FollowerShutdown(i)
  411. /\ Clean(leader, i)
  412. \/ /\ ~IsQuorum(newLearners) \* leader switches to looking
  413. /\ LeaderShutdown(leader)
  414. /\ UNCHANGED <<cepochRecv, ackeRecv, ackldRecv>>
  415. \/ /\ ~connectedWithLeader
  416. /\ FollowerShutdown(i)
  417. /\ CleanInputBuffer({i})
  418. /\ UNCHANGED <<learners, cepochRecv, ackeRecv, ackldRecv>>
  419. \/ /\ IsLeader(i)
  420. /\ LeaderShutdown(i)
  421. /\ UNCHANGED <<cepochRecv, ackeRecv, ackldRecv>>
  422. /\ lastCommitted' = [lastCommitted EXCEPT ![i] = [ index |-> 0,
  423. zxid |-> <<0, 0>> ] ]
  424. /\ UNCHANGED <<acceptedEpoch, currentEpoch, history,
  425. sendCounter, leaderOracle, verifyVars>>
  426. /\ UpdateRecorder(<<"Restart", i>>)
  427. -----------------------------------------------------------------------------
  428. (* Establish connection between leader and follower. *)
  429. ConnectAndFollowerSendCEPOCH(i, j) ==
  430. /\ IsLeader(i) /\ \lnot IsMyLearner(i, j)
  431. /\ IsFollower(j) /\ HasNoLeader(j) /\ leaderOracle = i
  432. /\ learners' = [learners EXCEPT ![i] = @ \union {j}]
  433. /\ connectInfo' = [connectInfo EXCEPT ![j] = i]
  434. /\ Send(j, i, [ mtype |-> CEPOCH,
  435. mepoch |-> acceptedEpoch[j] ]) \* contains f.p
  436. /\ UNCHANGED <<serverVars, electionVars, verifyVars, cepochRecv,
  437. ackeRecv, ackldRecv, sendCounter>>
  438. /\ UpdateRecorder(<<"ConnectAndFollowerSendCEPOCH", i, j>>)
  439. CepochRecvQuorumFormed(i) == LET sid_cepochRecv == {c.sid: c \in cepochRecv[i]}
  440. IN IsQuorum(sid_cepochRecv)
  441. CepochRecvBecomeQuorum(i) == LET sid_cepochRecv == {c.sid: c \in cepochRecv'[i]}
  442. IN IsQuorum(sid_cepochRecv)
  443. UpdateCepochRecv(oldSet, sid, peerEpoch) ==
  444. LET sid_set == {s.sid: s \in oldSet}
  445. IN IF sid \in sid_set
  446. THEN LET old_info == CHOOSE info \in oldSet: info.sid = sid
  447. new_info == [ sid |-> sid,
  448. connected |-> TRUE,
  449. epoch |-> peerEpoch ]
  450. IN ( oldSet \ {old_info} ) \union {new_info}
  451. ELSE LET follower_info == [ sid |-> sid,
  452. connected |-> TRUE,
  453. epoch |-> peerEpoch ]
  454. IN oldSet \union {follower_info}
  455. \* Determine new e' in this round from a quorum of CEPOCH.
  456. DetermineNewEpoch(i) ==
  457. LET epoch_cepochRecv == {c.epoch: c \in cepochRecv'[i]}
  458. IN Maximum(epoch_cepochRecv) + 1
  459. (* Leader waits for receiving FOLLOWERINFO from a quorum including itself,
  460. and chooses a new epoch e' as its own epoch and broadcasts NEWEPOCH. *)
  461. LeaderProcessCEPOCH(i, j) ==
  462. /\ CheckEpoch \* test restrictions of max epoch
  463. /\ IsLeader(i)
  464. /\ PendingCEPOCH(i, j)
  465. /\ LET msg == msgs[j][i][1]
  466. infoOk == IsMyLearner(i, j)
  467. IN /\ infoOk
  468. /\ \/ \* 1. has not broadcast NEWEPOCH
  469. /\ ~CepochRecvQuorumFormed(i)
  470. /\ \/ /\ zabState[i] = DISCOVERY
  471. /\ UNCHANGED violatedInvariants
  472. \/ /\ zabState[i] /= DISCOVERY
  473. /\ PrintT("Exception: CepochRecvQuorumFormed false," \o
  474. " while zabState not DISCOVERY.")
  475. /\ violatedInvariants' = [violatedInvariants
  476. EXCEPT !.stateInconsistent = TRUE]
  477. /\ cepochRecv' = [cepochRecv EXCEPT ![i] = UpdateCepochRecv(@, j, msg.mepoch) ]
  478. /\ \/ \* 1.1. cepochRecv becomes quorum,
  479. \* then determine e' and broadcasts NEWEPOCH in Q.
  480. /\ CepochRecvBecomeQuorum(i)
  481. /\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = DetermineNewEpoch(i)]
  482. /\ LET m == [ mtype |-> NEWEPOCH,
  483. mepoch |-> acceptedEpoch'[i] ]
  484. IN DiscardAndBroadcastNEWEPOCH(i, j, m)
  485. \/ \* 1.2. cepochRecv still not quorum.
  486. /\ ~CepochRecvBecomeQuorum(i)
  487. /\ Discard(j, i)
  488. /\ UNCHANGED acceptedEpoch
  489. \/ \* 2. has broadcast NEWEPOCH
  490. /\ CepochRecvQuorumFormed(i)
  491. /\ cepochRecv' = [cepochRecv EXCEPT ![i] = UpdateCepochRecv(@, j, msg.mepoch) ]
  492. /\ Reply(i, j, [ mtype |-> NEWEPOCH,
  493. mepoch |-> acceptedEpoch[i] ])
  494. /\ UNCHANGED <<violatedInvariants, acceptedEpoch>>
  495. /\ UNCHANGED <<state, zabState, currentEpoch, history, lastCommitted, learners,
  496. ackeRecv, ackldRecv, sendCounter, followerVars,
  497. electionVars, proposalMsgsLog, epochLeader>>
  498. /\ UpdateRecorder(<<"LeaderProcessCEPOCH", i, j>>)
  499. (* Follower receives LEADERINFO. If newEpoch >= acceptedEpoch, then follower
  500. updates acceptedEpoch and sends ACKEPOCH back, containing currentEpoch and
  501. history. After this, zabState turns to SYNC. *)
  502. FollowerProcessNEWEPOCH(i, j) ==
  503. /\ IsFollower(i)
  504. /\ PendingNEWEPOCH(i, j)
  505. /\ LET msg == msgs[j][i][1]
  506. infoOk == IsMyLeader(i, j)
  507. stateOk == zabState[i] = DISCOVERY
  508. epochOk == msg.mepoch >= acceptedEpoch[i]
  509. IN /\ infoOk
  510. /\ \/ \* 1. Normal case
  511. /\ epochOk
  512. /\ \/ /\ stateOk
  513. /\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = msg.mepoch]
  514. /\ LET m == [ mtype |-> ACKEPOCH,
  515. mepoch |-> currentEpoch[i],
  516. mhistory |-> history[i] ]
  517. IN Reply(i, j, m)
  518. /\ zabState' = [zabState EXCEPT ![i] = SYNCHRONIZATION]
  519. /\ UNCHANGED violatedInvariants
  520. \/ /\ ~stateOk
  521. /\ PrintT("Exception: Follower receives NEWEPOCH," \o
  522. " whileZabState not DISCOVERY.")
  523. /\ violatedInvariants' = [violatedInvariants
  524. EXCEPT !.stateInconsistent = TRUE]
  525. /\ Discard(j, i)
  526. /\ UNCHANGED <<acceptedEpoch, zabState>>
  527. /\ UNCHANGED <<followerVars, learners, cepochRecv, ackeRecv,
  528. ackldRecv, state>>
  529. \/ \* 2. Abnormal case - go back to election
  530. /\ ~epochOk
  531. /\ FollowerShutdown(i)
  532. /\ LET leader == connectInfo[i]
  533. IN /\ Clean(i, leader)
  534. /\ RemoveLearner(leader, i)
  535. /\ UNCHANGED <<acceptedEpoch, violatedInvariants>>
  536. /\ UNCHANGED <<currentEpoch, history, lastCommitted, sendCounter,
  537. electionVars, proposalMsgsLog, epochLeader>>
  538. /\ UpdateRecorder(<<"FollowerProcessNEWEPOCH", i, j>>)
  539. AckeRecvQuorumFormed(i) == LET sid_ackeRecv == {a.sid: a \in ackeRecv[i]}
  540. IN IsQuorum(sid_ackeRecv)
  541. AckeRecvBecomeQuorum(i) == LET sid_ackeRecv == {a.sid: a \in ackeRecv'[i]}
  542. IN IsQuorum(sid_ackeRecv)
  543. UpdateAckeRecv(oldSet, sid, peerEpoch, peerHistory) ==
  544. LET sid_set == {s.sid: s \in oldSet}
  545. follower_info == [ sid |-> sid,
  546. connected |-> TRUE,
  547. peerLastEpoch |-> peerEpoch,
  548. peerHistory |-> peerHistory ]
  549. IN IF sid \in sid_set
  550. THEN LET old_info == CHOOSE info \in oldSet: info.sid = sid
  551. IN (oldSet \ {old_info}) \union {follower_info}
  552. ELSE oldSet \union {follower_info}
  553. \* for checking invariants
  554. RECURSIVE SetPacketsForChecking(_,_,_,_,_,_)
  555. SetPacketsForChecking(set, src, ep, his, cur, end) ==
  556. IF cur > end THEN set
  557. ELSE LET m_proposal == [ source |-> src,
  558. epoch |-> ep,
  559. zxid |-> his[cur].zxid,
  560. data |-> his[cur].value ]
  561. IN SetPacketsForChecking((set \union {m_proposal}), src, ep, his, cur + 1, end)
  562. LastZxidOfHistory(his) == IF Len(his) = 0 THEN <<0, 0>>
  563. ELSE his[Len(his)].zxid
  564. \* TRUE: f1.a > f2.a or (f1.a = fa.a and f1.zxid >= f2.zxid)
  565. MoreResentOrEqual(ss1, ss2) == \/ ss1.currentEpoch > ss2.currentEpoch
  566. \/ /\ ss1.currentEpoch = ss2.currentEpoch
  567. /\ ~ZxidCompare(ss2.lastZxid, ss1.lastZxid)
  568. \* Determine initial history Ie' in this round from a quorum of ACKEPOCH.
  569. DetermineInitialHistory(i) ==
  570. LET set == ackeRecv'[i]
  571. ss_set == { [ sid |-> a.sid,
  572. currentEpoch |-> a.peerLastEpoch,
  573. lastZxid |-> LastZxidOfHistory(a.peerHistory) ]
  574. : a \in set }
  575. selected == CHOOSE ss \in ss_set:
  576. \A ss1 \in (ss_set \ {ss}): MoreResentOrEqual(ss, ss1)
  577. info == CHOOSE f \in set: f.sid = selected.sid
  578. IN info.peerHistory
  579. RECURSIVE InitAcksidHelper(_,_)
  580. InitAcksidHelper(txns, src) == IF Len(txns) = 0 THEN << >>
  581. ELSE LET oldTxn == txns[1]
  582. newTxn == [ zxid |-> oldTxn.zxid,
  583. value |-> oldTxn.value,
  584. ackSid |-> {src},
  585. epoch |-> oldTxn.epoch ]
  586. IN <<newTxn>> \o InitAcksidHelper( Tail(txns), src)
  587. \* Atomically let all txns in initial history contain self's acks.
  588. InitAcksid(i, his) == InitAcksidHelper(his, i)
  589. (* Leader waits for receiving ACKEPOPCH from a quorum, and determines initialHistory
  590. according to history of whom has most recent state summary from them. After this,
  591. leader's zabState turns to SYNCHRONIZATION. *)
  592. LeaderProcessACKEPOCH(i, j) ==
  593. /\ IsLeader(i)
  594. /\ PendingACKEPOCH(i, j)
  595. /\ LET msg == msgs[j][i][1]
  596. infoOk == IsMyLearner(i, j)
  597. IN /\ infoOk
  598. /\ \/ \* 1. has broadcast NEWLEADER
  599. /\ AckeRecvQuorumFormed(i)
  600. /\ ackeRecv' = [ackeRecv EXCEPT ![i] = UpdateAckeRecv(@, j,
  601. msg.mepoch, msg.mhistory) ]
  602. /\ LET toSend == history[i] \* contains (Ie', Be')
  603. m == [ mtype |-> NEWLEADER,
  604. mepoch |-> acceptedEpoch[i],
  605. mhistory |-> toSend ]
  606. set_forChecking == SetPacketsForChecking({ }, i,
  607. acceptedEpoch[i], toSend, 1, Len(toSend))
  608. IN
  609. /\ Reply(i, j, m)
  610. /\ proposalMsgsLog' = proposalMsgsLog \union set_forChecking
  611. /\ UNCHANGED <<violatedInvariants, currentEpoch, history,
  612. zabState, epochLeader>>
  613. \/ \* 2. has not broadcast NEWLEADER
  614. /\ ~AckeRecvQuorumFormed(i)
  615. /\ \/ /\ zabState[i] = DISCOVERY
  616. /\ UNCHANGED violatedInvariants
  617. \/ /\ zabState[i] /= DISCOVERY
  618. /\ PrintT("Exception: AckeRecvQuorumFormed false," \o
  619. " while zabState not DISCOVERY.")
  620. /\ violatedInvariants' = [violatedInvariants EXCEPT
  621. !.stateInconsistent = TRUE]
  622. /\ ackeRecv' = [ackeRecv EXCEPT ![i] = UpdateAckeRecv(@, j,
  623. msg.mepoch, msg.mhistory) ]
  624. /\ \/ \* 2.1. ackeRecv becomes quorum, determine Ie'
  625. \* and broadcasts NEWLEADER in Q. (l.1.2 + l.2.1)
  626. /\ AckeRecvBecomeQuorum(i)
  627. /\ \* Update f.a
  628. LET newLeaderEpoch == acceptedEpoch[i] IN
  629. /\ currentEpoch' = [currentEpoch EXCEPT ![i] = newLeaderEpoch]
  630. /\ epochLeader' = [epochLeader EXCEPT ![newLeaderEpoch]
  631. = @ \union {i} ] \* for checking invariants
  632. /\ \* Determine initial history Ie'
  633. LET initialHistory == DetermineInitialHistory(i) IN
  634. history' = [history EXCEPT ![i] = InitAcksid(i, initialHistory) ]
  635. /\ \* Update zabState
  636. zabState' = [zabState EXCEPT ![i] = SYNCHRONIZATION]
  637. /\ \* Broadcast NEWLEADER with (e', Ie')
  638. LET toSend == history'[i]
  639. m == [ mtype |-> NEWLEADER,
  640. mepoch |-> acceptedEpoch[i],
  641. mhistory |-> toSend ]
  642. set_forChecking == SetPacketsForChecking({ }, i,
  643. acceptedEpoch[i], toSend, 1, Len(toSend))
  644. IN
  645. /\ DiscardAndBroadcastNEWLEADER(i, j, m)
  646. /\ proposalMsgsLog' = proposalMsgsLog \union set_forChecking
  647. \/ \* 2.2. ackeRecv still not quorum.
  648. /\ ~AckeRecvBecomeQuorum(i)
  649. /\ Discard(j, i)
  650. /\ UNCHANGED <<currentEpoch, history, zabState,
  651. proposalMsgsLog, epochLeader>>
  652. /\ UNCHANGED <<state, acceptedEpoch, lastCommitted, learners, cepochRecv, ackldRecv,
  653. sendCounter, followerVars, electionVars>>
  654. /\ UpdateRecorder(<<"LeaderProcessACKEPOCH", i, j>>)
  655. -----------------------------------------------------------------------------
  656. (* Follower receives NEWLEADER. Update f.a and history. *)
  657. FollowerProcessNEWLEADER(i, j) ==
  658. /\ IsFollower(i)
  659. /\ PendingNEWLEADER(i, j)
  660. /\ LET msg == msgs[j][i][1]
  661. infoOk == IsMyLeader(i, j)
  662. epochOk == acceptedEpoch[i] = msg.mepoch
  663. stateOk == zabState[i] = SYNCHRONIZATION
  664. IN /\ infoOk
  665. /\ \/ \* 1. f.p not equals e', starts a new iteration.
  666. /\ ~epochOk
  667. /\ FollowerShutdown(i)
  668. /\ LET leader == connectInfo[i]
  669. IN /\ Clean(i, leader)
  670. /\ RemoveLearner(leader, i)
  671. /\ UNCHANGED <<violatedInvariants, currentEpoch, history>>
  672. \/ \* 2. f.p equals e'.
  673. /\ epochOk
  674. /\ \/ /\ stateOk
  675. /\ UNCHANGED violatedInvariants
  676. \/ /\ ~stateOk
  677. /\ PrintT("Exception: Follower receives NEWLEADER," \o
  678. " whileZabState not SYNCHRONIZATION.")
  679. /\ violatedInvariants' = [violatedInvariants
  680. EXCEPT !.stateInconsistent = TRUE]
  681. /\ currentEpoch' = [currentEpoch EXCEPT ![i] = acceptedEpoch[i]]
  682. /\ history' = [history EXCEPT ![i] = msg.mhistory] \* no need to care ackSid
  683. /\ LET m == [ mtype |-> ACKLD,
  684. mzxid |-> LastZxidOfHistory(history'[i]) ]
  685. IN Reply(i, j, m)
  686. /\ UNCHANGED <<followerVars, state, zabState, learners, cepochRecv,
  687. ackeRecv, ackldRecv>>
  688. /\ UNCHANGED <<acceptedEpoch, lastCommitted, sendCounter, electionVars,
  689. proposalMsgsLog, epochLeader>>
  690. /\ UpdateRecorder(<<"FollowerProcessNEWLEADER", i, j>>)
  691. AckldRecvQuorumFormed(i) == LET sid_ackldRecv == {a.sid: a \in ackldRecv[i]}
  692. IN IsQuorum(sid_ackldRecv)
  693. AckldRecvBecomeQuorum(i) == LET sid_ackldRecv == {a.sid: a \in ackldRecv'[i]}
  694. IN IsQuorum(sid_ackldRecv)
  695. UpdateAckldRecv(oldSet, sid) ==
  696. LET sid_set == {s.sid: s \in oldSet}
  697. follower_info == [ sid |-> sid,
  698. connected |-> TRUE ]
  699. IN IF sid \in sid_set
  700. THEN LET old_info == CHOOSE info \in oldSet: info.sid = sid
  701. IN (oldSet \ {old_info}) \union {follower_info}
  702. ELSE oldSet \union {follower_info}
  703. LastZxid(i) == LastZxidOfHistory(history[i])
  704. RECURSIVE UpdateAcksidHelper(_,_,_)
  705. UpdateAcksidHelper(txns, target, endZxid) ==
  706. IF Len(txns) = 0 THEN << >>
  707. ELSE LET oldTxn == txns[1]
  708. IN IF ZxidCompare(oldTxn.zxid, endZxid) THEN txns
  709. ELSE LET newTxn == [ zxid |-> oldTxn.zxid,
  710. value |-> oldTxn.value,
  711. ackSid |-> IF target \in oldTxn.ackSid
  712. THEN oldTxn.ackSid
  713. ELSE oldTxn.ackSid \union {target},
  714. epoch |-> oldTxn.epoch ]
  715. IN <<newTxn>> \o UpdateAcksidHelper( Tail(txns), target, endZxid)
  716. \* Atomically add ackSid of one learner according to zxid in ACKLD.
  717. UpdateAcksid(his, target, endZxid) == UpdateAcksidHelper(his, target, endZxid)
  718. (* Leader waits for receiving ACKLD from a quorum including itself,
  719. and broadcasts COMMITLD and turns to BROADCAST. *)
  720. LeaderProcessACKLD(i, j) ==
  721. /\ IsLeader(i)
  722. /\ PendingACKLD(i, j)
  723. /\ LET msg == msgs[j][i][1]
  724. infoOk == IsMyLearner(i, j)
  725. IN /\ infoOk
  726. /\ \/ \* 1. has not broadcast COMMITLD
  727. /\ ~AckldRecvQuorumFormed(i)
  728. /\ \/ /\ zabState[i] = SYNCHRONIZATION
  729. /\ UNCHANGED violatedInvariants
  730. \/ /\ zabState[i] /= SYNCHRONIZATION
  731. /\ PrintT("Exception: AckldRecvQuorumFormed false," \o
  732. " while zabState not SYNCHRONIZATION.")
  733. /\ violatedInvariants' = [violatedInvariants
  734. EXCEPT !.stateInconsistent = TRUE]
  735. /\ ackldRecv' = [ackldRecv EXCEPT ![i] = UpdateAckldRecv(@, j) ]
  736. /\ history' = [history EXCEPT ![i] = UpdateAcksid(@, j, msg.mzxid)]
  737. /\ \/ \* 1.1. ackldRecv becomes quorum,
  738. \* then broadcasts COMMITLD and turns to BROADCAST.
  739. /\ AckldRecvBecomeQuorum(i)
  740. /\ lastCommitted' = [lastCommitted EXCEPT
  741. ![i] = [ index |-> Len(history[i]),
  742. zxid |-> LastZxid(i) ] ]
  743. /\ zabState' = [zabState EXCEPT ![i] = BROADCAST]
  744. /\ LET m == [ mtype |-> COMMITLD,
  745. mzxid |-> LastZxid(i) ]
  746. IN DiscardAndBroadcastCOMMITLD(i, j, m)
  747. \/ \* 1.2. ackldRecv still not quorum.
  748. /\ ~AckldRecvBecomeQuorum(i)
  749. /\ Discard(j, i)
  750. /\ UNCHANGED <<zabState, lastCommitted>>
  751. \/ \* 2. has broadcast COMMITLD
  752. /\ AckldRecvQuorumFormed(i)
  753. /\ \/ /\ zabState[i] = BROADCAST
  754. /\ UNCHANGED violatedInvariants
  755. \/ /\ zabState[i] /= BROADCAST
  756. /\ PrintT("Exception: AckldRecvQuorumFormed true," \o
  757. " while zabState not BROADCAST.")
  758. /\ violatedInvariants' = [violatedInvariants
  759. EXCEPT !.stateInconsistent = TRUE]
  760. /\ ackldRecv' = [ackldRecv EXCEPT ![i] = UpdateAckldRecv(@, j) ]
  761. /\ history' = [history EXCEPT ![i] = UpdateAcksid(@, j, msg.mzxid)]
  762. /\ Reply(i, j, [ mtype |-> COMMITLD,
  763. mzxid |-> lastCommitted[i].zxid ])
  764. /\ UNCHANGED <<zabState, lastCommitted>>
  765. /\ UNCHANGED <<state, acceptedEpoch, currentEpoch, learners, cepochRecv, ackeRecv,
  766. sendCounter, followerVars, electionVars, proposalMsgsLog, epochLeader>>
  767. /\ UpdateRecorder(<<"LeaderProcessACKLD", i, j>>)
  768. RECURSIVE ZxidToIndexHelper(_,_,_,_)
  769. ZxidToIndexHelper(his, zxid, cur, appeared) ==
  770. IF cur > Len(his) THEN cur
  771. ELSE IF TxnZxidEqual(his[cur], zxid)
  772. THEN CASE appeared = TRUE -> -1
  773. [] OTHER -> Minimum( { cur,
  774. ZxidToIndexHelper(his, zxid, cur + 1, TRUE) } )
  775. ELSE ZxidToIndexHelper(his, zxid, cur + 1, appeared)
  776. \* return -1: this zxid appears at least twice. Len(his) + 1: does not exist.
  777. \* 1 - Len(his): exists and appears just once.
  778. ZxidToIndex(his, zxid) == IF ZxidEqual( zxid, <<0, 0>> ) THEN 0
  779. ELSE IF Len(his) = 0 THEN 1
  780. ELSE LET len == Len(his) IN
  781. IF \E idx \in 1..len: TxnZxidEqual(his[idx], zxid)
  782. THEN ZxidToIndexHelper(his, zxid, 1, FALSE)
  783. ELSE len + 1
  784. (* Follower receives COMMITLD. Commit all txns. *)
  785. FollowerProcessCOMMITLD(i, j) ==
  786. /\ IsFollower(i)
  787. /\ PendingCOMMITLD(i, j)
  788. /\ LET msg == msgs[j][i][1]
  789. infoOk == IsMyLeader(i, j)
  790. index == IF ZxidEqual(msg.mzxid, <<0, 0>>) THEN 0
  791. ELSE ZxidToIndex(history[i], msg.mzxid)
  792. logOk == index >= 0 /\ index <= Len(history[i])
  793. IN /\ infoOk
  794. /\ \/ /\ logOk
  795. /\ UNCHANGED violatedInvariants
  796. \/ /\ ~logOk
  797. /\ PrintT("Exception: zxid in COMMITLD not exists in history.")
  798. /\ violatedInvariants' = [violatedInvariants
  799. EXCEPT !.proposalInconsistent = TRUE]
  800. /\ lastCommitted' = [lastCommitted EXCEPT ![i] = [ index |-> index,
  801. zxid |-> msg.mzxid ] ]
  802. /\ zabState' = [zabState EXCEPT ![i] = BROADCAST]
  803. /\ Discard(j, i)
  804. /\ UNCHANGED <<state, acceptedEpoch, currentEpoch, history, leaderVars,
  805. followerVars, electionVars, proposalMsgsLog, epochLeader>>
  806. /\ UpdateRecorder(<<"FollowerProcessCOMMITLD", i, j>>)
  807. ----------------------------------------------------------------------------
  808. IncZxid(s, zxid) == IF currentEpoch[s] = zxid[1] THEN <<zxid[1], zxid[2] + 1>>
  809. ELSE <<currentEpoch[s], 1>>
  810. (* Leader receives client request.
  811. Note: In production, any server in traffic can receive requests and
  812. forward it to leader if necessary. We choose to let leader be
  813. the sole one who can receive write requests, to simplify spec
  814. and keep correctness at the same time. *)
  815. LeaderProcessRequest(i) ==
  816. /\ CheckTransactionNum \* test restrictions of transaction num
  817. /\ IsLeader(i)
  818. /\ zabState[i] = BROADCAST
  819. /\ LET request_value == GetRecorder("nClientRequest") \* unique value
  820. newTxn == [ zxid |-> IncZxid(i, LastZxid(i)),
  821. value |-> request_value,
  822. ackSid |-> {i},
  823. epoch |-> currentEpoch[i] ]
  824. IN history' = [history EXCEPT ![i] = Append(@, newTxn) ]
  825. /\ UNCHANGED <<state, zabState, acceptedEpoch, currentEpoch, lastCommitted,
  826. leaderVars, followerVars, electionVars, msgVars, verifyVars>>
  827. /\ UpdateRecorder(<<"LeaderProcessRequest", i>>)
  828. \* Latest counter existing in history.
  829. CurrentCounter(i) == IF LastZxid(i)[1] = currentEpoch[i] THEN LastZxid(i)[2]
  830. ELSE 0
  831. (* Leader broadcasts PROPOSE when sendCounter < currentCounter. *)
  832. LeaderBroadcastPROPOSE(i) ==
  833. /\ IsLeader(i)
  834. /\ zabState[i] = BROADCAST
  835. /\ sendCounter[i] < CurrentCounter(i) \* there exists proposal to be sent
  836. /\ LET toSendCounter == sendCounter[i] + 1
  837. toSendZxid == <<currentEpoch[i], toSendCounter>>
  838. toSendIndex == ZxidToIndex(history[i], toSendZxid)
  839. toSendTxn == history[i][toSendIndex]
  840. m_proposal == [ mtype |-> PROPOSE,
  841. mzxid |-> toSendTxn.zxid,
  842. mdata |-> toSendTxn.value ]
  843. m_proposal_forChecking == [ source |-> i,
  844. epoch |-> currentEpoch[i],
  845. zxid |-> toSendTxn.zxid,
  846. data |-> toSendTxn.value ]
  847. IN /\ sendCounter' = [sendCounter EXCEPT ![i] = toSendCounter]
  848. /\ Broadcast(i, m_proposal)
  849. /\ proposalMsgsLog' = proposalMsgsLog \union {m_proposal_forChecking}
  850. /\ UNCHANGED <<serverVars, learners, cepochRecv, ackeRecv, ackldRecv,
  851. followerVars, electionVars, epochLeader, violatedInvariants>>
  852. /\ UpdateRecorder(<<"LeaderBroadcastPROPOSE", i>>)
  853. IsNextZxid(curZxid, nextZxid) ==
  854. \/ \* first PROPOSAL in this epoch
  855. /\ nextZxid[2] = 1
  856. /\ curZxid[1] < nextZxid[1]
  857. \/ \* not first PROPOSAL in this epoch
  858. /\ nextZxid[2] > 1
  859. /\ curZxid[1] = nextZxid[1]
  860. /\ curZxid[2] + 1 = nextZxid[2]
  861. (* Follower processes PROPOSE, saves it in history and replies ACK. *)
  862. FollowerProcessPROPOSE(i, j) ==
  863. /\ IsFollower(i)
  864. /\ PendingPROPOSE(i, j)
  865. /\ LET msg == msgs[j][i][1]
  866. infoOk == IsMyLeader(i, j)
  867. isNext == IsNextZxid(LastZxid(i), msg.mzxid)
  868. newTxn == [ zxid |-> msg.mzxid,
  869. value |-> msg.mdata,
  870. ackSid |-> {},
  871. epoch |-> currentEpoch[i] ]
  872. m_ack == [ mtype |-> ACK,
  873. mzxid |-> msg.mzxid ]
  874. IN /\ infoOk
  875. /\ \/ /\ isNext
  876. /\ history' = [history EXCEPT ![i] = Append(@, newTxn)]
  877. /\ Reply(i, j, m_ack)
  878. /\ UNCHANGED violatedInvariants
  879. \/ /\ ~isNext
  880. /\ LET index == ZxidToIndex(history[i], msg.mzxid)
  881. exist == index > 0 /\ index <= Len(history[i])
  882. IN \/ /\ exist
  883. /\ UNCHANGED violatedInvariants
  884. \/ /\ ~exist
  885. /\ PrintT("Exception: Follower receives PROPOSE, while" \o
  886. " txn is neither the next nor exists in history.")
  887. /\ violatedInvariants' = [violatedInvariants EXCEPT
  888. !.proposalInconsistent = TRUE]
  889. /\ Discard(j, i)
  890. /\ UNCHANGED history
  891. /\ UNCHANGED <<state, zabState, acceptedEpoch, currentEpoch, lastCommitted,
  892. leaderVars, followerVars, electionVars, proposalMsgsLog, epochLeader>>
  893. /\ UpdateRecorder(<<"FollowerProcessPROPOSE", i, j>>)
  894. LeaderTryToCommit(s, index, zxid, newTxn, follower) ==
  895. LET allTxnsBeforeCommitted == lastCommitted[s].index >= index - 1
  896. \* Only when all proposals before zxid has been committed,
  897. \* this proposal can be permitted to be committed.
  898. hasAllQuorums == IsQuorum(newTxn.ackSid)
  899. \* In order to be committed, a proposal must be accepted
  900. \* by a quorum.
  901. ordered == lastCommitted[s].index + 1 = index
  902. \* Commit proposals in order.
  903. IN \/ /\ \* Current conditions do not satisfy committing the proposal.
  904. \/ ~allTxnsBeforeCommitted
  905. \/ ~hasAllQuorums
  906. /\ Discard(follower, s)
  907. /\ UNCHANGED <<violatedInvariants, lastCommitted>>
  908. \/ /\ allTxnsBeforeCommitted
  909. /\ hasAllQuorums
  910. /\ \/ /\ ~ordered
  911. /\ PrintT("Warn: Committing zxid " \o ToString(zxid) \o " not first.")
  912. /\ violatedInvariants' = [violatedInvariants EXCEPT
  913. !.commitInconsistent = TRUE]
  914. \/ /\ ordered
  915. /\ UNCHANGED violatedInvariants
  916. /\ lastCommitted' = [lastCommitted EXCEPT ![s] = [ index |-> index,
  917. zxid |-> zxid ] ]
  918. /\ LET m_commit == [ mtype |-> COMMIT,
  919. mzxid |-> zxid ]
  920. IN DiscardAndBroadcast(s, follower, m_commit)
  921. LastAckIndexFromFollower(i, j) ==
  922. LET set_index == {idx \in 1..Len(history[i]): j \in history[i][idx].ackSid }
  923. IN Maximum(set_index)
  924. (* Leader Keeps a count of acks for a particular proposal, and try to
  925. commit the proposal. If committed, COMMIT of proposal will be broadcast. *)
  926. LeaderProcessACK(i, j) ==
  927. /\ IsLeader(i)
  928. /\ PendingACK(i, j)
  929. /\ LET msg == msgs[j][i][1]
  930. infoOk == IsMyLearner(i, j)
  931. index == ZxidToIndex(history[i], msg.mzxid)
  932. exist == index >= 1 /\ index <= Len(history[i]) \* proposal exists in history
  933. outstanding == lastCommitted[i].index < Len(history[i]) \* outstanding not null
  934. hasCommitted == ~ZxidCompare(msg.mzxid, lastCommitted[i].zxid)
  935. ackIndex == LastAckIndexFromFollower(i, j)
  936. monotonicallyInc == \/ ackIndex = -1
  937. \/ ackIndex + 1 = index
  938. IN /\ infoOk
  939. /\ \/ /\ exist
  940. /\ monotonicallyInc
  941. /\ LET txn == history[i][index]
  942. txnAfterAddAck == [ zxid |-> txn.zxid,
  943. value |-> txn.value,
  944. ackSid |-> txn.ackSid \union {j} ,
  945. epoch |-> txn.epoch ]
  946. IN
  947. /\ history' = [history EXCEPT ![i][index] = txnAfterAddAck ]
  948. /\ \/ /\ \* Note: outstanding is 0.
  949. \* / proposal has already been committed.
  950. \/ ~outstanding
  951. \/ hasCommitted
  952. /\ Discard(j, i)
  953. /\ UNCHANGED <<violatedInvariants, lastCommitted>>
  954. \/ /\ outstanding
  955. /\ ~hasCommitted
  956. /\ LeaderTryToCommit(i, index, msg.mzxid, txnAfterAddAck, j)
  957. \/ /\ \/ ~exist
  958. \/ ~monotonicallyInc
  959. /\ PrintT("Exception: No such zxid. " \o
  960. " / ackIndex doesn't inc monotonically.")
  961. /\ violatedInvariants' = [violatedInvariants
  962. EXCEPT !.ackInconsistent = TRUE]
  963. /\ Discard(j, i)
  964. /\ UNCHANGED <<history, lastCommitted>>
  965. /\ UNCHANGED <<state, zabState, acceptedEpoch, currentEpoch, leaderVars,
  966. followerVars, electionVars, proposalMsgsLog, epochLeader>>
  967. /\ UpdateRecorder(<<"LeaderProcessACK", i, j>>)
  968. (* Follower processes COMMIT. *)
  969. FollowerProcessCOMMIT(i, j) ==
  970. /\ IsFollower(i)
  971. /\ PendingCOMMIT(i, j)
  972. /\ LET msg == msgs[j][i][1]
  973. infoOk == IsMyLeader(i, j)
  974. pending == lastCommitted[i].index < Len(history[i])
  975. IN /\ infoOk
  976. /\ \/ /\ ~pending
  977. /\ PrintT("Warn: Committing zxid without seeing txn.")
  978. /\ UNCHANGED <<lastCommitted, violatedInvariants>>
  979. \/ /\ pending
  980. /\ LET firstElement == history[i][lastCommitted[i].index + 1]
  981. match == ZxidEqual(firstElement.zxid, msg.mzxid)
  982. IN
  983. \/ /\ ~match
  984. /\ PrintT("Exception: Committing zxid not equals" \o
  985. " next pending txn zxid.")
  986. /\ violatedInvariants' = [violatedInvariants EXCEPT
  987. !.commitInconsistent = TRUE]
  988. /\ UNCHANGED lastCommitted
  989. \/ /\ match
  990. /\ lastCommitted' = [lastCommitted EXCEPT ![i] =
  991. [ index |-> lastCommitted[i].index + 1,
  992. zxid |-> firstElement.zxid ] ]
  993. /\ UNCHANGED violatedInvariants
  994. /\ Discard(j, i)
  995. /\ UNCHANGED <<state, zabState, acceptedEpoch, currentEpoch, history,
  996. leaderVars, followerVars, electionVars, proposalMsgsLog, epochLeader>>
  997. /\ UpdateRecorder(<<"FollowerProcessCOMMIT", i, j>>)
  998. ----------------------------------------------------------------------------
  999. (* Used to discard some messages which should not exist in network channel.
  1000. This action should not be triggered. *)
  1001. FilterNonexistentMessage(i) ==
  1002. /\ \E j \in Server \ {i}: /\ msgs[j][i] /= << >>
  1003. /\ LET msg == msgs[j][i][1]
  1004. IN
  1005. \/ /\ IsLeader(i)
  1006. /\ LET infoOk == IsMyLearner(i, j)
  1007. IN
  1008. \/ msg.mtype = NEWEPOCH
  1009. \/ msg.mtype = NEWLEADER
  1010. \/ msg.mtype = COMMITLD
  1011. \/ msg.mtype = PROPOSE
  1012. \/ msg.mtype = COMMIT
  1013. \/ /\ ~infoOk
  1014. /\ \/ msg.mtype = CEPOCH
  1015. \/ msg.mtype = ACKEPOCH
  1016. \/ msg.mtype = ACKLD
  1017. \/ msg.mtype = ACK
  1018. \/ /\ IsFollower(i)
  1019. /\ LET infoOk == IsMyLeader(i, j)
  1020. IN
  1021. \/ msg.mtype = CEPOCH
  1022. \/ msg.mtype = ACKEPOCH
  1023. \/ msg.mtype = ACKLD
  1024. \/ msg.mtype = ACK
  1025. \/ /\ ~infoOk
  1026. /\ \/ msg.mtype = NEWEPOCH
  1027. \/ msg.mtype = NEWLEADER
  1028. \/ msg.mtype = COMMITLD
  1029. \/ msg.mtype = PROPOSE
  1030. \/ msg.mtype = COMMIT
  1031. \/ IsLooking(i)
  1032. /\ Discard(j, i)
  1033. /\ violatedInvariants' = [violatedInvariants EXCEPT !.messageIllegal = TRUE]
  1034. /\ UNCHANGED <<serverVars, leaderVars, followerVars, electionVars,
  1035. proposalMsgsLog, epochLeader>>
  1036. /\ UnchangeRecorder
  1037. ----------------------------------------------------------------------------
  1038. \* Defines how the variables may transition.
  1039. Next ==
  1040. (* Election *)
  1041. \/ \E i \in Server: UpdateLeader(i)
  1042. \/ \E i \in Server: FollowLeader(i)
  1043. (* Abnormal situations like failure, network disconnection *)
  1044. \/ \E i, j \in Server: Timeout(i, j)
  1045. \/ \E i \in Server: Restart(i)
  1046. (* Zab module - Discovery and Synchronization part *)
  1047. \/ \E i, j \in Server: ConnectAndFollowerSendCEPOCH(i, j)
  1048. \/ \E i, j \in Server: LeaderProcessCEPOCH(i, j)
  1049. \/ \E i, j \in Server: FollowerProcessNEWEPOCH(i, j)
  1050. \/ \E i, j \in Server: LeaderProcessACKEPOCH(i, j)
  1051. \/ \E i, j \in Server: FollowerProcessNEWLEADER(i, j)
  1052. \/ \E i, j \in Server: LeaderProcessACKLD(i, j)
  1053. \/ \E i, j \in Server: FollowerProcessCOMMITLD(i, j)
  1054. (* Zab module - Broadcast part *)
  1055. \/ \E i \in Server: LeaderProcessRequest(i)
  1056. \/ \E i \in Server: LeaderBroadcastPROPOSE(i)
  1057. \/ \E i, j \in Server: FollowerProcessPROPOSE(i, j)
  1058. \/ \E i, j \in Server: LeaderProcessACK(i, j)
  1059. \/ \E i, j \in Server: FollowerProcessCOMMIT(i, j)
  1060. (* An action used to judge whether there are redundant messages in network *)
  1061. \/ \E i \in Server: FilterNonexistentMessage(i)
  1062. Spec == Init /\ [][Next]_vars
  1063. ----------------------------------------------------------------------------
  1064. \* Define safety properties of Zab.
  1065. ShouldNotBeTriggered == \A p \in DOMAIN violatedInvariants: violatedInvariants[p] = FALSE
  1066. \* There is most one established leader for a certain epoch.
  1067. Leadership1 == \A i, j \in Server:
  1068. /\ IsLeader(i) /\ zabState[i] \in {SYNCHRONIZATION, BROADCAST}
  1069. /\ IsLeader(j) /\ zabState[j] \in {SYNCHRONIZATION, BROADCAST}
  1070. /\ currentEpoch[i] = currentEpoch[j]
  1071. => i = j
  1072. Leadership2 == \A epoch \in 1..MAXEPOCH: Cardinality(epochLeader[epoch]) <= 1
  1073. \* PrefixConsistency: The prefix that have been committed
  1074. \* in history in any process is the same.
  1075. PrefixConsistency == \A i, j \in Server:
  1076. LET smaller == Minimum({lastCommitted[i].index, lastCommitted[j].index})
  1077. IN \/ smaller = 0
  1078. \/ /\ smaller > 0
  1079. /\ \A index \in 1..smaller:
  1080. TxnEqual(history[i][index], history[j][index])
  1081. \* Integrity: If some follower delivers one transaction, then some primary has broadcast it.
  1082. Integrity == \A i \in Server:
  1083. /\ IsFollower(i)
  1084. /\ lastCommitted[i].index > 0
  1085. => \A idx \in 1..lastCommitted[i].index: \E proposal \in proposalMsgsLog:
  1086. LET txn_proposal == [ zxid |-> proposal.zxid,
  1087. value |-> proposal.data ]
  1088. IN TxnEqual(history[i][idx], txn_proposal)
  1089. \* Agreement: If some follower f delivers transaction a and some follower f' delivers transaction b,
  1090. \* then f' delivers a or f delivers b.
  1091. Agreement == \A i, j \in Server:
  1092. /\ IsFollower(i) /\ lastCommitted[i].index > 0
  1093. /\ IsFollower(j) /\ lastCommitted[j].index > 0
  1094. =>
  1095. \A idx1 \in 1..lastCommitted[i].index, idx2 \in 1..lastCommitted[j].index :
  1096. \/ \E idx_j \in 1..lastCommitted[j].index:
  1097. TxnEqual(history[j][idx_j], history[i][idx1])
  1098. \/ \E idx_i \in 1..lastCommitted[i].index:
  1099. TxnEqual(history[i][idx_i], history[j][idx2])
  1100. \* Total order: If some follower delivers a before b, then any process that delivers b
  1101. \* must also deliver a and deliver a before b.
  1102. TotalOrder == \A i, j \in Server:
  1103. LET committed1 == lastCommitted[i].index
  1104. committed2 == lastCommitted[j].index
  1105. IN committed1 >= 2 /\ committed2 >= 2
  1106. => \A idx_i1 \in 1..(committed1 - 1) : \A idx_i2 \in (idx_i1 + 1)..committed1 :
  1107. LET logOk == \E idx \in 1..committed2 :
  1108. TxnEqual(history[i][idx_i2], history[j][idx])
  1109. IN \/ ~logOk
  1110. \/ /\ logOk
  1111. /\ \E idx_j2 \in 1..committed2 :
  1112. /\ TxnEqual(history[i][idx_i2], history[j][idx_j2])
  1113. /\ \E idx_j1 \in 1..(idx_j2 - 1):
  1114. TxnEqual(history[i][idx_i1], history[j][idx_j1])
  1115. \* Local primary order: If a primary broadcasts a before it broadcasts b, then a follower that
  1116. \* delivers b must also deliver a before b.
  1117. LocalPrimaryOrder == LET p_set(i, e) == {p \in proposalMsgsLog: /\ p.source = i
  1118. /\ p.epoch = e }
  1119. txn_set(i, e) == { [ zxid |-> p.zxid,
  1120. value |-> p.data ] : p \in p_set(i, e) }
  1121. IN \A i \in Server: \A e \in 1..currentEpoch[i]:
  1122. \/ Cardinality(txn_set(i, e)) < 2
  1123. \/ /\ Cardinality(txn_set(i, e)) >= 2
  1124. /\ \E txn1, txn2 \in txn_set(i, e):
  1125. \/ TxnEqual(txn1, txn2)
  1126. \/ /\ ~TxnEqual(txn1, txn2)
  1127. /\ LET TxnPre == IF ZxidCompare(txn1.zxid, txn2.zxid) THEN txn2 ELSE txn1
  1128. TxnNext == IF ZxidCompare(txn1.zxid, txn2.zxid) THEN txn1 ELSE txn2
  1129. IN \A j \in Server: /\ lastCommitted[j].index >= 2
  1130. /\ \E idx \in 1..lastCommitted[j].index:
  1131. TxnEqual(history[j][idx], TxnNext)
  1132. => \E idx2 \in 1..lastCommitted[j].index:
  1133. /\ TxnEqual(history[j][idx2], TxnNext)
  1134. /\ idx2 > 1
  1135. /\ \E idx1 \in 1..(idx2 - 1):
  1136. TxnEqual(history[j][idx1], TxnPre)
  1137. \* Global primary order: A follower f delivers both a with epoch e and b with epoch e', and e < e',
  1138. \* then f must deliver a before b.
  1139. GlobalPrimaryOrder == \A i \in Server: lastCommitted[i].index >= 2
  1140. => \A idx1, idx2 \in 1..lastCommitted[i].index:
  1141. \/ ~EpochPrecedeInTxn(history[i][idx1], history[i][idx2])
  1142. \/ /\ EpochPrecedeInTxn(history[i][idx1], history[i][idx2])
  1143. /\ idx1 < idx2
  1144. \* Primary integrity: If primary p broadcasts a and some follower f delivers b such that b has epoch
  1145. \* smaller than epoch of p, then p must deliver b before it broadcasts a.
  1146. PrimaryIntegrity == \A i, j \in Server: /\ IsLeader(i) /\ IsMyLearner(i, j)
  1147. /\ IsFollower(j) /\ IsMyLeader(j, i)
  1148. /\ zabState[i] = BROADCAST
  1149. /\ zabState[j] = BROADCAST
  1150. /\ lastCommitted[j].index >= 1
  1151. => \A idx_j \in 1..lastCommitted[j].index:
  1152. \/ history[j][idx_j].zxid[1] >= currentEpoch[i]
  1153. \/ /\ history[j][idx_j].zxid[1] < currentEpoch[i]
  1154. /\ \E idx_i \in 1..lastCommitted[i].index:
  1155. TxnEqual(history[i][idx_i], history[j][idx_j])
  1156. =============================================================================
  1157. \* Modification History
  1158. \* Last modified Tue Jan 31 20:40:11 CST 2023 by huangbinyu
  1159. \* Last modified Sat Dec 11 22:31:08 CST 2021 by Dell
  1160. \* Created Thu Dec 02 20:49:23 CST 2021 by Dell