|
@@ -22,8 +22,10 @@ import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+import com.google.inject.Provider;
|
|
|
import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
@@ -69,6 +71,9 @@ public class ClustersDeadlockTest {
|
|
|
|
|
|
private final AtomicInteger hostNameCounter = new AtomicInteger(0);
|
|
|
|
|
|
+ private CountDownLatch writerStoppedSignal;
|
|
|
+ private CountDownLatch readerStoppedSignal;
|
|
|
+
|
|
|
private final StackId stackId = new StackId("HDP-0.1");
|
|
|
|
|
|
@Inject
|
|
@@ -109,6 +114,9 @@ public class ClustersDeadlockTest {
|
|
|
|
|
|
// install HDFS
|
|
|
installService("HDFS");
|
|
|
+
|
|
|
+ writerStoppedSignal = new CountDownLatch(NUMBER_OF_THREADS);
|
|
|
+ readerStoppedSignal = new CountDownLatch(NUMBER_OF_THREADS);
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -116,6 +124,42 @@ public class ClustersDeadlockTest {
|
|
|
injector.getInstance(PersistService.class).stop();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Launches reader and writer threads simultaneously to check for a deadlock.
|
|
|
+ * The numbers of launched reader and writer threads are equal to
|
|
|
+ * the {@code}numberOfThreads{@code}. This method expects that reader
|
|
|
+ * and writer threads are using {@code}readerStoppedSignal{@code}
|
|
|
+ * and {@code}writerStoppedSignal{@code} correctly.
|
|
|
+ *
|
|
|
+ * Reader threads should be stopped after writer threads are finished.
|
|
|
+ */
|
|
|
+ private void doLoadTest(Provider<? extends Thread> readerProvider,
|
|
|
+ Provider<? extends Thread> writerProvider,
|
|
|
+ final int numberOfThreads,
|
|
|
+ CountDownLatch writerStoppedSignal,
|
|
|
+ CountDownLatch readerStoppedSignal) throws Exception {
|
|
|
+ List<Thread> writerThreads = new ArrayList<Thread>();
|
|
|
+ for (int i = 0; i < numberOfThreads; i++) {
|
|
|
+ Thread readerThread = readerProvider.get();
|
|
|
+ Thread writerThread = writerProvider.get();
|
|
|
+
|
|
|
+ writerThreads.add(writerThread);
|
|
|
+
|
|
|
+ readerThread.start();
|
|
|
+ writerThread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Thread writerThread : writerThreads) {
|
|
|
+ writerThread.join();
|
|
|
+ // Notify that one writer thread is stopped
|
|
|
+ writerStoppedSignal.countDown();
|
|
|
+ }
|
|
|
+
|
|
|
+ // All writer threads are stopped. Reader threads should finish now.
|
|
|
+ // Await for all reader threads to stop
|
|
|
+ readerStoppedSignal.await();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Tests that no deadlock exists when adding hosts while reading from the
|
|
|
* cluster.
|
|
@@ -124,21 +168,20 @@ public class ClustersDeadlockTest {
|
|
|
*/
|
|
|
@Test(timeout = 40000)
|
|
|
public void testDeadlockWhileMappingHosts() throws Exception {
|
|
|
- List<Thread> threads = new ArrayList<Thread>();
|
|
|
- for (int i = 0; i < NUMBER_OF_THREADS; i++) {
|
|
|
- ClusterReaderThread readerThread = new ClusterReaderThread();
|
|
|
- ClustersHostMapperThread writerThread = new ClustersHostMapperThread();
|
|
|
-
|
|
|
- threads.add(readerThread);
|
|
|
- threads.add(writerThread);
|
|
|
+ Provider<ClustersHostMapperThread> clustersHostMapperThreadFactory =
|
|
|
+ new Provider<ClustersHostMapperThread>() {
|
|
|
|
|
|
- readerThread.start();
|
|
|
- writerThread.start();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public ClustersHostMapperThread get() {
|
|
|
+ return new ClustersHostMapperThread();
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- for (Thread thread : threads) {
|
|
|
- thread.join();
|
|
|
- }
|
|
|
+ doLoadTest(new ClusterReaderThreadFactory(),
|
|
|
+ clustersHostMapperThreadFactory,
|
|
|
+ NUMBER_OF_THREADS,
|
|
|
+ writerStoppedSignal,
|
|
|
+ readerStoppedSignal);
|
|
|
|
|
|
Assert.assertEquals(NUMBER_OF_THREADS * NUMBER_OF_HOSTS,
|
|
|
clusters.getHostsForCluster(CLUSTER_NAME).size());
|
|
@@ -154,21 +197,20 @@ public class ClustersDeadlockTest {
|
|
|
@Test(timeout = 40000)
|
|
|
public void testDeadlockWhileMappingHostsWithExistingServices()
|
|
|
throws Exception {
|
|
|
- List<Thread> threads = new ArrayList<Thread>();
|
|
|
- for (int i = 0; i < NUMBER_OF_THREADS; i++) {
|
|
|
- ClusterReaderThread readerThread = new ClusterReaderThread();
|
|
|
- ClustersHostAndComponentMapperThread writerThread = new ClustersHostAndComponentMapperThread();
|
|
|
-
|
|
|
- threads.add(readerThread);
|
|
|
- threads.add(writerThread);
|
|
|
+ Provider<ClustersHostAndComponentMapperThread> clustersHostAndComponentMapperThreadFactory =
|
|
|
+ new Provider<ClustersHostAndComponentMapperThread>() {
|
|
|
|
|
|
- readerThread.start();
|
|
|
- writerThread.start();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public ClustersHostAndComponentMapperThread get() {
|
|
|
+ return new ClustersHostAndComponentMapperThread();
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- for (Thread thread : threads) {
|
|
|
- thread.join();
|
|
|
- }
|
|
|
+ doLoadTest(new ClusterReaderThreadFactory(),
|
|
|
+ clustersHostAndComponentMapperThreadFactory,
|
|
|
+ NUMBER_OF_THREADS,
|
|
|
+ writerStoppedSignal,
|
|
|
+ readerStoppedSignal);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -179,26 +221,33 @@ public class ClustersDeadlockTest {
|
|
|
*/
|
|
|
@Test(timeout = 40000)
|
|
|
public void testDeadlockWhileUnmappingHosts() throws Exception {
|
|
|
- List<Thread> threads = new ArrayList<Thread>();
|
|
|
- for (int i = 0; i < NUMBER_OF_THREADS; i++) {
|
|
|
- ClusterReaderThread readerThread = new ClusterReaderThread();
|
|
|
- ClustersHostUnMapperThread writerThread = new ClustersHostUnMapperThread();
|
|
|
-
|
|
|
- threads.add(readerThread);
|
|
|
- threads.add(writerThread);
|
|
|
+ Provider<ClustersHostUnMapperThread> clustersHostUnMapperThreadFactory =
|
|
|
+ new Provider<ClustersHostUnMapperThread>() {
|
|
|
|
|
|
- readerThread.start();
|
|
|
- writerThread.start();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public ClustersHostUnMapperThread get() {
|
|
|
+ return new ClustersHostUnMapperThread();
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- for (Thread thread : threads) {
|
|
|
- thread.join();
|
|
|
- }
|
|
|
+ doLoadTest(new ClusterReaderThreadFactory(),
|
|
|
+ clustersHostUnMapperThreadFactory,
|
|
|
+ NUMBER_OF_THREADS,
|
|
|
+ writerStoppedSignal,
|
|
|
+ readerStoppedSignal);
|
|
|
|
|
|
Assert.assertEquals(0,
|
|
|
clusters.getHostsForCluster(CLUSTER_NAME).size());
|
|
|
}
|
|
|
|
|
|
+ private final class ClusterReaderThreadFactory implements Provider<ClusterReaderThread> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ClusterReaderThread get() {
|
|
|
+ return new ClusterReaderThread();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The {@link ClusterReaderThread} reads from a cluster over and over again
|
|
|
* with a slight pause.
|
|
@@ -211,12 +260,20 @@ public class ClustersDeadlockTest {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
- for (int i = 0; i < 1000; i++) {
|
|
|
+ // Repeat until writer threads exist
|
|
|
+ while (true) {
|
|
|
+ if (writerStoppedSignal.getCount() == 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
cluster.convertToResponse();
|
|
|
Thread.sleep(10);
|
|
|
}
|
|
|
} catch (Exception exception) {
|
|
|
throw new RuntimeException(exception);
|
|
|
+ } finally {
|
|
|
+ // Notify that one reader was stopped
|
|
|
+ readerStoppedSignal.countDown();
|
|
|
}
|
|
|
}
|
|
|
}
|