|
@@ -18,55 +18,23 @@
|
|
|
package org.apache.hadoop.contrib.bkjournal;
|
|
|
|
|
|
import static org.junit.Assert.*;
|
|
|
-
|
|
|
-import java.net.URI;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-import org.apache.bookkeeper.proto.BookieServer;
|
|
|
-import org.apache.bookkeeper.conf.ServerConfiguration;
|
|
|
-import org.apache.bookkeeper.util.LocalBookKeeper;
|
|
|
-
|
|
|
-import java.io.RandomAccessFile;
|
|
|
-import java.io.File;
|
|
|
-import java.io.FilenameFilter;
|
|
|
-import java.io.BufferedInputStream;
|
|
|
-import java.io.DataInputStream;
|
|
|
-import java.io.IOException;
|
|
|
-import org.apache.hadoop.io.Text;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.After;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.AfterClass;
|
|
|
|
|
|
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
-import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
|
|
|
+import java.io.IOException;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
|
|
|
|
|
+import org.apache.bookkeeper.proto.BookieServer;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
-import org.apache.zookeeper.Watcher;
|
|
|
-import org.apache.zookeeper.WatchedEvent;
|
|
|
-import org.apache.zookeeper.KeeperException;
|
|
|
-
|
|
|
-import com.google.common.collect.ImmutableList;
|
|
|
-
|
|
|
-import java.util.zip.CheckedInputStream;
|
|
|
-import java.util.zip.Checksum;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -75,125 +43,26 @@ public class TestBookKeeperJournalManager {
|
|
|
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
|
|
|
|
|
|
private static final long DEFAULT_SEGMENT_SIZE = 1000;
|
|
|
- private static final String zkEnsemble = "localhost:2181";
|
|
|
- final static private int numBookies = 5;
|
|
|
|
|
|
- private static Thread bkthread;
|
|
|
protected static Configuration conf = new Configuration();
|
|
|
private ZooKeeper zkc;
|
|
|
+ private static BKJMUtil bkutil;
|
|
|
+ static int numBookies = 3;
|
|
|
|
|
|
-
|
|
|
- static int nextPort = 6000; // next port for additionally created bookies
|
|
|
-
|
|
|
- private static ZooKeeper connectZooKeeper(String ensemble)
|
|
|
- throws IOException, KeeperException, InterruptedException {
|
|
|
- final CountDownLatch latch = new CountDownLatch(1);
|
|
|
-
|
|
|
- ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- if (!latch.await(3, TimeUnit.SECONDS)) {
|
|
|
- throw new IOException("Zookeeper took too long to connect");
|
|
|
- }
|
|
|
- return zkc;
|
|
|
- }
|
|
|
-
|
|
|
- private static BookieServer newBookie() throws Exception {
|
|
|
- int port = nextPort++;
|
|
|
- ServerConfiguration bookieConf = new ServerConfiguration();
|
|
|
- bookieConf.setBookiePort(port);
|
|
|
- File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
|
|
|
- "test");
|
|
|
- tmpdir.delete();
|
|
|
- tmpdir.mkdir();
|
|
|
-
|
|
|
- bookieConf.setZkServers(zkEnsemble);
|
|
|
- bookieConf.setJournalDirName(tmpdir.getPath());
|
|
|
- bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
|
|
|
-
|
|
|
- BookieServer b = new BookieServer(bookieConf);
|
|
|
- b.start();
|
|
|
- for (int i = 0; i < 10 && !b.isRunning(); i++) {
|
|
|
- Thread.sleep(10000);
|
|
|
- }
|
|
|
- if (!b.isRunning()) {
|
|
|
- throw new IOException("Bookie would not start");
|
|
|
- }
|
|
|
- return b;
|
|
|
+ @BeforeClass
|
|
|
+ public static void setupBookkeeper() throws Exception {
|
|
|
+ bkutil = new BKJMUtil(numBookies);
|
|
|
+ bkutil.start();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Check that a number of bookies are available
|
|
|
- * @param count number of bookies required
|
|
|
- * @param timeout number of seconds to wait for bookies to start
|
|
|
- * @throws IOException if bookies are not started by the time the timeout hits
|
|
|
- */
|
|
|
- private static int checkBookiesUp(int count, int timeout) throws Exception {
|
|
|
- ZooKeeper zkc = connectZooKeeper(zkEnsemble);
|
|
|
- try {
|
|
|
- boolean up = false;
|
|
|
- int mostRecentSize = 0;
|
|
|
- for (int i = 0; i < timeout; i++) {
|
|
|
- try {
|
|
|
- List<String> children = zkc.getChildren("/ledgers/available",
|
|
|
- false);
|
|
|
- mostRecentSize = children.size();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Found " + mostRecentSize + " bookies up, "
|
|
|
- + "waiting for " + count);
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- for (String child : children) {
|
|
|
- LOG.trace(" server: " + child);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (mostRecentSize == count) {
|
|
|
- up = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- } catch (KeeperException e) {
|
|
|
- // ignore
|
|
|
- }
|
|
|
- Thread.sleep(1000);
|
|
|
- }
|
|
|
- return mostRecentSize;
|
|
|
- } finally {
|
|
|
- zkc.close();
|
|
|
- }
|
|
|
+ @AfterClass
|
|
|
+ public static void teardownBookkeeper() throws Exception {
|
|
|
+ bkutil.teardown();
|
|
|
}
|
|
|
|
|
|
- @BeforeClass
|
|
|
- public static void setupBookkeeper() throws Exception {
|
|
|
- bkthread = new Thread() {
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- String[] args = new String[1];
|
|
|
- args[0] = String.valueOf(numBookies);
|
|
|
- LOG.info("Starting bk");
|
|
|
- LocalBookKeeper.main(args);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // go away quietly
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Error starting local bk", e);
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
- bkthread.start();
|
|
|
-
|
|
|
- if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
|
|
|
- throw new Exception("Error starting zookeeper/bookkeeper");
|
|
|
- }
|
|
|
- assertEquals("Not all bookies started",
|
|
|
- numBookies, checkBookiesUp(numBookies, 10));
|
|
|
- }
|
|
|
-
|
|
|
@Before
|
|
|
public void setup() throws Exception {
|
|
|
- zkc = connectZooKeeper(zkEnsemble);
|
|
|
+ zkc = BKJMUtil.connectZooKeeper();
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -201,18 +70,10 @@ public class TestBookKeeperJournalManager {
|
|
|
zkc.close();
|
|
|
}
|
|
|
|
|
|
- @AfterClass
|
|
|
- public static void teardownBookkeeper() throws Exception {
|
|
|
- if (bkthread != null) {
|
|
|
- bkthread.interrupt();
|
|
|
- bkthread.join();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Test
|
|
|
public void testSimpleWrite() throws Exception {
|
|
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
|
|
|
long txid = 1;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
|
for (long i = 1 ; i <= 100; i++) {
|
|
@@ -231,8 +92,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testNumberOfTransactions() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
|
|
|
long txid = 1;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
|
for (long i = 1 ; i <= 100; i++) {
|
|
@@ -249,8 +110,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testNumberOfTransactionsWithGaps() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
|
|
|
long txid = 1;
|
|
|
for (long i = 0; i < 3; i++) {
|
|
|
long start = txid;
|
|
@@ -262,9 +123,11 @@ public class TestBookKeeperJournalManager {
|
|
|
}
|
|
|
out.close();
|
|
|
bkjm.finalizeLogSegment(start, txid-1);
|
|
|
- assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
|
|
|
+ assertNotNull(
|
|
|
+ zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
|
|
|
}
|
|
|
- zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
|
|
|
+ zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
|
|
|
+ DEFAULT_SEGMENT_SIZE*2), -1);
|
|
|
|
|
|
long numTrans = bkjm.getNumberOfTransactions(1, true);
|
|
|
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
|
|
@@ -282,8 +145,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
|
|
|
long txid = 1;
|
|
|
for (long i = 0; i < 3; i++) {
|
|
|
long start = txid;
|
|
@@ -296,7 +159,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
out.close();
|
|
|
bkjm.finalizeLogSegment(start, (txid-1));
|
|
|
- assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
|
|
|
+ assertNotNull(
|
|
|
+ zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
|
|
|
}
|
|
|
long start = txid;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(start);
|
|
@@ -320,8 +184,8 @@ public class TestBookKeeperJournalManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testWriteRestartFrom1() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
|
|
|
long txid = 1;
|
|
|
long start = txid;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
|
@@ -375,10 +239,10 @@ public class TestBookKeeperJournalManager {
|
|
|
@Test
|
|
|
public void testTwoWriters() throws Exception {
|
|
|
long start = 1;
|
|
|
- BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
|
|
|
- BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
|
|
|
+ BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
|
|
|
+ BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
|
|
|
|
|
|
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
|
|
|
try {
|
|
@@ -391,8 +255,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testSimpleRead() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
|
|
|
long txid = 1;
|
|
|
final long numTransactions = 10000;
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
@@ -416,8 +280,8 @@ public class TestBookKeeperJournalManager {
|
|
|
|
|
|
@Test
|
|
|
public void testSimpleRecovery() throws Exception {
|
|
|
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
|
|
|
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
|
|
long txid = 1;
|
|
|
for (long i = 1 ; i <= 100; i++) {
|
|
@@ -448,13 +312,13 @@ public class TestBookKeeperJournalManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testAllBookieFailure() throws Exception {
|
|
|
- BookieServer bookieToFail = newBookie();
|
|
|
+ BookieServer bookieToFail = bkutil.newBookie();
|
|
|
BookieServer replacementBookie = null;
|
|
|
|
|
|
try {
|
|
|
int ensembleSize = numBookies + 1;
|
|
|
assertEquals("New bookie didn't start",
|
|
|
- ensembleSize, checkBookiesUp(ensembleSize, 10));
|
|
|
+ ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
|
|
|
|
|
// ensure that the journal manager has to use all bookies,
|
|
|
// so that a failure will fail the journal manager
|
|
@@ -465,8 +329,7 @@ public class TestBookKeeperJournalManager {
|
|
|
ensembleSize);
|
|
|
long txid = 1;
|
|
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble
|
|
|
- + "/hdfsjournal-allbookiefailure"));
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
|
|
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
@@ -478,7 +341,7 @@ public class TestBookKeeperJournalManager {
|
|
|
out.flush();
|
|
|
bookieToFail.shutdown();
|
|
|
assertEquals("New bookie didn't die",
|
|
|
- numBookies, checkBookiesUp(numBookies, 10));
|
|
|
+ numBookies, bkutil.checkBookiesUp(numBookies, 10));
|
|
|
|
|
|
try {
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
@@ -494,10 +357,10 @@ public class TestBookKeeperJournalManager {
|
|
|
assertTrue("Invalid exception message",
|
|
|
ioe.getMessage().contains("Failed to write to bookkeeper"));
|
|
|
}
|
|
|
- replacementBookie = newBookie();
|
|
|
+ replacementBookie = bkutil.newBookie();
|
|
|
|
|
|
assertEquals("New bookie didn't start",
|
|
|
- numBookies+1, checkBookiesUp(numBookies+1, 10));
|
|
|
+ numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
|
|
|
out = bkjm.startLogSegment(txid);
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
|
@@ -517,7 +380,7 @@ public class TestBookKeeperJournalManager {
|
|
|
}
|
|
|
bookieToFail.shutdown();
|
|
|
|
|
|
- if (checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
+ if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
LOG.warn("Not all bookies from this test shut down, expect errors");
|
|
|
}
|
|
|
}
|
|
@@ -530,13 +393,13 @@ public class TestBookKeeperJournalManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testOneBookieFailure() throws Exception {
|
|
|
- BookieServer bookieToFail = newBookie();
|
|
|
+ BookieServer bookieToFail = bkutil.newBookie();
|
|
|
BookieServer replacementBookie = null;
|
|
|
|
|
|
try {
|
|
|
int ensembleSize = numBookies + 1;
|
|
|
assertEquals("New bookie didn't start",
|
|
|
- ensembleSize, checkBookiesUp(ensembleSize, 10));
|
|
|
+ ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
|
|
|
|
|
// ensure that the journal manager has to use all bookies,
|
|
|
// so that a failure will fail the journal manager
|
|
@@ -547,8 +410,7 @@ public class TestBookKeeperJournalManager {
|
|
|
ensembleSize);
|
|
|
long txid = 1;
|
|
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
|
|
- URI.create("bookkeeper://" + zkEnsemble
|
|
|
- + "/hdfsjournal-onebookiefailure"));
|
|
|
+ BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
|
|
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
|
@@ -558,12 +420,12 @@ public class TestBookKeeperJournalManager {
|
|
|
out.setReadyToFlush();
|
|
|
out.flush();
|
|
|
|
|
|
- replacementBookie = newBookie();
|
|
|
+ replacementBookie = bkutil.newBookie();
|
|
|
assertEquals("replacement bookie didn't start",
|
|
|
- ensembleSize+1, checkBookiesUp(ensembleSize+1, 10));
|
|
|
+ ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
|
|
|
bookieToFail.shutdown();
|
|
|
assertEquals("New bookie didn't die",
|
|
|
- ensembleSize, checkBookiesUp(ensembleSize, 10));
|
|
|
+ ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
|
|
|
|
|
|
for (long i = 1 ; i <= 3; i++) {
|
|
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
|
@@ -581,10 +443,10 @@ public class TestBookKeeperJournalManager {
|
|
|
}
|
|
|
bookieToFail.shutdown();
|
|
|
|
|
|
- if (checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
+ if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
|
|
|
LOG.warn("Not all bookies from this test shut down, expect errors");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
+}
|