|
@@ -21,9 +21,9 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.security.PrivilegedAction;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -79,8 +79,8 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|
|
private RecordFactory recordFactory;
|
|
|
//have a cache/map of UGIs so as to avoid creating too many RPC
|
|
|
//client connection objects to the same NodeManager
|
|
|
- private Map<String, UserGroupInformation> ugiMap =
|
|
|
- new HashMap<String, UserGroupInformation>();
|
|
|
+ private ConcurrentMap<String, UserGroupInformation> ugiMap =
|
|
|
+ new ConcurrentHashMap<String, UserGroupInformation>();
|
|
|
|
|
|
public ContainerLauncherImpl(AppContext context) {
|
|
|
super(ContainerLauncherImpl.class.getName());
|
|
@@ -142,22 +142,19 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|
|
|
|
|
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
|
|
|
|
|
- // TODO: Synchronization problems!!
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- if(!ugiMap.containsKey(containerManagerBindAddr)) {
|
|
|
- Token<ContainerTokenIdentifier> token =
|
|
|
- new Token<ContainerTokenIdentifier>(
|
|
|
- containerToken.getIdentifier().array(),
|
|
|
- containerToken.getPassword().array(), new Text(
|
|
|
- containerToken.getKind()), new Text(
|
|
|
- containerToken.getService()));
|
|
|
- //the user in createRemoteUser in this context is not important
|
|
|
- user = UserGroupInformation.createRemoteUser(containerManagerBindAddr);
|
|
|
- user.addToken(token);
|
|
|
- ugiMap.put(containerManagerBindAddr, user);
|
|
|
- } else {
|
|
|
- user = ugiMap.get(containerManagerBindAddr);
|
|
|
- }
|
|
|
+
|
|
|
+ Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
|
|
+ containerToken.getIdentifier().array(), containerToken
|
|
|
+ .getPassword().array(), new Text(containerToken.getKind()),
|
|
|
+ new Text(containerToken.getService()));
|
|
|
+ // the user in createRemoteUser in this context is not important
|
|
|
+ UserGroupInformation ugi = UserGroupInformation
|
|
|
+ .createRemoteUser(containerManagerBindAddr);
|
|
|
+ ugi.addToken(token);
|
|
|
+ ugiMap.putIfAbsent(containerManagerBindAddr, ugi);
|
|
|
+
|
|
|
+ user = ugiMap.get(containerManagerBindAddr);
|
|
|
}
|
|
|
ContainerManager proxy =
|
|
|
user.doAs(new PrivilegedAction<ContainerManager>() {
|