|
@@ -172,7 +172,6 @@ public class JournalNodeSyncer {
|
|
} else {
|
|
} else {
|
|
syncJournals();
|
|
syncJournals();
|
|
}
|
|
}
|
|
- Thread.sleep(journalSyncInterval);
|
|
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
if (!shouldSync) {
|
|
if (!shouldSync) {
|
|
if (t instanceof InterruptedException) {
|
|
if (t instanceof InterruptedException) {
|
|
@@ -194,6 +193,17 @@ public class JournalNodeSyncer {
|
|
LOG.error(
|
|
LOG.error(
|
|
"JournalNodeSyncer daemon received Runtime exception. ", t);
|
|
"JournalNodeSyncer daemon received Runtime exception. ", t);
|
|
}
|
|
}
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(journalSyncInterval);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ if (!shouldSync) {
|
|
|
|
+ LOG.info("Stopping JournalNode Sync.");
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn("JournalNodeSyncer interrupted", e);
|
|
|
|
+ }
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
});
|
|
});
|
|
syncJournalDaemon.start();
|
|
syncJournalDaemon.start();
|
|
@@ -320,30 +330,30 @@ public class JournalNodeSyncer {
|
|
|
|
|
|
List<RemoteEditLog> missingEditLogs = Lists.newArrayList();
|
|
List<RemoteEditLog> missingEditLogs = Lists.newArrayList();
|
|
|
|
|
|
- int thisJnIndex = 0, otherJnIndex = 0;
|
|
|
|
- int thisJnNumLogs = thisJournalEditLogs.size();
|
|
|
|
- int otherJnNumLogs = otherJournalEditLogs.size();
|
|
|
|
|
|
+ int localJnIndex = 0, remoteJnIndex = 0;
|
|
|
|
+ int localJnNumLogs = thisJournalEditLogs.size();
|
|
|
|
+ int remoteJnNumLogs = otherJournalEditLogs.size();
|
|
|
|
|
|
- while (thisJnIndex < thisJnNumLogs && otherJnIndex < otherJnNumLogs) {
|
|
|
|
- long localJNstartTxId = thisJournalEditLogs.get(thisJnIndex)
|
|
|
|
|
|
+ while (localJnIndex < localJnNumLogs && remoteJnIndex < remoteJnNumLogs) {
|
|
|
|
+ long localJNstartTxId = thisJournalEditLogs.get(localJnIndex)
|
|
.getStartTxId();
|
|
.getStartTxId();
|
|
- long remoteJNstartTxId = otherJournalEditLogs.get(otherJnIndex)
|
|
|
|
|
|
+ long remoteJNstartTxId = otherJournalEditLogs.get(remoteJnIndex)
|
|
.getStartTxId();
|
|
.getStartTxId();
|
|
|
|
|
|
if (localJNstartTxId == remoteJNstartTxId) {
|
|
if (localJNstartTxId == remoteJNstartTxId) {
|
|
- thisJnIndex++;
|
|
|
|
- otherJnIndex++;
|
|
|
|
|
|
+ localJnIndex++;
|
|
|
|
+ remoteJnIndex++;
|
|
} else if (localJNstartTxId > remoteJNstartTxId) {
|
|
} else if (localJNstartTxId > remoteJNstartTxId) {
|
|
- missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
|
|
|
|
- otherJnIndex++;
|
|
|
|
|
|
+ missingEditLogs.add(otherJournalEditLogs.get(remoteJnIndex));
|
|
|
|
+ remoteJnIndex++;
|
|
} else {
|
|
} else {
|
|
- thisJnIndex++;
|
|
|
|
|
|
+ localJnIndex++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (otherJnIndex < otherJnNumLogs) {
|
|
|
|
- for (; otherJnIndex < otherJnNumLogs; otherJnIndex++) {
|
|
|
|
- missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
|
|
|
|
|
|
+ if (remoteJnIndex < remoteJnNumLogs) {
|
|
|
|
+ for (; remoteJnIndex < remoteJnNumLogs; remoteJnIndex++) {
|
|
|
|
+ missingEditLogs.add(otherJournalEditLogs.get(remoteJnIndex));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|