|
@@ -0,0 +1,880 @@
|
|
|
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|
|
|
+<html>
|
|
|
+<head>
|
|
|
+<META http-equiv="Content-Type" content="text/html; charset=UTF-8">
|
|
|
+<meta content="Apache Forrest" name="Generator">
|
|
|
+<meta name="Forrest-version" content="0.8">
|
|
|
+<meta name="Forrest-skin-name" content="pelt">
|
|
|
+<title>ZooKeeper Java Example</title>
|
|
|
+<link type="text/css" href="skin/basic.css" rel="stylesheet">
|
|
|
+<link media="screen" type="text/css" href="skin/screen.css" rel="stylesheet">
|
|
|
+<link media="print" type="text/css" href="skin/print.css" rel="stylesheet">
|
|
|
+<link type="text/css" href="skin/profile.css" rel="stylesheet">
|
|
|
+<script src="skin/getBlank.js" language="javascript" type="text/javascript"></script><script src="skin/getMenu.js" language="javascript" type="text/javascript"></script><script src="skin/fontsize.js" language="javascript" type="text/javascript"></script>
|
|
|
+<link rel="shortcut icon" href="images/favicon.ico">
|
|
|
+</head>
|
|
|
+<body onload="init()">
|
|
|
+<script type="text/javascript">ndeSetTextSize();</script>
|
|
|
+<div id="top">
|
|
|
+<!--+
|
|
|
+ |breadtrail
|
|
|
+ +-->
|
|
|
+<div class="breadtrail">
|
|
|
+<a href="http://www.apache.org/">Apache</a> > <a href="http://hadoop.apache.org/">Hadoop</a> > <a href="http://hadoop.apache.org/zookeeper/">ZooKeeper</a><script src="skin/breadcrumbs.js" language="JavaScript" type="text/javascript"></script>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |header
|
|
|
+ +-->
|
|
|
+<div class="header">
|
|
|
+<!--+
|
|
|
+ |start group logo
|
|
|
+ +-->
|
|
|
+<div class="grouplogo">
|
|
|
+<a href="http://hadoop.apache.org/"><img class="logoImage" alt="Hadoop" src="images/hadoop-logo.jpg" title="Apache Hadoop"></a>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |end group logo
|
|
|
+ +-->
|
|
|
+<!--+
|
|
|
+ |start Project Logo
|
|
|
+ +-->
|
|
|
+<div class="projectlogo">
|
|
|
+<a href="http://hadoop.apache.org/zookeeper/"><img class="logoImage" alt="ZooKeeper" src="images/zookeeper_small.gif" title="The Hadoop database"></a>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |end Project Logo
|
|
|
+ +-->
|
|
|
+<!--+
|
|
|
+ |start Search
|
|
|
+ +-->
|
|
|
+<div class="searchbox">
|
|
|
+<form action="http://www.google.com/search" method="get" class="roundtopsmall">
|
|
|
+<input value="hadoop.apache.org" name="sitesearch" type="hidden"><input onFocus="getBlank (this, 'Search the site with google');" size="25" name="q" id="query" type="text" value="Search the site with google">
|
|
|
+ <input name="Search" value="Search" type="submit">
|
|
|
+</form>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |end search
|
|
|
+ +-->
|
|
|
+<!--+
|
|
|
+ |start Tabs
|
|
|
+ +-->
|
|
|
+<ul id="tabs">
|
|
|
+<li>
|
|
|
+<a class="unselected" href="http://hadoop.apache.org/zookeeper/">Project</a>
|
|
|
+</li>
|
|
|
+<li>
|
|
|
+<a class="unselected" href="http://wiki.apache.org/hadoop/ZooKeeper">Wiki</a>
|
|
|
+</li>
|
|
|
+<li class="current">
|
|
|
+<a class="selected" href="index.html">ZooKeeper Documentation</a>
|
|
|
+</li>
|
|
|
+</ul>
|
|
|
+<!--+
|
|
|
+ |end Tabs
|
|
|
+ +-->
|
|
|
+</div>
|
|
|
+</div>
|
|
|
+<div id="main">
|
|
|
+<div id="publishedStrip">
|
|
|
+<!--+
|
|
|
+ |start Subtabs
|
|
|
+ +-->
|
|
|
+<div id="level2tabs"></div>
|
|
|
+<!--+
|
|
|
+ |end Endtabs
|
|
|
+ +-->
|
|
|
+<script type="text/javascript"><!--
|
|
|
+document.write("Last Published: " + document.lastModified);
|
|
|
+// --></script>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |breadtrail
|
|
|
+ +-->
|
|
|
+<div class="breadtrail">
|
|
|
+
|
|
|
+
|
|
|
+ </div>
|
|
|
+<!--+
|
|
|
+ |start Menu, mainarea
|
|
|
+ +-->
|
|
|
+<!--+
|
|
|
+ |start Menu
|
|
|
+ +-->
|
|
|
+<div id="menu">
|
|
|
+<div onclick="SwitchMenu('menu_selected_1.1', 'skin/')" id="menu_selected_1.1Title" class="menutitle" style="background-image: url('skin/images/chapter_open.gif');">Documentation</div>
|
|
|
+<div id="menu_selected_1.1" class="selectedmenuitemgroup" style="display: block;">
|
|
|
+<div class="menuitem">
|
|
|
+<a href="index.html">Welcome</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperOver.html">Zookeeper Overview</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperStarted.html">Getting Started</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperProgrammers.html">Programmer's Guide</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="recipes.html">Recipes</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperAdmin.html">Administrator's Guide</a>
|
|
|
+</div>
|
|
|
+<div class="menupage">
|
|
|
+<div class="menupagetitle">Java Example</div>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperTutorial.html">Barrie and Queue Tutorial</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperInternals.html">ZooKeeper Internals</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="api/index.html">API Docs</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="http://wiki.apache.org/hadoop/ZooKeeper">Wiki</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="http://wiki.apache.org/hadoop/ZooKeeper/FAQ">FAQ</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="http://hadoop.apache.org/zookeeper/mailing_lists.html">Mailing Lists</a>
|
|
|
+</div>
|
|
|
+<div class="menuitem">
|
|
|
+<a href="zookeeperOtherInfo.html">Other Info</a>
|
|
|
+</div>
|
|
|
+</div>
|
|
|
+<div id="credit"></div>
|
|
|
+<div id="roundbottom">
|
|
|
+<img style="display: none" class="corner" height="15" width="15" alt="" src="skin/images/rc-b-l-15-1body-2menu-3menu.png"></div>
|
|
|
+<!--+
|
|
|
+ |alternative credits
|
|
|
+ +-->
|
|
|
+<div id="credit2"></div>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |end Menu
|
|
|
+ +-->
|
|
|
+<!--+
|
|
|
+ |start content
|
|
|
+ +-->
|
|
|
+<div id="content">
|
|
|
+<div title="Portable Document Format" class="pdflink">
|
|
|
+<a class="dida" href="javaExample.pdf"><img alt="PDF -icon" src="skin/images/pdfdoc.gif" class="skin"><br>
|
|
|
+ PDF</a>
|
|
|
+</div>
|
|
|
+<h1>ZooKeeper Java Example</h1>
|
|
|
+<div id="minitoc-area">
|
|
|
+<ul class="minitoc">
|
|
|
+<li>
|
|
|
+<a href="#ch_Introduction">A Simple Watch Client</a>
|
|
|
+<ul class="minitoc">
|
|
|
+<li>
|
|
|
+<a href="#sc_requirements">Requirements</a>
|
|
|
+</li>
|
|
|
+<li>
|
|
|
+<a href="#sc_design">Program Design</a>
|
|
|
+</li>
|
|
|
+</ul>
|
|
|
+</li>
|
|
|
+<li>
|
|
|
+<a href="#sc_executor">The Executor Class</a>
|
|
|
+</li>
|
|
|
+<li>
|
|
|
+<a href="#sc_DataMonitor">The DataMonitor Class</a>
|
|
|
+</li>
|
|
|
+<li>
|
|
|
+<a href="#sc_completeSourceCode">Complete Source Listings</a>
|
|
|
+</li>
|
|
|
+</ul>
|
|
|
+</div>
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+<a name="N10009"></a><a name="ch_Introduction"></a>
|
|
|
+<h2 class="h3">A Simple Watch Client</h2>
|
|
|
+<div class="section">
|
|
|
+<p>To introduce you to the ZooKeeper Java API, we develop here a very simple
|
|
|
+ watch client. This ZooKeeper client watches a ZooKeeper node for changes
|
|
|
+ and responds to by starting or stopping a program.</p>
|
|
|
+<a name="N10012"></a><a name="sc_requirements"></a>
|
|
|
+<h3 class="h4">Requirements</h3>
|
|
|
+<p>The client has four requirements:</p>
|
|
|
+<ul>
|
|
|
+<li>
|
|
|
+<p>It takes as parameters:</p>
|
|
|
+
|
|
|
+<ul>
|
|
|
+
|
|
|
+<li>
|
|
|
+<p>the address of the ZooKeeper service</p>
|
|
|
+</li>
|
|
|
+
|
|
|
+<li>
|
|
|
+<p> a znode, by name</p>
|
|
|
+</li>
|
|
|
+
|
|
|
+<li>
|
|
|
+<p> an executable with arguments.</p>
|
|
|
+</li>
|
|
|
+</ul>
|
|
|
+</li>
|
|
|
+
|
|
|
+<li>
|
|
|
+<p>It fetches the data associated with the znode and starts the executable.</p>
|
|
|
+</li>
|
|
|
+
|
|
|
+<li>
|
|
|
+<p>If the znode changes, the client refetches the contents and restarts the executable.</p>
|
|
|
+</li>
|
|
|
+
|
|
|
+<li>
|
|
|
+<p>If the znode disappears, the client kills the executable.</p>
|
|
|
+</li>
|
|
|
+</ul>
|
|
|
+<a name="N1003B"></a><a name="sc_design"></a>
|
|
|
+<h3 class="h4">Program Design</h3>
|
|
|
+<p>Conventionally, ZooKeeper applications are broken into two units, one which maintains the connection,
|
|
|
+ and the other which monitors data. In this application, the class called the <strong>Executor</strong>
|
|
|
+ maintains the ZooKeeper connection, and the class called the <strong>DataMonitor</strong> monitors the data
|
|
|
+ in the ZooKeeper tree. Also, Executor contains the main thread and contains the execution logic.
|
|
|
+ It is responsible for what little user interaction there is, as well as interaction with the exectuable program you
|
|
|
+ pass in as an argument and which the sample (per the requirements) shuts down and restarts, according to the
|
|
|
+ state of the znode.</p>
|
|
|
+</div>
|
|
|
+
|
|
|
+
|
|
|
+<a name="N1004C"></a><a name="sc_executor"></a>
|
|
|
+<h2 class="h3">The Executor Class</h2>
|
|
|
+<div class="section">
|
|
|
+<p>The Executor object is the primary container of the sample application. It contains
|
|
|
+ both the <strong>ZooKeeper</strong> object, <strong>DataMonitor</strong>, as described above in
|
|
|
+ <a href="#sc_design">Program Design</a>. </p>
|
|
|
+<pre class="code">
|
|
|
+// from the Executor class...
|
|
|
+
|
|
|
+public static void main(String[] args) {
|
|
|
+ if (args.length < 4) {
|
|
|
+ System.err
|
|
|
+ .println("USAGE: Executor hostPort znode filename program [args ...]");
|
|
|
+ System.exit(2);
|
|
|
+ }
|
|
|
+ String hostPort = args[0];
|
|
|
+ String znode = args[1];
|
|
|
+ String filename = args[2];
|
|
|
+ String exec[] = new String[args.length - 3];
|
|
|
+ System.arraycopy(args, 3, exec, 0, exec.length);
|
|
|
+ try {
|
|
|
+ Executor theExectutor = new Executor(hostPort, znode, filename, exec);
|
|
|
+ theExectutor.run();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+public Executor(String hostPort, String znode, String filename,
|
|
|
+ String exec[]) throws KeeperException, IOException {
|
|
|
+ this.filename = filename;
|
|
|
+ this.exec = exec;
|
|
|
+
|
|
|
+ //create a new zookeeper object, passing a self-reference in a the Watcher
|
|
|
+ zk = new ZooKeeper(hostPort, 3000, this);
|
|
|
+ dm = new DataMonitor(zk, znode, null, this);
|
|
|
+}
|
|
|
+
|
|
|
+public void run() {
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ while (!dm.dead) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+}
|
|
|
+</pre>
|
|
|
+<p>
|
|
|
+ Recall that the Executor's job is to starts and stop the executable whose name you pass in on the command line.
|
|
|
+ It does this in response to events fired by the ZooKeeper object. As you can see in the code above, the Executor passes
|
|
|
+ a reference to itself as the Watcher argument in the ZooKeeper constructor. It also passes a reference to itself
|
|
|
+ as DataMonitorListener argument to the DataMonitor constructor. Per the Executor's definition, it implements both these
|
|
|
+ interfaces:
|
|
|
+ </p>
|
|
|
+<pre class="code">
|
|
|
+public class Executor implements Watcher, Runnable, DataMonitorListener {
|
|
|
+...
|
|
|
+ </pre>
|
|
|
+<p>The <strong>Watcher</strong> interface is defined by the ZooKeeper Java API.
|
|
|
+ ZooKeeper uses it to communicate back to its container. It supports only one method, <span class="codefrag command">process()</span>, and ZooKeeper uses
|
|
|
+ it to communciates generic events that the main thread would be intersted in, such as the state of the ZooKeeper connection or the ZooKeeper session.The Executor
|
|
|
+ in this example simply forwards those events down to the DataMonitor to decide what to do with them. It does this simply to illustrate
|
|
|
+ the point that, by convention, the Executor or some Executor-like object "owns" the ZooKeeper connection, but it is free to delegate the events to other
|
|
|
+ events to other objects. It also uses this as the default channel on which to fire watch events. (More on this later.)</p>
|
|
|
+<pre class="code">
|
|
|
+public void process(WatcherEvent event) {
|
|
|
+ dm.process(event);
|
|
|
+}
|
|
|
+</pre>
|
|
|
+<p>The <strong>DataMonitorListener</strong>
|
|
|
+ interface, on the other hand, is not part of the the ZooKeeper API. It is a completely custom interface,
|
|
|
+ designed for this sample application. The DataMonitor object uses it to communicate back to its container, which
|
|
|
+ is also the the Executor object.The DataMonitorListener interface looks like this:</p>
|
|
|
+<pre class="code">
|
|
|
+public interface DataMonitorListener {
|
|
|
+ /**
|
|
|
+ * The existence status of the node has changed.
|
|
|
+ */
|
|
|
+ void exists(byte data[]);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The ZooKeeper session is no longer valid.
|
|
|
+ *
|
|
|
+ * @param rc
|
|
|
+ * the ZooKeeper reason code
|
|
|
+ */
|
|
|
+ void closing(int rc);
|
|
|
+}
|
|
|
+</pre>
|
|
|
+<p>This interface is defined in the DataMonitor class and implemented in the Executor class.
|
|
|
+ When <span class="codefrag command">Executor.exists()</span> is invoked,
|
|
|
+ the Executor decides whether to start up or shut down per the requirements. Recall that the requires say to kill the executable when the
|
|
|
+ znode ceases to <em>exist</em>. </p>
|
|
|
+<p>When <span class="codefrag command">Executor.closing()</span>
|
|
|
+ is invoked, the Executor decides whether or not to shut itself down in response to the ZooKeeper connection permanently disappearing.</p>
|
|
|
+<p>As you might have guessed, DataMonitor is the object that invokes
|
|
|
+ these methods, in response to changes in ZooKeeper's state.</p>
|
|
|
+<p>Here are Executor's implementation of
|
|
|
+ <span class="codefrag command">DataMonitorListener.exists()</span> and <span class="codefrag command">DataMonitorListener.closing</span>:
|
|
|
+ </p>
|
|
|
+<pre class="code">
|
|
|
+public void exists( byte[] data ) {
|
|
|
+ if (data == null) {
|
|
|
+ if (child != null) {
|
|
|
+ System.out.println("Killing process");
|
|
|
+ child.destroy();
|
|
|
+ try {
|
|
|
+ child.waitFor();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ child = null;
|
|
|
+ } else {
|
|
|
+ if (child != null) {
|
|
|
+ System.out.println("Stopping child");
|
|
|
+ child.destroy();
|
|
|
+ try {
|
|
|
+ child.waitFor();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ FileOutputStream fos = new FileOutputStream(filename);
|
|
|
+ fos.write(data);
|
|
|
+ fos.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ System.out.println("Starting child");
|
|
|
+ child = Runtime.getRuntime().exec(exec);
|
|
|
+ new StreamWriter(child.getInputStream(), System.out);
|
|
|
+ new StreamWriter(child.getErrorStream(), System.err);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+public void closing(int rc) {
|
|
|
+ synchronized (this) {
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+}
|
|
|
+</pre>
|
|
|
+</div>
|
|
|
+
|
|
|
+<a name="N100A0"></a><a name="sc_DataMonitor"></a>
|
|
|
+<h2 class="h3">The DataMonitor Class</h2>
|
|
|
+<div class="section">
|
|
|
+<p>
|
|
|
+The DataMonitor class has the meat of the ZooKeeper logic. It is mostly
|
|
|
+asynchronous and event driven. DataMonitor kicks things off in the constructor with:</p>
|
|
|
+<pre class="code">
|
|
|
+public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
|
|
|
+ DataMonitorListener listener) {
|
|
|
+ this.zk = zk;
|
|
|
+ this.znode = znode;
|
|
|
+ this.chainedWatcher = chainedWatcher;
|
|
|
+ this.listener = listener;
|
|
|
+
|
|
|
+ // Get things started by checking if the node exists. We are going
|
|
|
+ // to be completely event driven
|
|
|
+ <strong>zk.exists(znode, true, this, null);</strong>
|
|
|
+}
|
|
|
+</pre>
|
|
|
+<p>The call to <span class="codefrag command">ZooKeeper.exists()</span> checks for the existence of the znode,
|
|
|
+sets a watch, and passes a reference to itself (<span class="codefrag command">this</span>)
|
|
|
+as the completion callback object. In this sense, it kicks things off, since the
|
|
|
+real processing happens when the watch is triggered.</p>
|
|
|
+<div class="note">
|
|
|
+<div class="label">Note</div>
|
|
|
+<div class="content">
|
|
|
+
|
|
|
+<p>Don't confuse the completion callback with the watch callback. The <span class="codefrag command">ZooKeeper.exists()</span>
|
|
|
+completion callback, which happens to be the method <span class="codefrag command">StatCallback.processResult()</span> implemented
|
|
|
+in the DataMonitor object, is invoked when the asynchronous <em>setting of the watch</em> operation
|
|
|
+(by <span class="codefrag command">ZooKeeper.exists()</span>) completes on the server. </p>
|
|
|
+
|
|
|
+<p>
|
|
|
+The triggering of the watch, on the other hand, sends an event to the <em>Executor</em> object, since
|
|
|
+the Executor registered as the Watcher of the ZooKeeper object.</p>
|
|
|
+
|
|
|
+
|
|
|
+<p>As an aside, you might note that the DataMonitor could also register itself as the Watcher
|
|
|
+for this particular watch event. This is new to ZooKeeper 3.0.0 (the support of multiple Watchers). In this
|
|
|
+example, however, DataMonitor does not register as the Watcher.</p>
|
|
|
+
|
|
|
+</div>
|
|
|
+</div>
|
|
|
+<p>When the <span class="codefrag command">ZooKeeper.exists()</span> operation completes on the server, the ZooKeeper API invokes this completion callback on
|
|
|
+the client:</p>
|
|
|
+<pre class="code">
|
|
|
+public void processResult(int rc, String path, Object ctx, Stat stat) {
|
|
|
+ boolean exists;
|
|
|
+ switch (rc) {
|
|
|
+ case Code.Ok:
|
|
|
+ exists = true;
|
|
|
+ break;
|
|
|
+ case Code.NoNode:
|
|
|
+ exists = false;
|
|
|
+ break;
|
|
|
+ case Code.SessionExpired:
|
|
|
+ case Code.NoAuth:
|
|
|
+ dead = true;
|
|
|
+ listener.closing(rc);
|
|
|
+ return;
|
|
|
+ default:
|
|
|
+ // Retry errors
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ byte b[] = null;
|
|
|
+ if (exists) {
|
|
|
+ try {
|
|
|
+ <strong>b = zk.getData(znode, false, null);</strong>
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ // We don't need to worry about recovering now. The watch
|
|
|
+ // callbacks will kick off any exception handling
|
|
|
+ e.printStackTrace();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if ((b == null && b != prevData)
|
|
|
+ || (b != null && !Arrays.equals(prevData, b))) {
|
|
|
+ <strong>listener.exists(b);</strong>
|
|
|
+ prevData = b;
|
|
|
+ }
|
|
|
+}
|
|
|
+</pre>
|
|
|
+<p>
|
|
|
+The code first checks the error codes for znode existence, fatal errors, and
|
|
|
+recoverable errors. If the file (or znode) exists, it gets the data from the znode, and
|
|
|
+then invoke the exists() callback of Executor if the state has changed. Note,
|
|
|
+it doesn't have to do any Exception processing for the getData call because it
|
|
|
+has watches pending for anything that could cause an error: if the node is deleted
|
|
|
+before it calls <span class="codefrag command">ZooKeeper.getData()</span>, the watch event set by
|
|
|
+the <span class="codefrag command">ZooKeeper.exists()</span> triggers a callback;
|
|
|
+if there is a communication error, a connection watch event fires when
|
|
|
+the connection comes back up.
|
|
|
+</p>
|
|
|
+<p>Finally, notice how DataMonitor processes watch events: </p>
|
|
|
+<pre class="code">
|
|
|
+public void process(WatcherEvent event) {
|
|
|
+ String path = event.getPath();
|
|
|
+ if (event.getType() == Watcher.Event.EventNone) {
|
|
|
+ // We are are being told that the state of the
|
|
|
+ // connection has changed
|
|
|
+ switch (event.getState()) {
|
|
|
+ case Event.KeeperStateSyncConnected:
|
|
|
+ // Everything is happy. Lets kick things off
|
|
|
+ // again by checking the existence of the znode
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ break;
|
|
|
+ case Event.KeeperStateExpired:
|
|
|
+ // It's all over
|
|
|
+ dead = true;
|
|
|
+ listener.closing(KeeperException.Code.SessionExpired);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (path != null && path.equals(znode)) {
|
|
|
+ // Something has changed on the node, let's find out
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (chainedWatcher != null) {
|
|
|
+ chainedWatcher.process(event);
|
|
|
+ }
|
|
|
+}
|
|
|
+</pre>
|
|
|
+<p>
|
|
|
+If the client-side ZooKeeper libraries can reestablish the communication channel to ZooKeeper, DataMonitor simply kicks
|
|
|
+everything off again with the call to <span class="codefrag command">ZooKeeper.exists()</span>.
|
|
|
+If it gets an event for a znode, it calls <span class="codefrag command">ZooKeeper.exists()</span> to find out what has changed.
|
|
|
+</p>
|
|
|
+</div>
|
|
|
+
|
|
|
+
|
|
|
+<a name="N10104"></a><a name="sc_completeSourceCode"></a>
|
|
|
+<h2 class="h3">Complete Source Listings</h2>
|
|
|
+<div class="section">
|
|
|
+<div class="note example">
|
|
|
+<div class="label">Executor.java</div>
|
|
|
+<div class="content">
|
|
|
+<title>Executor.java</title>
|
|
|
+<pre class="code">
|
|
|
+/**
|
|
|
+ * A simple example program to use DataMonitor to start and
|
|
|
+ * stop executables based on a znode. The program watches the
|
|
|
+ * specified znode and saves the data that corresponds to the
|
|
|
+ * znode in the filesystem. It also starts the specified program
|
|
|
+ * with the specified arguments when the znode exists and kills
|
|
|
+ * the program if the znode goes away.
|
|
|
+ */
|
|
|
+package com.yahoo.zk.executor;
|
|
|
+
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+
|
|
|
+import com.yahoo.zk.executor.DataMonitor.DataMonitorListener;
|
|
|
+import com.yahoo.zookeeper.KeeperException;
|
|
|
+import com.yahoo.zookeeper.Watcher;
|
|
|
+import com.yahoo.zookeeper.ZooKeeper;
|
|
|
+import com.yahoo.zookeeper.proto.WatcherEvent;
|
|
|
+
|
|
|
+public class Executor implements Watcher, Runnable, DataMonitorListener {
|
|
|
+ String znode;
|
|
|
+
|
|
|
+ DataMonitor dm;
|
|
|
+
|
|
|
+ ZooKeeper zk;
|
|
|
+
|
|
|
+ String filename;
|
|
|
+
|
|
|
+ String exec[];
|
|
|
+
|
|
|
+ Process child;
|
|
|
+
|
|
|
+ public Executor(String hostPort, String znode, String filename,
|
|
|
+ String exec[]) throws KeeperException, IOException {
|
|
|
+ this.filename = filename;
|
|
|
+ this.exec = exec;
|
|
|
+ zk = new ZooKeeper(hostPort, 3000, this);
|
|
|
+ dm = new DataMonitor(zk, znode, null, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param args
|
|
|
+ */
|
|
|
+ public static void main(String[] args) {
|
|
|
+ if (args.length < 4) {
|
|
|
+ System.err
|
|
|
+ .println("USAGE: Executor hostPort znode filename program [args ...]");
|
|
|
+ System.exit(2);
|
|
|
+ }
|
|
|
+ String hostPort = args[0];
|
|
|
+ String znode = args[1];
|
|
|
+ String filename = args[2];
|
|
|
+ String exec[] = new String[args.length - 3];
|
|
|
+ System.arraycopy(args, 3, exec, 0, exec.length);
|
|
|
+ try {
|
|
|
+ new Executor(hostPort, znode, filename, exec).run();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /***************************************************************************
|
|
|
+ * We do process any events ourselves, we just need to forward them on.
|
|
|
+ *
|
|
|
+ * @see com.yahoo.zookeeper.Watcher#process(com.yahoo.zookeeper.proto.WatcherEvent)
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void process(WatcherEvent event) {
|
|
|
+ dm.process(event);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ while (!dm.dead) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void closing(int rc) {
|
|
|
+ synchronized (this) {
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class StreamWriter extends Thread {
|
|
|
+ OutputStream os;
|
|
|
+
|
|
|
+ InputStream is;
|
|
|
+
|
|
|
+ StreamWriter(InputStream is, OutputStream os) {
|
|
|
+ this.is = is;
|
|
|
+ this.os = os;
|
|
|
+ start();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ byte b[] = new byte[80];
|
|
|
+ int rc;
|
|
|
+ try {
|
|
|
+ while ((rc = is.read(b)) > 0) {
|
|
|
+ os.write(b, 0, rc);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void exists(byte[] data) {
|
|
|
+ if (data == null) {
|
|
|
+ if (child != null) {
|
|
|
+ System.out.println("Killing process");
|
|
|
+ child.destroy();
|
|
|
+ try {
|
|
|
+ child.waitFor();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ child = null;
|
|
|
+ } else {
|
|
|
+ if (child != null) {
|
|
|
+ System.out.println("Stopping child");
|
|
|
+ child.destroy();
|
|
|
+ try {
|
|
|
+ child.waitFor();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ FileOutputStream fos = new FileOutputStream(filename);
|
|
|
+ fos.write(data);
|
|
|
+ fos.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ System.out.println("Starting child");
|
|
|
+ child = Runtime.getRuntime().exec(exec);
|
|
|
+ new StreamWriter(child.getInputStream(), System.out);
|
|
|
+ new StreamWriter(child.getErrorStream(), System.err);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+</pre>
|
|
|
+
|
|
|
+
|
|
|
+</div>
|
|
|
+</div>
|
|
|
+<div class="note example">
|
|
|
+<div class="label">DataMonitor.java</div>
|
|
|
+<div class="content">
|
|
|
+
|
|
|
+<title>DataMonitor.java</title>
|
|
|
+
|
|
|
+<pre class="code">
|
|
|
+/**
|
|
|
+ * A simple class that monitors the data and existence of a ZooKeeper
|
|
|
+ * node. It uses asynchronous ZooKeeper APIs.
|
|
|
+ */
|
|
|
+package com.yahoo.zk.executor;
|
|
|
+
|
|
|
+import java.util.Arrays;
|
|
|
+
|
|
|
+import com.yahoo.zookeeper.KeeperException;
|
|
|
+import com.yahoo.zookeeper.Watcher;
|
|
|
+import com.yahoo.zookeeper.ZooKeeper;
|
|
|
+import com.yahoo.zookeeper.AsyncCallback.StatCallback;
|
|
|
+import com.yahoo.zookeeper.KeeperException.Code;
|
|
|
+import com.yahoo.zookeeper.data.Stat;
|
|
|
+import com.yahoo.zookeeper.proto.WatcherEvent;
|
|
|
+
|
|
|
+public class DataMonitor implements Watcher, StatCallback {
|
|
|
+
|
|
|
+ ZooKeeper zk;
|
|
|
+
|
|
|
+ String znode;
|
|
|
+
|
|
|
+ Watcher chainedWatcher;
|
|
|
+
|
|
|
+ boolean dead;
|
|
|
+
|
|
|
+ DataMonitorListener listener;
|
|
|
+
|
|
|
+ byte prevData[];
|
|
|
+
|
|
|
+ public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
|
|
|
+ DataMonitorListener listener) {
|
|
|
+ this.zk = zk;
|
|
|
+ this.znode = znode;
|
|
|
+ this.chainedWatcher = chainedWatcher;
|
|
|
+ this.listener = listener;
|
|
|
+ // Get things started by checking if the node exists. We are going
|
|
|
+ // to be completely event driven
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Other classes use the DataMonitor by implementing this method
|
|
|
+ */
|
|
|
+ public interface DataMonitorListener {
|
|
|
+ /**
|
|
|
+ * The existence status of the node has changed.
|
|
|
+ */
|
|
|
+ void exists(byte data[]);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The ZooKeeper session is no longer valid.
|
|
|
+ *
|
|
|
+ * @param rc
|
|
|
+ * the ZooKeeper reason code
|
|
|
+ */
|
|
|
+ void closing(int rc);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ /**
|
|
|
+ * This is a watch event callback. The node we were watching has changed or
|
|
|
+ * something happened to our connection to ZooKeeper.
|
|
|
+ */
|
|
|
+ public void process(WatcherEvent event) {
|
|
|
+ String path = event.getPath();
|
|
|
+ if (event.getType() == Watcher.Event.EventNone) {
|
|
|
+ // We are are being told that the state of the
|
|
|
+ // connection has changed
|
|
|
+ switch (event.getState()) {
|
|
|
+ case Event.KeeperStateSyncConnected:
|
|
|
+ // Everything is happy. Lets kick things off
|
|
|
+ // again by checking the existence of the znode
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ break;
|
|
|
+ case Event.KeeperStateExpired:
|
|
|
+ // It's all over
|
|
|
+ dead = true;
|
|
|
+ listener.closing(KeeperException.Code.SessionExpired);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (path != null && path.equals(znode)) {
|
|
|
+ // Something has changed on the node, let's find out
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (chainedWatcher != null) {
|
|
|
+ chainedWatcher.process(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
|
|
|
+ boolean exists;
|
|
|
+ switch (rc) {
|
|
|
+ case Code.Ok:
|
|
|
+ exists = true;
|
|
|
+ break;
|
|
|
+ case Code.NoNode:
|
|
|
+ exists = false;
|
|
|
+ break;
|
|
|
+ case Code.SessionExpired:
|
|
|
+ case Code.NoAuth:
|
|
|
+ dead = true;
|
|
|
+ listener.closing(rc);
|
|
|
+ return;
|
|
|
+ default:
|
|
|
+ // Retry errors
|
|
|
+ zk.exists(znode, true, this, null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ byte b[] = null;
|
|
|
+ if (exists) {
|
|
|
+ try {
|
|
|
+ b = zk.getData(znode, false, null);
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ // We don't need to worry about recovering now. The watch
|
|
|
+ // callbacks will kick off any exception handling
|
|
|
+ e.printStackTrace();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if ((b == null && b != prevData)
|
|
|
+ || (b != null && !Arrays.equals(prevData, b))) {
|
|
|
+ listener.exists(b);
|
|
|
+ prevData = b;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+</pre>
|
|
|
+
|
|
|
+</div>
|
|
|
+</div>
|
|
|
+</div>
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+<p align="right">
|
|
|
+<font size="-2"></font>
|
|
|
+</p>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |end content
|
|
|
+ +-->
|
|
|
+<div class="clearboth"> </div>
|
|
|
+</div>
|
|
|
+<div id="footer">
|
|
|
+<!--+
|
|
|
+ |start bottomstrip
|
|
|
+ +-->
|
|
|
+<div class="lastmodified">
|
|
|
+<script type="text/javascript"><!--
|
|
|
+document.write("Last Published: " + document.lastModified);
|
|
|
+// --></script>
|
|
|
+</div>
|
|
|
+<div class="copyright">
|
|
|
+ Copyright ©
|
|
|
+ 2008 <a href="http://www.apache.org/licenses/">The Apache Software Foundation.</a>
|
|
|
+</div>
|
|
|
+<!--+
|
|
|
+ |end bottomstrip
|
|
|
+ +-->
|
|
|
+</div>
|
|
|
+</body>
|
|
|
+</html>
|