|
@@ -18,23 +18,55 @@
|
|
|
package org.apache.hadoop.contrib.bkjournal;
|
|
|
|
|
|
import static org.junit.Assert.*;
|
|
|
+
|
|
|
+import java.net.URI;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import org.apache.bookkeeper.proto.BookieServer;
|
|
|
+import org.apache.bookkeeper.conf.ServerConfiguration;
|
|
|
+import org.apache.bookkeeper.util.LocalBookKeeper;
|
|
|
+
|
|
|
+import java.io.RandomAccessFile;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FilenameFilter;
|
|
|
+import java.io.BufferedInputStream;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.After;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.AfterClass;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-
|
|
|
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
|
|
|
|
|
-import org.apache.bookkeeper.proto.BookieServer;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
+import org.apache.zookeeper.WatchedEvent;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+
|
|
|
+import com.google.common.collect.ImmutableList;
|
|
|
+
|
|
|
+import java.util.zip.CheckedInputStream;
|
|
|
+import java.util.zip.Checksum;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -43,26 +75,125 @@ public class TestBookKeeperJournalManager {
|
|
|
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
|
|
|
|
|
|
private static final long DEFAULT_SEGMENT_SIZE = 1000;
|
|
|
+ private static final String zkEnsemble = "localhost:2181";
|
|
|
+ final static private int numBookies = 5;
|
|
|
|
|
|
+ private static Thread bkthread;
|
|
|
protected static Configuration conf = new Configuration();
|
|
|
private ZooKeeper zkc;
|
|
|
- private static BKJMUtil bkutil;
|
|
|
- static int numBookies = 3;
|
|
|
|
|
|
- @BeforeClass
|
|
|
- public static void setupBookkeeper() throws Exception {
|
|
|
- bkutil = new BKJMUtil(numBookies);
|
|
|
- bkutil.start();
|
|
|
+
|
|
|
+ static int nextPort = 6000; // next port for additionally created bookies
|
|
|
+
|
|
|
+ private static ZooKeeper connectZooKeeper(String ensemble)
|
|
|
+ throws IOException, KeeperException, InterruptedException {
|
|
|
+ final CountDownLatch latch = new CountDownLatch(1);
|
|
|
+
|
|
|
+ ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ if (!latch.await(3, TimeUnit.SECONDS)) {
|
|
|
+ throw new IOException("Zookeeper took too long to connect");
|
|
|
+ }
|
|
|
+ return zkc;
|
|
|
}
|
|
|
|
|
|
- @AfterClass
|
|
|
- public static void teardownBookkeeper() throws Exception {
|
|
|
- bkutil.teardown();
|
|
|
+ private static BookieServer newBookie() throws Exception {
|
|
|
+ int port = nextPort++;
|
|
|
+ ServerConfiguration bookieConf = new ServerConfiguration();
|
|
|
+ bookieConf.setBookiePort(port);
|
|
|
+ File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
|
|
|
+ "test");
|
|
|
+ tmpdir.delete();
|
|
|
+ tmpdir.mkdir();
|
|
|
+
|
|
|
+ bookieConf.setZkServers(zkEnsemble);
|
|
|
+ bookieConf.setJournalDirName(tmpdir.getPath());
|
|
|
+ bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
|
|
|
+
|
|
|
+ BookieServer b = new BookieServer(bookieConf);
|
|
|
+ b.start();
|
|
|
+ for (int i = 0; i < 10 && !b.isRunning(); i++) {
|
|
|
+ Thread.sleep(10000);
|
|
|
+ }
|
|
|
+ if (!b.isRunning()) {
|
|
|
+ throw new IOException("Bookie would not start");
|
|
|
+ }
|
|
|
+ return b;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check that a number of bookies are available
|
|
|
+ * @param count number of bookies required
|
|
|
+ * @param timeout number of seconds to wait for bookies to start
|
|
|
+ * @throws IOException if bookies are not started by the time the timeout hits
|
|
|
+ */
|
|
|
+ private static int checkBookiesUp(int count, int timeout) throws Exception {
|
|
|
+ ZooKeeper zkc = connectZooKeeper(zkEnsemble);
|
|
|
+ try {
|
|
|
+ boolean up = false;
|
|
|
+ int mostRecentSize = 0;
|
|
|
+ for (int i = 0; i < timeout; i++) {
|
|
|
+ try {
|
|
|
+ List<String> children = zkc.getChildren("/ledgers/available",
|
|
|
+ false);
|
|
|
+ mostRecentSize = children.size();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Found " + mostRecentSize + " bookies up, "
|
|
|
+ + "waiting for " + count);
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ for (String child : children) {
|
|
|
+ LOG.trace(" server: " + child);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (mostRecentSize == count) {
|
|
|
+ up = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ return mostRecentSize;
|
|
|
+ } finally {
|
|
|
+ zkc.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void setupBookkeeper() throws Exception {
|
|
|
+ bkthread = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ String[] args = new String[1];
|
|
|
+ args[0] = String.valueOf(numBookies);
|
|
|
+ LOG.info("Starting bk");
|
|
|
+ LocalBookKeeper.main(args);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // go away quietly
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error starting local bk", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ bkthread.start();
|
|
|
+
|
|
|
+ if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
|
|
|
+ throw new Exception("Error starting zookeeper/bookkeeper");
|
|
|
+ }
|
|
|
+ assertEquals("Not all bookies started",
|
|
|
+ numBookies, checkBookiesUp(numBookies, 10));
|
|
|
+ }
|
|
|
+
|
|
|
@Before
|
|
|
public void setup() throws Exception {
|
|
|
- zkc = BKJMUtil.connectZooKeeper();
|
|
|
+ zkc = connectZooKeeper(zkEnsemble);
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -70,10 +201,18 @@ public class TestBookKeeperJournalManager {
|
|
|
zkc.close();
|
|
|
}
|
|
|
|
|
|
+ @AfterClass
|
|
|
+ public static void teardownBookkeeper() throws Exception {
|
|
|
+ if (bkthread != null) {
|
|
|
+ bkthread.interrupt();
|
|
|
+ bkthread.join();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testSimpleWrite() throws Exception {
|
|
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
|
|
|
long txid = 1;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
|
for (long i = 1 ; i <= 100; i++) {
|
|
@@ -92,8 +231,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testNumberOfTransactions() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
|
|
|
long txid = 1;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
|
for (long i = 1 ; i <= 100; i++) {
|
|
@@ -110,8 +249,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testNumberOfTransactionsWithGaps() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
|
|
|
long txid = 1;
|
|
|
for (long i = 0; i < 3; i++) {
|
|
|
long start = txid;
|
|
@@ -123,11 +262,9 @@ public class TestBookKeeperJournalManager {
|
|
|
}
|
|
|
out.close();
|
|
|
bkjm.finalizeLogSegment(start, txid-1);
|
|
|
- assertNotNull(
|
|
|
- zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
|
|
|
+ assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
|
|
|
}
|
|
|
- zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
|
|
|
- DEFAULT_SEGMENT_SIZE*2), -1);
|
|
|
+ zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
|
|
|
|
|
|
long numTrans = bkjm.getNumberOfTransactions(1, true);
|
|
|
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
|
|
@@ -145,8 +282,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
|
|
|
long txid = 1;
|
|
|
for (long i = 0; i < 3; i++) {
|
|
|
long start = txid;
|
|
@@ -159,8 +296,7 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
out.close();
|
|
|
bkjm.finalizeLogSegment(start, (txid-1));
|
|
|
- assertNotNull(
|
|
|
- zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
|
|
|
+ assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
|
|
|
}
|
|
|
long start = txid;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(start);
|
|
@@ -184,8 +320,8 @@ public class TestBookKeeperJournalManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testWriteRestartFrom1() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
|
|
|
long txid = 1;
|
|
|
long start = txid;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
|
@@ -239,10 +375,10 @@ public class TestBookKeeperJournalManager {
|
|
|
@Test
|
|
|
public void testTwoWriters() throws Exception {
|
|
|
long start = 1;
|
|
|
- BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
|
|
|
- BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
|
|
|
+ BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
|
|
|
+ BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
|
|
|
|
|
|
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
|
|
|
try {
|
|
@@ -255,8 +391,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testSimpleRead() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
|
|
|
long txid = 1;
|
|
|
final long numTransactions = 10000;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
@@ -280,8 +416,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testSimpleRecovery() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
|
long txid = 1;
|
|
|
for (long i = 1 ; i <= 100; i++) {
|
|
@@ -312,13 +448,13 @@ public class TestBookKeeperJournalManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testAllBookieFailure() throws Exception {
|
|
|
- BookieServer bookieToFail = bkutil.newBookie();
|
|
|
+ BookieServer bookieToFail = newBookie();
|
|
|
BookieServer replacementBookie = null;
|
|
|
|
|
|
try {
|
|
|
int ensembleSize = numBookies + 1;
|
|
|
assertEquals("New bookie didn't start",
|
|
|
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
|
|
+ ensembleSize, checkBookiesUp(ensembleSize, 10));
|
|
|
|
|
|
// ensure that the journal manager has to use all bookies,
|
|
|
// so that a failure will fail the journal manager
|
|
@@ -329,7 +465,8 @@ public class TestBookKeeperJournalManager {
|
|
|
ensembleSize);
|
|
|
long txid = 1;
|
|
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble
|
|
|
+ + "/hdfsjournal-allbookiefailure"));
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
|
|
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
@@ -341,7 +478,7 @@ public class TestBookKeeperJournalManager {
|
|
|
out.flush();
|
|
|
bookieToFail.shutdown();
|
|
|
assertEquals("New bookie didn't die",
|
|
|
- numBookies, bkutil.checkBookiesUp(numBookies, 10));
|
|
|
+ numBookies, checkBookiesUp(numBookies, 10));
|
|
|
|
|
|
try {
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
@@ -357,10 +494,10 @@ public class TestBookKeeperJournalManager {
|
|
|
assertTrue("Invalid exception message",
|
|
|
ioe.getMessage().contains("Failed to write to bookkeeper"));
|
|
|
}
|
|
|
- replacementBookie = bkutil.newBookie();
|
|
|
+ replacementBookie = newBookie();
|
|
|
|
|
|
assertEquals("New bookie didn't start",
|
|
|
- numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
|
|
|
+ numBookies+1, checkBookiesUp(numBookies+1, 10));
|
|
|
out = bkjm.startLogSegment(txid);
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
|
@@ -380,7 +517,7 @@ public class TestBookKeeperJournalManager {
|
|
|
}
|
|
|
bookieToFail.shutdown();
|
|
|
|
|
|
- if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
+ if (checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
LOG.warn("Not all bookies from this test shut down, expect errors");
|
|
|
}
|
|
|
}
|
|
@@ -393,13 +530,13 @@ public class TestBookKeeperJournalManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testOneBookieFailure() throws Exception {
|
|
|
- BookieServer bookieToFail = bkutil.newBookie();
|
|
|
+ BookieServer bookieToFail = newBookie();
|
|
|
BookieServer replacementBookie = null;
|
|
|
|
|
|
try {
|
|
|
int ensembleSize = numBookies + 1;
|
|
|
assertEquals("New bookie didn't start",
|
|
|
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
|
|
+ ensembleSize, checkBookiesUp(ensembleSize, 10));
|
|
|
|
|
|
// ensure that the journal manager has to use all bookies,
|
|
|
// so that a failure will fail the journal manager
|
|
@@ -410,7 +547,8 @@ public class TestBookKeeperJournalManager {
|
|
|
ensembleSize);
|
|
|
long txid = 1;
|
|
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
|
|
|
+ URI.create("bookkeeper://" + zkEnsemble
|
|
|
+ + "/hdfsjournal-onebookiefailure"));
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
|
@@ -420,12 +558,12 @@ public class TestBookKeeperJournalManager {
|
|
|
out.setReadyToFlush();
|
|
|
out.flush();
|
|
|
|
|
|
- replacementBookie = bkutil.newBookie();
|
|
|
+ replacementBookie = newBookie();
|
|
|
assertEquals("replacement bookie didn't start",
|
|
|
- ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
|
|
|
+ ensembleSize+1, checkBookiesUp(ensembleSize+1, 10));
|
|
|
bookieToFail.shutdown();
|
|
|
assertEquals("New bookie didn't die",
|
|
|
- ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
|
|
+ ensembleSize, checkBookiesUp(ensembleSize, 10));
|
|
|
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
|
@@ -443,10 +581,10 @@ public class TestBookKeeperJournalManager {
|
|
|
}
|
|
|
bookieToFail.shutdown();
|
|
|
|
|
|
- if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
+ if (checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
LOG.warn("Not all bookies from this test shut down, expect errors");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
+}
|