|
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
|
|
|
import org.apache.bookkeeper.client.BKException;
|
|
|
import org.apache.bookkeeper.client.BookKeeper;
|
|
|
import org.apache.bookkeeper.client.LedgerHandle;
|
|
|
+import org.apache.bookkeeper.util.ZkUtils;
|
|
|
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
@@ -36,6 +37,7 @@ import org.apache.zookeeper.WatchedEvent;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
+import org.apache.zookeeper.AsyncCallback.StringCallback;
|
|
|
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -124,6 +126,12 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
|
|
|
private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
|
|
|
|
|
|
+ public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH
|
|
|
+ = "dfs.namenode.bookkeeperjournal.zk.availablebookies";
|
|
|
+
|
|
|
+ public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
|
|
|
+ = "/ledgers/available";
|
|
|
+
|
|
|
private ZooKeeper zkc;
|
|
|
private final Configuration conf;
|
|
|
private final BookKeeper bkc;
|
|
@@ -196,7 +204,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
zkc.create(ledgerPath, new byte[] {'0'},
|
|
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
}
|
|
|
-
|
|
|
+ prepareBookKeeperEnv();
|
|
|
bkc = new BookKeeper(new ClientConfiguration(),
|
|
|
zkc);
|
|
|
} catch (KeeperException e) {
|
|
@@ -210,6 +218,50 @@ public class BookKeeperJournalManager implements JournalManager {
|
|
|
maxTxId = new MaxTxId(zkc, maxTxIdPath);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Pre-creating bookkeeper metadata path in zookeeper.
|
|
|
+ */
|
|
|
+ private void prepareBookKeeperEnv() throws IOException {
|
|
|
+ // create bookie available path in zookeeper if it doesn't exists
|
|
|
+ final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH,
|
|
|
+ BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
|
|
|
+ final CountDownLatch zkPathLatch = new CountDownLatch(1);
|
|
|
+
|
|
|
+ StringCallback callback = new StringCallback() {
|
|
|
+ @Override
|
|
|
+ public void processResult(int rc, String path, Object ctx, String name) {
|
|
|
+ if (KeeperException.Code.OK.intValue() == rc
|
|
|
+ || KeeperException.Code.NODEEXISTS.intValue() == rc) {
|
|
|
+ LOG.info("Successfully created bookie available path : "
|
|
|
+ + zkAvailablePath);
|
|
|
+ zkPathLatch.countDown();
|
|
|
+ } else {
|
|
|
+ KeeperException.Code code = KeeperException.Code.get(rc);
|
|
|
+ LOG
|
|
|
+ .error("Error : "
|
|
|
+ + KeeperException.create(code, path).getMessage()
|
|
|
+ + ", failed to create bookie available path : "
|
|
|
+ + zkAvailablePath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)) {
|
|
|
+ throw new IOException("Couldn't create bookie available path :"
|
|
|
+ + zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
|
|
|
+ + " millis");
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException(
|
|
|
+ "Interrupted when creating the bookie available path : "
|
|
|
+ + zkAvailablePath, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start a new log segment in a BookKeeper ledger.
|
|
|
* First ensure that we have the write lock for this journal.
|