فهرست منبع

ZOOKEEPER-436. Bookies should auto register to ZooKeeper

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@924942 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 15 سال پیش
والد
کامیت
65fc5e579a

+ 2 - 0
CHANGES.txt

@@ -304,6 +304,8 @@ BUGFIXES:
   ZOOKEEPER-708. zkpython failing due to undefined symbol
   deallocate_String_vector (mahadev via phunt)
 
+  ZOOKEEPER-436. Bookies should auto register to ZooKeeper (erwin tam & fpj via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

+ 53 - 2
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java

@@ -27,6 +27,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -39,6 +40,11 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
 
 
 
@@ -55,6 +61,13 @@ public class Bookie extends Thread {
 
     final File ledgerDirectories[];
     
+    // ZK registration path for this bookie
+    static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available/";
+    static final String LEDGERS_PATH = "/ledgers";
+
+    // ZooKeeper client instance for the Bookie
+    ZooKeeper zk;
+
     public static class NoLedgerException extends IOException {
         private static final long serialVersionUID = 1L;
         private long ledgerId;
@@ -119,7 +132,8 @@ public class Bookie extends Thread {
         }
     }
     SyncThread syncThread = new SyncThread();
-    public Bookie(File journalDirectory, File ledgerDirectories[]) throws IOException {
+    public Bookie(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
+        instantiateZookeeperClient(port, zkServers);
         this.journalDirectory = journalDirectory;
         this.ledgerDirectories = ledgerDirectories;
         entryLogger = new EntryLogger(ledgerDirectories);
@@ -145,6 +159,9 @@ public class Bookie extends Thread {
             if (logs.size() == 0 || logs.get(0) != markedLogId) {
                 throw new IOException("Recovery log " + markedLogId + " is missing");
             }
+            // TODO: When reading in the journal logs that need to be synced, we
+            // should use BufferedChannels instead to minimize the amount of
+            // system calls done.
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
             ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
             for(Long id: logs) {
@@ -189,6 +206,36 @@ public class Bookie extends Thread {
         syncThread.start();
     }
 
+    // Method to instantiate the ZooKeeper client for the Bookie.
+    private void instantiateZookeeperClient(int port, String zkServers) throws IOException {
+        if (zkServers == null) {
+            LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
+            zk = null;
+            return;
+        }
+        // Create the ZooKeeper client instance
+        zk = new ZooKeeper(zkServers, 10000, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // TODO: handle session disconnects and expires
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Process: " + event.getType() + " " + event.getPath());
+                }
+            }
+        });
+        // Create the ZK ephemeral node for this Bookie.
+        try {
+            zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (Exception e) {
+            LOG.fatal("ZK exception registering ephemeral Znode for Bookie!", e);
+            // Throw an IOException back up. This will cause the Bookie
+            // constructor to error out. Alternatively, we could do a System
+            // exit here as this is a fatal error.
+            throw new IOException(e);
+        }
+    }
+
     private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
         int total = 0;
         while(bb.remaining() > 0) {
@@ -342,6 +389,9 @@ public class Bookie extends Thread {
             long nextPrealloc = preAllocSize;
             long lastFlushPosition = 0;
             logFile.write(zeros, nextPrealloc);
+            // TODO: Currently, when we roll over the journal logs, the older
+            // ones are never garbage collected. We should remove a journal log
+            // once all of its entries have been synced with the entry logs.
             while (true) {
                 QueueEntry qe = null;
                 if (toFlush.isEmpty()) {
@@ -391,6 +441,7 @@ public class Bookie extends Thread {
     }
 
     public void shutdown() throws InterruptedException {
+    	if(zk != null) zk.close();
         this.interrupt();
         this.join();
         syncThread.running = false;
@@ -462,7 +513,7 @@ public class Bookie extends Thread {
      */
     public static void main(String[] args) throws IOException,
             InterruptedException, BookieException {
-        Bookie b = new Bookie(new File("/tmp"), new File[] { new File("/tmp") });
+        Bookie b = new Bookie(5000, null, new File("/tmp"), new File[] { new File("/tmp") });
         CounterCallback cb = new CounterCallback();
         long start = System.currentTimeMillis();
         for (int i = 0; i < 100000; i++) {

+ 11 - 10
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java

@@ -42,9 +42,9 @@ public class BookieServer implements NIOServerFactory.PacketProcessor, Bookkeepe
     Bookie bookie;
     static Logger LOG = Logger.getLogger(BookieServer.class);
 
-    public BookieServer(int port, File journalDirectory, File ledgerDirectories[]) throws IOException {
+    public BookieServer(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
         this.port = port;
-        this.bookie = new Bookie(journalDirectory, ledgerDirectories);
+        this.bookie = new Bookie(port, zkServers, journalDirectory, ledgerDirectories);
     }
 
     public void start() throws IOException {
@@ -71,26 +71,27 @@ public class BookieServer implements NIOServerFactory.PacketProcessor, Bookkeepe
      * @throws InterruptedException
      */
     public static void main(String[] args) throws IOException, InterruptedException {
-        if (args.length < 3) {
-            System.err.println("USAGE: BookieServer port journalDirectory ledgerDirectory [ledgerDirectory]*");
+        if (args.length < 4) {
+            System.err.println("USAGE: BookieServer port zkServers journalDirectory ledgerDirectory [ledgerDirectory]*");
             return;
         }
         int port = Integer.parseInt(args[0]);
-        File journalDirectory = new File(args[1]);
-        File ledgerDirectory[] = new File[args.length - 2];
+        String zkServers = args[1];
+        File journalDirectory = new File(args[2]);
+        File ledgerDirectory[] = new File[args.length - 3];
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < ledgerDirectory.length; i++) {
-            ledgerDirectory[i] = new File(args[i + 2]);
+            ledgerDirectory[i] = new File(args[i + 3]);
             if (i != 0) {
                 sb.append(',');
             }
             sb.append(ledgerDirectory[i]);
         }
         String hello = String.format(
-                "Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.",
-                port, journalDirectory, sb);
+                "Hello, I'm your bookie, listening on port %1$s. ZKServers are on %2$s. Journals are in %3$s. Ledgers are stored in %4$s.",
+                port, zkServers, journalDirectory, sb);
         LOG.info(hello);
-        BookieServer bs = new BookieServer(port, journalDirectory, ledgerDirectory);
+        BookieServer bs = new BookieServer(port, zkServers, journalDirectory, ledgerDirectory);
         bs.start();
         bs.join();
     }

+ 5 - 6
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java

@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
@@ -105,11 +106,8 @@ public class LocalBookKeeper {
 			zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
 			zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 			zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-			// create an entry for each requested bookie
-			for(int i = 0; i < numberOfBookies; i++){
-				zkc.create("/ledgers/available/127.0.0.1:" + 
-					Integer.toString(initialPort + i), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-			}
+            // No need to create an entry for each requested bookie anymore as the 
+            // BookieServers will register themselves with ZooKeeper on startup.
 		} catch (KeeperException e) {
 			// TODO Auto-generated catch block
 			LOG.fatal("Exception while creating znodes", e);
@@ -133,7 +131,8 @@ public class LocalBookKeeper {
 			tmpDirs[i].delete();
 			tmpDirs[i].mkdir();
 			
-			bs[i] = new BookieServer(initialPort + i, tmpDirs[i], new File[]{tmpDirs[i]});
+			bs[i] = new BookieServer(initialPort + i, InetAddress.getLocalHost().getHostAddress() + ":"
+                    + ZooKeeperDefaultPort, tmpDirs[i], new File[]{tmpDirs[i]});
 			bs[i].start();
 		}		
 	}

+ 2 - 6
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java

@@ -52,7 +52,7 @@ import junit.framework.TestCase;
 public abstract class BaseTestCase extends TestCase {
     static final Logger LOG = Logger.getLogger(BaseTestCase.class);
     // ZooKeeper related variables
-    private static final String HOSTPORT = "127.0.0.1:2181";
+    static final String HOSTPORT = "127.0.0.1:2181";
     static Integer ZooKeeperDefaultPort = 2181;
     ZooKeeperServer zks;
     ZooKeeper zkc; // zookeeper client
@@ -103,10 +103,6 @@ public abstract class BaseTestCase extends TestCase {
         // initialize the zk client with values
         zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        for (int i = 0; i < numBookies; i++) {
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + i), new byte[0],
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        }
 
         // Create Bookie Servers (B1, B2, B3)
         for (int i = 0; i < numBookies; i++) {
@@ -115,7 +111,7 @@ public abstract class BaseTestCase extends TestCase {
             f.delete();
             f.mkdir();
 
-            BookieServer server = new BookieServer(initialPort + i, f, new File[] { f });
+            BookieServer server = new BookieServer(initialPort + i, HOSTPORT, f, new File[] { f });
             server.start();
             bs.add(server);
         }

+ 4 - 1
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java

@@ -55,7 +55,10 @@ public class BookieClientTest extends TestCase {
         tmpDir = File.createTempFile("bookie", "test");
         tmpDir.delete();
         tmpDir.mkdir();
-        bs = new BookieServer(port, tmpDir, new File[] { tmpDir });
+        // Since this test does not rely on the BookKeeper client needing to
+        // know via ZooKeeper which Bookies are available, okay, so pass in null
+        // for the zkServers input parameter when constructing the BookieServer.
+        bs = new BookieServer(port, null, tmpDir, new File[] { tmpDir });
         bs.start();
         channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
                 .newCachedThreadPool());

+ 1 - 1
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java

@@ -138,7 +138,7 @@ public class BookieFailureTest extends BaseTestCase implements AddCallback, Read
         }
         
         bs.get(3).shutdown();
-        BookieServer server = new BookieServer(initialPort + 3, tmpDirs.get(3), new File[] { tmpDirs.get(3)});
+        BookieServer server = new BookieServer(initialPort + 3, HOSTPORT, tmpDirs.get(3), new File[] { tmpDirs.get(3)});
         server.start();
         bs.set(3, server);
 

+ 1 - 1
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java

@@ -65,7 +65,7 @@ public class ConcurrentLedgerTest extends TestCase {
         ledgerDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir");
         ledgerDir.mkdirs();
         
-        bookie = new Bookie(txnDir, new File[] {ledgerDir});
+        bookie = new Bookie(5000, null, txnDir, new File[] {ledgerDir});
     }
     
     static void recursiveDelete(File f) {