|
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.InvocationHandler;
|
|
@@ -34,6 +35,7 @@ import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -84,6 +86,7 @@ import org.apache.hadoop.io.retry.RetryInvocationHandler;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.LightWeightCache;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
@@ -1143,77 +1146,77 @@ public class TestRetryCacheWithHA {
|
|
|
AtMostOnceOp op = new CreateSnapshotOp(client, "/test", "s1");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testDeleteSnapshot() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new DeleteSnapshotOp(client, "/test", "s1");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testRenameSnapshot() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new RenameSnapshotOp(client, "/test", "s1", "s2");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testCreate() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new CreateOp(client, "/testfile");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testAppend() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new AppendOp(client, "/testfile");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testRename() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new RenameOp(client, "/file1", "/file2");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testRename2() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new Rename2Op(client, "/file1", "/file2");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testConcat() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new ConcatOp(client, new Path("/test/file"), 5);
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testDelete() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new DeleteOp(client, "/testfile");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testCreateSymlink() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new CreateSymlinkOp(client, "/testfile", "/testlink");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testUpdatePipeline() throws Exception {
|
|
|
final DFSClient client = genClientWithDummyHandler();
|
|
|
AtMostOnceOp op = new UpdatePipelineOp(client, "/testfile");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testAddCacheDirectiveInfo() throws Exception {
|
|
|
DFSClient client = genClientWithDummyHandler();
|
|
@@ -1265,7 +1268,7 @@ public class TestRetryCacheWithHA {
|
|
|
AtMostOnceOp op = new RemoveCachePoolOp(client, "pool");
|
|
|
testClientRetryWithFailover(op);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testSetXAttr() throws Exception {
|
|
|
DFSClient client = genClientWithDummyHandler();
|
|
@@ -1287,7 +1290,7 @@ public class TestRetryCacheWithHA {
|
|
|
*/
|
|
|
public void testClientRetryWithFailover(final AtMostOnceOp op)
|
|
|
throws Exception {
|
|
|
- final Map<String, Object> results = new HashMap<String, Object>();
|
|
|
+ final Map<String, Object> results = new ConcurrentHashMap<>();
|
|
|
|
|
|
op.prepare();
|
|
|
// set DummyRetryInvocationHandler#block to true
|
|
@@ -1300,10 +1303,7 @@ public class TestRetryCacheWithHA {
|
|
|
op.invoke();
|
|
|
Object result = op.getResult();
|
|
|
LOG.info("Operation " + op.name + " finished");
|
|
|
- synchronized (TestRetryCacheWithHA.this) {
|
|
|
- results.put(op.name, result == null ? "SUCCESS" : result);
|
|
|
- TestRetryCacheWithHA.this.notifyAll();
|
|
|
- }
|
|
|
+ results.put(op.name, result == null ? "SUCCESS" : result);
|
|
|
} catch (Exception e) {
|
|
|
LOG.info("Got Exception while calling " + op.name, e);
|
|
|
} finally {
|
|
@@ -1311,7 +1311,7 @@ public class TestRetryCacheWithHA {
|
|
|
}
|
|
|
}
|
|
|
}.start();
|
|
|
-
|
|
|
+
|
|
|
// make sure the client's call has actually been handled by the active NN
|
|
|
assertTrue("After waiting the operation " + op.name
|
|
|
+ " still has not taken effect on NN yet",
|
|
@@ -1323,40 +1323,59 @@ public class TestRetryCacheWithHA {
|
|
|
// disable the block in DummyHandler
|
|
|
LOG.info("Setting block to false");
|
|
|
DummyRetryInvocationHandler.block.set(false);
|
|
|
-
|
|
|
- synchronized (this) {
|
|
|
- while (!results.containsKey(op.name)) {
|
|
|
- this.wait();
|
|
|
+
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return results.containsKey(op.name);
|
|
|
}
|
|
|
- LOG.info("Got the result of " + op.name + ": "
|
|
|
- + results.get(op.name));
|
|
|
- }
|
|
|
+ }, 5, 10000);
|
|
|
+ LOG.info("Got the result of " + op.name + ": "
|
|
|
+ + results.get(op.name));
|
|
|
|
|
|
// Waiting for failover.
|
|
|
- while (cluster.getNamesystem(1).isInStandbyState()) {
|
|
|
- Thread.sleep(10);
|
|
|
- }
|
|
|
-
|
|
|
- long hitNN0 = cluster.getNamesystem(0).getRetryCache().getMetricsForTests()
|
|
|
- .getCacheHit();
|
|
|
- long hitNN1 = cluster.getNamesystem(1).getRetryCache().getMetricsForTests()
|
|
|
- .getCacheHit();
|
|
|
- assertTrue("CacheHit: " + hitNN0 + ", " + hitNN1,
|
|
|
- hitNN0 + hitNN1 > 0);
|
|
|
- long updatedNN0 = cluster.getNamesystem(0).getRetryCache()
|
|
|
- .getMetricsForTests().getCacheUpdated();
|
|
|
- long updatedNN1 = cluster.getNamesystem(1).getRetryCache()
|
|
|
- .getMetricsForTests().getCacheUpdated();
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return !cluster.getNamesystem(1).isInStandbyState();
|
|
|
+ }
|
|
|
+ }, 5, 10000);
|
|
|
+ final long[] hitsNN = new long[]{0, 0};
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ hitsNN[0] = cluster.getNamesystem(0).getRetryCache().getMetricsForTests()
|
|
|
+ .getCacheHit();
|
|
|
+ hitsNN[1] = cluster.getNamesystem(1).getRetryCache().getMetricsForTests()
|
|
|
+ .getCacheHit();
|
|
|
+ return (hitsNN[0] + hitsNN[1]) > 0;
|
|
|
+ }
|
|
|
+ }, 5, 10000);
|
|
|
+
|
|
|
+ assertTrue("CacheHit: " + hitsNN[0] + ", " + hitsNN[1],
|
|
|
+ hitsNN[0] + hitsNN[1] > 0);
|
|
|
+ final long[] updatesNN = new long[]{0, 0};
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ updatesNN[0] = cluster.getNamesystem(0).getRetryCache().getMetricsForTests()
|
|
|
+ .getCacheUpdated();
|
|
|
+ updatesNN[1] = cluster.getNamesystem(1).getRetryCache().getMetricsForTests()
|
|
|
+ .getCacheUpdated();
|
|
|
+ return updatesNN[0] > 0 && updatesNN[1] > 0;
|
|
|
+ }
|
|
|
+ }, 5, 10000);
|
|
|
+
|
|
|
// Cache updated metrics on NN0 should be >0 since the op was process on NN0
|
|
|
- assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
|
|
|
+ assertTrue("CacheUpdated on NN0: " + updatesNN[0], updatesNN[0] > 0);
|
|
|
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
|
|
|
- assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
|
|
|
+ assertTrue("CacheUpdated on NN1: " + updatesNN[1], updatesNN[1] > 0);
|
|
|
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
|
|
|
if (expectedUpdateCount > 0) {
|
|
|
- assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
|
|
|
- updatedNN0);
|
|
|
- assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
|
|
|
- updatedNN1);
|
|
|
+ assertEquals("CacheUpdated on NN0: " + updatesNN[0], expectedUpdateCount,
|
|
|
+ updatesNN[0]);
|
|
|
+ assertEquals("CacheUpdated on NN0: " + updatesNN[1], expectedUpdateCount,
|
|
|
+ updatesNN[1]);
|
|
|
}
|
|
|
}
|
|
|
|