浏览代码

YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter)

Robert Kanter 9 年之前
父节点
当前提交
954dd57043

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
     YARN-4648. Move preemption related tests from TestFairScheduler to
     YARN-4648. Move preemption related tests from TestFairScheduler to
     TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
     TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
 
 
+    YARN-4697. NM aggregation thread pool is not bound by
+    limits (haibochen via rkanter)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -759,6 +759,11 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
   public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
   public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
   public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
 
 
+  /** The number of threads to handle log aggregation in node manager. */
+  public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE =
+      NM_PREFIX + "logaggregation.threadpool-size-max";
+  public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100;
+
   public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
   public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
       NM_PREFIX + "resourcemanager.minimum.version";
       NM_PREFIX + "resourcemanager.minimum.version";
   public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
   public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1226,6 +1226,14 @@
     <value>1.0</value>
     <value>1.0</value>
   </property>
   </property>
 
 
+  <property>
+    <description>
+    Thread pool size for LogAggregationService in Node Manager.
+    </description>
+    <name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
+    <value>100</value>
+  </property>
+
   <property>
   <property>
     <description>Percentage of CPU that can be allocated
     <description>Percentage of CPU that can be allocated
     for containers. This setting allows users to limit the amount of
     for containers. This setting allows users to limit the amount of

+ 29 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -102,7 +102,8 @@ public class LogAggregationService extends AbstractService implements
 
 
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
 
 
-  private final ExecutorService threadPool;
+  @VisibleForTesting
+  ExecutorService threadPool;
   
   
   public LogAggregationService(Dispatcher dispatcher, Context context,
   public LogAggregationService(Dispatcher dispatcher, Context context,
       DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
       DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@@ -113,10 +114,6 @@ public class LogAggregationService extends AbstractService implements
     this.dirsHandler = dirsHandler;
     this.dirsHandler = dirsHandler;
     this.appLogAggregators =
     this.appLogAggregators =
         new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
         new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
-    this.threadPool = HadoopExecutors.newCachedThreadPool(
-        new ThreadFactoryBuilder()
-          .setNameFormat("LogAggregationService #%d")
-          .build());
   }
   }
 
 
   protected void serviceInit(Configuration conf) throws Exception {
   protected void serviceInit(Configuration conf) throws Exception {
@@ -126,7 +123,11 @@ public class LogAggregationService extends AbstractService implements
     this.remoteRootLogDirSuffix =
     this.remoteRootLogDirSuffix =
         conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
         conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-
+    int threadPoolSize = getAggregatorThreadPoolSize(conf);
+    this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
+        new ThreadFactoryBuilder()
+            .setNameFormat("LogAggregationService #%d")
+            .build());
     super.serviceInit(conf);
     super.serviceInit(conf);
   }
   }
 
 
@@ -487,4 +488,26 @@ public class LogAggregationService extends AbstractService implements
   public NodeId getNodeId() {
   public NodeId getNodeId() {
     return this.nodeId;
     return this.nodeId;
   }
   }
+
+
+  private int getAggregatorThreadPoolSize(Configuration conf) {
+    int threadPoolSize;
+    try {
+      threadPoolSize = conf.getInt(YarnConfiguration
+          .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+          YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+    } catch (NumberFormatException ex) {
+      LOG.warn("Invalid thread pool size. Setting it to the default value " +
+          "in YarnConfiguration");
+      threadPoolSize = YarnConfiguration.
+          DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+    }
+    if(threadPoolSize <= 0) {
+      LOG.warn("Invalid thread pool size. Setting it to the default value " +
+          "in YarnConfiguration");
+      threadPoolSize = YarnConfiguration.
+          DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+    }
+    return threadPoolSize;
+  }
 }
 }

+ 143 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -55,6 +55,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -1040,6 +1046,143 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     return appAcls;
     return appAcls;
   }
   }
 
 
+  @Test (timeout = 30000)
+  public void testFixedSizeThreadPool() throws Exception {
+    // store configured thread pool size temporarily for restoration
+    int initThreadPoolSize = conf.getInt(YarnConfiguration
+        .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+    int threadPoolSize = 3;
+    conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        threadPoolSize);
+
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+    when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+    LogAggregationService logAggregationService =
+      new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ExecutorService executorService = logAggregationService.threadPool;
+
+    // used to block threads in the thread pool because main thread always
+    // acquires the write lock first.
+    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    final Lock rLock = rwLock.readLock();
+    final Lock wLock = rwLock.writeLock();
+
+    try {
+      wLock.lock();
+      Runnable runnable = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // threads in the thread pool running this will be blocked
+            rLock.tryLock(35000, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } finally {
+            rLock.unlock();
+          }
+        }
+      };
+
+      // submit $(threadPoolSize + 1) runnables to the thread pool. If the thread
+      // pool size is set properly, only $(threadPoolSize) threads will be
+      // created in the thread pool, each of which is blocked on the read lock.
+      for(int i = 0; i < threadPoolSize + 1; i++)  {
+        executorService.submit(runnable);
+      }
+
+      // count the number of current running LogAggregationService threads
+      int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount();
+      assertEquals(threadPoolSize, runningThread);
+    }
+    finally {
+      wLock.unlock();
+    }
+
+    logAggregationService.stop();
+    logAggregationService.close();
+
+    // restore the original configurations to avoid side effects
+    conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        initThreadPoolSize);
+  }
+
+  @Test
+  public void testInvalidThreadPoolSizeNaN() throws IOException {
+      testInvalidThreadPoolSizeValue("NaN");
+  }
+
+  @Test
+  public void testInvalidThreadPoolSizeNegative() throws IOException {
+      testInvalidThreadPoolSizeValue("-100");
+  }
+
+  @Test
+  public void testInvalidThreadPoolSizeXLarge() throws  IOException {
+      testInvalidThreadPoolSizeValue("11111111111");
+  }
+
+  private void testInvalidThreadPoolSizeValue(final String threadPoolSize)
+      throws IOException {
+    Supplier<Boolean> isInputInvalid = new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            int value = Integer.parseInt(threadPoolSize);
+            return value <= 0;
+          } catch (NumberFormatException ex) {
+            return true;
+          }
+        }
+    };
+
+    assertTrue("The thread pool size must be invalid to use with this " +
+        "method", isInputInvalid.get());
+
+
+    // store configured thread pool size temporarily for restoration
+    int initThreadPoolSize = conf.getInt(YarnConfiguration
+        .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+    conf.set(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+         threadPoolSize);
+
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+    when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+    LogAggregationService logAggregationService =
+         new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ThreadPoolExecutor executorService = (ThreadPoolExecutor)
+        logAggregationService.threadPool;
+    assertEquals("The thread pool size should be set to the value of YARN" +
+        ".DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE because the configured "
+         + " thread pool size is " + "invalid.",
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        executorService.getMaximumPoolSize());
+
+    logAggregationService.stop();
+    logAggregationService.close();
+
+     // retore original configuration to aviod side effects
+     conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+         initThreadPoolSize);
+  }
+
   @Test(timeout=20000)
   @Test(timeout=20000)
   public void testStopAfterError() throws Exception {
   public void testStopAfterError() throws Exception {
     DeletionService delSrvc = mock(DeletionService.class);
     DeletionService delSrvc = mock(DeletionService.class);