|
@@ -49,7 +49,7 @@ import java.net.URI;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
/**
|
|
|
* BookKeeper Journal Manager
|
|
|
*
|
|
@@ -122,7 +122,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
= "dfs.namenode.bookkeeperjournal.zk.session.timeout";
|
|
|
public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
|
|
|
|
|
|
- private final ZooKeeper zkc;
|
|
|
+ private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
|
|
|
+
|
|
|
+ private ZooKeeper zkc;
|
|
|
private final Configuration conf;
|
|
|
private final BookKeeper bkc;
|
|
|
private final CurrentInprogress ci;
|
|
@@ -351,13 +353,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
|
|
|
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
|
|
|
throws IOException {
|
|
|
- for (EditLogLedgerMetadata l : getLedgerList()) {
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
|
|
|
long lastTxId = l.getLastTxId();
|
|
|
if (l.isInProgress()) {
|
|
|
- if (!inProgressOk) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
lastTxId = recoverLastTxId(l, false);
|
|
|
}
|
|
|
|
|
@@ -413,13 +411,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
throws IOException {
|
|
|
long count = 0;
|
|
|
long expectedStart = 0;
|
|
|
- for (EditLogLedgerMetadata l : getLedgerList()) {
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
|
|
|
long lastTxId = l.getLastTxId();
|
|
|
if (l.isInProgress()) {
|
|
|
- if (!inProgressOk) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
lastTxId = recoverLastTxId(l, false);
|
|
|
if (lastTxId == HdfsConstants.INVALID_TXID) {
|
|
|
break;
|
|
@@ -457,7 +451,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
try {
|
|
|
List<String> children = zkc.getChildren(ledgerPath, false);
|
|
|
for (String child : children) {
|
|
|
- if (!child.startsWith("inprogress_")) {
|
|
|
+ if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
|
|
|
continue;
|
|
|
}
|
|
|
String znode = ledgerPath + "/" + child;
|
|
@@ -504,9 +498,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
@Override
|
|
|
public void purgeLogsOlderThan(long minTxIdToKeep)
|
|
|
throws IOException {
|
|
|
- for (EditLogLedgerMetadata l : getLedgerList()) {
|
|
|
- if (!l.isInProgress()
|
|
|
- && l.getLastTxId() < minTxIdToKeep) {
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList(false)) {
|
|
|
+ if (l.getLastTxId() < minTxIdToKeep) {
|
|
|
try {
|
|
|
Stat stat = zkc.exists(l.getZkPath(), false);
|
|
|
zkc.delete(l.getZkPath(), stat.getVersion());
|
|
@@ -597,13 +590,26 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
/**
|
|
|
* Get a list of all segments in the journal.
|
|
|
*/
|
|
|
- private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
|
|
|
+ List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
|
|
|
+ throws IOException {
|
|
|
List<EditLogLedgerMetadata> ledgers
|
|
|
= new ArrayList<EditLogLedgerMetadata>();
|
|
|
try {
|
|
|
List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
|
|
|
- for (String n : ledgerNames) {
|
|
|
- ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
|
|
|
+ for (String ledgerName : ledgerNames) {
|
|
|
+ if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String legderMetadataPath = ledgerPath + "/" + ledgerName;
|
|
|
+ try {
|
|
|
+ EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
|
|
|
+ .read(zkc, legderMetadataPath);
|
|
|
+ ledgers.add(editLogLedgerMetadata);
|
|
|
+ } catch (KeeperException.NoNodeException e) {
|
|
|
+ LOG.warn("ZNode: " + legderMetadataPath
|
|
|
+ + " might have finalized and deleted."
|
|
|
+ + " So ignoring NoNodeException.");
|
|
|
+ }
|
|
|
}
|
|
|
} catch (KeeperException e) {
|
|
|
throw new IOException("Exception reading ledger list from zk", e);
|
|
@@ -630,6 +636,11 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ void setZooKeeper(ZooKeeper zk) {
|
|
|
+ this.zkc = zk;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Simple watcher to notify when zookeeper has connected
|
|
|
*/
|