SyncRequestProcessor.java 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.zookeeper.server;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.File;
  21. import java.io.FileOutputStream;
  22. import java.io.IOException;
  23. import java.nio.ByteBuffer;
  24. import java.nio.channels.FileChannel;
  25. import java.util.LinkedList;
  26. import java.util.Random;
  27. import java.util.concurrent.LinkedBlockingQueue;
  28. import org.apache.log4j.Logger;
  29. import org.apache.jute.BinaryOutputArchive;
  30. import org.apache.jute.Record;
  31. import org.apache.zookeeper.server.util.Profiler;
  32. import org.apache.zookeeper.txn.TxnHeader;
  33. /**
  34. * This RequestProcessor logs requests to disk. It batches the requests to do
  35. * the io efficiently. The request is not passed to the next RequestProcessor
  36. * until its log has been synced to disk.
  37. */
  38. public class SyncRequestProcessor extends Thread implements RequestProcessor {
  39. private static final Logger LOG = Logger.getLogger(SyncRequestProcessor.class);
  40. static final int PADDING_TIMEOUT=1000;
  41. ZooKeeperServer zks;
  42. LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
  43. static boolean forceSync;
  44. static {
  45. forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals(
  46. "no");
  47. }
  48. static long preAllocSize = 65536 * 1024;
  49. static {
  50. String size = System.getProperty("zookeeper.preAllocSize");
  51. if (size != null) {
  52. try {
  53. preAllocSize = Long.parseLong(size) * 1024;
  54. } catch (NumberFormatException e) {
  55. LOG.warn(size + " is not a valid value for preAllocSize");
  56. }
  57. }
  58. }
  59. /**
  60. * The number of log entries to log before starting a snapshot
  61. */
  62. static public int snapCount = ZooKeeperServer.getSnapCount();
  63. Thread snapInProcess;
  64. RequestProcessor nextProcessor;
  65. boolean timeToDie = false;
  66. public SyncRequestProcessor(ZooKeeperServer zks,
  67. RequestProcessor nextProcessor) {
  68. super("SyncThread");
  69. this.zks = zks;
  70. this.nextProcessor = nextProcessor;
  71. start();
  72. }
  73. /**
  74. * Transactions that have been written and are waiting to be flushed to
  75. * disk. Basically this is the list of SyncItems whose callbacks will be
  76. * invoked after flush returns successfully.
  77. */
  78. LinkedList<Request> toFlush = new LinkedList<Request>();
  79. FileOutputStream logStream;
  80. BinaryOutputArchive logArchive;
  81. Random r = new Random(System.nanoTime());
  82. int logCount = 0;
  83. Request requestOfDeath = Request.requestOfDeath;
  84. private static ByteBuffer fill = ByteBuffer.allocateDirect(1024);
  85. LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
  86. private long padLogFile(FileChannel fc,long fileSize) throws IOException{
  87. long position = fc.position();
  88. // We pad the file in 1M chunks to avoid syncing to
  89. // write the new filesize.
  90. if (position + 4096 >= fileSize) {
  91. fileSize = fileSize + preAllocSize;
  92. fill.position(0);
  93. fc.write(fill, fileSize);
  94. }
  95. return fileSize;
  96. }
  97. public void run() {
  98. try {
  99. long fileSize = 0;
  100. long lastZxidSeen = -1;
  101. FileChannel fc = null;
  102. while (true) {
  103. Request si = null;
  104. if (toFlush.isEmpty()) {
  105. si = queuedRequests.take();
  106. } else {
  107. si = queuedRequests.poll();
  108. if (si == null) {
  109. flush(toFlush);
  110. continue;
  111. }
  112. }
  113. if (si == requestOfDeath) {
  114. break;
  115. }
  116. if (si != null) {
  117. // LOG.warn("Sync>>> cxid = " + si.cxid + " type = " +
  118. // si.type + " id = " + si.sessionId + " zxid = " +
  119. // Long.toHexString(si.zxid));
  120. ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
  121. 'S', si, "");
  122. TxnHeader hdr = si.hdr;
  123. if (hdr != null) {
  124. if (hdr.getZxid() <= lastZxidSeen) {
  125. LOG.warn("Current zxid " + hdr.getZxid()
  126. + " is <= " + lastZxidSeen + " for "
  127. + hdr.getType());
  128. }
  129. Record txn = si.txn;
  130. if (logStream == null) {
  131. fileSize = 0;
  132. logStream = new FileOutputStream(new File(
  133. zks.dataLogDir, ZooKeeperServer
  134. .getLogName(hdr.getZxid())));
  135. synchronized (streamsToFlush) {
  136. streamsToFlush.add(logStream);
  137. }
  138. fc = logStream.getChannel();
  139. logArchive = BinaryOutputArchive
  140. .getArchive(logStream);
  141. }
  142. final long fsize=fileSize;
  143. final FileChannel ffc=fc;
  144. fileSize = Profiler.profile(
  145. new Profiler.Operation<Long>() {
  146. public Long execute() throws Exception {
  147. return SyncRequestProcessor.this
  148. .padLogFile(ffc, fsize);
  149. }
  150. }, PADDING_TIMEOUT,
  151. "Logfile padding exceeded time threshold"
  152. );
  153. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  154. BinaryOutputArchive boa = BinaryOutputArchive
  155. .getArchive(baos);
  156. hdr.serialize(boa, "hdr");
  157. if (txn != null) {
  158. txn.serialize(boa, "txn");
  159. }
  160. logArchive.writeBuffer(baos.toByteArray(), "txnEntry");
  161. logArchive.writeByte((byte) 0x42, "EOR");
  162. logCount++;
  163. if (logCount > snapCount / 2
  164. && r.nextInt(snapCount / 2) == 0) {
  165. // We just want one snapshot going at a time
  166. if (snapInProcess != null
  167. && snapInProcess.isAlive()) {
  168. LOG.warn("Too busy to snap, skipping");
  169. } else {
  170. logStream = null;
  171. logArchive = null;
  172. snapInProcess = new Thread() {
  173. public void run() {
  174. try {
  175. zks.snapshot();
  176. } catch (Exception e) {
  177. LOG.warn("Unexpected exception",e);
  178. }
  179. }
  180. };
  181. snapInProcess.start();
  182. }
  183. logCount = 0;
  184. }
  185. }
  186. toFlush.add(si);
  187. if (toFlush.size() > 1000) {
  188. flush(toFlush);
  189. }
  190. }
  191. }
  192. } catch (Exception e) {
  193. LOG.error("Severe error, exiting",e);
  194. System.exit(11);
  195. }
  196. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  197. "SyncRequestProcessor exiyed!");
  198. }
  199. private void flush(LinkedList<Request> toFlush) throws IOException {
  200. if (toFlush.size() == 0) {
  201. return;
  202. }
  203. LinkedList<FileOutputStream> streamsToFlushNow;
  204. synchronized (streamsToFlush) {
  205. streamsToFlushNow = (LinkedList<FileOutputStream>) streamsToFlush
  206. .clone();
  207. }
  208. for (FileOutputStream fos : streamsToFlushNow) {
  209. fos.flush();
  210. if (forceSync) {
  211. ((FileChannel) fos.getChannel()).force(false);
  212. }
  213. }
  214. while (streamsToFlushNow.size() > 1) {
  215. FileOutputStream fos = streamsToFlushNow.removeFirst();
  216. fos.close();
  217. synchronized (streamsToFlush) {
  218. streamsToFlush.remove(fos);
  219. }
  220. }
  221. while (toFlush.size() > 0) {
  222. Request i = toFlush.remove();
  223. nextProcessor.processRequest(i);
  224. }
  225. }
  226. public void shutdown() {
  227. timeToDie = true;
  228. queuedRequests.add(requestOfDeath);
  229. nextProcessor.shutdown();
  230. }
  231. public void processRequest(Request request) {
  232. // request.addRQRec(">sync");
  233. queuedRequests.add(request);
  234. }
  235. }