|
@@ -132,7 +132,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
/*
|
|
/*
|
|
* Start up the ZooKeeper server.
|
|
* Start up the ZooKeeper server.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param args the port and data directory
|
|
* @param args the port and data directory
|
|
*/
|
|
*/
|
|
public static void main(String[] args) {
|
|
public static void main(String[] args) {
|
|
@@ -175,7 +175,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
/**
|
|
/**
|
|
* Creates a ZooKeeperServer instance. It sets everything up, but doesn't
|
|
* Creates a ZooKeeperServer instance. It sets everything up, but doesn't
|
|
* actually start listening for clients until run() is invoked.
|
|
* actually start listening for clients until run() is invoked.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param dataDir
|
|
* @param dataDir
|
|
* the directory to put the data
|
|
* the directory to put the data
|
|
* @throws IOException
|
|
* @throws IOException
|
|
@@ -210,7 +210,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Default constructor, relies on the config for its agrument values
|
|
* Default constructor, relies on the config for its agrument values
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public ZooKeeperServer(DataTreeBuilder treeBuilder) throws IOException {
|
|
public ZooKeeperServer(DataTreeBuilder treeBuilder) throws IOException {
|
|
@@ -267,39 +267,39 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
this.prefix = prefix;
|
|
this.prefix = prefix;
|
|
this.ascending = ascending;
|
|
this.ascending = ascending;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public int compare(File o1, File o2) {
|
|
public int compare(File o1, File o2) {
|
|
long z1 = getZxidFromName(o1.getName(), prefix);
|
|
long z1 = getZxidFromName(o1.getName(), prefix);
|
|
long z2 = getZxidFromName(o2.getName(), prefix);
|
|
long z2 = getZxidFromName(o2.getName(), prefix);
|
|
int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
|
|
int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
|
|
return ascending ? result : -result;
|
|
return ascending ? result : -result;
|
|
- }
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Sort the list of files. Recency as determined by the version component
|
|
|
|
|
|
+ * Sort the list of files. Recency as determined by the version component
|
|
* of the file name.
|
|
* of the file name.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param files array of files
|
|
* @param files array of files
|
|
- * @param prefix files not matching this prefix are assumed to have a
|
|
|
|
|
|
+ * @param prefix files not matching this prefix are assumed to have a
|
|
* version = -1)
|
|
* version = -1)
|
|
- * @param ascending true sorted in ascending order, false results in
|
|
|
|
|
|
+ * @param ascending true sorted in ascending order, false results in
|
|
* descending order
|
|
* descending order
|
|
* @return sorted input files
|
|
* @return sorted input files
|
|
*/
|
|
*/
|
|
- static List<File>
|
|
|
|
- sortDataDir(File[] files, String prefix, boolean ascending)
|
|
|
|
|
|
+ static List<File>
|
|
|
|
+ sortDataDir(File[] files, String prefix, boolean ascending)
|
|
{
|
|
{
|
|
List<File> filelist = Arrays.asList(files);
|
|
List<File> filelist = Arrays.asList(files);
|
|
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
|
|
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
|
|
return filelist;
|
|
return filelist;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Find the log file that starts at, or just before, the snapshot. Return
|
|
* Find the log file that starts at, or just before, the snapshot. Return
|
|
- * this and all subsequent logs. Results are ordered by zxid of file,
|
|
|
|
|
|
+ * this and all subsequent logs. Results are ordered by zxid of file,
|
|
* ascending order.
|
|
* ascending order.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param logDirList array of files
|
|
* @param logDirList array of files
|
|
* @param snapshotZxid return files at, or before this zxid
|
|
* @param snapshotZxid return files at, or before this zxid
|
|
* @return
|
|
* @return
|
|
@@ -346,23 +346,23 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
}
|
|
}
|
|
|
|
|
|
LOG.warn("Processing snapshot: " + f);
|
|
LOG.warn("Processing snapshot: " + f);
|
|
-
|
|
|
|
|
|
+
|
|
InputStream snapIS =
|
|
InputStream snapIS =
|
|
new BufferedInputStream(new FileInputStream(f));
|
|
new BufferedInputStream(new FileInputStream(f));
|
|
loadData(BinaryInputArchive.getArchive(snapIS));
|
|
loadData(BinaryInputArchive.getArchive(snapIS));
|
|
- snapIS.close();
|
|
|
|
-
|
|
|
|
|
|
+ snapIS.close();
|
|
|
|
+
|
|
dataTree.lastProcessedZxid = zxid;
|
|
dataTree.lastProcessedZxid = zxid;
|
|
-
|
|
|
|
|
|
+
|
|
// Apply the logs on/after the selected snapshot
|
|
// Apply the logs on/after the selected snapshot
|
|
File[] logfiles = getLogFiles(dataLogDir.listFiles(), zxid);
|
|
File[] logfiles = getLogFiles(dataLogDir.listFiles(), zxid);
|
|
for (File logfile : logfiles) {
|
|
for (File logfile : logfiles) {
|
|
LOG.warn("Processing log file: " + logfile);
|
|
LOG.warn("Processing log file: " + logfile);
|
|
-
|
|
|
|
|
|
+
|
|
InputStream logIS =
|
|
InputStream logIS =
|
|
new BufferedInputStream(new FileInputStream(logfile));
|
|
new BufferedInputStream(new FileInputStream(logfile));
|
|
zxid = playLog(BinaryInputArchive.getArchive(logIS));
|
|
zxid = playLog(BinaryInputArchive.getArchive(logIS));
|
|
- logIS.close();
|
|
|
|
|
|
+ logIS.close();
|
|
}
|
|
}
|
|
hzxid = zxid;
|
|
hzxid = zxid;
|
|
|
|
|
|
@@ -377,7 +377,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
public void loadData() throws IOException, InterruptedException {
|
|
public void loadData() throws IOException, InterruptedException {
|
|
loadSnapshotAndLogs();
|
|
loadSnapshotAndLogs();
|
|
-
|
|
|
|
|
|
+
|
|
// Clean up dead sessions
|
|
// Clean up dead sessions
|
|
LinkedList<Long> deadSessions = new LinkedList<Long>();
|
|
LinkedList<Long> deadSessions = new LinkedList<Long>();
|
|
for (long session : dataTree.getSessions()) {
|
|
for (long session : dataTree.getSessions()) {
|
|
@@ -402,8 +402,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
long id = ia.readLong("id");
|
|
long id = ia.readLong("id");
|
|
int to = ia.readInt("timeout");
|
|
int to = ia.readInt("timeout");
|
|
sessionsWithTimeouts.put(id, to);
|
|
sessionsWithTimeouts.put(id, to);
|
|
- ZooLog.logTextTraceMessage("loadData --- session in archive: " + id
|
|
|
|
- + " with timeout: " + to, ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
|
|
|
|
+ "loadData --- session in archive: " + id
|
|
|
|
+ + " with timeout: " + to);
|
|
count--;
|
|
count--;
|
|
}
|
|
}
|
|
dataTree.deserialize(ia, "tree");
|
|
dataTree.deserialize(ia, "tree");
|
|
@@ -438,21 +439,21 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
case OpCode.createSession:
|
|
case OpCode.createSession:
|
|
sessionsWithTimeouts.put(hdr.getClientId(),
|
|
sessionsWithTimeouts.put(hdr.getClientId(),
|
|
((CreateSessionTxn) txn).getTimeOut());
|
|
((CreateSessionTxn) txn).getTimeOut());
|
|
- ZooLog.logTextTraceMessage(
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG,
|
|
|
|
+ ZooTrace.SESSION_TRACE_MASK,
|
|
"playLog --- create session in log: "
|
|
"playLog --- create session in log: "
|
|
+ Long.toHexString(hdr.getClientId())
|
|
+ Long.toHexString(hdr.getClientId())
|
|
+ " with timeout: "
|
|
+ " with timeout: "
|
|
- + ((CreateSessionTxn) txn).getTimeOut(),
|
|
|
|
- ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ + ((CreateSessionTxn) txn).getTimeOut());
|
|
// give dataTree a chance to sync its lastProcessedZxid
|
|
// give dataTree a chance to sync its lastProcessedZxid
|
|
dataTree.processTxn(hdr, txn);
|
|
dataTree.processTxn(hdr, txn);
|
|
break;
|
|
break;
|
|
case OpCode.closeSession:
|
|
case OpCode.closeSession:
|
|
sessionsWithTimeouts.remove(hdr.getClientId());
|
|
sessionsWithTimeouts.remove(hdr.getClientId());
|
|
- ZooLog.logTextTraceMessage(
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG,
|
|
|
|
+ ZooTrace.SESSION_TRACE_MASK,
|
|
"playLog --- close session in log: "
|
|
"playLog --- close session in log: "
|
|
- + Long.toHexString(hdr.getClientId()),
|
|
|
|
- ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ + Long.toHexString(hdr.getClientId()));
|
|
dataTree.processTxn(hdr, txn);
|
|
dataTree.processTxn(hdr, txn);
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
@@ -473,7 +474,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
/**
|
|
/**
|
|
* maintains a list of last 500 or so committed requests. This is used for
|
|
* maintains a list of last 500 or so committed requests. This is used for
|
|
* fast follower synchronization.
|
|
* fast follower synchronization.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param r
|
|
* @param r
|
|
* committed request
|
|
* committed request
|
|
*/
|
|
*/
|
|
@@ -617,9 +618,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
public void snapshot() throws InterruptedException {
|
|
public void snapshot() throws InterruptedException {
|
|
long lastZxid = dataTree.lastProcessedZxid;
|
|
long lastZxid = dataTree.lastProcessedZxid;
|
|
- ZooLog.logTextTraceMessage(
|
|
|
|
- "Snapshotting: " + Long.toHexString(lastZxid),
|
|
|
|
- ZooLog.textTraceMask);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
|
|
|
|
+ "Snapshotting: " + Long.toHexString(lastZxid));
|
|
try {
|
|
try {
|
|
File f =new File(dataDir, "snapshot." + Long.toHexString(lastZxid));
|
|
File f =new File(dataDir, "snapshot." + Long.toHexString(lastZxid));
|
|
OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(f));
|
|
OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(f));
|
|
@@ -627,9 +627,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
snapshot(oa);
|
|
snapshot(oa);
|
|
sessOS.flush();
|
|
sessOS.flush();
|
|
sessOS.close();
|
|
sessOS.close();
|
|
- ZooLog.logTextTraceMessage(
|
|
|
|
- "Snapshotting finished: " + Long.toHexString(lastZxid),
|
|
|
|
- ZooLog.textTraceMask);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
|
|
|
|
+ "Snapshotting finished: " + Long.toHexString(lastZxid));
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Severe error, exiting",e);
|
|
LOG.error("Severe error, exiting",e);
|
|
// This is a severe error that we cannot recover from,
|
|
// This is a severe error that we cannot recover from,
|
|
@@ -659,8 +658,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
public void closeSession(long sessionId) throws KeeperException,
|
|
public void closeSession(long sessionId) throws KeeperException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
- ZooLog.logTextTraceMessage("ZooKeeperServer --- Session to be closed: "
|
|
|
|
- + Long.toHexString(sessionId), ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
|
|
|
|
+ "ZooKeeperServer --- Session to be closed: "
|
|
|
|
+ + Long.toHexString(sessionId));
|
|
// we do not want to wait for a session close. send it as soon as we
|
|
// we do not want to wait for a session close. send it as soon as we
|
|
// detect it!
|
|
// detect it!
|
|
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
|
|
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
|
|
@@ -668,8 +668,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
protected void killSession(long sessionId) {
|
|
protected void killSession(long sessionId) {
|
|
dataTree.killSession(sessionId);
|
|
dataTree.killSession(sessionId);
|
|
- ZooLog.logTextTraceMessage("ZooKeeperServer --- killSession: "
|
|
|
|
- + Long.toHexString(sessionId), ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
|
|
|
|
+ "ZooKeeperServer --- killSession: "
|
|
|
|
+ + Long.toHexString(sessionId));
|
|
if (sessionTracker != null) {
|
|
if (sessionTracker != null) {
|
|
sessionTracker.removeSession(sessionId);
|
|
sessionTracker.removeSession(sessionId);
|
|
}
|
|
}
|
|
@@ -677,9 +678,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
|
|
|
|
public void expire(long sessionId) {
|
|
public void expire(long sessionId) {
|
|
try {
|
|
try {
|
|
- ZooLog.logTextTraceMessage(
|
|
|
|
- "ZooKeeperServer --- Session to expire: " + Long.toHexString(sessionId),
|
|
|
|
- ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG,
|
|
|
|
+ ZooTrace.SESSION_TRACE_MASK,
|
|
|
|
+ "ZooKeeperServer --- Session to expire: " + Long.toHexString(sessionId));
|
|
closeSession(sessionId);
|
|
closeSession(sessionId);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.error("FIXMSG",e);
|
|
LOG.error("FIXMSG",e);
|
|
@@ -817,8 +818,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
|
|
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
|
|
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
|
|
int sessionTimeout) throws IOException, InterruptedException {
|
|
int sessionTimeout) throws IOException, InterruptedException {
|
|
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
|
|
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
|
|
- ZooLog.logTextTraceMessage("Session " + Long.toHexString(sessionId) +
|
|
|
|
- " is valid: " + rc,ZooLog.SESSION_TRACE_MASK);
|
|
|
|
|
|
+ ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
|
|
|
|
+ "Session " + Long.toHexString(sessionId) +
|
|
|
|
+ " is valid: " + rc);
|
|
cnxn.finishSessionInit(rc);
|
|
cnxn.finishSessionInit(rc);
|
|
}
|
|
}
|
|
|
|
|