|
@@ -15,21 +15,8 @@
|
|
* See the License for the specific language governing permissions and
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
* limitations under the License.
|
|
*/
|
|
*/
|
|
-
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
-
|
|
|
|
-import static org.apache.zookeeper.server.ServerConfig.getClientPort;
|
|
|
|
-import static org.apache.zookeeper.server.ServerConfig.getDataDir;
|
|
|
|
-import static org.apache.zookeeper.server.ServerConfig.getDataLogDir;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getSyncLimit;
|
|
|
|
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getTickTime;
|
|
|
|
-
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.FileInputStream;
|
|
import java.io.FileInputStream;
|
|
@@ -42,10 +29,9 @@ import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
-import org.apache.log4j.Logger;
|
|
|
|
-
|
|
|
|
import org.apache.jute.BinaryInputArchive;
|
|
import org.apache.jute.BinaryInputArchive;
|
|
import org.apache.jute.InputArchive;
|
|
import org.apache.jute.InputArchive;
|
|
|
|
+import org.apache.log4j.Logger;
|
|
import org.apache.zookeeper.server.NIOServerCnxn;
|
|
import org.apache.zookeeper.server.NIOServerCnxn;
|
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
|
import org.apache.zookeeper.txn.TxnHeader;
|
|
import org.apache.zookeeper.txn.TxnHeader;
|
|
@@ -76,28 +62,6 @@ import org.apache.zookeeper.txn.TxnHeader;
|
|
* </pre>
|
|
* </pre>
|
|
*
|
|
*
|
|
* The request for the current leader will consist solely of an xid: int xid;
|
|
* The request for the current leader will consist solely of an xid: int xid;
|
|
- *
|
|
|
|
- * <h2>Configuration file</h2>
|
|
|
|
- *
|
|
|
|
- * When the main() method of this class is used to start the program, the file
|
|
|
|
- * "zoo.cfg" in the current directory will be used to obtain configuration
|
|
|
|
- * information. zoo.cfg is a Properties file, so keys and values are separated
|
|
|
|
- * by equals (=) and the key/value pairs are separated by new lines. The
|
|
|
|
- * following keys are used in the configuration file:
|
|
|
|
- * <ol>
|
|
|
|
- * <li>dataDir - The directory where the zookeeper data is stored.</li>
|
|
|
|
- * <li>clientPort - The port used to communicate with clients.</li>
|
|
|
|
- * <li>tickTime - The duration of a tick in milliseconds. This is the basic
|
|
|
|
- * unit of time in zookeeper.</li>
|
|
|
|
- * <li>initLimit - The maximum number of ticks that a follower will wait to
|
|
|
|
- * initially synchronize with a leader.</li>
|
|
|
|
- * <li>syncLimit - The maximum number of ticks that a follower will wait for a
|
|
|
|
- * message (including heartbeats) from the leader.</li>
|
|
|
|
- * <li>server.<i>id</i> - This is the host:port that the server with the
|
|
|
|
- * given id will use for the quorum protocol.</li>
|
|
|
|
- * </ol>
|
|
|
|
- * In addition to the zoo.cfg file. There is a file in the data directory called
|
|
|
|
- * "myid" that contains the server id as an ASCII decimal value.
|
|
|
|
*/
|
|
*/
|
|
public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
private static final Logger LOG = Logger.getLogger(QuorumPeer.class);
|
|
private static final Logger LOG = Logger.getLogger(QuorumPeer.class);
|
|
@@ -267,16 +231,24 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
*/
|
|
*/
|
|
private File dataLogDir;
|
|
private File dataLogDir;
|
|
|
|
|
|
|
|
+ private int electionType;
|
|
|
|
+
|
|
Election electionAlg;
|
|
Election electionAlg;
|
|
|
|
|
|
int electionPort;
|
|
int electionPort;
|
|
|
|
|
|
NIOServerCnxn.Factory cnxnFactory;
|
|
NIOServerCnxn.Factory cnxnFactory;
|
|
|
|
|
|
|
|
+
|
|
|
|
+ public QuorumPeer() {
|
|
|
|
+ super("QuorumPeer");
|
|
|
|
+ }
|
|
|
|
+
|
|
public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
|
|
public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
|
|
- File dataLogDir, int electionAlg, int electionPort,long myid, int tickTime,
|
|
|
|
|
|
+ File dataLogDir, int electionType, int electionPort,long myid, int tickTime,
|
|
int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
|
|
int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
|
|
- super("QuorumPeer");
|
|
|
|
|
|
+ this();
|
|
|
|
+ this.electionType = electionType;
|
|
this.cnxnFactory = cnxnFactory;
|
|
this.cnxnFactory = cnxnFactory;
|
|
this.quorumPeers = quorumPeers;
|
|
this.quorumPeers = quorumPeers;
|
|
this.dataDir = dataDir;
|
|
this.dataDir = dataDir;
|
|
@@ -285,7 +257,14 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
this.myid = myid;
|
|
this.myid = myid;
|
|
this.tickTime = tickTime;
|
|
this.tickTime = tickTime;
|
|
this.initLimit = initLimit;
|
|
this.initLimit = initLimit;
|
|
- this.syncLimit = syncLimit;
|
|
|
|
|
|
+ this.syncLimit = syncLimit;
|
|
|
|
+
|
|
|
|
+ QuorumStats.getInstance().setStatsProvider(this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void start() {
|
|
|
|
+
|
|
currentVote = new Vote(myid, getLastLoggedZxid());
|
|
currentVote = new Vote(myid, getLastLoggedZxid());
|
|
for (QuorumServer p : quorumPeers) {
|
|
for (QuorumServer p : quorumPeers) {
|
|
if (p.id == myid) {
|
|
if (p.id == myid) {
|
|
@@ -294,16 +273,20 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (myQuorumAddr == null) {
|
|
if (myQuorumAddr == null) {
|
|
- throw new SocketException("My id " + myid + " not in the peer list");
|
|
|
|
|
|
+ throw new RuntimeException("My id " + myid + " not in the peer list");
|
|
}
|
|
}
|
|
- if (electionAlg == 0) {
|
|
|
|
- udpSocket = new DatagramSocket(myQuorumAddr.getPort());
|
|
|
|
- new ResponderThread().start();
|
|
|
|
|
|
+ if (electionType == 0) {
|
|
|
|
+ try {
|
|
|
|
+ udpSocket = new DatagramSocket(myQuorumAddr.getPort());
|
|
|
|
+ new ResponderThread().start();
|
|
|
|
+ } catch (SocketException e) {
|
|
|
|
+ new RuntimeException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- this.electionAlg = createElectionAlgorithm(electionAlg);
|
|
|
|
- QuorumStats.getInstance().setStatsProvider(this);
|
|
|
|
|
|
+ this.electionAlg = createElectionAlgorithm(electionType);
|
|
|
|
+ super.start();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This constructor is only used by the existing unit test code.
|
|
* This constructor is only used by the existing unit test code.
|
|
*/
|
|
*/
|
|
@@ -313,26 +296,23 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime,
|
|
this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime,
|
|
initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
|
|
initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
|
|
}
|
|
}
|
|
- /**
|
|
|
|
- * The constructor uses the quorum peer config to instantiate the class
|
|
|
|
- */
|
|
|
|
- public QuorumPeer(NIOServerCnxn.Factory cnxnFactory) throws IOException {
|
|
|
|
- this(getServers(), new File(getDataDir()), new File(getDataLogDir()),
|
|
|
|
- getElectionAlg(), getElectionPort(),getServerId(),getTickTime(),
|
|
|
|
- getInitLimit(), getSyncLimit(),cnxnFactory);
|
|
|
|
- }
|
|
|
|
|
|
|
|
public Follower follower;
|
|
public Follower follower;
|
|
public Leader leader;
|
|
public Leader leader;
|
|
|
|
|
|
|
|
+ private int clientPort;
|
|
|
|
+
|
|
protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
|
|
protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
|
|
- return new Follower(this, new FollowerZooKeeperServer(dataDir,
|
|
|
|
- dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder()));
|
|
|
|
|
|
+ FollowerZooKeeperServer zks = new FollowerZooKeeperServer(dataDir, dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder());
|
|
|
|
+ zks.setClientPort(clientPort);
|
|
|
|
+ return new Follower(this, zks);
|
|
}
|
|
}
|
|
|
|
|
|
protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
|
|
protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
|
|
- return new Leader(this, new LeaderZooKeeperServer(dataDir, dataLogDir,
|
|
|
|
- this,new ZooKeeperServer.BasicDataTreeBuilder()));
|
|
|
|
|
|
+ LeaderZooKeeperServer zks = new LeaderZooKeeperServer(dataDir, dataLogDir,
|
|
|
|
+ this,new ZooKeeperServer.BasicDataTreeBuilder());
|
|
|
|
+ zks.setClientPort(clientPort);
|
|
|
|
+ return new Leader(this, zks);
|
|
}
|
|
}
|
|
|
|
|
|
private Election createElectionAlgorithm(int electionAlgorithm){
|
|
private Election createElectionAlgorithm(int electionAlgorithm){
|
|
@@ -502,18 +482,6 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
return zxid;
|
|
return zxid;
|
|
}
|
|
}
|
|
|
|
|
|
- public static void runPeer(QuorumPeer.Factory qpFactory) {
|
|
|
|
- try {
|
|
|
|
- QuorumStats.registerAsConcrete();
|
|
|
|
- QuorumPeer self = qpFactory.create(qpFactory.createConnectionFactory());
|
|
|
|
- self.start();
|
|
|
|
- self.join();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.fatal("Unexpected exception",e);
|
|
|
|
- }
|
|
|
|
- System.exit(2);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
public String[] getQuorumPeers() {
|
|
public String[] getQuorumPeers() {
|
|
List<String> l = new ArrayList<String>();
|
|
List<String> l = new ArrayList<String>();
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
@@ -547,27 +515,138 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
|
|
return QuorumStats.Provider.UNKNOWN_STATE;
|
|
return QuorumStats.Provider.UNKNOWN_STATE;
|
|
}
|
|
}
|
|
|
|
|
|
- public static void main(String args[]) {
|
|
|
|
- if (args.length == 2) {
|
|
|
|
- ZooKeeperServer.main(args);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- QuorumPeerConfig.parse(args);
|
|
|
|
|
|
+ /**
|
|
|
|
+ * get the id of this quorum peer.
|
|
|
|
+ */
|
|
|
|
+ public long getMyid() {
|
|
|
|
+ return myid;
|
|
|
|
+ }
|
|
|
|
|
|
- if (!QuorumPeerConfig.isStandalone()) {
|
|
|
|
- runPeer(new QuorumPeer.Factory() {
|
|
|
|
- public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
|
|
|
|
- throws IOException {
|
|
|
|
- return new QuorumPeer(cnxnFactory);
|
|
|
|
- }
|
|
|
|
- public NIOServerCnxn.Factory createConnectionFactory()
|
|
|
|
- throws IOException {
|
|
|
|
- return new NIOServerCnxn.Factory(getClientPort());
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- }else{
|
|
|
|
- // there is only server in the quorum -- run as standalone
|
|
|
|
- ZooKeeperServer.main(args);
|
|
|
|
- }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * set the id of this quorum peer.
|
|
|
|
+ */
|
|
|
|
+ public void setMyid(long myid) {
|
|
|
|
+ this.myid = myid;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the number of milliseconds of each tick
|
|
|
|
+ */
|
|
|
|
+ public int getTickTime() {
|
|
|
|
+ return tickTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Set the number of milliseconds of each tick
|
|
|
|
+ */
|
|
|
|
+ public void setTickTime(int tickTime) {
|
|
|
|
+ this.tickTime = tickTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the number of ticks that the initial synchronization phase can take
|
|
|
|
+ */
|
|
|
|
+ public int getInitLimit() {
|
|
|
|
+ return initLimit;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Set the number of ticks that the initial synchronization phase can take
|
|
|
|
+ */
|
|
|
|
+ public void setInitLimit(int initLimit) {
|
|
|
|
+ this.initLimit = initLimit;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the number of ticks that can pass between sending a request and getting
|
|
|
|
+ * an acknowledgement
|
|
|
|
+ */
|
|
|
|
+ public int getSyncLimit() {
|
|
|
|
+ return syncLimit;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Set the number of ticks that can pass between sending a request and getting
|
|
|
|
+ * an acknowledgement
|
|
|
|
+ */
|
|
|
|
+ public void setSyncLimit(int syncLimit) {
|
|
|
|
+ this.syncLimit = syncLimit;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the directory where the snapshot is stored.
|
|
|
|
+ */
|
|
|
|
+ public File getDataDir() {
|
|
|
|
+ return dataDir;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Set the directory where the snapshot is stored.
|
|
|
|
+ */
|
|
|
|
+ public void setDataDir(File dataDir) {
|
|
|
|
+ this.dataDir = dataDir;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the directory where the logs are stored.
|
|
|
|
+ */
|
|
|
|
+ public File getDataLogDir() {
|
|
|
|
+ return dataLogDir;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Set the directory where the logs are stored.
|
|
|
|
+ */
|
|
|
|
+ public void setDataLogDir(File dataLogDir) {
|
|
|
|
+ this.dataLogDir = dataLogDir;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Gets the election port
|
|
|
|
+ */
|
|
|
|
+ public int getElectionPort() {
|
|
|
|
+ return electionPort;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Gets the election type
|
|
|
|
+ */
|
|
|
|
+ public int getElectionType() {
|
|
|
|
+ return electionType;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Sets the election type
|
|
|
|
+ */
|
|
|
|
+ public void setElectionType(int electionType) {
|
|
|
|
+ this.electionType = electionType;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Sets the election port
|
|
|
|
+ */
|
|
|
|
+ public void setElectionPort(int electionPort) {
|
|
|
|
+ this.electionPort = electionPort;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public NIOServerCnxn.Factory getCnxnFactory() {
|
|
|
|
+ return cnxnFactory;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
|
|
|
|
+ this.cnxnFactory = cnxnFactory;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setQuorumPeers(ArrayList<QuorumServer> quorumPeers) {
|
|
|
|
+ this.quorumPeers = quorumPeers;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int getClientPort() {
|
|
|
|
+ return clientPort;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setClientPort(int clientPort) {
|
|
|
|
+ this.clientPort = clientPort;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|