Преглед изворни кода

YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.

Vrushali C пре 6 година
родитељ
комит
22362c876d

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2767,6 +2767,15 @@ public class YarnConfiguration extends Configuration {
   public static final int
       DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
 
+  /** The setting that controls the capacity of the queue for async writes
+   * to timeline collector.
+   */
+  public static final String TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY =
+      TIMELINE_SERVICE_PREFIX + "writer.async.queue.capacity";
+
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY = 100;
+
   /**
    * The name for setting that controls how long the final value of
    * a metric of a completed app is retained before merging

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2590,6 +2590,13 @@
     <value>60</value>
   </property>
 
+  <property>
+    <description>The setting that decides the capacity of the queue to hold
+    asynchronous timeline entities.</description>
+    <name>yarn.timeline-service.writer.async.queue.capacity</name>
+    <value>100</value>
+  </property>
+
   <property>
     <description>Time period till which the application collector will be alive
      in NM, after the  application master container finishes.</description>

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

@@ -23,8 +23,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +65,7 @@ public abstract class TimelineCollector extends CompositeService {
       = new ConcurrentHashMap<>();
   private static Set<String> entityTypesSkipAggregation
       = new HashSet<>();
+  private ThreadPoolExecutor pool;
 
   private volatile boolean readyToAggregate = false;
 
@@ -73,6 +78,14 @@ public abstract class TimelineCollector extends CompositeService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
+    int capacity = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY
+    );
+    pool = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(capacity));
+    pool.setRejectedExecutionHandler(
+        new ThreadPoolExecutor.DiscardOldestPolicy());
   }
 
   @Override
@@ -83,6 +96,7 @@ public abstract class TimelineCollector extends CompositeService {
   @Override
   protected void serviceStop() throws Exception {
     isStopped = true;
+    pool.shutdownNow();
     super.serviceStop();
   }
 
@@ -213,7 +227,15 @@ public abstract class TimelineCollector extends CompositeService {
     LOG.debug("putEntitiesAsync(entities={}, callerUgi={})", entities,
         callerUgi);
 
-    writeTimelineEntities(entities, callerUgi);
+    pool.execute(new Runnable() {
+      @Override public void run() {
+        try {
+          writeTimelineEntities(entities, callerUgi);
+        } catch (IOException ie) {
+          LOG.error("Got an exception while writing entity", ie);
+        }
+      }
+    });
   }
 
   /**

+ 40 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java

@@ -27,11 +27,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
+import org.mockito.internal.stubbing.answers.AnswersWithDelay;
+import org.mockito.internal.stubbing.answers.Returns;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -46,6 +50,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestTimelineCollector {
 
@@ -165,17 +170,49 @@ public class TestTimelineCollector {
    * putEntityAsync() calls.
    */
   @Test
-  public void testPutEntityAsync() throws IOException {
+  public void testPutEntityAsync() throws Exception {
     TimelineWriter writer = mock(TimelineWriter.class);
     TimelineCollector collector = new TimelineCollectorForTest(writer);
-
+    collector.init(new Configuration());
+    collector.start();
     TimelineEntities entities = generateTestEntities(1, 1);
     collector.putEntitiesAsync(
         entities, UserGroupInformation.createRemoteUser("test-user"));
-
+    Thread.sleep(1000);
     verify(writer, times(1)).write(any(TimelineCollectorContext.class),
         any(TimelineEntities.class), any(UserGroupInformation.class));
     verify(writer, never()).flush();
+    collector.stop();
+  }
+
+  /**
+   * Test TimelineCollector's discarding entities in case of async writes if
+   * write is taking too much time.
+   */
+  @Test
+  public void testAsyncEntityDiscard() throws Exception {
+    TimelineWriter writer = mock(TimelineWriter.class);
+
+    when(writer.write(any(), any(), any())).thenAnswer(
+        new AnswersWithDelay(500, new Returns(new TimelineWriteResponse())));
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+    Configuration config = new Configuration();
+    config
+        .setInt(YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+            3);
+    collector.init(config);
+    collector.start();
+    for (int i = 0; i < 10; ++i) {
+      TimelineEntities entities = generateTestEntities(i + 1, 1);
+      collector.putEntitiesAsync(entities,
+          UserGroupInformation.createRemoteUser("test-user"));
+    }
+    Thread.sleep(3000);
+    verify(writer, times(4))
+        .write(any(TimelineCollectorContext.class), any(TimelineEntities.class),
+            any(UserGroupInformation.class));
+    verify(writer, never()).flush();
+    collector.stop();
   }
 
   /**