|
@@ -36,6 +36,7 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
@@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
|
|
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
|
|
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
|
@@ -62,7 +64,9 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
+import org.junit.Rule;
|
|
|
import org.junit.Test;
|
|
|
+import org.junit.rules.TestName;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.mockito.stubbing.Stubber;
|
|
|
|
|
@@ -88,11 +92,17 @@ public class TestQuorumJournalManager {
|
|
|
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
|
|
|
}
|
|
|
|
|
|
+ @Rule
|
|
|
+ public TestName name = new TestName();
|
|
|
+
|
|
|
@Before
|
|
|
public void setup() throws Exception {
|
|
|
conf = new Configuration();
|
|
|
- // Don't retry connections - it just slows down the tests.
|
|
|
- conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
|
|
+ if (!name.getMethodName().equals("testSelectThreadCounts")) {
|
|
|
+ // Don't retry connections - it just slows down the tests.
|
|
|
+ conf.setInt(
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
|
|
+ }
|
|
|
// Turn off IPC client caching to handle daemon restarts.
|
|
|
conf.setInt(
|
|
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
|
@@ -1041,6 +1051,34 @@ public class TestQuorumJournalManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testSelectThreadCounts() throws Exception {
|
|
|
+ EditLogOutputStream stm =
|
|
|
+ qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
|
|
+ writeTxns(stm, 1, 10);
|
|
|
+ JournalNode jn0 = cluster.getJournalNode(0);
|
|
|
+ String ipcAddr = cluster.getJournalNodeIpcAddress(0);
|
|
|
+ jn0.stopAndJoin(0);
|
|
|
+ for (int i = 0; i < 1000; i++) {
|
|
|
+ qjm.selectInputStreams(new ArrayList<EditLogInputStream>(), 1, true,
|
|
|
+ false);
|
|
|
+ }
|
|
|
+ String expectedName =
|
|
|
+ "Logger channel (from parallel executor) to " + ipcAddr;
|
|
|
+ Iterator<Thread> itr = Thread.getAllStackTraces().keySet().iterator();
|
|
|
+ int num = 0;
|
|
|
+ while (itr.hasNext()) {
|
|
|
+ Thread elem = itr.next();
|
|
|
+ if (elem.getName().contains(expectedName)) {
|
|
|
+ num++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // The number of threads for the stopped jn shouldn't be more than the
|
|
|
+ // configured value.
|
|
|
+ assertTrue("Number of threads are : " + num,
|
|
|
+ num <= DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testSelectViaRpcTwoJNsError() throws Exception {
|
|
|
EditLogOutputStream stm =
|