|
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ExecutorCompletionService;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
@@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
|
|
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
|
@@ -234,7 +240,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|
|
RegisterApplicationMasterRequest registerReq =
|
|
|
Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
registerReq.setHost(Integer.toString(testAppId));
|
|
|
- registerReq.setRpcPort(testAppId);
|
|
|
+ registerReq.setRpcPort(0);
|
|
|
registerReq.setTrackingUrl("");
|
|
|
|
|
|
RegisterApplicationMasterResponse registerResponse =
|
|
@@ -298,7 +304,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|
|
RegisterApplicationMasterRequest registerReq =
|
|
|
Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
registerReq.setHost(Integer.toString(testAppId));
|
|
|
- registerReq.setRpcPort(testAppId);
|
|
|
+ registerReq.setRpcPort(0);
|
|
|
registerReq.setTrackingUrl("");
|
|
|
|
|
|
RegisterApplicationMasterResponse registerResponse =
|
|
@@ -338,6 +344,78 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|
|
Assert.assertEquals(true, finshResponse.getIsUnregistered());
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Test concurrent register threads. This is possible because the timeout
|
|
|
+ * between AM and AMRMProxy is shorter than the timeout + failOver between
|
|
|
+ * FederationInterceptor (AMRMProxy) and RM. When first call is blocked due to
|
|
|
+ * RM failover and AM timeout, it will call us resulting in a second register
|
|
|
+ * thread.
|
|
|
+ */
|
|
|
+ @Test(timeout = 5000)
|
|
|
+ public void testConcurrentRegister()
|
|
|
+ throws InterruptedException, ExecutionException {
|
|
|
+ ExecutorService threadpool = Executors.newCachedThreadPool();
|
|
|
+ ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
|
|
|
+ new ExecutorCompletionService<>(threadpool);
|
|
|
+
|
|
|
+ Object syncObj = MockResourceManagerFacade.getSyncObj();
|
|
|
+
|
|
|
+ // Two register threads
|
|
|
+ synchronized (syncObj) {
|
|
|
+ // Make sure first thread will block within RM, before the second thread
|
|
|
+ // starts
|
|
|
+ LOG.info("Starting first register thread");
|
|
|
+ compSvc.submit(new ConcurrentRegisterAMCallable());
|
|
|
+
|
|
|
+ try {
|
|
|
+ LOG.info("Test main starts waiting for the first thread to block");
|
|
|
+ syncObj.wait();
|
|
|
+ LOG.info("Test main wait finished");
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Test main wait interrupted", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // The second thread will get already registered exception from RM.
|
|
|
+ LOG.info("Starting second register thread");
|
|
|
+ compSvc.submit(new ConcurrentRegisterAMCallable());
|
|
|
+
|
|
|
+ // Notify the first register thread to return
|
|
|
+ LOG.info("Let first blocked register thread move on");
|
|
|
+ synchronized (syncObj) {
|
|
|
+ syncObj.notifyAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Both thread should return without exception
|
|
|
+ RegisterApplicationMasterResponse response = compSvc.take().get();
|
|
|
+ Assert.assertNotNull(response);
|
|
|
+
|
|
|
+ response = compSvc.take().get();
|
|
|
+ Assert.assertNotNull(response);
|
|
|
+
|
|
|
+ threadpool.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A callable that calls registerAM to RM with blocking.
|
|
|
+ */
|
|
|
+ public class ConcurrentRegisterAMCallable
|
|
|
+ implements Callable<RegisterApplicationMasterResponse> {
|
|
|
+ @Override
|
|
|
+ public RegisterApplicationMasterResponse call() throws Exception {
|
|
|
+ RegisterApplicationMasterResponse response = null;
|
|
|
+ try {
|
|
|
+ // Use port number 1001 to let mock RM block in the register call
|
|
|
+ response = interceptor.registerApplicationMaster(
|
|
|
+ RegisterApplicationMasterRequest.newInstance(null, 1001, null));
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Register thread exception", e);
|
|
|
+ response = null;
|
|
|
+ }
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testRequestInterceptorChainCreation() throws Exception {
|
|
|
RequestInterceptor root =
|
|
@@ -381,7 +459,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|
|
RegisterApplicationMasterRequest registerReq =
|
|
|
Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
registerReq.setHost(Integer.toString(testAppId));
|
|
|
- registerReq.setRpcPort(testAppId);
|
|
|
+ registerReq.setRpcPort(0);
|
|
|
registerReq.setTrackingUrl("");
|
|
|
|
|
|
for (int i = 0; i < 2; i++) {
|
|
@@ -397,7 +475,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|
|
RegisterApplicationMasterRequest registerReq =
|
|
|
Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
registerReq.setHost(Integer.toString(testAppId));
|
|
|
- registerReq.setRpcPort(testAppId);
|
|
|
+ registerReq.setRpcPort(0);
|
|
|
registerReq.setTrackingUrl("");
|
|
|
|
|
|
RegisterApplicationMasterResponse registerResponse =
|
|
@@ -407,7 +485,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|
|
// Register the application second time with a different request obj
|
|
|
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
registerReq.setHost(Integer.toString(testAppId));
|
|
|
- registerReq.setRpcPort(testAppId);
|
|
|
+ registerReq.setRpcPort(0);
|
|
|
registerReq.setTrackingUrl("different");
|
|
|
try {
|
|
|
registerResponse = interceptor.registerApplicationMaster(registerReq);
|