|
@@ -27,7 +27,9 @@ import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
@@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.yarn.MockApps;
|
|
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
|
@@ -44,28 +47,36 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Event;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
-import org.junit.Test;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.BeforeClass;
|
|
|
-
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
public class TestClientRMService {
|
|
|
|
|
@@ -235,6 +246,88 @@ public class TestClientRMService {
|
|
|
rmService.renewDelegationToken(request);
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout=4000)
|
|
|
+ public void testConcurrentAppSubmit()
|
|
|
+ throws IOException, InterruptedException, BrokenBarrierException {
|
|
|
+ YarnScheduler yarnScheduler = mock(YarnScheduler.class);
|
|
|
+ RMContext rmContext = mock(RMContext.class);
|
|
|
+ mockRMContext(yarnScheduler, rmContext);
|
|
|
+ RMStateStore stateStore = mock(RMStateStore.class);
|
|
|
+ when(rmContext.getStateStore()).thenReturn(stateStore);
|
|
|
+ RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
|
|
|
+ null, mock(ApplicationACLsManager.class), new Configuration());
|
|
|
+
|
|
|
+ final ApplicationId appId1 = getApplicationId(100);
|
|
|
+ final ApplicationId appId2 = getApplicationId(101);
|
|
|
+ final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
|
|
|
+ final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
|
|
|
+
|
|
|
+ final CyclicBarrier startBarrier = new CyclicBarrier(2);
|
|
|
+ final CyclicBarrier endBarrier = new CyclicBarrier(2);
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ EventHandler eventHandler = new EventHandler() {
|
|
|
+ @Override
|
|
|
+ public void handle(Event rawEvent) {
|
|
|
+ if (rawEvent instanceof RMAppEvent) {
|
|
|
+ RMAppEvent event = (RMAppEvent) rawEvent;
|
|
|
+ if (event.getApplicationId().equals(appId1)) {
|
|
|
+ try {
|
|
|
+ startBarrier.await();
|
|
|
+ endBarrier.await();
|
|
|
+ } catch (BrokenBarrierException e) {
|
|
|
+ LOG.warn("Broken Barrier", e);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("Interrupted while awaiting barriers", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
|
|
|
+
|
|
|
+ final ClientRMService rmService =
|
|
|
+ new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
|
|
|
+
|
|
|
+ // submit an app and wait for it to block while in app submission
|
|
|
+ Thread t = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ rmService.submitApplication(submitRequest1);
|
|
|
+ } catch (YarnRemoteException e) {}
|
|
|
+ }
|
|
|
+ };
|
|
|
+ t.start();
|
|
|
+
|
|
|
+ // submit another app, so go through while the first app is blocked
|
|
|
+ startBarrier.await();
|
|
|
+ rmService.submitApplication(submitRequest2);
|
|
|
+ endBarrier.await();
|
|
|
+ t.join();
|
|
|
+ }
|
|
|
+
|
|
|
+ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
|
|
|
+ String user = MockApps.newUserName();
|
|
|
+ String queue = MockApps.newQueue();
|
|
|
+
|
|
|
+ ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
|
|
|
+ Resource resource = mock(Resource.class);
|
|
|
+ when(amContainerSpec.getResource()).thenReturn(resource);
|
|
|
+
|
|
|
+ ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
|
|
+ when(submissionContext.getUser()).thenReturn(user);
|
|
|
+ when(submissionContext.getQueue()).thenReturn(queue);
|
|
|
+ when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
|
|
|
+ when(submissionContext.getApplicationId()).thenReturn(appId);
|
|
|
+
|
|
|
+ SubmitApplicationRequest submitRequest =
|
|
|
+ recordFactory.newRecordInstance(SubmitApplicationRequest.class);
|
|
|
+ submitRequest.setApplicationSubmissionContext(submissionContext);
|
|
|
+ return submitRequest;
|
|
|
+ }
|
|
|
+
|
|
|
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
|
|
|
throws IOException {
|
|
|
Dispatcher dispatcher = mock(Dispatcher.class);
|