|
@@ -0,0 +1,508 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.contrib.bkjournal;
|
|
|
+
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+
|
|
|
+import org.apache.bookkeeper.conf.ClientConfiguration;
|
|
|
+import org.apache.bookkeeper.client.BKException;
|
|
|
+import org.apache.bookkeeper.client.BookKeeper;
|
|
|
+import org.apache.bookkeeper.client.LedgerHandle;
|
|
|
+
|
|
|
+import org.apache.zookeeper.data.Stat;
|
|
|
+import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
+import org.apache.zookeeper.WatchedEvent;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.CreateMode;
|
|
|
+import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.io.IOException;
|
|
|
+
|
|
|
+import java.net.URI;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+
|
|
|
+/**
|
|
|
+ * BookKeeper Journal Manager
|
|
|
+ *
|
|
|
+ * To use, add the following to hdfs-site.xml.
|
|
|
+ * <pre>
|
|
|
+ * {@code
|
|
|
+ * <property>
|
|
|
+ * <name>dfs.namenode.edits.dir</name>
|
|
|
+ * <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
|
|
|
+ * </property>
|
|
|
+ *
|
|
|
+ * <property>
|
|
|
+ * <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
|
|
|
+ * <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
|
|
|
+ * </property>
|
|
|
+ * }
|
|
|
+ * </pre>
|
|
|
+ * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
|
|
|
+ * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
|
|
|
+ * pairs. In the example above there are 3 servers, in the ensemble,
|
|
|
+ * zk1, zk2 & zk3, each one listening on port 2181.
|
|
|
+ *
|
|
|
+ * [root znode] is the path of the zookeeper znode, under which the editlog
|
|
|
+ * information will be stored.
|
|
|
+ *
|
|
|
+ * Other configuration options are:
|
|
|
+ * <ul>
|
|
|
+ * <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
|
|
|
+ * Number of bytes a bookkeeper journal stream will buffer before
|
|
|
+ * forcing a flush. Default is 1024.</li>
|
|
|
+ * <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
|
|
|
+ * Number of bookkeeper servers in edit log ledger ensembles. This
|
|
|
+ * is the number of bookkeeper servers which need to be available
|
|
|
+ * for the ledger to be writable. Default is 3.</li>
|
|
|
+ * <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
|
|
|
+ * Number of bookkeeper servers in the write quorum. This is the
|
|
|
+ * number of bookkeeper servers which must have acknowledged the
|
|
|
+ * write of an entry before it is considered written.
|
|
|
+ * Default is 2.</li>
|
|
|
+ * <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
|
|
|
+ * Password to use when creating ledgers. </li>
|
|
|
+ * </ul>
|
|
|
+ */
|
|
|
+public class BookKeeperJournalManager implements JournalManager {
|
|
|
+ static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
|
|
|
+
|
|
|
+ public static final String BKJM_OUTPUT_BUFFER_SIZE
|
|
|
+ = "dfs.namenode.bookkeeperjournal.output-buffer-size";
|
|
|
+ public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
|
|
|
+
|
|
|
+ public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
|
|
|
+ = "dfs.namenode.bookkeeperjournal.ensemble-size";
|
|
|
+ public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
|
|
|
+
|
|
|
+ public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
|
|
|
+ = "dfs.namenode.bookkeeperjournal.quorum-size";
|
|
|
+ public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
|
|
|
+
|
|
|
+ public static final String BKJM_BOOKKEEPER_DIGEST_PW
|
|
|
+ = "dfs.namenode.bookkeeperjournal.digestPw";
|
|
|
+ public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
|
|
|
+
|
|
|
+ private static final int BKJM_LAYOUT_VERSION = -1;
|
|
|
+
|
|
|
+ private final ZooKeeper zkc;
|
|
|
+ private final Configuration conf;
|
|
|
+ private final BookKeeper bkc;
|
|
|
+ private final WriteLock wl;
|
|
|
+ private final String ledgerPath;
|
|
|
+ private final MaxTxId maxTxId;
|
|
|
+ private final int ensembleSize;
|
|
|
+ private final int quorumSize;
|
|
|
+ private final String digestpw;
|
|
|
+ private final CountDownLatch zkConnectLatch;
|
|
|
+
|
|
|
+ private LedgerHandle currentLedger = null;
|
|
|
+
|
|
|
+ private int bytesToInt(byte[] b) {
|
|
|
+ assert b.length >= 4;
|
|
|
+ return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
|
|
|
+ }
|
|
|
+
|
|
|
+ private byte[] intToBytes(int i) {
|
|
|
+ return new byte[] {
|
|
|
+ (byte)(i >> 24),
|
|
|
+ (byte)(i >> 16),
|
|
|
+ (byte)(i >> 8),
|
|
|
+ (byte)(i) };
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Construct a Bookkeeper journal manager.
|
|
|
+ */
|
|
|
+ public BookKeeperJournalManager(Configuration conf, URI uri)
|
|
|
+ throws IOException {
|
|
|
+ this.conf = conf;
|
|
|
+ String zkConnect = uri.getAuthority().replace(";", ",");
|
|
|
+ String zkPath = uri.getPath();
|
|
|
+ ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
|
|
+ BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
|
|
+ quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
|
|
+ BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
|
|
+
|
|
|
+ ledgerPath = zkPath + "/ledgers";
|
|
|
+ String maxTxIdPath = zkPath + "/maxtxid";
|
|
|
+ String lockPath = zkPath + "/lock";
|
|
|
+ String versionPath = zkPath + "/version";
|
|
|
+ digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
|
|
|
+ BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
|
|
|
+
|
|
|
+ try {
|
|
|
+ zkConnectLatch = new CountDownLatch(1);
|
|
|
+ zkc = new ZooKeeper(zkConnect, 3000, new ZkConnectionWatcher());
|
|
|
+ if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
|
|
|
+ throw new IOException("Error connecting to zookeeper");
|
|
|
+ }
|
|
|
+ if (zkc.exists(zkPath, false) == null) {
|
|
|
+ zkc.create(zkPath, new byte[] {'0'},
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ }
|
|
|
+
|
|
|
+ Stat versionStat = zkc.exists(versionPath, false);
|
|
|
+ if (versionStat != null) {
|
|
|
+ byte[] d = zkc.getData(versionPath, false, versionStat);
|
|
|
+ // There's only one version at the moment
|
|
|
+ assert bytesToInt(d) == BKJM_LAYOUT_VERSION;
|
|
|
+ } else {
|
|
|
+ zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION),
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (zkc.exists(ledgerPath, false) == null) {
|
|
|
+ zkc.create(ledgerPath, new byte[] {'0'},
|
|
|
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
+ }
|
|
|
+
|
|
|
+ bkc = new BookKeeper(new ClientConfiguration(),
|
|
|
+ zkc);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Error initializing zk", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ wl = new WriteLock(zkc, lockPath);
|
|
|
+ maxTxId = new MaxTxId(zkc, maxTxIdPath);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start a new log segment in a BookKeeper ledger.
|
|
|
+ * First ensure that we have the write lock for this journal.
|
|
|
+ * Then create a ledger and stream based on that ledger.
|
|
|
+ * The ledger id is written to the inprogress znode, so that in the
|
|
|
+ * case of a crash, a recovery process can find the ledger we were writing
|
|
|
+ * to when we crashed.
|
|
|
+ * @param txId First transaction id to be written to the stream
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
|
|
+ wl.acquire();
|
|
|
+
|
|
|
+ if (txId <= maxTxId.get()) {
|
|
|
+ throw new IOException("We've already seen " + txId
|
|
|
+ + ". A new stream cannot be created with it");
|
|
|
+ }
|
|
|
+ if (currentLedger != null) {
|
|
|
+ throw new IOException("Already writing to a ledger, id="
|
|
|
+ + currentLedger.getId());
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ currentLedger = bkc.createLedger(ensembleSize, quorumSize,
|
|
|
+ BookKeeper.DigestType.MAC,
|
|
|
+ digestpw.getBytes());
|
|
|
+ String znodePath = inprogressZNode();
|
|
|
+ EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
|
|
|
+ HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
|
|
|
+ /* Write the ledger metadata out to the inprogress ledger znode
|
|
|
+ * This can fail if for some reason our write lock has
|
|
|
+ * expired (@see WriteLock) and another process has managed to
|
|
|
+ * create the inprogress znode.
|
|
|
+ * In this case, throw an exception. We don't want to continue
|
|
|
+ * as this would lead to a split brain situation.
|
|
|
+ */
|
|
|
+ l.write(zkc, znodePath);
|
|
|
+
|
|
|
+ return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (currentLedger != null) {
|
|
|
+ try {
|
|
|
+ currentLedger.close();
|
|
|
+ } catch (Exception e2) {
|
|
|
+ //log & ignore, an IOException will be thrown soon
|
|
|
+ LOG.error("Error closing ledger", e2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new IOException("Error creating ledger", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Finalize a log segment. If the journal manager is currently
|
|
|
+ * writing to a ledger, ensure that this is the ledger of the log segment
|
|
|
+ * being finalized.
|
|
|
+ *
|
|
|
+ * Otherwise this is the recovery case. In the recovery case, ensure that
|
|
|
+ * the firstTxId of the ledger matches firstTxId for the segment we are
|
|
|
+ * trying to finalize.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|
|
+ throws IOException {
|
|
|
+ String inprogressPath = inprogressZNode();
|
|
|
+ try {
|
|
|
+ Stat inprogressStat = zkc.exists(inprogressPath, false);
|
|
|
+ if (inprogressStat == null) {
|
|
|
+ throw new IOException("Inprogress znode " + inprogressPath
|
|
|
+ + " doesn't exist");
|
|
|
+ }
|
|
|
+
|
|
|
+ wl.checkWriteLock();
|
|
|
+ EditLogLedgerMetadata l
|
|
|
+ = EditLogLedgerMetadata.read(zkc, inprogressPath);
|
|
|
+
|
|
|
+ if (currentLedger != null) { // normal, non-recovery case
|
|
|
+ if (l.getLedgerId() == currentLedger.getId()) {
|
|
|
+ try {
|
|
|
+ currentLedger.close();
|
|
|
+ } catch (BKException bke) {
|
|
|
+ LOG.error("Error closing current ledger", bke);
|
|
|
+ }
|
|
|
+ currentLedger = null;
|
|
|
+ } else {
|
|
|
+ throw new IOException(
|
|
|
+ "Active ledger has different ID to inprogress. "
|
|
|
+ + l.getLedgerId() + " found, "
|
|
|
+ + currentLedger.getId() + " expected");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (l.getFirstTxId() != firstTxId) {
|
|
|
+ throw new IOException("Transaction id not as expected, "
|
|
|
+ + l.getFirstTxId() + " found, " + firstTxId + " expected");
|
|
|
+ }
|
|
|
+
|
|
|
+ l.finalizeLedger(lastTxId);
|
|
|
+ String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
|
|
|
+ try {
|
|
|
+ l.write(zkc, finalisedPath);
|
|
|
+ } catch (KeeperException.NodeExistsException nee) {
|
|
|
+ if (!l.verify(zkc, finalisedPath)) {
|
|
|
+ throw new IOException("Node " + finalisedPath + " already exists"
|
|
|
+ + " but data doesn't match");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ maxTxId.store(lastTxId);
|
|
|
+ zkc.delete(inprogressPath, inprogressStat.getVersion());
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ throw new IOException("Error finalising ledger", e);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException("Error finalising ledger", ie);
|
|
|
+ } finally {
|
|
|
+ wl.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList()) {
|
|
|
+ if (l.getFirstTxId() == fromTxnId) {
|
|
|
+ try {
|
|
|
+ LedgerHandle h = bkc.openLedger(l.getLedgerId(),
|
|
|
+ BookKeeper.DigestType.MAC,
|
|
|
+ digestpw.getBytes());
|
|
|
+ return new BookKeeperEditLogInputStream(h, l);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Could not open ledger for " + fromTxnId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long getNumberOfTransactions(long fromTxnId) throws IOException {
|
|
|
+ long count = 0;
|
|
|
+ long expectedStart = 0;
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList()) {
|
|
|
+ if (l.isInProgress()) {
|
|
|
+ long endTxId = recoverLastTxId(l);
|
|
|
+ if (endTxId == HdfsConstants.INVALID_TXID) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ count += (endTxId - l.getFirstTxId()) + 1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (l.getFirstTxId() < fromTxnId) {
|
|
|
+ continue;
|
|
|
+ } else if (l.getFirstTxId() == fromTxnId) {
|
|
|
+ count = (l.getLastTxId() - l.getFirstTxId()) + 1;
|
|
|
+ expectedStart = l.getLastTxId() + 1;
|
|
|
+ } else {
|
|
|
+ if (expectedStart != l.getFirstTxId()) {
|
|
|
+ if (count == 0) {
|
|
|
+ throw new CorruptionException("StartTxId " + l.getFirstTxId()
|
|
|
+ + " is not as expected " + expectedStart
|
|
|
+ + ". Gap in transaction log?");
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ count += (l.getLastTxId() - l.getFirstTxId()) + 1;
|
|
|
+ expectedStart = l.getLastTxId() + 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void recoverUnfinalizedSegments() throws IOException {
|
|
|
+ wl.acquire();
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ try {
|
|
|
+ EditLogLedgerMetadata l
|
|
|
+ = EditLogLedgerMetadata.read(zkc, inprogressZNode());
|
|
|
+ long endTxId = recoverLastTxId(l);
|
|
|
+ if (endTxId == HdfsConstants.INVALID_TXID) {
|
|
|
+ LOG.error("Unrecoverable corruption has occurred in segment "
|
|
|
+ + l.toString() + " at path " + inprogressZNode()
|
|
|
+ + ". Unable to continue recovery.");
|
|
|
+ throw new IOException("Unrecoverable corruption, please check logs.");
|
|
|
+ }
|
|
|
+ finalizeLogSegment(l.getFirstTxId(), endTxId);
|
|
|
+ } catch (KeeperException.NoNodeException nne) {
|
|
|
+ // nothing to recover, ignore
|
|
|
+ } finally {
|
|
|
+ if (wl.haveLock()) {
|
|
|
+ wl.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void purgeLogsOlderThan(long minTxIdToKeep)
|
|
|
+ throws IOException {
|
|
|
+ for (EditLogLedgerMetadata l : getLedgerList()) {
|
|
|
+ if (!l.isInProgress()
|
|
|
+ && l.getLastTxId() < minTxIdToKeep) {
|
|
|
+ try {
|
|
|
+ Stat stat = zkc.exists(l.getZkPath(), false);
|
|
|
+ zkc.delete(l.getZkPath(), stat.getVersion());
|
|
|
+ bkc.deleteLedger(l.getLedgerId());
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.error("Interrupted while purging " + l, ie);
|
|
|
+ } catch (BKException bke) {
|
|
|
+ LOG.error("Couldn't delete ledger from bookkeeper", bke);
|
|
|
+ } catch (KeeperException ke) {
|
|
|
+ LOG.error("Error deleting ledger entry in zookeeper", ke);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ try {
|
|
|
+ bkc.close();
|
|
|
+ zkc.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Couldn't close zookeeper client", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set the amount of memory that this stream should use to buffer edits.
|
|
|
+ * Setting this will only affect future output stream. Streams
|
|
|
+ * which have currently be created won't be affected.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void setOutputBufferCapacity(int size) {
|
|
|
+ conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Find the id of the last edit log transaction writen to a edit log
|
|
|
+ * ledger.
|
|
|
+ */
|
|
|
+ private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
|
|
|
+ try {
|
|
|
+ LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
|
|
|
+ BookKeeper.DigestType.MAC,
|
|
|
+ digestpw.getBytes());
|
|
|
+ long lastAddConfirmed = lh.getLastAddConfirmed();
|
|
|
+ BookKeeperEditLogInputStream in
|
|
|
+ = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
|
|
|
+
|
|
|
+ long endTxId = HdfsConstants.INVALID_TXID;
|
|
|
+ FSEditLogOp op = in.readOp();
|
|
|
+ while (op != null) {
|
|
|
+ if (endTxId == HdfsConstants.INVALID_TXID
|
|
|
+ || op.getTransactionId() == endTxId+1) {
|
|
|
+ endTxId = op.getTransactionId();
|
|
|
+ }
|
|
|
+ op = in.readOp();
|
|
|
+ }
|
|
|
+ return endTxId;
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Exception retreiving last tx id for ledger " + l,
|
|
|
+ e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a list of all segments in the journal.
|
|
|
+ */
|
|
|
+ private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
|
|
|
+ List<EditLogLedgerMetadata> ledgers
|
|
|
+ = new ArrayList<EditLogLedgerMetadata>();
|
|
|
+ try {
|
|
|
+ List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
|
|
|
+ for (String n : ledgerNames) {
|
|
|
+ ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IOException("Exception reading ledger list from zk", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
|
|
|
+ return ledgers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the znode path for a finalize ledger
|
|
|
+ */
|
|
|
+ String finalizedLedgerZNode(long startTxId, long endTxId) {
|
|
|
+ return String.format("%s/edits_%018d_%018d",
|
|
|
+ ledgerPath, startTxId, endTxId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the znode path for the inprogressZNode
|
|
|
+ */
|
|
|
+ String inprogressZNode() {
|
|
|
+ return ledgerPath + "/inprogress";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Simple watcher to notify when zookeeper has connected
|
|
|
+ */
|
|
|
+ private class ZkConnectionWatcher implements Watcher {
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ if (Event.KeeperState.SyncConnected.equals(event.getState())) {
|
|
|
+ zkConnectLatch.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|