|
@@ -188,8 +188,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
|
|
|
bkc = new BookKeeper(new ClientConfiguration(),
|
|
|
zkc);
|
|
|
- } catch (Exception e) {
|
|
|
+ } catch (KeeperException e) {
|
|
|
throw new IOException("Error initializing zk", e);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted while initializing bk journal manager",
|
|
|
+ ie);
|
|
|
}
|
|
|
|
|
|
ci = new CurrentInprogress(zkc, currentInprogressNodePath);
|
|
@@ -211,6 +214,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
throw new IOException("We've already seen " + txId
|
|
|
+ ". A new stream cannot be created with it");
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
String existingInprogressNode = ci.read();
|
|
|
if (null != existingInprogressNode
|
|
@@ -224,6 +228,15 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
|
|
|
BookKeeper.DigestType.MAC,
|
|
|
digestpw.getBytes());
|
|
|
+ } catch (BKException bke) {
|
|
|
+ throw new IOException("Error creating ledger", bke);
|
|
|
+ } catch (KeeperException ke) {
|
|
|
+ throw new IOException("Error in zookeeper while creating ledger", ke);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted creating ledger", ie);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
String znodePath = inprogressZNode(txId);
|
|
|
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
|
|
|
HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
|
|
@@ -239,21 +252,27 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
maxTxId.store(txId);
|
|
|
ci.update(znodePath);
|
|
|
return new BookKeeperEditLogOutputStream(conf, currentLedger);
|
|
|
- } catch (Exception e) {
|
|
|
- if (currentLedger != null) {
|
|
|
- try {
|
|
|
- long id = currentLedger.getId();
|
|
|
- currentLedger.close();
|
|
|
- bkc.deleteLedger(id);
|
|
|
- } catch (Exception e2) {
|
|
|
- //log & ignore, an IOException will be thrown soon
|
|
|
- LOG.error("Error closing ledger", e2);
|
|
|
- }
|
|
|
- }
|
|
|
- throw new IOException("Error creating ledger", e);
|
|
|
+ } catch (KeeperException ke) {
|
|
|
+ cleanupLedger(currentLedger);
|
|
|
+ throw new IOException("Error storing ledger metadata", ke);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void cleanupLedger(LedgerHandle lh) {
|
|
|
+ try {
|
|
|
+ long id = currentLedger.getId();
|
|
|
+ currentLedger.close();
|
|
|
+ bkc.deleteLedger(id);
|
|
|
+ } catch (BKException bke) {
|
|
|
+ //log & ignore, an IOException will be thrown soon
|
|
|
+ LOG.error("Error closing ledger", bke);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted while closing ledger", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Finalize a log segment. If the journal manager is currently
|
|
|
* writing to a ledger, ensure that this is the ledger of the log segment
|
|
@@ -347,8 +366,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
l);
|
|
|
s.skipTo(fromTxId);
|
|
|
return s;
|
|
|
- } catch (Exception e) {
|
|
|
+ } catch (BKException e) {
|
|
|
throw new IOException("Could not open ledger for " + fromTxId, e);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted opening ledger for "
|
|
|
+ + fromTxId, ie);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -479,8 +501,10 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
try {
|
|
|
bkc.close();
|
|
|
zkc.close();
|
|
|
- } catch (Exception e) {
|
|
|
- throw new IOException("Couldn't close zookeeper client", e);
|
|
|
+ } catch (BKException bke) {
|
|
|
+ throw new IOException("Couldn't close bookkeeper client", bke);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted while closing journal manager", ie);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -525,9 +549,12 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
op = in.readOp();
|
|
|
}
|
|
|
return endTxId;
|
|
|
- } catch (Exception e) {
|
|
|
+ } catch (BKException e) {
|
|
|
throw new IOException("Exception retreiving last tx id for ledger " + l,
|
|
|
e);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted while retreiving last tx id "
|
|
|
+ + "for ledger " + l, ie);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -542,8 +569,10 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
for (String n : ledgerNames) {
|
|
|
ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
+ } catch (KeeperException e) {
|
|
|
throw new IOException("Exception reading ledger list from zk", e);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Interrupted getting list of ledgers from zk", ie);
|
|
|
}
|
|
|
|
|
|
Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
|