|
@@ -27,7 +27,9 @@ import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.zookeeper.common.Time;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -53,8 +55,6 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
|
|
|
|
|
|
- private static final int FLUSH_SIZE = 1000;
|
|
|
-
|
|
|
private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
|
|
|
|
|
|
/** The number of log entries to log before starting a snapshot */
|
|
@@ -85,7 +85,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
* disk. Basically this is the list of SyncItems whose callbacks will be
|
|
|
* invoked after flush returns successfully.
|
|
|
*/
|
|
|
- private final Queue<Request> toFlush = new ArrayDeque<>(FLUSH_SIZE);
|
|
|
+ private final Queue<Request> toFlush;
|
|
|
+ private long lastFlushTime;
|
|
|
|
|
|
public SyncRequestProcessor(ZooKeeperServer zks,
|
|
|
RequestProcessor nextProcessor) {
|
|
@@ -93,6 +94,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
.getZooKeeperServerListener());
|
|
|
this.zks = zks;
|
|
|
this.nextProcessor = nextProcessor;
|
|
|
+ this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -112,6 +114,28 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
return snapCount;
|
|
|
}
|
|
|
|
|
|
+ private long getRemainingDelay() {
|
|
|
+ long flushDelay = zks.getFlushDelay();
|
|
|
+ long duration = Time.currentElapsedTime() - lastFlushTime;
|
|
|
+ if (duration < flushDelay) {
|
|
|
+ return flushDelay - duration;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** If both flushDelay and maxMaxBatchSize are set (> 0), flush
|
|
|
+ * whenever either condition is hit. If only one or the other is
|
|
|
+ * set, flush only when the relevant condition is hit.
|
|
|
+ */
|
|
|
+ private boolean shouldFlush() {
|
|
|
+ long flushDelay = zks.getFlushDelay();
|
|
|
+ long maxBatchSize = zks.getMaxBatchSize();
|
|
|
+ if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* used by tests to check for changing
|
|
|
* snapcounts
|
|
@@ -139,9 +163,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
// we do this in an attempt to ensure that not all of the servers
|
|
|
// in the ensemble take a snapshot at the same time
|
|
|
resetSnapshotStats();
|
|
|
+ lastFlushTime = Time.currentElapsedTime();
|
|
|
while (true) {
|
|
|
- Request si = queuedRequests.poll();
|
|
|
+ long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
|
|
|
+ Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
|
|
|
if (si == null) {
|
|
|
+ /* We timed out looking for more writes to batch, go ahead and flush immediately */
|
|
|
flush();
|
|
|
si = queuedRequests.take();
|
|
|
}
|
|
@@ -187,7 +214,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
continue;
|
|
|
}
|
|
|
toFlush.add(si);
|
|
|
- if (toFlush.size() == FLUSH_SIZE) {
|
|
|
+ if (shouldFlush()) {
|
|
|
flush();
|
|
|
}
|
|
|
}
|
|
@@ -213,7 +240,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
|
|
|
}
|
|
|
if (this.nextProcessor instanceof Flushable) {
|
|
|
((Flushable)this.nextProcessor).flush();
|
|
|
- }
|
|
|
+ }
|
|
|
+ lastFlushTime = Time.currentElapsedTime();
|
|
|
}
|
|
|
}
|
|
|
|