FileTxnSnapLog.java 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.zookeeper.server.persistence;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.util.Map;
  22. import java.util.concurrent.ConcurrentHashMap;
  23. import org.apache.jute.Record;
  24. import org.apache.log4j.Logger;
  25. import org.apache.zookeeper.ZooDefs.OpCode;
  26. import org.apache.zookeeper.server.DataTree;
  27. import org.apache.zookeeper.server.Request;
  28. import org.apache.zookeeper.server.ZooTrace;
  29. import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
  30. import org.apache.zookeeper.txn.CreateSessionTxn;
  31. import org.apache.zookeeper.txn.TxnHeader;
  32. /**
  33. * This is a helper class
  34. * above the implementations
  35. * of txnlog and snapshot
  36. * classes
  37. */
  38. public class FileTxnSnapLog {
  39. //the direcotry containing the
  40. //the transaction logs
  41. File dataDir;
  42. //the directory containing the
  43. //the snapshot directory
  44. File snapDir;
  45. TxnLog txnLog;
  46. SnapShot snapLog;
  47. private static final Logger LOG = Logger.getLogger(FileTxnSnapLog.class);
  48. /**
  49. * This listener helps
  50. * the external apis calling
  51. * restore to gather information
  52. * while the data is being
  53. * restored.
  54. */
  55. public interface PlayBackListener {
  56. void onTxnLoaded(TxnHeader hdr, Record rec);
  57. }
  58. /**
  59. * the constructor which takes the datadir and
  60. * snapdir.
  61. * @param dataDir the trasaction directory
  62. * @param snapDir the snapshot directory
  63. */
  64. public FileTxnSnapLog(File dataDir, File snapDir) {
  65. this.dataDir = dataDir;
  66. this.snapDir = snapDir;
  67. txnLog = new FileTxnLog(dataDir);
  68. snapLog = new FileSnap(snapDir);
  69. }
  70. /**
  71. * this function restors the server
  72. * database after reading from the
  73. * snapshots and transaction logs
  74. * @param dt the datatree to be restored
  75. * @param sessions the sessions to be restored
  76. * @param listener the playback listener to run on the
  77. * database restoration
  78. * @return the highest zxid restored
  79. * @throws IOException
  80. */
  81. public long restore(DataTree dt, Map<Long, Integer> sessions,
  82. PlayBackListener listener) throws IOException {
  83. snapLog.deserialize(dt, sessions);
  84. FileTxnLog txnLog = new FileTxnLog(dataDir);
  85. TxnIterator itr = txnLog.read(dt.lastProcessedZxid);
  86. long highestZxid = dt.lastProcessedZxid;
  87. TxnHeader hdr;
  88. while (true) {
  89. // iterator points to
  90. // the first valid txn when initialized
  91. hdr = itr.getHeader();
  92. if (hdr == null) {
  93. //empty logs
  94. return dt.lastProcessedZxid;
  95. }
  96. if (hdr.getZxid() <= highestZxid && highestZxid != 0) {
  97. LOG.error(highestZxid + "(higestZxid) >= "
  98. + hdr.getZxid() + "(next log) for type "
  99. + hdr.getType());
  100. } else {
  101. highestZxid = hdr.getZxid();
  102. }
  103. processTransaction(hdr,dt,sessions, itr.getTxn());
  104. if (!itr.next())
  105. break;
  106. }
  107. return highestZxid;
  108. }
  109. /**
  110. * process the transaction on the datatree
  111. * @param hdr the hdr of the transaction
  112. * @param dt the datatree to apply transaction to
  113. * @param sessions the sessions to be restored
  114. * @param txn the transaction to be applied
  115. */
  116. private void processTransaction(TxnHeader hdr,DataTree dt,
  117. Map<Long, Integer> sessions, Record txn){
  118. switch (hdr.getType()) {
  119. case OpCode.createSession:
  120. sessions.put(hdr.getClientId(),
  121. ((CreateSessionTxn) txn).getTimeOut());
  122. ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
  123. "playLog --- create session in log: "
  124. + Long.toHexString(hdr.getClientId())
  125. + " with timeout: "
  126. + ((CreateSessionTxn) txn).getTimeOut());
  127. // give dataTree a chance to sync its lastProcessedZxid
  128. dt.processTxn(hdr, txn);
  129. break;
  130. case OpCode.closeSession:
  131. sessions.remove(hdr.getClientId());
  132. ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
  133. "playLog --- close session in log: "
  134. + Long.toHexString(hdr.getClientId()));
  135. dt.processTxn(hdr, txn);
  136. break;
  137. default:
  138. dt.processTxn(hdr, txn);
  139. }
  140. }
  141. /**
  142. * the last logged zxid on the transaction logs
  143. * @return the last logged zxid
  144. */
  145. public long getLastLoggedZxid() {
  146. FileTxnLog txnLog = new FileTxnLog(dataDir);
  147. return txnLog.getLastLoggedZxid();
  148. }
  149. /**
  150. * save the datatree and the sessions into a snapshot
  151. * @param dataTree the datatree to be serialized onto disk
  152. * @param sessionsWithTimeouts the sesssion timeouts to be
  153. * serialized onto disk
  154. * @throws IOException
  155. */
  156. public void save(DataTree dataTree,
  157. ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) throws IOException {
  158. long lastZxid = dataTree.lastProcessedZxid;
  159. LOG.info("Snapshotting: " + Long.toHexString(lastZxid));
  160. File snapshot=new File(
  161. snapDir, Util.makeSnapshotName(lastZxid));
  162. snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
  163. }
  164. /**
  165. * truncate the transaction logs the zxid
  166. * specified
  167. * @param zxid the zxid to truncate the logs to
  168. * @return true if able to truncate the log, false if not
  169. * @throws IOException
  170. */
  171. public boolean truncateLog(long zxid) throws IOException {
  172. FileTxnLog txnLog = new FileTxnLog(dataDir);
  173. return txnLog.truncate(zxid);
  174. }
  175. /**
  176. * the most recent snapshot in the snapshot
  177. * directory
  178. * @return the file that contains the most
  179. * recent snapshot
  180. * @throws IOException
  181. */
  182. public File findMostRecentSnapshot() throws IOException {
  183. FileSnap snaplog = new FileSnap(snapDir);
  184. return snaplog.findMostRecentSnapshot();
  185. }
  186. /**
  187. * get the snapshot logs that are greater than
  188. * the given zxid
  189. * @param zxid the zxid that contains logs greater than
  190. * zxid
  191. * @return
  192. */
  193. public File[] getSnapshotLogs(long zxid) {
  194. return FileTxnLog.getLogFiles(dataDir.listFiles(), zxid);
  195. }
  196. /**
  197. * append the request to the transaction logs
  198. * @param si the request to be appended
  199. * @throws IOException
  200. */
  201. public void append(Request si) throws IOException {
  202. txnLog.append(si.hdr, si.txn);
  203. }
  204. /**
  205. * commit the transaction of logs
  206. * @throws IOException
  207. */
  208. public void commit() throws IOException {
  209. txnLog.commit();
  210. }
  211. /**
  212. * roll the transaction logs
  213. */
  214. public void rollLog() {
  215. txnLog.rollLog();
  216. }
  217. }