|
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
|
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
|
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
|
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
|
|
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
|
|
|
|
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
|
@@ -62,7 +63,9 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
|
|
+import org.junit.Rule;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.junit.rules.TestName;
|
|
import org.mockito.Mockito;
|
|
import org.mockito.Mockito;
|
|
import org.mockito.stubbing.Stubber;
|
|
import org.mockito.stubbing.Stubber;
|
|
|
|
|
|
@@ -87,11 +90,17 @@ public class TestQuorumJournalManager {
|
|
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
|
|
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Rule
|
|
|
|
+ public TestName name = new TestName();
|
|
|
|
+
|
|
@Before
|
|
@Before
|
|
public void setup() throws Exception {
|
|
public void setup() throws Exception {
|
|
conf = new Configuration();
|
|
conf = new Configuration();
|
|
- // Don't retry connections - it just slows down the tests.
|
|
|
|
- conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
|
|
|
|
|
+ if (!name.getMethodName().equals("testSelectThreadCounts")) {
|
|
|
|
+ // Don't retry connections - it just slows down the tests.
|
|
|
|
+ conf.setInt(
|
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
|
|
|
+ }
|
|
// Turn off IPC client caching to handle daemon restarts.
|
|
// Turn off IPC client caching to handle daemon restarts.
|
|
conf.setInt(
|
|
conf.setInt(
|
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
|
@@ -1039,6 +1048,27 @@ public class TestQuorumJournalManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testSelectThreadCounts() throws Exception {
|
|
|
|
+ EditLogOutputStream stm =
|
|
|
|
+ qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
|
|
|
+ writeTxns(stm, 1, 10);
|
|
|
|
+ JournalNode jn0 = cluster.getJournalNode(0);
|
|
|
|
+ String ipcAddr = cluster.getJournalNodeIpcAddress(0);
|
|
|
|
+ jn0.stopAndJoin(0);
|
|
|
|
+ for (int i = 0; i < 1000; i++) {
|
|
|
|
+ qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
|
|
|
|
+ }
|
|
|
|
+ String expectedName =
|
|
|
|
+ "Logger channel (from parallel executor) to " + ipcAddr;
|
|
|
|
+ long num = Thread.getAllStackTraces().keySet().stream()
|
|
|
|
+ .filter((t) -> t.getName().contains(expectedName)).count();
|
|
|
|
+ // The number of threads for the stopped jn shouldn't be more than the
|
|
|
|
+ // configured value.
|
|
|
|
+ assertTrue("Number of threads are : " + num,
|
|
|
|
+ num <= DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testSelectViaRpcTwoJNsError() throws Exception {
|
|
public void testSelectViaRpcTwoJNsError() throws Exception {
|
|
EditLogOutputStream stm =
|
|
EditLogOutputStream stm =
|