|
@@ -434,6 +434,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
//last proposed quorum verifier
|
|
|
public QuorumVerifier lastSeenQuorumVerifier = null;
|
|
|
|
|
|
+ // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
|
|
|
+ final Object QV_LOCK = new Object();
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* My id
|
|
|
*/
|
|
@@ -665,28 +669,40 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized InetSocketAddress getQuorumAddress(){
|
|
|
- return myQuorumAddr;
|
|
|
+ public InetSocketAddress getQuorumAddress(){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ return myQuorumAddr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public synchronized void setQuorumAddress(InetSocketAddress addr){
|
|
|
- myQuorumAddr = addr;
|
|
|
+ public void setQuorumAddress(InetSocketAddress addr){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ myQuorumAddr = addr;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public InetSocketAddress getElectionAddress(){
|
|
|
- return myElectionAddr;
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ return myElectionAddr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void setElectionAddress(InetSocketAddress addr){
|
|
|
- myElectionAddr = addr;
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ myElectionAddr = addr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public InetSocketAddress getClientAddress(){
|
|
|
- return myClientAddr;
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ return myClientAddr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void setClientAddress(InetSocketAddress addr){
|
|
|
- myClientAddr = addr;
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ myClientAddr = addr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private int electionType;
|
|
@@ -1396,25 +1412,32 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Return QuorumVerifier object for the last committed configuration
|
|
|
+ * Return QuorumVerifier object for the last committed configuration.
|
|
|
*/
|
|
|
-
|
|
|
- public synchronized QuorumVerifier getQuorumVerifier(){
|
|
|
- return quorumVerifier;
|
|
|
-
|
|
|
+ public QuorumVerifier getQuorumVerifier(){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ return quorumVerifier;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public synchronized QuorumVerifier getLastSeenQuorumVerifier(){
|
|
|
- return lastSeenQuorumVerifier;
|
|
|
+ /**
|
|
|
+ * Return QuorumVerifier object for the last proposed configuration.
|
|
|
+ */
|
|
|
+ public QuorumVerifier getLastSeenQuorumVerifier(){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ return lastSeenQuorumVerifier;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public synchronized void connectNewPeers(){
|
|
|
- if (qcm!=null && getQuorumVerifier()!=null && getLastSeenQuorumVerifier()!=null) {
|
|
|
- Map<Long, QuorumServer> committedView = getQuorumVerifier().getAllMembers();
|
|
|
- for (Entry<Long, QuorumServer> e: getLastSeenQuorumVerifier().getAllMembers().entrySet()){
|
|
|
- if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
|
|
|
- qcm.connectOne(e.getKey());
|
|
|
- }
|
|
|
+ private void connectNewPeers(){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier != null) {
|
|
|
+ Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
|
|
|
+ for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
|
|
|
+ if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
|
|
|
+ qcm.connectOne(e.getKey());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1431,73 +1454,76 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
|
|
return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
|
|
|
}
|
|
|
|
|
|
- public synchronized void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
|
|
|
- if (lastSeenQuorumVerifier!=null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
|
|
|
- LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
|
|
|
- ". Current version: " + quorumVerifier.getVersion());
|
|
|
-
|
|
|
- }
|
|
|
- // assuming that a version uniquely identifies a configuration, so if
|
|
|
- // version is the same, nothing to do here.
|
|
|
- if (lastSeenQuorumVerifier != null &&
|
|
|
- lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- lastSeenQuorumVerifier = qv;
|
|
|
- connectNewPeers();
|
|
|
- if (writeToDisk) {
|
|
|
- try {
|
|
|
- QuorumPeerConfig.writeDynamicConfig(
|
|
|
- getNextDynamicConfigFilename(), qv, true);
|
|
|
- } catch(IOException e){
|
|
|
- LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
+ public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
|
|
|
+ LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
|
|
|
+ ". Current version: " + quorumVerifier.getVersion());
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
|
|
|
- if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
|
|
|
- // this is normal. For example - server found out about new config through FastLeaderElection gossiping
|
|
|
- // and then got the same config in UPTODATE message so its already known
|
|
|
- LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() +
|
|
|
- ". Current version: " + quorumVerifier.getVersion());
|
|
|
- return quorumVerifier;
|
|
|
- }
|
|
|
- QuorumVerifier prevQV = quorumVerifier;
|
|
|
- quorumVerifier = qv;
|
|
|
- if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion()))
|
|
|
+ }
|
|
|
+ // assuming that a version uniquely identifies a configuration, so if
|
|
|
+ // version is the same, nothing to do here.
|
|
|
+ if (lastSeenQuorumVerifier != null &&
|
|
|
+ lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
lastSeenQuorumVerifier = qv;
|
|
|
-
|
|
|
- if (writeToDisk) {
|
|
|
- // some tests initialize QuorumPeer without a static config file
|
|
|
- if (configFilename != null) {
|
|
|
+ connectNewPeers();
|
|
|
+ if (writeToDisk) {
|
|
|
try {
|
|
|
- String dynamicConfigFilename = makeDynamicConfigFilename(
|
|
|
- qv.getVersion());
|
|
|
QuorumPeerConfig.writeDynamicConfig(
|
|
|
- dynamicConfigFilename, qv, false);
|
|
|
- QuorumPeerConfig.editStaticConfig(configFilename,
|
|
|
- dynamicConfigFilename,
|
|
|
- needEraseClientInfoFromStaticConfig());
|
|
|
+ getNextDynamicConfigFilename(), qv, true);
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Error closing file: ", e.getMessage());
|
|
|
+ LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
|
|
|
}
|
|
|
- } else {
|
|
|
- LOG.info("writeToDisk == true but configFilename == null");
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
|
|
|
+ synchronized (QV_LOCK) {
|
|
|
+ if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
|
|
|
+ // this is normal. For example - server found out about new config through FastLeaderElection gossiping
|
|
|
+ // and then got the same config in UPTODATE message so its already known
|
|
|
+ LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() +
|
|
|
+ ". Current version: " + quorumVerifier.getVersion());
|
|
|
+ return quorumVerifier;
|
|
|
+ }
|
|
|
+ QuorumVerifier prevQV = quorumVerifier;
|
|
|
+ quorumVerifier = qv;
|
|
|
+ if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion()))
|
|
|
+ lastSeenQuorumVerifier = qv;
|
|
|
+
|
|
|
+ if (writeToDisk) {
|
|
|
+ // some tests initialize QuorumPeer without a static config file
|
|
|
+ if (configFilename != null) {
|
|
|
+ try {
|
|
|
+ String dynamicConfigFilename = makeDynamicConfigFilename(
|
|
|
+ qv.getVersion());
|
|
|
+ QuorumPeerConfig.writeDynamicConfig(
|
|
|
+ dynamicConfigFilename, qv, false);
|
|
|
+ QuorumPeerConfig.editStaticConfig(configFilename,
|
|
|
+ dynamicConfigFilename,
|
|
|
+ needEraseClientInfoFromStaticConfig());
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error closing file: ", e.getMessage());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.info("writeToDisk == true but configFilename == null");
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()){
|
|
|
- QuorumPeerConfig.deleteFile( getNextDynamicConfigFilename() );
|
|
|
- }
|
|
|
- QuorumServer qs = qv.getAllMembers().get(getId());
|
|
|
- if (qs!=null){
|
|
|
- setQuorumAddress(qs.addr);
|
|
|
- setElectionAddress(qs.electionAddr);
|
|
|
- setClientAddress(qs.clientAddr);
|
|
|
+ if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
|
|
|
+ QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
|
|
|
+ }
|
|
|
+ QuorumServer qs = qv.getAllMembers().get(getId());
|
|
|
+ if (qs != null) {
|
|
|
+ setQuorumAddress(qs.addr);
|
|
|
+ setElectionAddress(qs.electionAddr);
|
|
|
+ setClientAddress(qs.clientAddr);
|
|
|
+ }
|
|
|
+ return prevQV;
|
|
|
}
|
|
|
- return prevQV;
|
|
|
}
|
|
|
|
|
|
private String makeDynamicConfigFilename(long version) {
|