|
@@ -17,6 +17,8 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
|
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.function.Supplier;
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
|
@@ -197,9 +199,9 @@ public class TestFsDatasetImpl {
|
|
|
assertEquals(0, dataset.getNumFailedVolumes());
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=10000)
|
|
|
public void testReadLockEnabledByDefault()
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws Exception {
|
|
|
final FsDatasetSpi ds = dataset;
|
|
|
AtomicBoolean accessed = new AtomicBoolean(false);
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
@@ -209,7 +211,8 @@ public class TestFsDatasetImpl {
|
|
|
public void run() {
|
|
|
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
latch.countDown();
|
|
|
- sleep(10000);
|
|
|
+ // wait for the waiter thread to access the lock.
|
|
|
+ waiterLatch.await();
|
|
|
} catch (Exception e) {
|
|
|
}
|
|
|
}
|
|
@@ -217,29 +220,33 @@ public class TestFsDatasetImpl {
|
|
|
|
|
|
Thread waiter = new Thread() {
|
|
|
public void run() {
|
|
|
- try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
+ try {
|
|
|
+ latch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
waiterLatch.countDown();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
accessed.getAndSet(true);
|
|
|
+ // signal the holder thread.
|
|
|
+ waiterLatch.countDown();
|
|
|
} catch (Exception e) {
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
-
|
|
|
- holder.start();
|
|
|
- latch.await();
|
|
|
waiter.start();
|
|
|
- waiterLatch.await();
|
|
|
+ holder.start();
|
|
|
+ holder.join();
|
|
|
+ waiter.join();
|
|
|
// The holder thread is still holding the lock, but the waiter can still
|
|
|
// run as the lock is a shared read lock.
|
|
|
assertEquals(true, accessed.get());
|
|
|
holder.interrupt();
|
|
|
- holder.join();
|
|
|
- waiter.join();
|
|
|
}
|
|
|
|
|
|
@Test(timeout=10000)
|
|
|
public void testReadLockCanBeDisabledByConfig()
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws Exception {
|
|
|
HdfsConfiguration conf = new HdfsConfiguration();
|
|
|
conf.setBoolean(
|
|
|
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
|
|
@@ -252,41 +259,52 @@ public class TestFsDatasetImpl {
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
CountDownLatch waiterLatch = new CountDownLatch(1);
|
|
|
- AtomicBoolean accessed = new AtomicBoolean(false);
|
|
|
+ // create a synchronized list and verify the order of elements.
|
|
|
+ List<Integer> syncList =
|
|
|
+ Collections.synchronizedList(new ArrayList<>());
|
|
|
+
|
|
|
|
|
|
Thread holder = new Thread() {
|
|
|
public void run() {
|
|
|
+ latch.countDown();
|
|
|
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
- latch.countDown();
|
|
|
- sleep(10000);
|
|
|
+ syncList.add(0);
|
|
|
} catch (Exception e) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ waiterLatch.await();
|
|
|
+ syncList.add(2);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
|
|
|
Thread waiter = new Thread() {
|
|
|
public void run() {
|
|
|
+ try {
|
|
|
+ // wait for holder to get into the critical section.
|
|
|
+ latch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ waiterLatch.countDown();
|
|
|
+ }
|
|
|
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
|
|
- accessed.getAndSet(true);
|
|
|
+ syncList.add(1);
|
|
|
waiterLatch.countDown();
|
|
|
} catch (Exception e) {
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
-
|
|
|
- holder.start();
|
|
|
- latch.await();
|
|
|
waiter.start();
|
|
|
- Thread.sleep(200);
|
|
|
- // Waiting thread should not have been able to update the variable
|
|
|
- // as the read lock is disabled and hence an exclusive lock.
|
|
|
- assertEquals(false, accessed.get());
|
|
|
- holder.interrupt();
|
|
|
- holder.join();
|
|
|
- waiterLatch.await();
|
|
|
- // After the holder thread exits, the variable is updated.
|
|
|
- assertEquals(true, accessed.get());
|
|
|
+ holder.start();
|
|
|
+
|
|
|
waiter.join();
|
|
|
+ holder.join();
|
|
|
+
|
|
|
+ // verify that the synchronized list has the correct sequence.
|
|
|
+ assertEquals(
|
|
|
+ "The sequence of checkpoints does not correspond to shared lock",
|
|
|
+ syncList, Arrays.asList(0, 1, 2));
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|