|
@@ -28,6 +28,7 @@ import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.URI;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
@@ -59,6 +60,7 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.regex.Pattern;
|
|
|
|
|
|
public class TestFSNamesystem {
|
|
|
|
|
@@ -287,10 +289,11 @@ public class TestFSNamesystem {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Test when FSNamesystem lock is held for a long time, logger will report it.
|
|
|
+ * Test when FSNamesystem write lock is held for a long time,
|
|
|
+ * logger will report it.
|
|
|
*/
|
|
|
@Test(timeout=45000)
|
|
|
- public void testFSLockLongHoldingReport() throws Exception {
|
|
|
+ public void testFSWriteLockLongHoldingReport() throws Exception {
|
|
|
final long writeLockReportingThreshold = 100L;
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
|
@@ -298,7 +301,7 @@ public class TestFSNamesystem {
|
|
|
FSImage fsImage = Mockito.mock(FSImage.class);
|
|
|
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
|
|
|
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
|
|
|
- FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
|
|
+ final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
|
|
|
|
|
LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
|
|
|
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
|
|
@@ -342,6 +345,143 @@ public class TestFSNamesystem {
|
|
|
assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test when FSNamesystem read lock is held for a long time,
|
|
|
+ * logger will report it.
|
|
|
+ */
|
|
|
+ @Test(timeout=45000)
|
|
|
+ public void testFSReadLockLongHoldingReport() throws Exception {
|
|
|
+ final long readLockReportingThreshold = 100L;
|
|
|
+ final String readLockLogStmt = "FSNamesystem read lock held for ";
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setLong(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
|
|
|
+ readLockReportingThreshold);
|
|
|
+ FSImage fsImage = Mockito.mock(FSImage.class);
|
|
|
+ FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
|
|
|
+ Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
|
|
|
+ final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
|
|
|
+
|
|
|
+ LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
|
|
|
+ GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
|
|
|
+
|
|
|
+ // Don't report if the read lock is held for a short time
|
|
|
+ fsn.readLock();
|
|
|
+ Thread.sleep(readLockReportingThreshold / 2);
|
|
|
+ fsn.readUnlock();
|
|
|
+ assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
|
|
|
+ logs.getOutput().contains(readLockLogStmt));
|
|
|
+
|
|
|
+ // Report if the read lock is held for a long time
|
|
|
+ fsn.readLock();
|
|
|
+ Thread.sleep(readLockReportingThreshold + 10);
|
|
|
+ logs.clearOutput();
|
|
|
+ fsn.readUnlock();
|
|
|
+ assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())
|
|
|
+ && logs.getOutput().contains(readLockLogStmt));
|
|
|
+
|
|
|
+ // Report if it's held for a long time when re-entering read lock
|
|
|
+ fsn.readLock();
|
|
|
+ Thread.sleep(readLockReportingThreshold / 2 + 1);
|
|
|
+ fsn.readLock();
|
|
|
+ Thread.sleep(readLockReportingThreshold / 2 + 1);
|
|
|
+ logs.clearOutput();
|
|
|
+ fsn.readUnlock();
|
|
|
+ assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) ||
|
|
|
+ logs.getOutput().contains(readLockLogStmt));
|
|
|
+ logs.clearOutput();
|
|
|
+ fsn.readUnlock();
|
|
|
+ assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
|
|
|
+ logs.getOutput().contains(readLockLogStmt));
|
|
|
+
|
|
|
+ // Report if it's held for a long time while another thread also has the
|
|
|
+ // read lock. Let one thread hold the lock long enough to activate an
|
|
|
+ // alert, then have another thread grab the read lock to ensure that this
|
|
|
+ // doesn't reset the timing.
|
|
|
+ logs.clearOutput();
|
|
|
+ final CountDownLatch barrier = new CountDownLatch(1);
|
|
|
+ final CountDownLatch barrier2 = new CountDownLatch(1);
|
|
|
+ Thread t1 = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ fsn.readLock();
|
|
|
+ Thread.sleep(readLockReportingThreshold + 1);
|
|
|
+ barrier.countDown(); // Allow for t2 to acquire the read lock
|
|
|
+ barrier2.await(); // Wait until t2 has the read lock
|
|
|
+ fsn.readUnlock();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ fail("Interrupted during testing");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ Thread t2 = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ barrier.await(); // Wait until t1 finishes sleeping
|
|
|
+ fsn.readLock();
|
|
|
+ barrier2.countDown(); // Allow for t1 to unlock
|
|
|
+ fsn.readUnlock();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ fail("Interrupted during testing");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ t1.start();
|
|
|
+ t2.start();
|
|
|
+ t1.join();
|
|
|
+ t2.join();
|
|
|
+ Pattern t1Pattern = Pattern.compile(
|
|
|
+ String.format("\\Q%s\\E.+%s", t1.getName(), readLockLogStmt));
|
|
|
+ assertTrue(t1Pattern.matcher(logs.getOutput()).find());
|
|
|
+ Pattern t2Pattern = Pattern.compile(
|
|
|
+ String.format("\\Q%s\\E.+%s", t2.getName(), readLockLogStmt));
|
|
|
+ assertFalse(t2Pattern.matcher(logs.getOutput()).find());
|
|
|
+
|
|
|
+ // Spin up a bunch of threads all grabbing the lock at once; assign some
|
|
|
+ // to go over threshold and some under. Check that they all log correctly.
|
|
|
+ logs.clearOutput();
|
|
|
+ final int threadCount = 50;
|
|
|
+ List<Thread> threads = new ArrayList<>(threadCount);
|
|
|
+ for (int i = 0; i < threadCount; i++) {
|
|
|
+ threads.add(new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ long sleepTime;
|
|
|
+ if (this.getName().hashCode() % 2 == 0) {
|
|
|
+ sleepTime = readLockReportingThreshold + 10;
|
|
|
+ } else {
|
|
|
+ sleepTime = readLockReportingThreshold / 2;
|
|
|
+ }
|
|
|
+ fsn.readLock();
|
|
|
+ Thread.sleep(sleepTime);
|
|
|
+ fsn.readUnlock();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ fail("Interrupted during testing");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ for (Thread t : threads) {
|
|
|
+ t.start();
|
|
|
+ }
|
|
|
+ for (Thread t : threads) {
|
|
|
+ t.join();
|
|
|
+ }
|
|
|
+ for (Thread t : threads) {
|
|
|
+ Pattern p = Pattern.compile(
|
|
|
+ String.format("\\Q%s\\E.+%s", t.getName(), readLockLogStmt));
|
|
|
+ boolean foundLog = p.matcher(logs.getOutput()).find();
|
|
|
+ if (t.getName().hashCode() % 2 == 0) {
|
|
|
+ assertTrue(foundLog);
|
|
|
+ } else {
|
|
|
+ assertFalse(foundLog);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testSafemodeReplicationConf() throws IOException {
|
|
|
Configuration conf = new Configuration();
|