|
@@ -20,13 +20,13 @@ package org.apache.hadoop.util.curator;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
|
|
|
import org.apache.curator.framework.AuthInfo;
|
|
|
import org.apache.curator.framework.CuratorFramework;
|
|
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
|
|
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
|
|
|
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
|
|
|
+import org.apache.curator.framework.api.transaction.CuratorOp;
|
|
|
import org.apache.curator.retry.RetryNTimes;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -387,43 +387,45 @@ public final class ZKCuratorManager {
|
|
|
/**
|
|
|
* Use curator transactions to ensure zk-operations are performed in an all
|
|
|
* or nothing fashion. This is equivalent to using ZooKeeper#multi.
|
|
|
- *
|
|
|
- * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
|
|
|
- * have to rewrite this inner class when we adopt that.
|
|
|
*/
|
|
|
public class SafeTransaction {
|
|
|
- private CuratorTransactionFinal transactionFinal;
|
|
|
private String fencingNodePath;
|
|
|
+ private List<CuratorOp> curatorOperations = new LinkedList<>();
|
|
|
|
|
|
SafeTransaction(List<ACL> fencingACL, String fencingNodePath)
|
|
|
throws Exception {
|
|
|
this.fencingNodePath = fencingNodePath;
|
|
|
- CuratorTransaction transaction = curator.inTransaction();
|
|
|
- transactionFinal = transaction.create()
|
|
|
- .withMode(CreateMode.PERSISTENT).withACL(fencingACL)
|
|
|
- .forPath(fencingNodePath, new byte[0]).and();
|
|
|
+ curatorOperations.add(curator.transactionOp().create()
|
|
|
+ .withMode(CreateMode.PERSISTENT)
|
|
|
+ .withACL(fencingACL)
|
|
|
+ .forPath(fencingNodePath, new byte[0]));
|
|
|
}
|
|
|
|
|
|
public void commit() throws Exception {
|
|
|
- transactionFinal = transactionFinal.delete()
|
|
|
- .forPath(fencingNodePath).and();
|
|
|
- transactionFinal.commit();
|
|
|
+ curatorOperations.add(curator.transactionOp().delete()
|
|
|
+ .forPath(fencingNodePath));
|
|
|
+ curator.transaction().forOperations(curatorOperations);
|
|
|
+ curatorOperations.clear();
|
|
|
}
|
|
|
|
|
|
public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
|
|
|
throws Exception {
|
|
|
- transactionFinal = transactionFinal.create()
|
|
|
- .withMode(mode).withACL(acl).forPath(path, data).and();
|
|
|
+ curatorOperations.add(curator.transactionOp().create()
|
|
|
+ .withMode(mode)
|
|
|
+ .withACL(acl)
|
|
|
+ .forPath(path, data));
|
|
|
}
|
|
|
|
|
|
public void delete(String path) throws Exception {
|
|
|
- transactionFinal = transactionFinal.delete().forPath(path).and();
|
|
|
+ curatorOperations.add(curator.transactionOp().delete()
|
|
|
+ .forPath(path));
|
|
|
}
|
|
|
|
|
|
public void setData(String path, byte[] data, int version)
|
|
|
throws Exception {
|
|
|
- transactionFinal = transactionFinal.setData()
|
|
|
- .withVersion(version).forPath(path, data).and();
|
|
|
+ curatorOperations.add(curator.transactionOp().setData()
|
|
|
+ .withVersion(version)
|
|
|
+ .forPath(path, data));
|
|
|
}
|
|
|
}
|
|
|
}
|