123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889 |
- <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
- <html>
- <head>
- <META http-equiv="Content-Type" content="text/html; charset=UTF-8">
- <meta content="Apache Forrest" name="Generator">
- <meta name="Forrest-version" content="0.8">
- <meta name="Forrest-skin-name" content="pelt">
- <title>Programming with ZooKeeper - A basic tutorial</title>
- <link type="text/css" href="skin/basic.css" rel="stylesheet">
- <link media="screen" type="text/css" href="skin/screen.css" rel="stylesheet">
- <link media="print" type="text/css" href="skin/print.css" rel="stylesheet">
- <link type="text/css" href="skin/profile.css" rel="stylesheet">
- <script src="skin/getBlank.js" language="javascript" type="text/javascript"></script><script src="skin/getMenu.js" language="javascript" type="text/javascript"></script><script src="skin/fontsize.js" language="javascript" type="text/javascript"></script>
- <link rel="shortcut icon" href="images/favicon.ico">
- </head>
- <body onload="init()">
- <script type="text/javascript">ndeSetTextSize();</script>
- <div id="top">
- <!--+
- |breadtrail
- +-->
- <div class="breadtrail">
- <a href="http://www.apache.org/">Apache</a> > <a href="http://hadoop.apache.org/">Hadoop</a> > <a href="http://hadoop.apache.org/zookeeper/">ZooKeeper</a><script src="skin/breadcrumbs.js" language="JavaScript" type="text/javascript"></script>
- </div>
- <!--+
- |header
- +-->
- <div class="header">
- <!--+
- |start group logo
- +-->
- <div class="grouplogo">
- <a href="http://hadoop.apache.org/"><img class="logoImage" alt="Hadoop" src="images/hadoop-logo.jpg" title="Apache Hadoop"></a>
- </div>
- <!--+
- |end group logo
- +-->
- <!--+
- |start Project Logo
- +-->
- <div class="projectlogo">
- <a href="http://hadoop.apache.org/zookeeper/"><img class="logoImage" alt="ZooKeeper" src="images/zookeeper_small.gif" title="ZooKeeper: distributed coordination"></a>
- </div>
- <!--+
- |end Project Logo
- +-->
- <!--+
- |start Search
- +-->
- <div class="searchbox">
- <form action="http://www.google.com/search" method="get" class="roundtopsmall">
- <input value="hadoop.apache.org" name="sitesearch" type="hidden"><input onFocus="getBlank (this, 'Search the site with google');" size="25" name="q" id="query" type="text" value="Search the site with google">
- <input name="Search" value="Search" type="submit">
- </form>
- </div>
- <!--+
- |end search
- +-->
- <!--+
- |start Tabs
- +-->
- <ul id="tabs">
- <li>
- <a class="unselected" href="http://hadoop.apache.org/zookeeper/">Project</a>
- </li>
- <li>
- <a class="unselected" href="http://wiki.apache.org/hadoop/ZooKeeper">Wiki</a>
- </li>
- <li class="current">
- <a class="selected" href="index.html">ZooKeeper 3.3 Documentation</a>
- </li>
- </ul>
- <!--+
- |end Tabs
- +-->
- </div>
- </div>
- <div id="main">
- <div id="publishedStrip">
- <!--+
- |start Subtabs
- +-->
- <div id="level2tabs"></div>
- <!--+
- |end Endtabs
- +-->
- <script type="text/javascript"><!--
- document.write("Last Published: " + document.lastModified);
- // --></script>
- </div>
- <!--+
- |breadtrail
- +-->
- <div class="breadtrail">
-
- </div>
- <!--+
- |start Menu, mainarea
- +-->
- <!--+
- |start Menu
- +-->
- <div id="menu">
- <div onclick="SwitchMenu('menu_1.1', 'skin/')" id="menu_1.1Title" class="menutitle">Overview</div>
- <div id="menu_1.1" class="menuitemgroup">
- <div class="menuitem">
- <a href="index.html">Welcome</a>
- </div>
- <div class="menuitem">
- <a href="zookeeperOver.html">Overview</a>
- </div>
- <div class="menuitem">
- <a href="zookeeperStarted.html">Getting Started</a>
- </div>
- <div class="menuitem">
- <a href="releasenotes.html">Release Notes</a>
- </div>
- </div>
- <div onclick="SwitchMenu('menu_selected_1.2', 'skin/')" id="menu_selected_1.2Title" class="menutitle" style="background-image: url('skin/images/chapter_open.gif');">Developer</div>
- <div id="menu_selected_1.2" class="selectedmenuitemgroup" style="display: block;">
- <div class="menuitem">
- <a href="api/index.html">API Docs</a>
- </div>
- <div class="menuitem">
- <a href="zookeeperProgrammers.html">Programmer's Guide</a>
- </div>
- <div class="menuitem">
- <a href="javaExample.html">Java Example</a>
- </div>
- <div class="menupage">
- <div class="menupagetitle">Barrier and Queue Tutorial</div>
- </div>
- <div class="menuitem">
- <a href="recipes.html">Recipes</a>
- </div>
- </div>
- <div onclick="SwitchMenu('menu_1.3', 'skin/')" id="menu_1.3Title" class="menutitle">BookKeeper</div>
- <div id="menu_1.3" class="menuitemgroup">
- <div class="menuitem">
- <a href="bookkeeperStarted.html">Getting started</a>
- </div>
- <div class="menuitem">
- <a href="bookkeeperOverview.html">Overview</a>
- </div>
- <div class="menuitem">
- <a href="bookkeeperConfig.html">Setup guide</a>
- </div>
- <div class="menuitem">
- <a href="bookkeeperProgrammer.html">Programmer's guide</a>
- </div>
- </div>
- <div onclick="SwitchMenu('menu_1.4', 'skin/')" id="menu_1.4Title" class="menutitle">Admin & Ops</div>
- <div id="menu_1.4" class="menuitemgroup">
- <div class="menuitem">
- <a href="zookeeperAdmin.html">Administrator's Guide</a>
- </div>
- <div class="menuitem">
- <a href="zookeeperQuotas.html">Quota Guide</a>
- </div>
- <div class="menuitem">
- <a href="zookeeperJMX.html">JMX</a>
- </div>
- </div>
- <div onclick="SwitchMenu('menu_1.5', 'skin/')" id="menu_1.5Title" class="menutitle">Contributor</div>
- <div id="menu_1.5" class="menuitemgroup">
- <div class="menuitem">
- <a href="zookeeperInternals.html">ZooKeeper Internals</a>
- </div>
- </div>
- <div onclick="SwitchMenu('menu_1.6', 'skin/')" id="menu_1.6Title" class="menutitle">Miscellaneous</div>
- <div id="menu_1.6" class="menuitemgroup">
- <div class="menuitem">
- <a href="http://wiki.apache.org/hadoop/ZooKeeper">Wiki</a>
- </div>
- <div class="menuitem">
- <a href="http://wiki.apache.org/hadoop/ZooKeeper/FAQ">FAQ</a>
- </div>
- <div class="menuitem">
- <a href="http://hadoop.apache.org/zookeeper/mailing_lists.html">Mailing Lists</a>
- </div>
- </div>
- <div id="credit"></div>
- <div id="roundbottom">
- <img style="display: none" class="corner" height="15" width="15" alt="" src="skin/images/rc-b-l-15-1body-2menu-3menu.png"></div>
- <!--+
- |alternative credits
- +-->
- <div id="credit2"></div>
- </div>
- <!--+
- |end Menu
- +-->
- <!--+
- |start content
- +-->
- <div id="content">
- <div title="Portable Document Format" class="pdflink">
- <a class="dida" href="zookeeperTutorial.pdf"><img alt="PDF -icon" src="skin/images/pdfdoc.gif" class="skin"><br>
- PDF</a>
- </div>
- <h1>Programming with ZooKeeper - A basic tutorial</h1>
- <div id="minitoc-area">
- <ul class="minitoc">
- <li>
- <a href="#ch_Introduction">Introduction</a>
- </li>
- <li>
- <a href="#sc_barriers">Barriers</a>
- </li>
- <li>
- <a href="#sc_producerConsumerQueues">Producer-Consumer Queues</a>
- </li>
- <li>
- <a href="#sc_sourceListing">Complete Source Listing</a>
- </li>
- </ul>
- </div>
-
-
-
- <a name="N10009"></a><a name="ch_Introduction"></a>
- <h2 class="h3">Introduction</h2>
- <div class="section">
- <p>In this tutorial, we show simple implementations of barriers and
- producer-consumer queues using ZooKeeper. We call the respective classes Barrier and Queue.
- These examples assume that you have at least one ZooKeeper server running.</p>
- <p>Both primitives use the following common excerpt of code:</p>
- <pre class="code">
- static ZooKeeper zk = null;
- static Integer mutex;
- String root;
- SyncPrimitive(String address) {
- if(zk == null){
- try {
- System.out.println("Starting ZK:");
- zk = new ZooKeeper(address, 3000, this);
- mutex = new Integer(-1);
- System.out.println("Finished starting ZK: " + zk);
- } catch (IOException e) {
- System.out.println(e.toString());
- zk = null;
- }
- }
- }
- synchronized public void process(WatchedEvent event) {
- synchronized (mutex) {
- mutex.notify();
- }
- }
- </pre>
- <p>Both classes extend SyncPrimitive. In this way, we execute steps that are
- common to all primitives in the constructor of SyncPrimitive. To keep the examples
- simple, we create a ZooKeeper object the first time we instantiate either a barrier
- object or a queue object, and we declare a static variable that is a reference
- to this object. The subsequent instances of Barrier and Queue check whether a
- ZooKeeper object exists. Alternatively, we could have the application creating a
- ZooKeeper object and passing it to the constructor of Barrier and Queue.</p>
- <p>
- We use the process() method to process notifications triggered due to watches.
- In the following discussion, we present code that sets watches. A watch is internal
- structure that enables ZooKeeper to notify a client of a change to a node. For example,
- if a client is waiting for other clients to leave a barrier, then it can set a watch and
- wait for modifications to a particular node, which can indicate that it is the end of the wait.
- This point becomes clear once we go over the examples.
- </p>
- </div>
-
-
- <a name="N1001F"></a><a name="sc_barriers"></a>
- <h2 class="h3">Barriers</h2>
- <div class="section">
- <p>
- A barrier is a primitive that enables a group of processes to synchronize the
- beginning and the end of a computation. The general idea of this implementation
- is to have a barrier node that serves the purpose of being a parent for individual
- process nodes. Suppose that we call the barrier node "/b1". Each process "p" then
- creates a node "/b1/p". Once enough processes have created their corresponding
- nodes, joined processes can start the computation.
- </p>
- <p>In this example, each process instantiates a Barrier object, and its constructor takes as parameters:</p>
- <ul>
- <li>
- <p>the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")</p>
- </li>
- <li>
- <p>the path of the barrier node on ZooKeeper (e.g., "/b1")</p>
- </li>
- <li>
- <p>the size of the group of processes</p>
- </li>
- </ul>
- <p>The constructor of Barrier passes the address of the Zookeeper server to the
- constructor of the parent class. The parent class creates a ZooKeeper instance if
- one does not exist. The constructor of Barrier then creates a
- barrier node on ZooKeeper, which is the parent node of all process nodes, and
- we call root (<strong>Note:</strong> This is not the ZooKeeper root "/").</p>
- <pre class="code">
- /**
- * Barrier constructor
- *
- * @param address
- * @param root
- * @param size
- */
- Barrier(String address, String root, int size) {
- super(address);
- this.root = root;
- this.size = size;
- // Create barrier node
- if (zk != null) {
- try {
- Stat s = zk.exists(root, false);
- if (s == null) {
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- System.out
- .println("Keeper exception when instantiating queue: "
- + e.toString());
- } catch (InterruptedException e) {
- System.out.println("Interrupted exception");
- }
- }
- // My node name
- try {
- name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
- } catch (UnknownHostException e) {
- System.out.println(e.toString());
- }
- }
- </pre>
- <p>
- To enter the barrier, a process calls enter(). The process creates a node under
- the root to represent it, using its host name to form the node name. It then wait
- until enough processes have entered the barrier. A process does it by checking
- the number of children the root node has with "getChildren()", and waiting for
- notifications in the case it does not have enough. To receive a notification when
- there is a change to the root node, a process has to set a watch, and does it
- through the call to "getChildren()". In the code, we have that "getChildren()"
- has two parameters. The first one states the node to read from, and the second is
- a boolean flag that enables the process to set a watch. In the code the flag is true.
- </p>
- <pre class="code">
- /**
- * Join barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- boolean enter() throws KeeperException, InterruptedException{
- zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL);
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(root, true);
- if (list.size() < size) {
- mutex.wait();
- } else {
- return true;
- }
- }
- }
- }
- </pre>
- <p>
- Note that enter() throws both KeeperException and InterruptedException, so it is
- the reponsability of the application to catch and handle such exceptions.</p>
- <p>
- Once the computation is finished, a process calls leave() to leave the barrier.
- First it deletes its corresponding node, and then it gets the children of the root
- node. If there is at least one child, then it waits for a notification (obs: note
- that the second parameter of the call to getChildren() is true, meaning that
- ZooKeeper has to set a watch on the the root node). Upon reception of a notification,
- it checks once more whether the root node has any child.</p>
- <pre class="code">
- /**
- * Wait until all reach barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- boolean leave() throws KeeperException, InterruptedException{
- zk.delete(root + "/" + name, 0);
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(root, true);
- if (list.size() > 0) {
- mutex.wait();
- } else {
- return true;
- }
- }
- }
- }
- }
- </pre>
- </div>
- <a name="N10051"></a><a name="sc_producerConsumerQueues"></a>
- <h2 class="h3">Producer-Consumer Queues</h2>
- <div class="section">
- <p>
- A producer-consumer queue is a distributed data estructure thata group of processes
- use to generate and consume items. Producer processes create new elements and add
- them to the queue. Consumer processes remove elements from the list, and process them.
- In this implementation, the elements are simple integers. The queue is represented
- by a root node, and to add an element to the queue, a producer process creates a new node,
- a child of the root node.
- </p>
- <p>
- The following excerpt of code corresponds to the constructor of the object. As
- with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive,
- that creates a ZooKeeper object if one doesn't exist. It then verifies if the root
- node of the queue exists, and creates if it doesn't.
- </p>
- <pre class="code">
- /**
- * Constructor of producer-consumer queue
- *
- * @param address
- * @param name
- */
- Queue(String address, String name) {
- super(address);
- this.root = name;
- // Create ZK node name
- if (zk != null) {
- try {
- Stat s = zk.exists(root, false);
- if (s == null) {
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- System.out
- .println("Keeper exception when instantiating queue: "
- + e.toString());
- } catch (InterruptedException e) {
- System.out.println("Interrupted exception");
- }
- }
- }
- </pre>
- <p>
- A producer process calls "produce()" to add an element to the queue, and passes
- an integer as an argument. To add an element to the queue, the method creates a
- new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to
- append the value of the sequencer counter associated to the root node. In this way,
- we impose a total order on the elements of the queue, thus guaranteeing that the
- oldest element of the queue is the next one consumed.
- </p>
- <pre class="code">
- /**
- * Add element to the queue.
- *
- * @param i
- * @return
- */
- boolean produce(int i) throws KeeperException, InterruptedException{
- ByteBuffer b = ByteBuffer.allocate(4);
- byte[] value;
- // Add child with value i
- b.putInt(i);
- value = b.array();
- zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL);
- return true;
- }
- </pre>
- <p>
- To consume an element, a consumer process obtains the children of the root node,
- reads the node with smallest counter value, and returns the element. Note that
- if there is a conflict, then one of the two contending processes won't be able to
- delete the node and the delete operation will throw an exception.</p>
- <p>
- A call to getChildren() returns the list of children in lexicographic order.
- As lexicographic order does not necessary follow the numerical order of the counter
- values, we need to decide which element is the smallest. To decide which one has
- the smallest counter value, we traverse the list, and remove the prefix "element"
- from each one.</p>
- <pre class="code">
- /**
- * Remove first element from the queue.
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- int consume() throws KeeperException, InterruptedException{
- int retvalue = -1;
- Stat stat = null;
- // Get the first element available
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(root, true);
- if (list.size() == 0) {
- System.out.println("Going to wait");
- mutex.wait();
- } else {
- Integer min = new Integer(list.get(0).substring(7));
- for(String s : list){
- Integer tempValue = new Integer(s.substring(7));
- //System.out.println("Temporary value: " + tempValue);
- if(tempValue < min) min = tempValue;
- }
- System.out.println("Temporary value: " + root + "/element" + min);
- byte[] b = zk.getData(root + "/element" + min,
- false, stat);
- zk.delete(root + "/element" + min, 0);
- ByteBuffer buffer = ByteBuffer.wrap(b);
- retvalue = buffer.getInt();
- return retvalue;
- }
- }
- }
- }
- }
- </pre>
- </div>
- <a name="N1006F"></a><a name="sc_sourceListing"></a>
- <h2 class="h3">Complete Source Listing</h2>
- <div class="section">
- <div class="note example">
- <div class="label">SyncPrimitive.Java</div>
- <div class="content">
- <title>SyncPrimitive.Java</title>
- <pre class="code">
- import java.io.IOException;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- import java.nio.ByteBuffer;
- import java.util.List;
- import java.util.Random;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import org.apache.zookeeper.ZooDefs.Ids;
- import org.apache.zookeeper.data.Stat;
- public class SyncPrimitive implements Watcher {
- static ZooKeeper zk = null;
- static Integer mutex;
- String root;
- SyncPrimitive(String address) {
- if(zk == null){
- try {
- System.out.println("Starting ZK:");
- zk = new ZooKeeper(address, 3000, this);
- mutex = new Integer(-1);
- System.out.println("Finished starting ZK: " + zk);
- } catch (IOException e) {
- System.out.println(e.toString());
- zk = null;
- }
- }
- //else mutex = new Integer(-1);
- }
- synchronized public void process(WatchedEvent event) {
- synchronized (mutex) {
- //System.out.println("Process: " + event.getType());
- mutex.notify();
- }
- }
- /**
- * Barrier
- */
- static public class Barrier extends SyncPrimitive {
- int size;
- String name;
- /**
- * Barrier constructor
- *
- * @param address
- * @param root
- * @param size
- */
- Barrier(String address, String root, int size) {
- super(address);
- this.root = root;
- this.size = size;
- // Create barrier node
- if (zk != null) {
- try {
- Stat s = zk.exists(root, false);
- if (s == null) {
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- System.out
- .println("Keeper exception when instantiating queue: "
- + e.toString());
- } catch (InterruptedException e) {
- System.out.println("Interrupted exception");
- }
- }
- // My node name
- try {
- name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
- } catch (UnknownHostException e) {
- System.out.println(e.toString());
- }
- }
- /**
- * Join barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- boolean enter() throws KeeperException, InterruptedException{
- zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL);
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(root, true);
- if (list.size() < size) {
- mutex.wait();
- } else {
- return true;
- }
- }
- }
- }
- /**
- * Wait until all reach barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- boolean leave() throws KeeperException, InterruptedException{
- zk.delete(root + "/" + name, 0);
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(root, true);
- if (list.size() > 0) {
- mutex.wait();
- } else {
- return true;
- }
- }
- }
- }
- }
- /**
- * Producer-Consumer queue
- */
- static public class Queue extends SyncPrimitive {
- /**
- * Constructor of producer-consumer queue
- *
- * @param address
- * @param name
- */
- Queue(String address, String name) {
- super(address);
- this.root = name;
- // Create ZK node name
- if (zk != null) {
- try {
- Stat s = zk.exists(root, false);
- if (s == null) {
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- System.out
- .println("Keeper exception when instantiating queue: "
- + e.toString());
- } catch (InterruptedException e) {
- System.out.println("Interrupted exception");
- }
- }
- }
- /**
- * Add element to the queue.
- *
- * @param i
- * @return
- */
- boolean produce(int i) throws KeeperException, InterruptedException{
- ByteBuffer b = ByteBuffer.allocate(4);
- byte[] value;
- // Add child with value i
- b.putInt(i);
- value = b.array();
- zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL);
- return true;
- }
- /**
- * Remove first element from the queue.
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- int consume() throws KeeperException, InterruptedException{
- int retvalue = -1;
- Stat stat = null;
- // Get the first element available
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(root, true);
- if (list.size() == 0) {
- System.out.println("Going to wait");
- mutex.wait();
- } else {
- Integer min = new Integer(list.get(0).substring(7));
- for(String s : list){
- Integer tempValue = new Integer(s.substring(7));
- //System.out.println("Temporary value: " + tempValue);
- if(tempValue < min) min = tempValue;
- }
- System.out.println("Temporary value: " + root + "/element" + min);
- byte[] b = zk.getData(root + "/element" + min,
- false, stat);
- zk.delete(root + "/element" + min, 0);
- ByteBuffer buffer = ByteBuffer.wrap(b);
- retvalue = buffer.getInt();
- return retvalue;
- }
- }
- }
- }
- }
- public static void main(String args[]) {
- if (args[0].equals("qTest"))
- queueTest(args);
- else
- barrierTest(args);
- }
- public static void queueTest(String args[]) {
- Queue q = new Queue(args[1], "/app1");
- System.out.println("Input: " + args[1]);
- int i;
- Integer max = new Integer(args[2]);
- if (args[3].equals("p")) {
- System.out.println("Producer");
- for (i = 0; i < max; i++)
- try{
- q.produce(10 + i);
- } catch (KeeperException e){
- } catch (InterruptedException e){
- }
- } else {
- System.out.println("Consumer");
- for (i = 0; i < max; i++) {
- try{
- int r = q.consume();
- System.out.println("Item: " + r);
- } catch (KeeperException e){
- i--;
- } catch (InterruptedException e){
- }
- }
- }
- }
- public static void barrierTest(String args[]) {
- Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
- try{
- boolean flag = b.enter();
- System.out.println("Entered barrier: " + args[2]);
- if(!flag) System.out.println("Error when entering the barrier");
- } catch (KeeperException e){
- } catch (InterruptedException e){
- }
- // Generate random integer
- Random rand = new Random();
- int r = rand.nextInt(100);
- // Loop for rand iterations
- for (int i = 0; i < r; i++) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
- try{
- b.leave();
- } catch (KeeperException e){
- } catch (InterruptedException e){
- }
- System.out.println("Left barrier");
- }
- }
- </pre>
- </div>
- </div>
- </div>
- <p align="right">
- <font size="-2"></font>
- </p>
- </div>
- <!--+
- |end content
- +-->
- <div class="clearboth"> </div>
- </div>
- <div id="footer">
- <!--+
- |start bottomstrip
- +-->
- <div class="lastmodified">
- <script type="text/javascript"><!--
- document.write("Last Published: " + document.lastModified);
- // --></script>
- </div>
- <div class="copyright">
- Copyright ©
- 2008 <a href="http://www.apache.org/licenses/">The Apache Software Foundation.</a>
- </div>
- <!--+
- |end bottomstrip
- +-->
- </div>
- </body>
- </html>
|