|
@@ -22,63 +22,78 @@ import java.io.BufferedReader;
|
|
|
import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileReader;
|
|
|
+import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
-import org.apache.zookeeper.server.ServerConfig;
|
|
|
+import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
|
|
|
-public class QuorumPeerConfig extends ServerConfig {
|
|
|
+public class QuorumPeerConfig {
|
|
|
private static final Logger LOG = Logger.getLogger(QuorumPeerConfig.class);
|
|
|
|
|
|
- private int initLimit;
|
|
|
- private int syncLimit;
|
|
|
- private int electionAlg;
|
|
|
- private int electionPort;
|
|
|
- private HashMap<Long,QuorumServer> servers = null;
|
|
|
- private long serverId;
|
|
|
-
|
|
|
- protected QuorumPeerConfig(int port, String dataDir, String dataLogDir,
|
|
|
- int tickTime)
|
|
|
- {
|
|
|
- super(port, dataDir, dataLogDir, tickTime);
|
|
|
- }
|
|
|
+ protected int clientPort;
|
|
|
+ protected String dataDir;
|
|
|
+ protected String dataLogDir;
|
|
|
+ protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
|
|
|
+
|
|
|
+ protected int initLimit;
|
|
|
+ protected int syncLimit;
|
|
|
+ protected int electionAlg;
|
|
|
+ protected int electionPort;
|
|
|
+ protected final HashMap<Long,QuorumServer> servers =
|
|
|
+ new HashMap<Long, QuorumServer>();
|
|
|
|
|
|
- public static void parse(String[] args) throws Exception {
|
|
|
- if (instance != null)
|
|
|
- return;
|
|
|
- if (args.length != 1) {
|
|
|
- throw new IllegalArgumentException("Invalid usage.");
|
|
|
+ protected long serverId;
|
|
|
+
|
|
|
+ @SuppressWarnings("serial")
|
|
|
+ public static class ConfigException extends Exception {
|
|
|
+ public ConfigException(String msg) {
|
|
|
+ super(msg);
|
|
|
}
|
|
|
- File zooCfgFile = new File(args[0]);
|
|
|
- if (!zooCfgFile.exists()) {
|
|
|
- throw new IllegalArgumentException(zooCfgFile.toString()
|
|
|
- + " file is missing");
|
|
|
+ public ConfigException(String msg, Exception e) {
|
|
|
+ super(msg, e);
|
|
|
}
|
|
|
- Properties cfg = new Properties();
|
|
|
- FileInputStream zooCfgStream = new FileInputStream(zooCfgFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Parse a ZooKeeper configuration file
|
|
|
+ * @param path the patch of the configuration file
|
|
|
+ * @throws ConfigException error processing configuration
|
|
|
+ */
|
|
|
+ public void parse(String path) throws ConfigException {
|
|
|
+ File configFile = new File(path);
|
|
|
+
|
|
|
+ LOG.info("Reading configuration from: " + configFile);
|
|
|
+
|
|
|
try {
|
|
|
- cfg.load(zooCfgStream);
|
|
|
- } finally {
|
|
|
- zooCfgStream.close();
|
|
|
+ if (!configFile.exists()) {
|
|
|
+ throw new IllegalArgumentException(configFile.toString()
|
|
|
+ + " file is missing");
|
|
|
+ }
|
|
|
+
|
|
|
+ Properties cfg = new Properties();
|
|
|
+ FileInputStream in = new FileInputStream(configFile);
|
|
|
+ try {
|
|
|
+ cfg.load(in);
|
|
|
+ } finally {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ parseProperties(cfg);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ConfigException("Error processing " + path, e);
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
+ throw new ConfigException("Error processing " + path, e);
|
|
|
}
|
|
|
-
|
|
|
- parseProperties(cfg);
|
|
|
}
|
|
|
|
|
|
- protected static void parseProperties(Properties zkProp) throws Exception {
|
|
|
- HashMap<Long, QuorumServer> servers = new HashMap<Long, QuorumServer>();
|
|
|
- String dataDir = null;
|
|
|
- String dataLogDir = null;
|
|
|
- int clientPort = 0;
|
|
|
- int tickTime = 0;
|
|
|
- int initLimit = 0;
|
|
|
- int syncLimit = 0;
|
|
|
- int electionAlg = 3;
|
|
|
- int electionPort = 2182;
|
|
|
+ protected void parseProperties(Properties zkProp) throws IOException {
|
|
|
for (Entry<Object, Object> entry : zkProp.entrySet()) {
|
|
|
String key = entry.getKey().toString();
|
|
|
String value = entry.getValue().toString();
|
|
@@ -106,9 +121,9 @@ public class QuorumPeerConfig extends ServerConfig {
|
|
|
}
|
|
|
InetSocketAddress addr = new InetSocketAddress(parts[0],
|
|
|
Integer.parseInt(parts[1]));
|
|
|
- if (parts.length == 2)
|
|
|
+ if (parts.length == 2) {
|
|
|
servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
|
|
|
- else if (parts.length == 3) {
|
|
|
+ } else if (parts.length == 3) {
|
|
|
InetSocketAddress electionAddr = new InetSocketAddress(
|
|
|
parts[0], Integer.parseInt(parts[2]));
|
|
|
servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
|
|
@@ -135,19 +150,13 @@ public class QuorumPeerConfig extends ServerConfig {
|
|
|
if (tickTime == 0) {
|
|
|
throw new IllegalArgumentException("tickTime is not set");
|
|
|
}
|
|
|
- if (servers.size() > 1 && initLimit == 0) {
|
|
|
- throw new IllegalArgumentException("initLimit is not set");
|
|
|
- }
|
|
|
- if (servers.size() > 1 && syncLimit == 0) {
|
|
|
- throw new IllegalArgumentException("syncLimit is not set");
|
|
|
- }
|
|
|
- QuorumPeerConfig conf = new QuorumPeerConfig(clientPort, dataDir,
|
|
|
- dataLogDir, tickTime);
|
|
|
- conf.initLimit = initLimit;
|
|
|
- conf.syncLimit = syncLimit;
|
|
|
- conf.electionAlg = electionAlg;
|
|
|
- conf.servers = servers;
|
|
|
if (servers.size() > 1) {
|
|
|
+ if (initLimit == 0) {
|
|
|
+ throw new IllegalArgumentException("initLimit is not set");
|
|
|
+ }
|
|
|
+ if (syncLimit == 0) {
|
|
|
+ throw new IllegalArgumentException("syncLimit is not set");
|
|
|
+ }
|
|
|
/*
|
|
|
* If using FLE, then every server requires a separate election
|
|
|
* port.
|
|
@@ -173,48 +182,27 @@ public class QuorumPeerConfig extends ServerConfig {
|
|
|
br.close();
|
|
|
}
|
|
|
try {
|
|
|
- conf.serverId = Long.parseLong(myIdString);
|
|
|
+ serverId = Long.parseLong(myIdString);
|
|
|
} catch (NumberFormatException e) {
|
|
|
throw new IllegalArgumentException("serverid " + myIdString
|
|
|
+ " is not a number");
|
|
|
}
|
|
|
}
|
|
|
- instance = conf;
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected boolean isStandaloneServer(){
|
|
|
- return QuorumPeerConfig.getServers().size() <= 1;
|
|
|
}
|
|
|
|
|
|
- public static int getInitLimit() {
|
|
|
- assert instance instanceof QuorumPeerConfig;
|
|
|
- return ((QuorumPeerConfig)instance).initLimit;
|
|
|
- }
|
|
|
+ public int getClientPort() { return clientPort; }
|
|
|
+ public String getDataDir() { return dataDir; }
|
|
|
+ public String getDataLogDir() { return dataLogDir; }
|
|
|
+ public int getTickTime() { return tickTime; }
|
|
|
|
|
|
- public static int getSyncLimit() {
|
|
|
- assert instance instanceof QuorumPeerConfig;
|
|
|
- return ((QuorumPeerConfig)instance).syncLimit;
|
|
|
- }
|
|
|
+ public int getInitLimit() { return initLimit; }
|
|
|
+ public int getSyncLimit() { return syncLimit; }
|
|
|
+ public int getElectionAlg() { return electionAlg; }
|
|
|
+ public int getElectionPort() { return electionPort; }
|
|
|
|
|
|
- public static int getElectionAlg() {
|
|
|
- assert instance instanceof QuorumPeerConfig;
|
|
|
- return ((QuorumPeerConfig)instance).electionAlg;
|
|
|
- }
|
|
|
-
|
|
|
- public static HashMap<Long,QuorumServer> getServers() {
|
|
|
- assert instance instanceof QuorumPeerConfig;
|
|
|
- return ((QuorumPeerConfig)instance).servers;
|
|
|
+ public Map<Long,QuorumServer> getServers() {
|
|
|
+ return Collections.unmodifiableMap(servers);
|
|
|
}
|
|
|
|
|
|
- public static int getQuorumSize(){
|
|
|
- assert instance instanceof QuorumPeerConfig;
|
|
|
- return ((QuorumPeerConfig)instance).servers.size();
|
|
|
- }
|
|
|
-
|
|
|
- public static long getServerId() {
|
|
|
- assert instance instanceof QuorumPeerConfig;
|
|
|
- return ((QuorumPeerConfig)instance).serverId;
|
|
|
- }
|
|
|
+ public long getServerId() { return serverId; }
|
|
|
}
|