|
@@ -39,6 +39,7 @@ import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.AsyncCallback.StringCallback;
|
|
|
+import org.apache.zookeeper.ZKUtil;
|
|
|
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -46,6 +47,7 @@ import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.io.IOException;
|
|
|
|
|
|
import java.net.URI;
|
|
@@ -142,13 +144,16 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
private final Configuration conf;
|
|
|
private final BookKeeper bkc;
|
|
|
private final CurrentInprogress ci;
|
|
|
+ private final String basePath;
|
|
|
private final String ledgerPath;
|
|
|
+ private final String versionPath;
|
|
|
private final MaxTxId maxTxId;
|
|
|
private final int ensembleSize;
|
|
|
private final int quorumSize;
|
|
|
private final String digestpw;
|
|
|
private final CountDownLatch zkConnectLatch;
|
|
|
private final NamespaceInfo nsInfo;
|
|
|
+ private boolean initialized = false;
|
|
|
private LedgerHandle currentLedger = null;
|
|
|
|
|
|
/**
|
|
@@ -160,16 +165,16 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
this.nsInfo = nsInfo;
|
|
|
|
|
|
String zkConnect = uri.getAuthority().replace(";", ",");
|
|
|
- String zkPath = uri.getPath();
|
|
|
+ basePath = uri.getPath();
|
|
|
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
|
|
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
|
|
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
|
|
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
|
|
|
|
|
- ledgerPath = zkPath + "/ledgers";
|
|
|
- String maxTxIdPath = zkPath + "/maxtxid";
|
|
|
- String currentInprogressNodePath = zkPath + "/CurrentInprogress";
|
|
|
- String versionPath = zkPath + "/version";
|
|
|
+ ledgerPath = basePath + "/ledgers";
|
|
|
+ String maxTxIdPath = basePath + "/maxtxid";
|
|
|
+ String currentInprogressNodePath = basePath + "/CurrentInprogress";
|
|
|
+ versionPath = basePath + "/version";
|
|
|
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
|
|
|
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
|
|
|
|
|
@@ -180,47 +185,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
|
|
|
throw new IOException("Error connecting to zookeeper");
|
|
|
}
|
|
|
- if (zkc.exists(zkPath, false) == null) {
|
|
|
- zkc.create(zkPath, new byte[] {'0'},
|
|
|
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
- }
|
|
|
|
|
|
- Stat versionStat = zkc.exists(versionPath, false);
|
|
|
- if (versionStat != null) {
|
|
|
- byte[] d = zkc.getData(versionPath, false, versionStat);
|
|
|
- VersionProto.Builder builder = VersionProto.newBuilder();
|
|
|
- TextFormat.merge(new String(d, UTF_8), builder);
|
|
|
- if (!builder.isInitialized()) {
|
|
|
- throw new IOException("Invalid/Incomplete data in znode");
|
|
|
- }
|
|
|
- VersionProto vp = builder.build();
|
|
|
-
|
|
|
- // There's only one version at the moment
|
|
|
- assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
|
|
|
-
|
|
|
- NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
|
|
|
-
|
|
|
- if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
|
|
|
- !nsInfo.clusterID.equals(readns.getClusterID()) ||
|
|
|
- !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
|
|
|
- String err = String.format("Environment mismatch. Running process %s"
|
|
|
- +", stored in ZK %s", nsInfo, readns);
|
|
|
- LOG.error(err);
|
|
|
- throw new IOException(err);
|
|
|
- }
|
|
|
- } else if (nsInfo.getNamespaceID() > 0) {
|
|
|
- VersionProto.Builder builder = VersionProto.newBuilder();
|
|
|
- builder.setNamespaceInfo(PBHelper.convert(nsInfo))
|
|
|
- .setLayoutVersion(BKJM_LAYOUT_VERSION);
|
|
|
- byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
|
|
|
- zkc.create(versionPath, data,
|
|
|
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
- }
|
|
|
-
|
|
|
- if (zkc.exists(ledgerPath, false) == null) {
|
|
|
- zkc.create(ledgerPath, new byte[] {'0'},
|
|
|
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
- }
|
|
|
prepareBookKeeperEnv();
|
|
|
bkc = new BookKeeper(new ClientConfiguration(), zkc);
|
|
|
} catch (KeeperException e) {
|
|
@@ -244,6 +209,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
|
|
|
final CountDownLatch zkPathLatch = new CountDownLatch(1);
|
|
|
|
|
|
+ final AtomicBoolean success = new AtomicBoolean(false);
|
|
|
StringCallback callback = new StringCallback() {
|
|
|
@Override
|
|
|
public void processResult(int rc, String path, Object ctx, String name) {
|
|
@@ -251,22 +217,23 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
|| KeeperException.Code.NODEEXISTS.intValue() == rc) {
|
|
|
LOG.info("Successfully created bookie available path : "
|
|
|
+ zkAvailablePath);
|
|
|
- zkPathLatch.countDown();
|
|
|
+ success.set(true);
|
|
|
} else {
|
|
|
KeeperException.Code code = KeeperException.Code.get(rc);
|
|
|
- LOG
|
|
|
- .error("Error : "
|
|
|
+ LOG.error("Error : "
|
|
|
+ KeeperException.create(code, path).getMessage()
|
|
|
+ ", failed to create bookie available path : "
|
|
|
+ zkAvailablePath);
|
|
|
}
|
|
|
+ zkPathLatch.countDown();
|
|
|
}
|
|
|
};
|
|
|
ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
|
|
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
|
|
|
|
|
|
try {
|
|
|
- if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)) {
|
|
|
+ if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)
|
|
|
+ || !success.get()) {
|
|
|
throw new IOException("Couldn't create bookie available path :"
|
|
|
+ zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
|
|
|
+ " millis");
|
|
@@ -281,19 +248,101 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
|
|
|
@Override
|
|
|
public void format(NamespaceInfo ns) throws IOException {
|
|
|
- // Currently, BKJM automatically formats itself when first accessed.
|
|
|
- // TODO: change over to explicit formatting so that the admin can
|
|
|
- // clear out the BK storage when reformatting a cluster.
|
|
|
- LOG.info("Not formatting " + this + " - BKJM does not currently " +
|
|
|
- "support reformatting. If it has not been used before, it will" +
|
|
|
- "be formatted automatically upon first use.");
|
|
|
+ try {
|
|
|
+ // delete old info
|
|
|
+ Stat baseStat = null;
|
|
|
+ Stat ledgerStat = null;
|
|
|
+ if ((baseStat = zkc.exists(basePath, false)) != null) {
|
|
|
+ if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) {
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList(true)) {
|
|
|
+ try {
|
|
|
+ bkc.deleteLedger(l.getLedgerId());
|
|
|
+ } catch (BKException.BKNoSuchLedgerExistsException bke) {
|
|
|
+ LOG.warn("Ledger " + l.getLedgerId() + " does not exist;"
|
|
|
+ + " Cannot delete.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ZKUtil.deleteRecursive(zkc, basePath);
|
|
|
+ }
|
|
|
+
|
|
|
+ // should be clean now.
|
|
|
+ zkc.create(basePath, new byte[] {'0'},
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+
|
|
|
+ VersionProto.Builder builder = VersionProto.newBuilder();
|
|
|
+ builder.setNamespaceInfo(PBHelper.convert(ns))
|
|
|
+ .setLayoutVersion(BKJM_LAYOUT_VERSION);
|
|
|
+
|
|
|
+ byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
|
|
|
+ zkc.create(versionPath, data,
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+
|
|
|
+ zkc.create(ledgerPath, new byte[] {'0'},
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ } catch (KeeperException ke) {
|
|
|
+ LOG.error("Error accessing zookeeper to format", ke);
|
|
|
+ throw new IOException("Error accessing zookeeper to format", ke);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException("Interrupted during format", ie);
|
|
|
+ } catch (BKException bke) {
|
|
|
+ throw new IOException("Error cleaning up ledgers during format", bke);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean hasSomeData() throws IOException {
|
|
|
- // Don't confirm format on BKJM, since format() is currently a
|
|
|
- // no-op anyway
|
|
|
- return false;
|
|
|
+ try {
|
|
|
+ return zkc.exists(basePath, false) != null;
|
|
|
+ } catch (KeeperException ke) {
|
|
|
+ throw new IOException("Couldn't contact zookeeper", ke);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException("Interrupted while checking for data", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized private void checkEnv() throws IOException {
|
|
|
+ if (!initialized) {
|
|
|
+ try {
|
|
|
+ Stat versionStat = zkc.exists(versionPath, false);
|
|
|
+ if (versionStat == null) {
|
|
|
+ throw new IOException("Environment not initialized. "
|
|
|
+ +"Have you forgotten to format?");
|
|
|
+ }
|
|
|
+ byte[] d = zkc.getData(versionPath, false, versionStat);
|
|
|
+
|
|
|
+ VersionProto.Builder builder = VersionProto.newBuilder();
|
|
|
+ TextFormat.merge(new String(d, UTF_8), builder);
|
|
|
+ if (!builder.isInitialized()) {
|
|
|
+ throw new IOException("Invalid/Incomplete data in znode");
|
|
|
+ }
|
|
|
+ VersionProto vp = builder.build();
|
|
|
+
|
|
|
+ // There's only one version at the moment
|
|
|
+ assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
|
|
|
+
|
|
|
+ NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
|
|
|
+
|
|
|
+ if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
|
|
|
+ !nsInfo.clusterID.equals(readns.getClusterID()) ||
|
|
|
+ !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
|
|
|
+ String err = String.format("Environment mismatch. Running process %s"
|
|
|
+ +", stored in ZK %s", nsInfo, readns);
|
|
|
+ LOG.error(err);
|
|
|
+ throw new IOException(err);
|
|
|
+ }
|
|
|
+
|
|
|
+ ci.init();
|
|
|
+ initialized = true;
|
|
|
+ } catch (KeeperException ke) {
|
|
|
+ throw new IOException("Cannot access ZooKeeper", ke);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException("Interrupted while checking environment", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -307,6 +356,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
*/
|
|
|
@Override
|
|
|
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
|
|
+ checkEnv();
|
|
|
+
|
|
|
if (txId <= maxTxId.get()) {
|
|
|
throw new IOException("We've already seen " + txId
|
|
|
+ ". A new stream cannot be created with it");
|
|
@@ -384,6 +435,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
@Override
|
|
|
public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|
|
throws IOException {
|
|
|
+ checkEnv();
|
|
|
+
|
|
|
String inprogressPath = inprogressZNode(firstTxId);
|
|
|
try {
|
|
|
Stat inprogressStat = zkc.exists(inprogressPath, false);
|
|
@@ -537,6 +590,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
|
|
|
@Override
|
|
|
public void recoverUnfinalizedSegments() throws IOException {
|
|
|
+ checkEnv();
|
|
|
+
|
|
|
synchronized (this) {
|
|
|
try {
|
|
|
List<String> children = zkc.getChildren(ledgerPath, false);
|
|
@@ -589,6 +644,8 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
@Override
|
|
|
public void purgeLogsOlderThan(long minTxIdToKeep)
|
|
|
throws IOException {
|
|
|
+ checkEnv();
|
|
|
+
|
|
|
for (EditLogLedgerMetadata l : getLedgerList(false)) {
|
|
|
if (l.getLastTxId() < minTxIdToKeep) {
|
|
|
try {
|