|
@@ -503,7 +503,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
@Override
|
|
@Override
|
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
long fromTxId, boolean inProgressOk) throws IOException {
|
|
long fromTxId, boolean inProgressOk) throws IOException {
|
|
- List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(inProgressOk);
|
|
|
|
|
|
+ List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
|
|
|
+ inProgressOk);
|
|
try {
|
|
try {
|
|
BookKeeperEditLogInputStream elis = null;
|
|
BookKeeperEditLogInputStream elis = null;
|
|
for (EditLogLedgerMetadata l : currentLedgerList) {
|
|
for (EditLogLedgerMetadata l : currentLedgerList) {
|
|
@@ -511,6 +512,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
if (l.isInProgress()) {
|
|
if (l.isInProgress()) {
|
|
lastTxId = recoverLastTxId(l, false);
|
|
lastTxId = recoverLastTxId(l, false);
|
|
}
|
|
}
|
|
|
|
+ // Check once again, required in case of InProgress and is case of any
|
|
|
|
+ // gap.
|
|
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
|
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
|
LedgerHandle h;
|
|
LedgerHandle h;
|
|
if (l.isInProgress()) { // we don't want to fence the current journal
|
|
if (l.isInProgress()) { // we don't want to fence the current journal
|
|
@@ -523,6 +526,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
elis = new BookKeeperEditLogInputStream(h, l);
|
|
elis = new BookKeeperEditLogInputStream(h, l);
|
|
elis.skipTo(fromTxId);
|
|
elis.skipTo(fromTxId);
|
|
} else {
|
|
} else {
|
|
|
|
+ // If mismatches then there might be some gap, so we should not check
|
|
|
|
+ // further.
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
streams.add(elis);
|
|
streams.add(elis);
|
|
@@ -732,6 +737,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
*/
|
|
*/
|
|
List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
|
|
List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ return getLedgerList(-1, inProgressOk);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private List<EditLogLedgerMetadata> getLedgerList(long fromTxId,
|
|
|
|
+ boolean inProgressOk) throws IOException {
|
|
List<EditLogLedgerMetadata> ledgers
|
|
List<EditLogLedgerMetadata> ledgers
|
|
= new ArrayList<EditLogLedgerMetadata>();
|
|
= new ArrayList<EditLogLedgerMetadata>();
|
|
try {
|
|
try {
|
|
@@ -744,6 +754,12 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
try {
|
|
try {
|
|
EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
|
|
EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
|
|
.read(zkc, legderMetadataPath);
|
|
.read(zkc, legderMetadataPath);
|
|
|
|
+ if (editLogLedgerMetadata.getLastTxId() != HdfsConstants.INVALID_TXID
|
|
|
|
+ && editLogLedgerMetadata.getLastTxId() < fromTxId) {
|
|
|
|
+ // exclude already read closed edits, but include inprogress edits
|
|
|
|
+ // as this will be handled in caller
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
ledgers.add(editLogLedgerMetadata);
|
|
ledgers.add(editLogLedgerMetadata);
|
|
} catch (KeeperException.NoNodeException e) {
|
|
} catch (KeeperException.NoNodeException e) {
|
|
LOG.warn("ZNode: " + legderMetadataPath
|
|
LOG.warn("ZNode: " + legderMetadataPath
|