瀏覽代碼

Fix for ConcurrentModification exception while iterating through tokens in a UGI in ContainerLauncherImpl. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1138317 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 14 年之前
父節點
當前提交
2df37eb332

+ 3 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Fix for ConcurrentModification exception while iterating through tokens in
+    a UGI in ContainerLauncherImpl. (ddas)
+
     MAPREDUCE-2611. Fix counters, finish times etc. in job history.
     (Siddharth Seth via llu)
 

+ 18 - 4
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import java.io.IOException;
 import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -71,6 +73,10 @@ public class ContainerLauncherImpl extends AbstractService implements
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
   private RecordFactory recordFactory;
+  //have a cache/map of UGIs so as to avoid creating too many RPC
+  //client connection objects to the same NodeManager
+  private Map<String, UserGroupInformation> ugiMap = 
+    new HashMap<String, UserGroupInformation>();
 
   public ContainerLauncherImpl(AppContext context) {
     super(ContainerLauncherImpl.class.getName());
@@ -130,15 +136,23 @@ public class ContainerLauncherImpl extends AbstractService implements
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
-    UserGroupInformation user = UserGroupInformation.getLoginUser();
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
     if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token =
+      if(!ugiMap.containsKey(containerManagerBindAddr)) {
+        Token<ContainerTokenIdentifier> token =
           new Token<ContainerTokenIdentifier>(
               containerToken.getIdentifier().array(),
               containerToken.getPassword().array(), new Text(
                   containerToken.getKind()), new Text(
-                  containerToken.getService()));
-      user.addToken(token);
+                      containerToken.getService()));
+        //the user in createRemoteUser in this context is not important
+        user = UserGroupInformation.createRemoteUser(containerManagerBindAddr);
+        user.addToken(token);
+        ugiMap.put(containerManagerBindAddr, user);
+      } else {
+        user = ugiMap.get(containerManagerBindAddr);    
+      }
     }
     ContainerManager proxy =
         user.doAs(new PrivilegedAction<ContainerManager>() {

+ 1 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -536,7 +536,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       containersAllocated += allocatedContainers.size();
       while (it.hasNext()) {
         Container allocated = it.next();
-        LOG.info("Assiging container " + allocated);
+        LOG.info("Assigning container " + allocated);
         ContainerRequest assigned = assign(allocated);
           
         if (assigned != null) {