|
@@ -134,10 +134,17 @@ public class EditLogTailer {
|
|
|
private final ExecutorService rollEditsRpcExecutor;
|
|
|
|
|
|
/**
|
|
|
- * How often the Standby should check if there are new finalized segment(s)
|
|
|
- * available to be read from.
|
|
|
+ * How often the tailer should check if there are new edit log entries
|
|
|
+ * ready to be consumed. This is the initial delay before any backoff.
|
|
|
*/
|
|
|
private final long sleepTimeMs;
|
|
|
+ /**
|
|
|
+ * The maximum time the tailer should wait between checking for new edit log
|
|
|
+ * entries. Exponential backoff will be applied when an edit log tail is
|
|
|
+ * performed but no edits are available to be read. If this is less than or
|
|
|
+ * equal to 0, backoff is disabled.
|
|
|
+ */
|
|
|
+ private final long maxSleepTimeMs;
|
|
|
|
|
|
private final int nnCount;
|
|
|
private NamenodeProtocol cachedActiveProxy = null;
|
|
@@ -206,6 +213,20 @@ public class EditLogTailer {
|
|
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
|
|
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT,
|
|
|
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
|
|
+ long maxSleepTimeMsTemp = conf.getTimeDuration(
|
|
|
+ DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY,
|
|
|
+ DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT,
|
|
|
+ TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
|
|
+ if (maxSleepTimeMsTemp > 0 && maxSleepTimeMsTemp < sleepTimeMs) {
|
|
|
+ LOG.warn(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY
|
|
|
+ + " was configured to be " + maxSleepTimeMsTemp
|
|
|
+ + " ms, but this is less than "
|
|
|
+ + DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY
|
|
|
+ + ". Disabling backoff when tailing edit logs.");
|
|
|
+ maxSleepTimeMs = 0;
|
|
|
+ } else {
|
|
|
+ maxSleepTimeMs = maxSleepTimeMsTemp;
|
|
|
+ }
|
|
|
|
|
|
rollEditsTimeoutMs = conf.getTimeDuration(
|
|
|
DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
|
|
@@ -290,7 +311,7 @@ public class EditLogTailer {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- public void doTailEdits() throws IOException, InterruptedException {
|
|
|
+ public long doTailEdits() throws IOException, InterruptedException {
|
|
|
// Write lock needs to be interruptible here because the
|
|
|
// transitionToActive RPC takes the write lock before calling
|
|
|
// tailer.stop() -- so if we're not interruptible, it will
|
|
@@ -315,7 +336,7 @@ public class EditLogTailer {
|
|
|
// edits file hasn't been started yet.
|
|
|
LOG.warn("Edits tailer failed to find any streams. Will try again " +
|
|
|
"later.", ioe);
|
|
|
- return;
|
|
|
+ return 0;
|
|
|
} finally {
|
|
|
NameNode.getNameNodeMetrics().addEditLogFetchTime(
|
|
|
Time.monotonicNow() - startTime);
|
|
@@ -346,6 +367,7 @@ public class EditLogTailer {
|
|
|
lastLoadTimeMs = monotonicNow();
|
|
|
}
|
|
|
lastLoadedTxnId = image.getLastAppliedTxId();
|
|
|
+ return editsLoaded;
|
|
|
} finally {
|
|
|
namesystem.writeUnlock();
|
|
|
}
|
|
@@ -406,6 +428,11 @@ public class EditLogTailer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ void sleep(long sleepTimeMillis) throws InterruptedException {
|
|
|
+ Thread.sleep(sleepTimeMillis);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The thread which does the actual work of tailing edits journals and
|
|
|
* applying the transactions to the FSNS.
|
|
@@ -434,7 +461,9 @@ public class EditLogTailer {
|
|
|
}
|
|
|
|
|
|
private void doWork() {
|
|
|
+ long currentSleepTimeMs = sleepTimeMs;
|
|
|
while (shouldRun) {
|
|
|
+ long editsTailed = 0;
|
|
|
try {
|
|
|
// There's no point in triggering a log roll if the Standby hasn't
|
|
|
// read any more transactions since the last time a roll was
|
|
@@ -460,7 +489,7 @@ public class EditLogTailer {
|
|
|
try {
|
|
|
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
|
|
startTime - lastLoadTimeMs);
|
|
|
- doTailEdits();
|
|
|
+ editsTailed = doTailEdits();
|
|
|
} finally {
|
|
|
namesystem.cpUnlock();
|
|
|
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
|
@@ -480,7 +509,17 @@ public class EditLogTailer {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- Thread.sleep(sleepTimeMs);
|
|
|
+ if (editsTailed == 0 && maxSleepTimeMs > 0) {
|
|
|
+ // If no edits were tailed, apply exponential backoff
|
|
|
+ // before tailing again. Double the current sleep time on each
|
|
|
+ // empty response, but don't exceed the max. If the sleep time
|
|
|
+ // was configured as 0, start the backoff at 1 ms.
|
|
|
+ currentSleepTimeMs = Math.min(maxSleepTimeMs,
|
|
|
+ (currentSleepTimeMs == 0 ? 1 : currentSleepTimeMs) * 2);
|
|
|
+ } else {
|
|
|
+ currentSleepTimeMs = sleepTimeMs; // reset to initial sleep time
|
|
|
+ }
|
|
|
+ EditLogTailer.this.sleep(currentSleepTimeMs);
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.warn("Edit log tailer interrupted", e);
|
|
|
}
|