|
@@ -500,16 +500,18 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
|
|
|
- throws IOException {
|
|
|
- for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
|
|
|
- long lastTxId = l.getLastTxId();
|
|
|
- if (l.isInProgress()) {
|
|
|
- lastTxId = recoverLastTxId(l, false);
|
|
|
- }
|
|
|
-
|
|
|
- if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
|
|
- try {
|
|
|
+ @Override
|
|
|
+ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
|
+ long fromTxId, boolean inProgressOk) throws IOException {
|
|
|
+ List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(inProgressOk);
|
|
|
+ try {
|
|
|
+ BookKeeperEditLogInputStream elis = null;
|
|
|
+ for (EditLogLedgerMetadata l : currentLedgerList) {
|
|
|
+ long lastTxId = l.getLastTxId();
|
|
|
+ if (l.isInProgress()) {
|
|
|
+ lastTxId = recoverLastTxId(l, false);
|
|
|
+ }
|
|
|
+ if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
|
|
|
LedgerHandle h;
|
|
|
if (l.isInProgress()) { // we don't want to fence the current journal
|
|
|
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
|
|
@@ -518,42 +520,22 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
|
|
|
digestpw.getBytes());
|
|
|
}
|
|
|
- BookKeeperEditLogInputStream s = new BookKeeperEditLogInputStream(h,
|
|
|
- l);
|
|
|
- s.skipTo(fromTxId);
|
|
|
- return s;
|
|
|
- } catch (BKException e) {
|
|
|
- throw new IOException("Could not open ledger for " + fromTxId, e);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new IOException("Interrupted opening ledger for "
|
|
|
- + fromTxId, ie);
|
|
|
+ elis = new BookKeeperEditLogInputStream(h, l);
|
|
|
+ elis.skipTo(fromTxId);
|
|
|
+ } else {
|
|
|
+ return;
|
|
|
}
|
|
|
+ streams.add(elis);
|
|
|
+ if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ fromTxId = elis.getLastTxId() + 1;
|
|
|
}
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
|
- long fromTxId, boolean inProgressOk) {
|
|
|
- // NOTE: could probably be rewritten more efficiently
|
|
|
- while (true) {
|
|
|
- EditLogInputStream elis;
|
|
|
- try {
|
|
|
- elis = getInputStream(fromTxId, inProgressOk);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error(e);
|
|
|
- return;
|
|
|
- }
|
|
|
- if (elis == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- streams.add(elis);
|
|
|
- if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
|
|
|
- return;
|
|
|
- }
|
|
|
- fromTxId = elis.getLastTxId() + 1;
|
|
|
+ } catch (BKException e) {
|
|
|
+ throw new IOException("Could not open ledger for " + fromTxId, e);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
|
|
|
}
|
|
|
}
|
|
|
|