|
@@ -28,6 +28,7 @@ import java.net.MalformedURLException;
|
|
|
import java.net.URL;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import com.google.common.math.LongMath;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -51,7 +52,7 @@ import com.google.common.collect.Lists;
|
|
|
* The Checkpointer is a daemon that periodically wakes up
|
|
|
* up (determined by the schedule specified in the configuration),
|
|
|
* triggers a periodic checkpoint and then goes back to sleep.
|
|
|
- *
|
|
|
+ *
|
|
|
* The start of a checkpoint is triggered by one of the two factors:
|
|
|
* (1) time or (2) the size of the edits file.
|
|
|
*/
|
|
@@ -126,14 +127,13 @@ class Checkpointer extends Daemon {
|
|
|
//
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- // Check the size of the edit log once every 5 minutes.
|
|
|
- long periodMSec = 5 * 60; // 5 minutes
|
|
|
- if(checkpointConf.getPeriod() < periodMSec) {
|
|
|
- periodMSec = checkpointConf.getPeriod();
|
|
|
- }
|
|
|
- periodMSec *= 1000;
|
|
|
+ // How often to check the size of the edit log (min of checkpointCheckPeriod and checkpointPeriod)
|
|
|
+ long periodMSec = checkpointConf.getCheckPeriod() * 1000;
|
|
|
+ // How often to checkpoint regardless of number of txns
|
|
|
+ long checkpointPeriodMSec = checkpointConf.getPeriod() * 1000;
|
|
|
|
|
|
long lastCheckpointTime = 0;
|
|
|
+ long lastEditLogCheckTime =0;
|
|
|
if (!backupNode.shouldCheckpointAtStartup()) {
|
|
|
lastCheckpointTime = monotonicNow();
|
|
|
}
|
|
@@ -141,16 +141,18 @@ class Checkpointer extends Daemon {
|
|
|
try {
|
|
|
long now = monotonicNow();
|
|
|
boolean shouldCheckpoint = false;
|
|
|
- if(now >= lastCheckpointTime + periodMSec) {
|
|
|
+ if(now >= lastCheckpointTime + checkpointPeriodMSec) {
|
|
|
shouldCheckpoint = true;
|
|
|
- } else {
|
|
|
+ } else if(now >= lastEditLogCheckTime + periodMSec) {
|
|
|
long txns = countUncheckpointedTxns();
|
|
|
+ lastEditLogCheckTime = now;
|
|
|
if(txns >= checkpointConf.getTxnCount())
|
|
|
shouldCheckpoint = true;
|
|
|
}
|
|
|
if(shouldCheckpoint) {
|
|
|
doCheckpoint();
|
|
|
lastCheckpointTime = now;
|
|
|
+ lastEditLogCheckTime = now;
|
|
|
}
|
|
|
} catch(IOException e) {
|
|
|
LOG.error("Exception in doCheckpoint: ", e);
|
|
@@ -160,7 +162,7 @@ class Checkpointer extends Daemon {
|
|
|
break;
|
|
|
}
|
|
|
try {
|
|
|
- Thread.sleep(periodMSec);
|
|
|
+ Thread.sleep(LongMath.gcd(periodMSec, checkpointPeriodMSec));
|
|
|
} catch(InterruptedException ie) {
|
|
|
// do nothing
|
|
|
}
|