1
0
Переглянути джерело

MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread pool.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232981 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 13 роки тому
батько
коміт
8b2f6909ec

+ 4 - 1
hadoop-mapreduce-project/CHANGES.txt

@@ -507,7 +507,10 @@ Release 0.23.1 - Unreleased
     leading to a long timeout in Task. (Rajesh Balamohan via acmurthy)
 
     MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both
-    delegation tokens and kerberos. (mahadev via acmurthy) 
+    delegation tokens and kerberos. (mahadev via acmurthy)
+
+    MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread
+    pool (tomwhite)
 
 Release 0.23.0 - 2011-11-01 
 

+ 47 - 35
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java

@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * A helper class for managing the distributed cache for {@link LocalJobRunner}.
  */
@@ -111,43 +114,52 @@ class LocalDistributedCacheManager {
     FileContext localFSFileContext = FileContext.getLocalFSFileContext();
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     
-    Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
-    ExecutorService exec = Executors.newCachedThreadPool();
-    Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
-    for (LocalResource resource : localResources.values()) {
-      Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
-          destPath, resource, new Random());
-      Future<Path> future = exec.submit(download);
-      resourcesToPaths.put(resource, future);
-    }
-    for (LocalResource resource : localResources.values()) {
-      Path path;
-      try {
-        path = resourcesToPaths.get(resource).get();
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      } catch (ExecutionException e) {
-        throw new IOException(e);
+    ExecutorService exec = null;
+    try {
+      ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("LocalDistributedCacheManager Downloader #%d")
+      .build();
+      exec = Executors.newCachedThreadPool(tf);
+      Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
+      Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
+      for (LocalResource resource : localResources.values()) {
+        Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
+            destPath, resource, new Random());
+        Future<Path> future = exec.submit(download);
+        resourcesToPaths.put(resource, future);
       }
-      String pathString = path.toUri().toString();
-      if (resource.getType() == LocalResourceType.ARCHIVE) {
-        localArchives.add(pathString);
-      } else if (resource.getType() == LocalResourceType.FILE) {
-        localFiles.add(pathString);
+      for (LocalResource resource : localResources.values()) {
+        Path path;
+        try {
+          path = resourcesToPaths.get(resource).get();
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e);
+        }
+        String pathString = path.toUri().toString();
+        if (resource.getType() == LocalResourceType.ARCHIVE) {
+          localArchives.add(pathString);
+        } else if (resource.getType() == LocalResourceType.FILE) {
+          localFiles.add(pathString);
+        }
+        Path resourcePath;
+        try {
+          resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+        } catch (URISyntaxException e) {
+          throw new IOException(e);
+        }
+        LOG.info(String.format("Localized %s as %s", resourcePath, path));
+        String cp = resourcePath.toUri().getPath();
+        if (classpaths.keySet().contains(cp)) {
+          localClasspaths.add(path.toUri().getPath().toString());
+        }
       }
-      Path resourcePath;
-      try {
-        resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
-      } catch (URISyntaxException e) {
-        throw new IOException(e);
+    } finally {
+      if (exec != null) {
+        exec.shutdown();
       }
-      LOG.info(String.format("Localized %s as %s", resourcePath, path));
-      String cp = resourcePath.toUri().getPath();
-      if (classpaths.keySet().contains(cp)) {
-        localClasspaths.add(path.toUri().getPath().toString());
-      }
-    }
-    
+    }    
     // Update the configuration object with localized data.
     if (!localArchives.isEmpty()) {
       conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
@@ -171,7 +183,7 @@ class LocalDistributedCacheManager {
     }
     setupCalled = true;
   }
-
+  
   /** 
    * Are the resources that should be added to the classpath? 
    * Should be called after setup().

+ 7 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -61,6 +62,8 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -302,7 +305,10 @@ public class LocalJobRunner implements ClientProtocol {
       LOG.debug("Map tasks to process: " + this.numMapTasks);
 
       // Create a new executor service to drain the work queue.
-      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
+      ThreadFactory tf = new ThreadFactoryBuilder()
+        .setNameFormat("LocalJobRunner Map Task Executor #%d")
+        .build();
+      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
 
       return executor;
     }