|
@@ -211,35 +211,32 @@ document.write("Last Published: " + document.lastModified);
|
|
|
These examples assume that you have at least one ZooKeeper server running.</p>
|
|
|
<p>Both primitives use the following common excerpt of code:</p>
|
|
|
<pre class="code">
|
|
|
-static ZooKeeper zk = null;
|
|
|
-static Integer mutex;
|
|
|
-
|
|
|
-String root;
|
|
|
-
|
|
|
-SyncPrimitive(String address) {
|
|
|
- if(zk == null){
|
|
|
- try {
|
|
|
- System.out.println("Starting ZK:");
|
|
|
- zk = new ZooKeeper(address, 3000, this);
|
|
|
- mutex = new Integer(-1);
|
|
|
- System.out.println("Finished starting ZK: " + zk);
|
|
|
- } catch (KeeperException e) {
|
|
|
- System.out.println("Keeper exception when starting new session: "
|
|
|
- + e.toString());
|
|
|
- zk = null;
|
|
|
- } catch (IOException e) {
|
|
|
- System.out.println(e.toString());
|
|
|
- zk = null;
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+ static ZooKeeper zk = null;
|
|
|
+ static Integer mutex;
|
|
|
|
|
|
-synchronized public void process(WatcherEvent event) {
|
|
|
- synchronized (mutex) {
|
|
|
- mutex.notify();
|
|
|
- }
|
|
|
-}
|
|
|
+ String root;
|
|
|
|
|
|
+ SyncPrimitive(String address) {
|
|
|
+ if(zk == null){
|
|
|
+ try {
|
|
|
+ System.out.println("Starting ZK:");
|
|
|
+ zk = new ZooKeeper(address, 3000, this);
|
|
|
+ mutex = new Integer(-1);
|
|
|
+ System.out.println("Finished starting ZK: " + zk);
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.out.println(e.toString());
|
|
|
+ zk = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //else mutex = new Integer(-1);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized public void process(WatchedEvent event) {
|
|
|
+ synchronized (mutex) {
|
|
|
+ //System.out.println("Process: " + event.getType());
|
|
|
+ mutex.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
</pre>
|
|
|
<p>Both classes extend SyncPrimitive. In this way, we execute steps that are
|
|
|
common to all primitives in the constructor of SyncPrimitive. To keep the examples
|
|
@@ -291,40 +288,43 @@ one does not exist. The constructor of Barrier then creates a
|
|
|
barrier node on ZooKeeper, which is the parent node of all process nodes, and
|
|
|
we call root (<strong>Note:</strong> This is not the ZooKeeper root "/").</p>
|
|
|
<pre class="code">
|
|
|
- /**
|
|
|
- * Barrier constructor
|
|
|
- *
|
|
|
- * @param address
|
|
|
- * @param name
|
|
|
- * @param size
|
|
|
- */
|
|
|
-Barrier(String address, String name, int size) {
|
|
|
- super(address);
|
|
|
- this.root = name;
|
|
|
- this.size = size;
|
|
|
-
|
|
|
- // Create barrier node
|
|
|
- if (zk != null) {
|
|
|
- try {
|
|
|
- Stat s = zk.exists(root, false);
|
|
|
- if (s == null) {
|
|
|
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- }
|
|
|
- } catch (KeeperException e) {
|
|
|
- System.out.println("Keeper exception when instantiating queue: " + e.toString());
|
|
|
- } catch (InterruptedException e) {
|
|
|
- System.out.println("Interrupted exception");
|
|
|
- }
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Barrier constructor
|
|
|
+ *
|
|
|
+ * @param address
|
|
|
+ * @param name
|
|
|
+ * @param size
|
|
|
+ */
|
|
|
+ Barrier(String address, String name, int size) {
|
|
|
+ super(address);
|
|
|
+ this.root = name;
|
|
|
+ this.size = size;
|
|
|
|
|
|
- // My node name
|
|
|
- try {
|
|
|
- name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
|
|
|
- } catch (UnknownHostException e) {
|
|
|
- System.out.println(e.toString());
|
|
|
- }
|
|
|
+ // Create barrier node
|
|
|
+ if (zk != null) {
|
|
|
+ try {
|
|
|
+ Stat s = zk.exists(root, false);
|
|
|
+ if (s == null) {
|
|
|
+ zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
+ }
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ System.out
|
|
|
+ .println("Keeper exception when instantiating queue: "
|
|
|
+ + e.toString());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ System.out.println("Interrupted exception");
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
-}
|
|
|
+ // My node name
|
|
|
+ try {
|
|
|
+ name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
|
|
|
+ } catch (UnknownHostException e) {
|
|
|
+ System.out.println(e.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
</pre>
|
|
|
<p>
|
|
|
To enter the barrier, a process calls enter(). The process creates a node under
|
|
@@ -338,29 +338,29 @@ has two parameters. The first one states the node to read from, and the second i
|
|
|
a boolean flag that enables the process to set a watch. In the code the flag is true.
|
|
|
</p>
|
|
|
<pre class="code">
|
|
|
- /**
|
|
|
- * Join barrier
|
|
|
- *
|
|
|
- * @return
|
|
|
- * @throws KeeperException
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
-
|
|
|
-boolean enter() throws KeeperException, InterruptedException{
|
|
|
- zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateFlags.EPHEMERAL);
|
|
|
- while (true) {
|
|
|
- synchronized (mutex) {
|
|
|
- ArrayList<String> list = zk.getChildren(root, true);
|
|
|
-
|
|
|
- if (list.size() < size) {
|
|
|
- mutex.wait();
|
|
|
- } else {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+ /**
|
|
|
+ * Join barrier
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ * @throws KeeperException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+
|
|
|
+ boolean enter() throws KeeperException, InterruptedException{
|
|
|
+ zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.EPHEMERAL);
|
|
|
+ while (true) {
|
|
|
+ synchronized (mutex) {
|
|
|
+ List<String> list = zk.getChildren(root, true);
|
|
|
+
|
|
|
+ if (list.size() < size) {
|
|
|
+ mutex.wait();
|
|
|
+ } else {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
</pre>
|
|
|
<p>
|
|
|
Note that enter() throws both KeeperException and InterruptedException, so it is
|
|
@@ -373,27 +373,28 @@ that the second parameter of the call to getChildren() is true, meaning that
|
|
|
ZooKeeper has to set a watch on the the root node). Upon reception of a notification,
|
|
|
it checks once more whether the root node has any child.</p>
|
|
|
<pre class="code">
|
|
|
- /**
|
|
|
- * Wait until all reach barrier
|
|
|
- *
|
|
|
- * @return
|
|
|
- * @throws KeeperException
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
-
|
|
|
-boolean leave() throws KeeperException, InterruptedException{
|
|
|
- zk.delete(root + "/" + name, 0);
|
|
|
- while (true) {
|
|
|
- synchronized (mutex) {
|
|
|
- ArrayList<String> list = zk.getChildren(root, true);
|
|
|
- if (list.size() > 0) {
|
|
|
- mutex.wait();
|
|
|
- } else {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+ /**
|
|
|
+ * Wait until all reach barrier
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ * @throws KeeperException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+
|
|
|
+ boolean leave() throws KeeperException, InterruptedException{
|
|
|
+ zk.delete(root + "/" + name, 0);
|
|
|
+ while (true) {
|
|
|
+ synchronized (mutex) {
|
|
|
+ List<String> list = zk.getChildren(root, true);
|
|
|
+ if (list.size() > 0) {
|
|
|
+ mutex.wait();
|
|
|
+ } else {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
</pre>
|
|
|
</div>
|
|
|
|
|
@@ -415,32 +416,33 @@ that creates a ZooKeeper object if one doesn't exist. It then verifies if the ro
|
|
|
node of the queue exists, and creates if it doesn't.
|
|
|
</p>
|
|
|
<pre class="code">
|
|
|
-/**
|
|
|
- * Constructor of producer-consumer queue
|
|
|
- *
|
|
|
- * @param address
|
|
|
- * @param name
|
|
|
- */
|
|
|
-Queue(String address, String name) {
|
|
|
- super(address);
|
|
|
- this.root = name;
|
|
|
- // Create ZK node name
|
|
|
- if (zk != null) {
|
|
|
- try {
|
|
|
- Stat s = zk.exists(root, false);
|
|
|
- if (s == null) {
|
|
|
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- }
|
|
|
- } catch (KeeperException e) {
|
|
|
- System.out
|
|
|
- .println("Keeper exception when instantiating queue: "
|
|
|
- + e.toString());
|
|
|
- } catch (InterruptedException e) {
|
|
|
- System.out.println("Interrupted exception");
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
- </pre>
|
|
|
+ /**
|
|
|
+ * Constructor of producer-consumer queue
|
|
|
+ *
|
|
|
+ * @param address
|
|
|
+ * @param name
|
|
|
+ */
|
|
|
+ Queue(String address, String name) {
|
|
|
+ super(address);
|
|
|
+ this.root = name;
|
|
|
+ // Create ZK node name
|
|
|
+ if (zk != null) {
|
|
|
+ try {
|
|
|
+ Stat s = zk.exists(root, false);
|
|
|
+ if (s == null) {
|
|
|
+ zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
+ }
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ System.out
|
|
|
+ .println("Keeper exception when instantiating queue: "
|
|
|
+ + e.toString());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ System.out.println("Interrupted exception");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+</pre>
|
|
|
<p>
|
|
|
A producer process calls "produce()" to add an element to the queue, and passes
|
|
|
an integer as an argument. To add an element to the queue, the method creates a
|
|
@@ -450,25 +452,25 @@ we impose a total order on the elements of the queue, thus guaranteeing that the
|
|
|
oldest element of the queue is the next one consumed.
|
|
|
</p>
|
|
|
<pre class="code">
|
|
|
-/**
|
|
|
- * Add element to the queue.
|
|
|
- *
|
|
|
- * @param i
|
|
|
- * @return
|
|
|
- */
|
|
|
-
|
|
|
-boolean produce(int i) throws KeeperException, InterruptedException{
|
|
|
- ByteBuffer b = ByteBuffer.allocate(4);
|
|
|
- byte[] value;
|
|
|
-
|
|
|
- // Add child with value i
|
|
|
- b.putInt(i);
|
|
|
- value = b.array();
|
|
|
- zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateFlags.SEQUENCE);
|
|
|
-
|
|
|
- return true;
|
|
|
-}
|
|
|
+ /**
|
|
|
+ * Add element to the queue.
|
|
|
+ *
|
|
|
+ * @param i
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+
|
|
|
+ boolean produce(int i) throws KeeperException, InterruptedException{
|
|
|
+ ByteBuffer b = ByteBuffer.allocate(4);
|
|
|
+ byte[] value;
|
|
|
+
|
|
|
+ // Add child with value i
|
|
|
+ b.putInt(i);
|
|
|
+ value = b.array();
|
|
|
+ zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
</pre>
|
|
|
<p>
|
|
|
To consume an element, a consumer process obtains the children of the root node,
|
|
@@ -482,41 +484,44 @@ values, we need to decide which element is the smallest. To decide which one has
|
|
|
the smallest counter value, we traverse the list, and remove the prefix "element"
|
|
|
from each one.</p>
|
|
|
<pre class="code">
|
|
|
-/**
|
|
|
- * Remove first element from the queue.
|
|
|
- *
|
|
|
- * @return
|
|
|
- * @throws KeeperException
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
-int consume() throws KeeperException, InterruptedException{
|
|
|
- int retvalue = -1;
|
|
|
- Stat stat = null;
|
|
|
-
|
|
|
- // Get the first element available
|
|
|
- while (true) {
|
|
|
- synchronized (mutex) {
|
|
|
- ArrayList<String> list = zk.getChildren(root, true);
|
|
|
- if (list.size() == 0) {
|
|
|
- System.out.println("Going to wait");
|
|
|
- mutex.wait();
|
|
|
- } else {
|
|
|
- Integer min = new Integer(list.get(0).substring(7));
|
|
|
- for(String s : list){
|
|
|
- Integer tempValue = new Integer(s.substring(7));
|
|
|
- if(tempValue > min) min = tempValue;
|
|
|
- }
|
|
|
- System.out.println("Temporary value: " + root + "/element" + min);
|
|
|
- byte[] b = zk.getData(root + "/element" + min, false, stat);
|
|
|
- zk.delete(root + "/element" + min, 0);
|
|
|
- ByteBuffer buffer = ByteBuffer.wrap(b);
|
|
|
- retvalue = buffer.getInt();
|
|
|
-
|
|
|
- return retvalue;
|
|
|
- }
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Remove first element from the queue.
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ * @throws KeeperException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ int consume() throws KeeperException, InterruptedException{
|
|
|
+ int retvalue = -1;
|
|
|
+ Stat stat = null;
|
|
|
+
|
|
|
+ // Get the first element available
|
|
|
+ while (true) {
|
|
|
+ synchronized (mutex) {
|
|
|
+ List<String> list = zk.getChildren(root, true);
|
|
|
+ if (list.size() == 0) {
|
|
|
+ System.out.println("Going to wait");
|
|
|
+ mutex.wait();
|
|
|
+ } else {
|
|
|
+ Integer min = new Integer(list.get(0).substring(7));
|
|
|
+ for(String s : list){
|
|
|
+ Integer tempValue = new Integer(s.substring(7));
|
|
|
+ //System.out.println("Temporary value: " + tempValue);
|
|
|
+ if(tempValue < min) min = tempValue;
|
|
|
+ }
|
|
|
+ System.out.println("Temporary value: " + root + "/element" + min);
|
|
|
+ byte[] b = zk.getData(root + "/element" + min,
|
|
|
+ false, stat);
|
|
|
+ zk.delete(root + "/element" + min, 0);
|
|
|
+ ByteBuffer buffer = ByteBuffer.wrap(b);
|
|
|
+ retvalue = buffer.getInt();
|
|
|
+
|
|
|
+ return retvalue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-}
|
|
|
</pre>
|
|
|
</div>
|
|
|
|
|
@@ -530,29 +535,26 @@ int consume() throws KeeperException, InterruptedException{
|
|
|
<title>SyncPrimitive.Java</title>
|
|
|
|
|
|
<pre class="code">
|
|
|
-package com.yahoo.SyncPrimitive;
|
|
|
-
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.UnknownHostException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.lang.InterruptedException;
|
|
|
-import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.Random;
|
|
|
|
|
|
-import com.yahoo.zookeeper.Watcher;
|
|
|
-import com.yahoo.zookeeper.data.Stat;
|
|
|
-import com.yahoo.zookeeper.proto.WatcherEvent;
|
|
|
-import com.yahoo.zookeeper.KeeperException;
|
|
|
-import com.yahoo.zookeeper.ZooDefs.Ids;
|
|
|
-import com.yahoo.zookeeper.ZooDefs.CreateFlags;
|
|
|
-import com.yahoo.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.CreateMode;
|
|
|
+import org.apache.zookeeper.KeeperException;
|
|
|
+import org.apache.zookeeper.WatchedEvent;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
+import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
+import org.apache.zookeeper.data.Stat;
|
|
|
|
|
|
public class SyncPrimitive implements Watcher {
|
|
|
|
|
|
static ZooKeeper zk = null;
|
|
|
static Integer mutex;
|
|
|
-
|
|
|
+
|
|
|
String root;
|
|
|
|
|
|
SyncPrimitive(String address) {
|
|
@@ -562,10 +564,6 @@ public class SyncPrimitive implements Watcher {
|
|
|
zk = new ZooKeeper(address, 3000, this);
|
|
|
mutex = new Integer(-1);
|
|
|
System.out.println("Finished starting ZK: " + zk);
|
|
|
- } catch (KeeperException e) {
|
|
|
- System.out.println("Keeper exception when starting new session: "
|
|
|
- + e.toString());
|
|
|
- zk = null;
|
|
|
} catch (IOException e) {
|
|
|
System.out.println(e.toString());
|
|
|
zk = null;
|
|
@@ -574,7 +572,7 @@ public class SyncPrimitive implements Watcher {
|
|
|
//else mutex = new Integer(-1);
|
|
|
}
|
|
|
|
|
|
- synchronized public void process(WatcherEvent event) {
|
|
|
+ synchronized public void process(WatchedEvent event) {
|
|
|
synchronized (mutex) {
|
|
|
//System.out.println("Process: " + event.getType());
|
|
|
mutex.notify();
|
|
@@ -590,7 +588,7 @@ public class SyncPrimitive implements Watcher {
|
|
|
|
|
|
/**
|
|
|
* Barrier constructor
|
|
|
- *
|
|
|
+ *
|
|
|
* @param address
|
|
|
* @param name
|
|
|
* @param size
|
|
@@ -605,7 +603,8 @@ public class SyncPrimitive implements Watcher {
|
|
|
try {
|
|
|
Stat s = zk.exists(root, false);
|
|
|
if (s == null) {
|
|
|
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
}
|
|
|
} catch (KeeperException e) {
|
|
|
System.out
|
|
@@ -615,31 +614,31 @@ public class SyncPrimitive implements Watcher {
|
|
|
System.out.println("Interrupted exception");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// My node name
|
|
|
try {
|
|
|
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
|
|
|
} catch (UnknownHostException e) {
|
|
|
System.out.println(e.toString());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Join barrier
|
|
|
- *
|
|
|
+ *
|
|
|
* @return
|
|
|
* @throws KeeperException
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
boolean enter() throws KeeperException, InterruptedException{
|
|
|
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateFlags.EPHEMERAL);
|
|
|
+ CreateMode.EPHEMERAL);
|
|
|
while (true) {
|
|
|
- synchronized (mutex) {
|
|
|
- ArrayList<String> list = zk.getChildren(root, true);
|
|
|
-
|
|
|
+ synchronized (mutex) {
|
|
|
+ List<String> list = zk.getChildren(root, true);
|
|
|
+
|
|
|
if (list.size() < size) {
|
|
|
mutex.wait();
|
|
|
} else {
|
|
@@ -651,17 +650,17 @@ public class SyncPrimitive implements Watcher {
|
|
|
|
|
|
/**
|
|
|
* Wait until all reach barrier
|
|
|
- *
|
|
|
+ *
|
|
|
* @return
|
|
|
* @throws KeeperException
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
boolean leave() throws KeeperException, InterruptedException{
|
|
|
zk.delete(root + "/" + name, 0);
|
|
|
while (true) {
|
|
|
synchronized (mutex) {
|
|
|
- ArrayList<String> list = zk.getChildren(root, true);
|
|
|
+ List<String> list = zk.getChildren(root, true);
|
|
|
if (list.size() > 0) {
|
|
|
mutex.wait();
|
|
|
} else {
|
|
@@ -679,7 +678,7 @@ public class SyncPrimitive implements Watcher {
|
|
|
|
|
|
/**
|
|
|
* Constructor of producer-consumer queue
|
|
|
- *
|
|
|
+ *
|
|
|
* @param address
|
|
|
* @param name
|
|
|
*/
|
|
@@ -691,7 +690,8 @@ public class SyncPrimitive implements Watcher {
|
|
|
try {
|
|
|
Stat s = zk.exists(root, false);
|
|
|
if (s == null) {
|
|
|
- zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
}
|
|
|
} catch (KeeperException e) {
|
|
|
System.out
|
|
@@ -705,11 +705,11 @@ public class SyncPrimitive implements Watcher {
|
|
|
|
|
|
/**
|
|
|
* Add element to the queue.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param i
|
|
|
* @return
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
boolean produce(int i) throws KeeperException, InterruptedException{
|
|
|
ByteBuffer b = ByteBuffer.allocate(4);
|
|
|
byte[] value;
|
|
@@ -718,15 +718,15 @@ public class SyncPrimitive implements Watcher {
|
|
|
b.putInt(i);
|
|
|
value = b.array();
|
|
|
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateFlags.SEQUENCE);
|
|
|
-
|
|
|
+ CreateMode.PERSISTENT_SEQUENTIAL);
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* Remove first element from the queue.
|
|
|
- *
|
|
|
+ *
|
|
|
* @return
|
|
|
* @throws KeeperException
|
|
|
* @throws InterruptedException
|
|
@@ -734,11 +734,11 @@ public class SyncPrimitive implements Watcher {
|
|
|
int consume() throws KeeperException, InterruptedException{
|
|
|
int retvalue = -1;
|
|
|
Stat stat = null;
|
|
|
-
|
|
|
+
|
|
|
// Get the first element available
|
|
|
while (true) {
|
|
|
synchronized (mutex) {
|
|
|
- ArrayList<String> list = zk.getChildren(root, true);
|
|
|
+ List<String> list = zk.getChildren(root, true);
|
|
|
if (list.size() == 0) {
|
|
|
System.out.println("Going to wait");
|
|
|
mutex.wait();
|
|
@@ -755,11 +755,11 @@ public class SyncPrimitive implements Watcher {
|
|
|
zk.delete(root + "/element" + min, 0);
|
|
|
ByteBuffer buffer = ByteBuffer.wrap(b);
|
|
|
retvalue = buffer.getInt();
|
|
|
-
|
|
|
+
|
|
|
return retvalue;
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -781,16 +781,16 @@ public class SyncPrimitive implements Watcher {
|
|
|
if (args[3].equals("p")) {
|
|
|
System.out.println("Producer");
|
|
|
for (i = 0; i < max; i++)
|
|
|
- try{
|
|
|
+ try{
|
|
|
q.produce(10 + i);
|
|
|
} catch (KeeperException e){
|
|
|
-
|
|
|
+
|
|
|
} catch (InterruptedException e){
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
} else {
|
|
|
System.out.println("Consumer");
|
|
|
-
|
|
|
+
|
|
|
for (i = 0; i < max; i++) {
|
|
|
try{
|
|
|
int r = q.consume();
|
|
@@ -798,7 +798,7 @@ public class SyncPrimitive implements Watcher {
|
|
|
} catch (KeeperException e){
|
|
|
i--;
|
|
|
} catch (InterruptedException e){
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -811,9 +811,9 @@ public class SyncPrimitive implements Watcher {
|
|
|
System.out.println("Entered barrier: " + args[2]);
|
|
|
if(!flag) System.out.println("Error when entering the barrier");
|
|
|
} catch (KeeperException e){
|
|
|
-
|
|
|
+
|
|
|
} catch (InterruptedException e){
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
|
|
|
// Generate random integer
|
|
@@ -830,13 +830,14 @@ public class SyncPrimitive implements Watcher {
|
|
|
try{
|
|
|
b.leave();
|
|
|
} catch (KeeperException e){
|
|
|
-
|
|
|
+
|
|
|
} catch (InterruptedException e){
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
System.out.println("Left barrier");
|
|
|
}
|
|
|
-}</pre>
|
|
|
+}
|
|
|
+</pre>
|
|
|
</div>
|
|
|
</div>
|
|
|
</div>
|