|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.nodemanager;
|
|
package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
@@ -37,7 +38,10 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -74,12 +78,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
|
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|
@@ -1463,6 +1469,63 @@ public class TestNodeStatusUpdater {
|
|
nm.stop();
|
|
nm.stop();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testConcurrentAccessToSystemCredentials(){
|
|
|
|
+ final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();
|
|
|
|
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]);
|
|
|
|
+ ApplicationId applicationId = ApplicationId.newInstance(123456, 120);
|
|
|
|
+ testCredentials.put(applicationId, byteBuffer);
|
|
|
|
+
|
|
|
|
+ final List<Throwable> exceptions = Collections.synchronizedList(new
|
|
|
|
+ ArrayList<Throwable>());
|
|
|
|
+
|
|
|
|
+ final int NUM_THREADS = 10;
|
|
|
|
+ final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
|
|
|
|
+ final ExecutorService threadPool = Executors.newFixedThreadPool(
|
|
|
|
+ NUM_THREADS);
|
|
|
|
+
|
|
|
|
+ final AtomicBoolean stop = new AtomicBoolean(false);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < NUM_THREADS; i++) {
|
|
|
|
+ threadPool.submit(new Runnable() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < 100 && !stop.get(); i++) {
|
|
|
|
+ NodeHeartbeatResponse nodeHeartBeatResponse =
|
|
|
|
+ newNodeHeartbeatResponse(0, NodeAction.NORMAL,
|
|
|
|
+ null, null, null, null, 0);
|
|
|
|
+ nodeHeartBeatResponse.setSystemCredentialsForApps(
|
|
|
|
+ testCredentials);
|
|
|
|
+ NodeHeartbeatResponseProto proto =
|
|
|
|
+ ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse)
|
|
|
|
+ .getProto();
|
|
|
|
+ Assert.assertNotNull(proto);
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ exceptions.add(t);
|
|
|
|
+ stop.set(true);
|
|
|
|
+ } finally {
|
|
|
|
+ allDone.countDown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int testTimeout = 2;
|
|
|
|
+ Assert.assertTrue("Timeout waiting for more than " + testTimeout + " " +
|
|
|
|
+ "seconds",
|
|
|
|
+ allDone.await(testTimeout, TimeUnit.SECONDS));
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ exceptions.add(ie);
|
|
|
|
+ } finally {
|
|
|
|
+ threadPool.shutdownNow();
|
|
|
|
+ }
|
|
|
|
+ Assert.assertTrue("Test failed with exception(s)" + exceptions,
|
|
|
|
+ exceptions.isEmpty());
|
|
|
|
+ }
|
|
|
|
+
|
|
// Add new containers info into NM context each time node heart beats.
|
|
// Add new containers info into NM context each time node heart beats.
|
|
private class MyNMContext extends NMContext {
|
|
private class MyNMContext extends NMContext {
|
|
|
|
|