|
@@ -23,6 +23,7 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
@@ -41,7 +42,7 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
|
|
|
public class QuorumPeerConfig {
|
|
|
private static final Logger LOG = Logger.getLogger(QuorumPeerConfig.class);
|
|
|
|
|
|
- protected int clientPort;
|
|
|
+ protected InetSocketAddress clientPortAddress;
|
|
|
protected String dataDir;
|
|
|
protected String dataLogDir;
|
|
|
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
|
|
@@ -61,7 +62,7 @@ public class QuorumPeerConfig {
|
|
|
protected HashMap<Long, Long> serverGroup = new HashMap<Long, Long>();
|
|
|
protected int numGroups = 0;
|
|
|
protected QuorumVerifier quorumVerifier;
|
|
|
-
|
|
|
+
|
|
|
protected LearnerType peerType = LearnerType.PARTICIPANT;
|
|
|
|
|
|
@SuppressWarnings("serial")
|
|
@@ -89,7 +90,7 @@ public class QuorumPeerConfig {
|
|
|
throw new IllegalArgumentException(configFile.toString()
|
|
|
+ " file is missing");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Properties cfg = new Properties();
|
|
|
FileInputStream in = new FileInputStream(configFile);
|
|
|
try {
|
|
@@ -97,7 +98,7 @@ public class QuorumPeerConfig {
|
|
|
} finally {
|
|
|
in.close();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
parseProperties(cfg);
|
|
|
} catch (IOException e) {
|
|
|
throw new ConfigException("Error processing " + path, e);
|
|
@@ -114,6 +115,8 @@ public class QuorumPeerConfig {
|
|
|
*/
|
|
|
public void parseProperties(Properties zkProp)
|
|
|
throws IOException, ConfigException {
|
|
|
+ int clientPort = 0;
|
|
|
+ String clientPortAddress = null;
|
|
|
for (Entry<Object, Object> entry : zkProp.entrySet()) {
|
|
|
String key = entry.getKey().toString().trim();
|
|
|
String value = entry.getValue().toString().trim();
|
|
@@ -123,6 +126,8 @@ public class QuorumPeerConfig {
|
|
|
dataLogDir = value;
|
|
|
} else if (key.equals("clientPort")) {
|
|
|
clientPort = Integer.parseInt(value);
|
|
|
+ } else if (key.equals("clientPortAddress")) {
|
|
|
+ clientPortAddress = value.trim();
|
|
|
} else if (key.equals("tickTime")) {
|
|
|
tickTime = Integer.parseInt(value);
|
|
|
} else if (key.equals("initLimit")) {
|
|
@@ -140,7 +145,7 @@ public class QuorumPeerConfig {
|
|
|
peerType = LearnerType.PARTICIPANT;
|
|
|
} else
|
|
|
{
|
|
|
- throw new ConfigException("Unrecognised peertype: " + value);
|
|
|
+ throw new ConfigException("Unrecognised peertype: " + value);
|
|
|
}
|
|
|
} else if (key.startsWith("server.")) {
|
|
|
int dot = key.indexOf('.');
|
|
@@ -174,14 +179,14 @@ public class QuorumPeerConfig {
|
|
|
electionAddr,type));
|
|
|
} else {
|
|
|
throw new ConfigException("Unrecognised peertype: " + value);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
} else if (key.startsWith("group")) {
|
|
|
int dot = key.indexOf('.');
|
|
|
long gid = Long.parseLong(key.substring(dot + 1));
|
|
|
-
|
|
|
+
|
|
|
numGroups++;
|
|
|
-
|
|
|
+
|
|
|
String parts[] = value.split(":");
|
|
|
for(String s : parts){
|
|
|
long sid = Long.parseLong(s);
|
|
@@ -189,8 +194,8 @@ public class QuorumPeerConfig {
|
|
|
throw new ConfigException("Server " + sid + "is in multiple groups");
|
|
|
else
|
|
|
serverGroup.put(sid, gid);
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
} else if(key.startsWith("weight")) {
|
|
|
int dot = key.indexOf('.');
|
|
|
long sid = Long.parseLong(key.substring(dot + 1));
|
|
@@ -199,7 +204,7 @@ public class QuorumPeerConfig {
|
|
|
System.setProperty("zookeeper." + key, value);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (dataDir == null) {
|
|
|
throw new IllegalArgumentException("dataDir is not set");
|
|
|
}
|
|
@@ -214,6 +219,13 @@ public class QuorumPeerConfig {
|
|
|
if (clientPort == 0) {
|
|
|
throw new IllegalArgumentException("clientPort is not set");
|
|
|
}
|
|
|
+ if (clientPortAddress != null) {
|
|
|
+ this.clientPortAddress = new InetSocketAddress(
|
|
|
+ InetAddress.getByName(clientPortAddress), clientPort);
|
|
|
+ } else {
|
|
|
+ this.clientPortAddress = new InetSocketAddress(clientPort);
|
|
|
+ }
|
|
|
+
|
|
|
if (tickTime == 0) {
|
|
|
throw new IllegalArgumentException("tickTime is not set");
|
|
|
}
|
|
@@ -239,35 +251,35 @@ public class QuorumPeerConfig {
|
|
|
/*
|
|
|
* Default of quorum config is majority
|
|
|
*/
|
|
|
- if(serverGroup.size() > 0){
|
|
|
+ if(serverGroup.size() > 0){
|
|
|
if(servers.size() != serverGroup.size())
|
|
|
throw new ConfigException("Every server must be in exactly one group");
|
|
|
- /*
|
|
|
- * The deafult weight of a server is 1
|
|
|
- */
|
|
|
- for(QuorumServer s : servers.values()){
|
|
|
- if(!serverWeight.containsKey(s.id))
|
|
|
- serverWeight.put(s.id, (long) 1);
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * Set the quorumVerifier to be QuorumHierarchical
|
|
|
- */
|
|
|
- quorumVerifier = new QuorumHierarchical(numGroups,
|
|
|
+ /*
|
|
|
+ * The deafult weight of a server is 1
|
|
|
+ */
|
|
|
+ for(QuorumServer s : servers.values()){
|
|
|
+ if(!serverWeight.containsKey(s.id))
|
|
|
+ serverWeight.put(s.id, (long) 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Set the quorumVerifier to be QuorumHierarchical
|
|
|
+ */
|
|
|
+ quorumVerifier = new QuorumHierarchical(numGroups,
|
|
|
serverWeight, serverGroup);
|
|
|
} else {
|
|
|
- /*
|
|
|
- * The default QuorumVerifier is QuorumMaj
|
|
|
- */
|
|
|
-
|
|
|
+ /*
|
|
|
+ * The default QuorumVerifier is QuorumMaj
|
|
|
+ */
|
|
|
+
|
|
|
LOG.info("Defaulting to majority quorums");
|
|
|
quorumVerifier = new QuorumMaj(servers.size());
|
|
|
}
|
|
|
-
|
|
|
- // Now add observers to servers, once the quorums have been
|
|
|
+
|
|
|
+ // Now add observers to servers, once the quorums have been
|
|
|
// figured out
|
|
|
servers.putAll(observers);
|
|
|
-
|
|
|
+
|
|
|
File myIdFile = new File(dataDir, "myid");
|
|
|
if (!myIdFile.exists()) {
|
|
|
throw new IllegalArgumentException(myIdFile.toString()
|
|
@@ -289,7 +301,7 @@ public class QuorumPeerConfig {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public int getClientPort() { return clientPort; }
|
|
|
+ public InetSocketAddress getClientPortAddress() { return clientPortAddress; }
|
|
|
public String getDataDir() { return dataDir; }
|
|
|
public String getDataLogDir() { return dataLogDir; }
|
|
|
public int getTickTime() { return tickTime; }
|
|
@@ -297,13 +309,13 @@ public class QuorumPeerConfig {
|
|
|
public int getInitLimit() { return initLimit; }
|
|
|
public int getSyncLimit() { return syncLimit; }
|
|
|
public int getElectionAlg() { return electionAlg; }
|
|
|
- public int getElectionPort() { return electionPort; }
|
|
|
+ public int getElectionPort() { return electionPort; }
|
|
|
public int getMaxClientCnxns() { return maxClientCnxns; }
|
|
|
-
|
|
|
- public QuorumVerifier getQuorumVerifier() {
|
|
|
+
|
|
|
+ public QuorumVerifier getQuorumVerifier() {
|
|
|
return quorumVerifier;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public Map<Long,QuorumServer> getServers() {
|
|
|
return Collections.unmodifiableMap(servers);
|
|
|
}
|