|
@@ -18,8 +18,7 @@
|
|
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
|
|
|
|
import java.io.InputStream;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collections;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
import org.apache.hadoop.fs.DF;
|
|
@@ -227,7 +226,6 @@ public class TestFsDatasetImpl {
|
|
|
assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
|
|
|
assertEquals(0, dataset.getNumFailedVolumes());
|
|
|
}
|
|
|
-
|
|
|
@Test(timeout=10000)
|
|
|
public void testReadLockEnabledByDefault()
|
|
|
throws Exception {
|
|
@@ -269,11 +267,12 @@ public class TestFsDatasetImpl {
|
|
|
waiter.join();
|
|
|
// The holder thread is still holding the lock, but the waiter can still
|
|
|
// run as the lock is a shared read lock.
|
|
|
+ // Otherwise test will timeout with deadlock.
|
|
|
assertEquals(true, accessed.get());
|
|
|
holder.interrupt();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=10000)
|
|
|
+ @Test(timeout=20000)
|
|
|
public void testReadLockCanBeDisabledByConfig()
|
|
|
throws Exception {
|
|
|
HdfsConfiguration conf = new HdfsConfiguration();
|
|
@@ -282,29 +281,20 @@ public class TestFsDatasetImpl {
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.numDataNodes(1).build();
|
|
|
try {
|
|
|
+ AtomicBoolean accessed = new AtomicBoolean(false);
|
|
|
cluster.waitActive();
|
|
|
DataNode dn = cluster.getDataNodes().get(0);
|
|
|
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
CountDownLatch waiterLatch = new CountDownLatch(1);
|
|
|
- // create a synchronized list and verify the order of elements.
|
|
|
- List<Integer> syncList =
|
|
|
- Collections.synchronizedList(new ArrayList<>());
|
|
|
-
|
|
|
-
|
|
|
Thread holder = new Thread() {
|
|
|
public void run() {
|
|
|
- latch.countDown();
|
|
|
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
- syncList.add(0);
|
|
|
- } catch (Exception e) {
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
+ latch.countDown();
|
|
|
+ // wait for the waiter thread to access the lock.
|
|
|
waiterLatch.await();
|
|
|
- syncList.add(2);
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ } catch (Exception e) {
|
|
|
}
|
|
|
}
|
|
|
};
|
|
@@ -312,13 +302,15 @@ public class TestFsDatasetImpl {
|
|
|
Thread waiter = new Thread() {
|
|
|
public void run() {
|
|
|
try {
|
|
|
- // wait for holder to get into the critical section.
|
|
|
+ // Wait for holder to get ds read lock.
|
|
|
latch.await();
|
|
|
} catch (InterruptedException e) {
|
|
|
waiterLatch.countDown();
|
|
|
+ return;
|
|
|
}
|
|
|
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
- syncList.add(1);
|
|
|
+ accessed.getAndSet(true);
|
|
|
+ // signal the holder thread.
|
|
|
waiterLatch.countDown();
|
|
|
} catch (Exception e) {
|
|
|
}
|
|
@@ -326,14 +318,21 @@ public class TestFsDatasetImpl {
|
|
|
};
|
|
|
waiter.start();
|
|
|
holder.start();
|
|
|
-
|
|
|
- waiter.join();
|
|
|
+ // Wait for sometime to make sure we are in deadlock,
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(() ->
|
|
|
+ accessed.get(),
|
|
|
+ 100, 10000);
|
|
|
+ fail("Waiter thread should not execute.");
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ }
|
|
|
+ // Release waiterLatch to exit deadlock.
|
|
|
+ waiterLatch.countDown();
|
|
|
holder.join();
|
|
|
-
|
|
|
- // verify that the synchronized list has the correct sequence.
|
|
|
- assertEquals(
|
|
|
- "The sequence of checkpoints does not correspond to shared lock",
|
|
|
- syncList, Arrays.asList(0, 1, 2));
|
|
|
+ waiter.join();
|
|
|
+ // After releasing waiterLatch water
|
|
|
+ // thread will be able to execute.
|
|
|
+ assertTrue(accessed.get());
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|