|
@@ -33,6 +33,7 @@ import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.Options.Rename;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
|
+import org.apache.hadoop.fs.XAttrSetFlag;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
@@ -126,6 +128,7 @@ public class TestRetryCacheWithHA {
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, ResponseSize);
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, ResponseSize);
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
|
|
|
cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
|
|
.numDataNodes(DataNodes).build();
|
|
@@ -1004,6 +1007,48 @@ public class TestRetryCacheWithHA {
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /** setXAttr */
|
|
|
+ class SetXAttrOp extends AtMostOnceOp {
|
|
|
+ private final String src;
|
|
|
+
|
|
|
+ SetXAttrOp(DFSClient client, String src) {
|
|
|
+ super("setXAttr", client);
|
|
|
+ this.src = src;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void prepare() throws Exception {
|
|
|
+ Path p = new Path(src);
|
|
|
+ if (!dfs.exists(p)) {
|
|
|
+ DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void invoke() throws Exception {
|
|
|
+ client.setXAttr(src, "user.key", "value".getBytes(),
|
|
|
+ EnumSet.of(XAttrSetFlag.CREATE));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ boolean checkNamenodeBeforeReturn() throws Exception {
|
|
|
+ for (int i = 0; i < CHECKTIMES; i++) {
|
|
|
+ Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
|
|
|
+ Set<String> keySet = iter.keySet();
|
|
|
+ if (keySet.contains("user.key")) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ Object getResult() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Test (timeout=60000)
|
|
|
public void testCreateSnapshot() throws Exception {
|
|
@@ -1133,6 +1178,13 @@ public class TestRetryCacheWithHA {
|
|
|
AtMostOnceOp op = new RemoveCachePoolOp(client, "pool");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
+
|
|
|
+ @Test (timeout=60000)
|
|
|
+ public void testSetXAttr() throws Exception {
|
|
|
+ DFSClient client = genClientWithDummyHandler();
|
|
|
+ AtMostOnceOp op = new SetXAttrOp(client, "/setxattr");
|
|
|
+ testClientRetryWithFailover(op);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* When NN failover happens, if the client did not receive the response and
|