|
@@ -17,24 +17,31 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
|
|
import org.apache.hadoop.ha.ServiceFailedException;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
import static org.junit.Assert.fail;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
-import static org.mockito.Mockito.atLeast;
|
|
|
-import static org.mockito.Mockito.atMost;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -49,6 +56,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
|
|
|
private Configuration conf;
|
|
|
private AtomicBoolean callbackCalled;
|
|
|
+ private AtomicInteger transitionToActiveCounter;
|
|
|
+ private AtomicInteger transitionToStandbyCounter;
|
|
|
|
|
|
private enum SyncTestType {
|
|
|
ACTIVE,
|
|
@@ -76,6 +85,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
|
|
|
|
|
callbackCalled = new AtomicBoolean(false);
|
|
|
+ transitionToActiveCounter = new AtomicInteger(0);
|
|
|
+ transitionToStandbyCounter = new AtomicInteger(0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -104,7 +115,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testCallbackSynchronization()
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
testCallbackSynchronization(SyncTestType.ACTIVE);
|
|
|
testCallbackSynchronization(SyncTestType.STANDBY);
|
|
|
testCallbackSynchronization(SyncTestType.NEUTRAL);
|
|
@@ -118,9 +129,10 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
* @param type the type of test to run
|
|
|
* @throws IOException if there's an issue transitioning
|
|
|
* @throws InterruptedException if interrupted
|
|
|
+ * @throws TimeoutException if waitFor timeout reached
|
|
|
*/
|
|
|
private void testCallbackSynchronization(SyncTestType type)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
AdminService as = mock(AdminService.class);
|
|
|
RMContext rc = mock(RMContext.class);
|
|
|
ResourceManager rm = mock(ResourceManager.class);
|
|
@@ -130,6 +142,23 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
when(rm.getRMContext()).thenReturn(rc);
|
|
|
when(rc.getRMAdminService()).thenReturn(as);
|
|
|
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocation) {
|
|
|
+ transitionToActiveCounter.incrementAndGet();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(as).transitionToActive((StateChangeRequestInfo) any());
|
|
|
+ transitionToActiveCounter.set(0);
|
|
|
+ doAnswer(new Answer() {
|
|
|
+ @Override
|
|
|
+ public Object answer(InvocationOnMock invocation) {
|
|
|
+ transitionToStandbyCounter.incrementAndGet();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(as).transitionToStandby((StateChangeRequestInfo) any());
|
|
|
+ transitionToStandbyCounter.set(0);
|
|
|
+
|
|
|
ActiveStandbyElectorBasedElectorService ees =
|
|
|
new ActiveStandbyElectorBasedElectorService(rm);
|
|
|
ees.init(myConf);
|
|
@@ -166,16 +195,22 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
* @param ees the embedded elector service
|
|
|
* @throws IOException if there's an issue transitioning
|
|
|
* @throws InterruptedException if interrupted
|
|
|
+ * @throws TimeoutException if waitFor timeout reached
|
|
|
*/
|
|
|
private void testCallbackSynchronizationActive(AdminService as,
|
|
|
ActiveStandbyElectorBasedElectorService ees)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
ees.becomeActive();
|
|
|
|
|
|
- Thread.sleep(100);
|
|
|
-
|
|
|
- verify(as).transitionToActive((StateChangeRequestInfo)any());
|
|
|
- verify(as, never()).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return transitionToActiveCounter.get() >= 1;
|
|
|
+ }
|
|
|
+ }, 500, 10 * 1000);
|
|
|
+ verify(as, times(1)).transitionToActive((StateChangeRequestInfo) any());
|
|
|
+ verify(as, never()).transitionToStandby((StateChangeRequestInfo) any());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -186,16 +221,21 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
* @param ees the embedded elector service
|
|
|
* @throws IOException if there's an issue transitioning
|
|
|
* @throws InterruptedException if interrupted
|
|
|
+ * @throws TimeoutException if waitFor timeout reached
|
|
|
*/
|
|
|
private void testCallbackSynchronizationStandby(AdminService as,
|
|
|
ActiveStandbyElectorBasedElectorService ees)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
ees.becomeStandby();
|
|
|
|
|
|
- Thread.sleep(100);
|
|
|
-
|
|
|
- verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
- verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return transitionToStandbyCounter.get() >= 1;
|
|
|
+ }
|
|
|
+ }, 500, 10 * 1000);
|
|
|
+ verify(as, times(1)).transitionToStandby((StateChangeRequestInfo) any());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -205,16 +245,21 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
* @param ees the embedded elector service
|
|
|
* @throws IOException if there's an issue transitioning
|
|
|
* @throws InterruptedException if interrupted
|
|
|
+ * @throws TimeoutException if waitFor timeout reached
|
|
|
*/
|
|
|
private void testCallbackSynchronizationNeutral(AdminService as,
|
|
|
ActiveStandbyElectorBasedElectorService ees)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
ees.enterNeutralMode();
|
|
|
|
|
|
- Thread.sleep(100);
|
|
|
-
|
|
|
- verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
- verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return transitionToStandbyCounter.get() >= 1;
|
|
|
+ }
|
|
|
+ }, 500, 10 * 1000);
|
|
|
+ verify(as, times(1)).transitionToStandby((StateChangeRequestInfo) any());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -225,10 +270,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
* @param ees the embedded elector service
|
|
|
* @throws IOException if there's an issue transitioning
|
|
|
* @throws InterruptedException if interrupted
|
|
|
+ * @throws TimeoutException if waitFor timeout reached
|
|
|
*/
|
|
|
private void testCallbackSynchronizationTimingActive(AdminService as,
|
|
|
ActiveStandbyElectorBasedElectorService ees)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
synchronized (ees.zkDisconnectLock) {
|
|
|
// Sleep while holding the lock so that the timer thread can't do
|
|
|
// anything when it runs. Sleep until we're pretty sure the timer thread
|
|
@@ -244,8 +290,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
// going to do, hopefully nothing.
|
|
|
Thread.sleep(50);
|
|
|
|
|
|
- verify(as).transitionToActive((StateChangeRequestInfo)any());
|
|
|
- verify(as, never()).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return transitionToActiveCounter.get() >= 1;
|
|
|
+ }
|
|
|
+ }, 500, 10 * 1000);
|
|
|
+ verify(as, times(1)).transitionToActive((StateChangeRequestInfo) any());
|
|
|
+ verify(as, never()).transitionToStandby((StateChangeRequestInfo) any());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -256,10 +309,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
* @param ees the embedded elector service
|
|
|
* @throws IOException if there's an issue transitioning
|
|
|
* @throws InterruptedException if interrupted
|
|
|
+ * @throws TimeoutException if waitFor timeout reached
|
|
|
*/
|
|
|
private void testCallbackSynchronizationTimingStandby(AdminService as,
|
|
|
ActiveStandbyElectorBasedElectorService ees)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
synchronized (ees.zkDisconnectLock) {
|
|
|
// Sleep while holding the lock so that the timer thread can't do
|
|
|
// anything when it runs. Sleep until we're pretty sure the timer thread
|
|
@@ -275,8 +329,14 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|
|
// going to do, hopefully nothing.
|
|
|
Thread.sleep(50);
|
|
|
|
|
|
- verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
- verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any());
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return transitionToStandbyCounter.get() >= 1;
|
|
|
+ }
|
|
|
+ }, 500, 10 * 1000);
|
|
|
+ verify(as, times(1)).transitionToStandby((StateChangeRequestInfo) any());
|
|
|
}
|
|
|
|
|
|
private class MockRMWithElector extends MockRM {
|