|
@@ -83,9 +83,9 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
LeaderElectionBean jmxLeaderElectionBean;
|
|
|
QuorumCnxManager qcm;
|
|
|
|
|
|
- /* ZKDatabase is a top level member of quorumpeer
|
|
|
+ /* ZKDatabase is a top level member of quorumpeer
|
|
|
* which will be used in all the zookeeperservers
|
|
|
- * instantiated later. Also, it is created once on
|
|
|
+ * instantiated later. Also, it is created once on
|
|
|
* bootup and only thrown away in case of a truncate
|
|
|
* message from the leader
|
|
|
*/
|
|
@@ -104,7 +104,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this.addr = addr;
|
|
|
this.electionAddr = null;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public QuorumServer(long id, InetSocketAddress addr,
|
|
|
InetSocketAddress electionAddr, LearnerType type) {
|
|
|
this.id = id;
|
|
@@ -112,54 +112,54 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this.electionAddr = electionAddr;
|
|
|
this.type = type;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public InetSocketAddress addr;
|
|
|
|
|
|
public InetSocketAddress electionAddr;
|
|
|
-
|
|
|
+
|
|
|
public long id;
|
|
|
-
|
|
|
+
|
|
|
public LearnerType type = LearnerType.PARTICIPANT;
|
|
|
}
|
|
|
|
|
|
public enum ServerState {
|
|
|
LOOKING, FOLLOWING, LEADING, OBSERVING;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* A peer can either be participating, which implies that it is willing to
|
|
|
* both vote in instances of consensus and to elect or become a Leader, or
|
|
|
* it may be observing in which case it isn't.
|
|
|
- *
|
|
|
- * We need this distinction to decide which ServerState to move to when
|
|
|
- * conditions change (e.g. which state to become after LOOKING).
|
|
|
+ *
|
|
|
+ * We need this distinction to decide which ServerState to move to when
|
|
|
+ * conditions change (e.g. which state to become after LOOKING).
|
|
|
*/
|
|
|
public enum LearnerType {
|
|
|
PARTICIPANT, OBSERVER;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* To enable observers to have no identifier, we need a generic identifier
|
|
|
* at least for QuorumCnxManager. We use the following constant to as the
|
|
|
- * value of such a generic identifier.
|
|
|
+ * value of such a generic identifier.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
static final long OBSERVER_ID = Long.MAX_VALUE;
|
|
|
|
|
|
/*
|
|
|
* Record leader election time
|
|
|
*/
|
|
|
public long start_fle, end_fle;
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Default value of peer is participant
|
|
|
*/
|
|
|
private LearnerType learnerType = LearnerType.PARTICIPANT;
|
|
|
-
|
|
|
+
|
|
|
public LearnerType getLearnerType() {
|
|
|
return learnerType;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Sets the LearnerType both in the QuorumPeer and in the peerMap
|
|
|
*/
|
|
@@ -168,10 +168,10 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
if (quorumPeers.containsKey(this.myid)) {
|
|
|
this.quorumPeers.get(myid).type = p;
|
|
|
} else {
|
|
|
- LOG.error("Setting LearnerType to " + p + " but " + myid
|
|
|
+ LOG.error("Setting LearnerType to " + p + " but " + myid
|
|
|
+ " not in QuorumPeers. ");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
/**
|
|
|
* The servers that make up the cluster
|
|
@@ -180,13 +180,13 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
public int getQuorumSize(){
|
|
|
return getVotingView().size();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * QuorumVerifier implementation; default (majority).
|
|
|
+ * QuorumVerifier implementation; default (majority).
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
private QuorumVerifier quorumConfig;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* My id
|
|
|
*/
|
|
@@ -204,14 +204,14 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
* This is who I think the leader currently is.
|
|
|
*/
|
|
|
volatile private Vote currentVote;
|
|
|
-
|
|
|
+
|
|
|
public synchronized Vote getCurrentVote(){
|
|
|
return currentVote;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public synchronized void setCurrentVote(Vote v){
|
|
|
currentVote = v;
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
volatile boolean running = true;
|
|
|
|
|
@@ -251,8 +251,8 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
/**
|
|
|
* @deprecated As of release 3.4.0, this class has been deprecated, since
|
|
|
* it is used with one of the udp-based versions of leader election, which
|
|
|
- * we are also deprecating.
|
|
|
- *
|
|
|
+ * we are also deprecating.
|
|
|
+ *
|
|
|
* This class simply responds to requests for the current leader of this
|
|
|
* node.
|
|
|
* <p>
|
|
@@ -270,7 +270,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
|
|
|
volatile boolean running = true;
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
@@ -316,7 +316,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
break;
|
|
|
case OBSERVING:
|
|
|
// Do nothing, Observers keep themselves to
|
|
|
- // themselves.
|
|
|
+ // themselves.
|
|
|
break;
|
|
|
}
|
|
|
packet.setData(b);
|
|
@@ -360,30 +360,30 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
private FileTxnSnapLog logFactory = null;
|
|
|
|
|
|
private final QuorumStats quorumStats;
|
|
|
-
|
|
|
+
|
|
|
public QuorumPeer() {
|
|
|
super("QuorumPeer");
|
|
|
quorumStats = new QuorumStats(this);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* For backward compatibility purposes, we instantiate QuorumMaj by default.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
|
|
|
File dataLogDir, int electionType,
|
|
|
long myid, int tickTime, int initLimit, int syncLimit,
|
|
|
ServerCnxnFactory cnxnFactory) throws IOException {
|
|
|
- this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
|
|
|
- initLimit, syncLimit, cnxnFactory,
|
|
|
- new QuorumMaj(countParticipants(quorumPeers)));
|
|
|
+ this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
|
|
|
+ initLimit, syncLimit, cnxnFactory,
|
|
|
+ new QuorumMaj(countParticipants(quorumPeers)));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
|
|
|
File dataLogDir, int electionType,
|
|
|
long myid, int tickTime, int initLimit, int syncLimit,
|
|
|
- ServerCnxnFactory cnxnFactory,
|
|
|
+ ServerCnxnFactory cnxnFactory,
|
|
|
QuorumVerifier quorumConfig) throws IOException {
|
|
|
this();
|
|
|
this.cnxnFactory = cnxnFactory;
|
|
@@ -392,79 +392,79 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
this.myid = myid;
|
|
|
this.tickTime = tickTime;
|
|
|
this.initLimit = initLimit;
|
|
|
- this.syncLimit = syncLimit;
|
|
|
+ this.syncLimit = syncLimit;
|
|
|
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
|
|
|
this.zkDb = new ZKDatabase(this.logFactory);
|
|
|
if(quorumConfig == null)
|
|
|
this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
|
|
|
else this.quorumConfig = quorumConfig;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
QuorumStats quorumStats() {
|
|
|
return quorumStats;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public synchronized void start() {
|
|
|
loadDataBase();
|
|
|
- cnxnFactory.start();
|
|
|
+ cnxnFactory.start();
|
|
|
startLeaderElection();
|
|
|
super.start();
|
|
|
}
|
|
|
|
|
|
- private void loadDataBase() {
|
|
|
- try {
|
|
|
+ private void loadDataBase() {
|
|
|
+ try {
|
|
|
zkDb.loadDataBase();
|
|
|
|
|
|
// load the epochs
|
|
|
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
|
|
|
- long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
|
|
|
+ long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
|
|
|
try {
|
|
|
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
|
|
|
+ currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
|
|
|
} catch(FileNotFoundException e) {
|
|
|
- // pick a reasonable epoch number
|
|
|
- // this should only happen once when moving to a
|
|
|
- // new code version
|
|
|
- LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
|
|
|
- currentEpoch = epochOfZxid;
|
|
|
- writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
|
|
|
+ // pick a reasonable epoch number
|
|
|
+ // this should only happen once when moving to a
|
|
|
+ // new code version
|
|
|
+ LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
|
|
|
+ currentEpoch = epochOfZxid;
|
|
|
+ writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
|
|
|
}
|
|
|
if (epochOfZxid > currentEpoch) {
|
|
|
- throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
|
|
|
+ throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
|
|
|
}
|
|
|
try {
|
|
|
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
|
|
|
+ acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
|
|
|
} catch(FileNotFoundException e) {
|
|
|
- // pick a reasonable epoch number
|
|
|
- // this should only happen once when moving to a
|
|
|
- // new code version
|
|
|
- LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
|
|
|
- acceptedEpoch = epochOfZxid;
|
|
|
- writeLongToFile(CURRENT_EPOCH_FILENAME, acceptedEpoch);
|
|
|
+ // pick a reasonable epoch number
|
|
|
+ // this should only happen once when moving to a
|
|
|
+ // new code version
|
|
|
+ LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
|
|
|
+ acceptedEpoch = epochOfZxid;
|
|
|
+ writeLongToFile(CURRENT_EPOCH_FILENAME, acceptedEpoch);
|
|
|
}
|
|
|
if (acceptedEpoch < currentEpoch) {
|
|
|
- throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
|
|
|
+ throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
|
|
|
}
|
|
|
} catch(IOException ie) {
|
|
|
LOG.error("Unable to load database on disk", ie);
|
|
|
throw new RuntimeException("Unable to run quorum server ", ie);
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
ResponderThread responder;
|
|
|
-
|
|
|
+
|
|
|
synchronized public void stopLeaderElection() {
|
|
|
responder.running = false;
|
|
|
responder.interrupt();
|
|
|
}
|
|
|
synchronized public void startLeaderElection() {
|
|
|
- try {
|
|
|
- currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
|
|
|
- } catch(IOException e) {
|
|
|
- RuntimeException re = new RuntimeException(e.getMessage());
|
|
|
- re.setStackTrace(e.getStackTrace());
|
|
|
- throw re;
|
|
|
- }
|
|
|
+ try {
|
|
|
+ currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
|
|
|
+ } catch(IOException e) {
|
|
|
+ RuntimeException re = new RuntimeException(e.getMessage());
|
|
|
+ re.setStackTrace(e.getStackTrace());
|
|
|
+ throw re;
|
|
|
+ }
|
|
|
for (QuorumServer p : getView().values()) {
|
|
|
if (p.id == myid) {
|
|
|
myQuorumAddr = p.addr;
|
|
@@ -485,7 +485,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
this.electionAlg = createElectionAlgorithm(electionType);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Count the number of nodes in the map that could be followers.
|
|
|
* @param peers
|
|
@@ -500,7 +500,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
return count;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* This constructor is only used by the existing unit test code.
|
|
|
* It defaults to FileLogProvider persistence provider.
|
|
@@ -515,14 +515,14 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
|
|
|
new QuorumMaj(countParticipants(quorumPeers)));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* This constructor is only used by the existing unit test code.
|
|
|
* It defaults to FileLogProvider persistence provider.
|
|
|
*/
|
|
|
public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
|
|
|
File logDir, int clientPort, int electionAlg,
|
|
|
- long myid, int tickTime, int initLimit, int syncLimit,
|
|
|
+ long myid, int tickTime, int initLimit, int syncLimit,
|
|
|
QuorumVerifier quorumConfig)
|
|
|
throws IOException
|
|
|
{
|
|
@@ -531,41 +531,42 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
|
|
|
quorumConfig);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* returns the highest zxid that this host has seen
|
|
|
- *
|
|
|
+ *
|
|
|
* @return the highest zxid for this host
|
|
|
*/
|
|
|
public long getLastLoggedZxid() {
|
|
|
if (!zkDb.isInitialized()) {
|
|
|
- loadDataBase();
|
|
|
+ loadDataBase();
|
|
|
}
|
|
|
return zkDb.getDataTreeLastProcessedZxid();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public Follower follower;
|
|
|
public Leader leader;
|
|
|
public Observer observer;
|
|
|
|
|
|
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
|
|
|
- return new Follower(this, new FollowerZooKeeperServer(logFactory,
|
|
|
+ return new Follower(this, new FollowerZooKeeperServer(logFactory,
|
|
|
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
|
|
|
return new Leader(this, new LeaderZooKeeperServer(logFactory,
|
|
|
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
|
|
|
return new Observer(this, new ObserverZooKeeperServer(logFactory,
|
|
|
this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
protected Election createElectionAlgorithm(int electionAlgorithm){
|
|
|
Election le=null;
|
|
|
-
|
|
|
+
|
|
|
//TODO: use a factory rather than a switch
|
|
|
switch (electionAlgorithm) {
|
|
|
case 0:
|
|
@@ -593,11 +594,12 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
return le;
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
protected Election makeLEStrategy(){
|
|
|
LOG.debug("Initializing leader election protocol...");
|
|
|
if (getElectionType() == 0) {
|
|
|
electionAlg = new LeaderElection(this);
|
|
|
- }
|
|
|
+ }
|
|
|
return electionAlg;
|
|
|
}
|
|
|
|
|
@@ -608,7 +610,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
synchronized protected void setFollower(Follower newFollower){
|
|
|
follower=newFollower;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized protected void setObserver(Observer newObserver){
|
|
|
observer=newObserver;
|
|
|
}
|
|
@@ -709,7 +711,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
setObserver(makeObserver(logFactory));
|
|
|
observer.observeLeader();
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Unexpected exception",e );
|
|
|
+ LOG.warn("Unexpected exception",e );
|
|
|
} finally {
|
|
|
observer.shutdown();
|
|
|
setObserver(null);
|
|
@@ -771,57 +773,57 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
if(udpSocket != null) {
|
|
|
udpSocket.close();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(getElectionAlg() != null){
|
|
|
this.interrupt();
|
|
|
- getElectionAlg().shutdown();
|
|
|
+ getElectionAlg().shutdown();
|
|
|
}
|
|
|
try {
|
|
|
zkDb.close();
|
|
|
} catch (IOException ie) {
|
|
|
LOG.warn("Error closing logs ", ie);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* A 'view' is a node's current opinion of the membership of the entire
|
|
|
- * ensemble.
|
|
|
+ * ensemble.
|
|
|
*/
|
|
|
public Map<Long,QuorumPeer.QuorumServer> getView() {
|
|
|
return Collections.unmodifiableMap(this.quorumPeers);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Observers are not contained in this view, only nodes with
|
|
|
- * PeerType=PARTICIPANT.
|
|
|
+ * Observers are not contained in this view, only nodes with
|
|
|
+ * PeerType=PARTICIPANT.
|
|
|
*/
|
|
|
public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
|
|
|
- Map<Long,QuorumPeer.QuorumServer> ret =
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> ret =
|
|
|
new HashMap<Long, QuorumPeer.QuorumServer>();
|
|
|
Map<Long,QuorumPeer.QuorumServer> view = getView();
|
|
|
- for (QuorumServer server : view.values()) {
|
|
|
+ for (QuorumServer server : view.values()) {
|
|
|
if (server.type == LearnerType.PARTICIPANT) {
|
|
|
ret.put(server.id, server);
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
return ret;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Returns only observers, no followers.
|
|
|
*/
|
|
|
public Map<Long,QuorumPeer.QuorumServer> getObservingView() {
|
|
|
- Map<Long,QuorumPeer.QuorumServer> ret =
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> ret =
|
|
|
new HashMap<Long, QuorumPeer.QuorumServer>();
|
|
|
Map<Long,QuorumPeer.QuorumServer> view = getView();
|
|
|
- for (QuorumServer server : view.values()) {
|
|
|
+ for (QuorumServer server : view.values()) {
|
|
|
if (server.type == LearnerType.OBSERVER) {
|
|
|
ret.put(server.id, server);
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
return ret;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Check if a node is in the current view. With static membership, the
|
|
|
* result of this check will never change; only when dynamic membership
|
|
@@ -830,7 +832,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
public boolean viewContains(Long sid) {
|
|
|
return this.quorumPeers.containsKey(sid);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Only used by QuorumStats at the moment
|
|
|
*/
|
|
@@ -909,7 +911,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
}
|
|
|
return fac.getMaxClientCnxnsPerHost();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/** minimum session timeout in milliseconds */
|
|
|
public int getMinSessionTimeout() {
|
|
|
return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
|
|
@@ -953,28 +955,28 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
public int getTick() {
|
|
|
return tick;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Return QuorumVerifier object
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
public QuorumVerifier getQuorumVerifier(){
|
|
|
return quorumConfig;
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setQuorumVerifier(QuorumVerifier quorumConfig){
|
|
|
this.quorumConfig = quorumConfig;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Get an instance of LeaderElection
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
public Election getElectionAlg(){
|
|
|
return electionAlg;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Get the synclimit
|
|
|
*/
|
|
@@ -1021,11 +1023,11 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
|
|
|
public void setClientPortAddress(InetSocketAddress addr) {
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void setTxnFactory(FileTxnSnapLog factory) {
|
|
|
this.logFactory = factory;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public FileTxnSnapLog getTxnFactory() {
|
|
|
return this.logFactory;
|
|
|
}
|
|
@@ -1053,61 +1055,61 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
|
return qcm;
|
|
|
}
|
|
|
private long readLongFromFile(String name) throws IOException {
|
|
|
- File file = new File(logFactory.getSnapDir(), name);
|
|
|
- BufferedReader br = new BufferedReader(new FileReader(file));
|
|
|
- String line = "";
|
|
|
- try {
|
|
|
- line = br.readLine();
|
|
|
- return Long.parseLong(line);
|
|
|
- } catch(NumberFormatException e) {
|
|
|
- throw new IOException("Found " + line + " in " + file);
|
|
|
- } finally {
|
|
|
- br.close();
|
|
|
- }
|
|
|
+ File file = new File(logFactory.getSnapDir(), name);
|
|
|
+ BufferedReader br = new BufferedReader(new FileReader(file));
|
|
|
+ String line = "";
|
|
|
+ try {
|
|
|
+ line = br.readLine();
|
|
|
+ return Long.parseLong(line);
|
|
|
+ } catch(NumberFormatException e) {
|
|
|
+ throw new IOException("Found " + line + " in " + file);
|
|
|
+ } finally {
|
|
|
+ br.close();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private long acceptedEpoch = -1;
|
|
|
private long currentEpoch = -1;
|
|
|
|
|
|
- public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
|
|
|
+ public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
|
|
|
|
|
|
- public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
|
|
|
+ public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
|
|
|
|
|
|
private void writeLongToFile(String name, long value) throws IOException {
|
|
|
- File file = new File(logFactory.getSnapDir(), name);
|
|
|
- FileOutputStream out = new FileOutputStream(file);
|
|
|
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
|
|
|
- try {
|
|
|
- bw.write(Long.toString(value));
|
|
|
- bw.flush();
|
|
|
- out.getFD().sync();
|
|
|
- } finally {
|
|
|
- bw.close();
|
|
|
- }
|
|
|
+ File file = new File(logFactory.getSnapDir(), name);
|
|
|
+ FileOutputStream out = new FileOutputStream(file);
|
|
|
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
|
|
|
+ try {
|
|
|
+ bw.write(Long.toString(value));
|
|
|
+ bw.flush();
|
|
|
+ out.getFD().sync();
|
|
|
+ } finally {
|
|
|
+ bw.close();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public long getCurrentEpoch() throws IOException {
|
|
|
- if (currentEpoch == -1) {
|
|
|
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
|
|
|
- }
|
|
|
- return currentEpoch;
|
|
|
- }
|
|
|
-
|
|
|
- public long getAcceptedEpoch() throws IOException {
|
|
|
- if (acceptedEpoch == -1) {
|
|
|
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
|
|
|
- }
|
|
|
- return acceptedEpoch;
|
|
|
- }
|
|
|
-
|
|
|
- public void setCurrentEpoch(long e) throws IOException {
|
|
|
- currentEpoch = e;
|
|
|
- writeLongToFile(CURRENT_EPOCH_FILENAME, e);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public void setAcceptedEpoch(long e) throws IOException {
|
|
|
- acceptedEpoch = e;
|
|
|
- writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
|
|
|
- }
|
|
|
+ if (currentEpoch == -1) {
|
|
|
+ currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
|
|
|
+ }
|
|
|
+ return currentEpoch;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getAcceptedEpoch() throws IOException {
|
|
|
+ if (acceptedEpoch == -1) {
|
|
|
+ acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
|
|
|
+ }
|
|
|
+ return acceptedEpoch;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setCurrentEpoch(long e) throws IOException {
|
|
|
+ currentEpoch = e;
|
|
|
+ writeLongToFile(CURRENT_EPOCH_FILENAME, e);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setAcceptedEpoch(long e) throws IOException {
|
|
|
+ acceptedEpoch = e;
|
|
|
+ writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
|
|
|
+ }
|
|
|
}
|