Browse Source

YARN-6357. Implement putEntitiesAsync API in TimelineCollector (Haibo Chen via Varun Saxena)

Varun Saxena 8 years ago
parent
commit
063b513b1c

+ 25 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

@@ -133,19 +133,35 @@ public abstract class TimelineCollector extends CompositeService {
   public TimelineWriteResponse putEntities(TimelineEntities entities,
       UserGroupInformation callerUgi) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
       LOG.debug("putEntities(entities=" + entities + ", callerUgi="
           + callerUgi + ")");
     }
-    TimelineCollectorContext context = getTimelineEntityContext();
 
+    TimelineWriteResponse response = writeTimelineEntities(entities);
+    flushBufferedTimelineEntities();
+
+    return response;
+  }
+
+  private TimelineWriteResponse writeTimelineEntities(
+      TimelineEntities entities) throws IOException {
     // Update application metrics for aggregation
     updateAggregateStatus(entities, aggregationGroups,
         getEntityTypesSkipAggregation());
 
+    final TimelineCollectorContext context = getTimelineEntityContext();
     return writer.write(context.getClusterId(), context.getUserId(),
-        context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
-        context.getAppId(), entities);
+        context.getFlowName(), context.getFlowVersion(),
+        context.getFlowRunId(), context.getAppId(), entities);
+  }
+
+  /**
+   * Flush buffered timeline entities, if any.
+   * @throws IOException if there is any exception encountered while
+   *      flushing buffered entities.
+   */
+  private void flushBufferedTimelineEntities() throws IOException {
+    writer.flush();
   }
 
   /**
@@ -158,14 +174,17 @@ public abstract class TimelineCollector extends CompositeService {
    *
    * @param entities entities to post
    * @param callerUgi the caller UGI
+   * @throws IOException if there is any exception encounted while putting
+   *     entities.
    */
   public void putEntitiesAsync(TimelineEntities entities,
-      UserGroupInformation callerUgi) {
-    // TODO implement
+      UserGroupInformation callerUgi) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
           callerUgi + ")");
     }
+
+    writeTimelineEntities(entities);
   }
 
   /**

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java

@@ -152,9 +152,6 @@ public class TimelineCollectorWebService {
       throw new ForbiddenException(msg);
     }
 
-    // TODO how to express async posts and handle them
-    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
     try {
       ApplicationId appID = parseApplicationId(appId);
       if (appID == null) {
@@ -169,7 +166,14 @@ public class TimelineCollectorWebService {
         throw new NotFoundException(); // different exception?
       }
 
-      collector.putEntities(processTimelineEntities(entities), callerUgi);
+      boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+      if (isAsync) {
+        collector.putEntitiesAsync(
+            processTimelineEntities(entities), callerUgi);
+      } else {
+        collector.putEntities(processTimelineEntities(entities), callerUgi);
+      }
+
       return Response.ok().build();
     } catch (Exception e) {
       LOG.error("Error putting entities", e);

+ 63 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java

@@ -18,17 +18,27 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class TestTimelineCollector {
 
@@ -124,4 +134,57 @@ public class TestTimelineCollector {
     }
 
   }
+
+  /**
+   * Test TimelineCollector's interaction with TimelineWriter upon
+   * putEntity() calls.
+   */
+  @Test
+  public void testPutEntity() throws IOException {
+    TimelineWriter writer = mock(TimelineWriter.class);
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+
+    TimelineEntities entities = generateTestEntities(1, 1);
+    collector.putEntities(
+        entities, UserGroupInformation.createRemoteUser("test-user"));
+
+    verify(writer, times(1)).write(
+        anyString(), anyString(), anyString(), anyString(), anyLong(),
+        anyString(), any(TimelineEntities.class));
+    verify(writer, times(1)).flush();
+  }
+
+  /**
+   * Test TimelineCollector's interaction with TimelineWriter upon
+   * putEntityAsync() calls.
+   */
+  @Test
+  public void testPutEntityAsync() throws IOException {
+    TimelineWriter writer = mock(TimelineWriter.class);
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+
+    TimelineEntities entities = generateTestEntities(1, 1);
+    collector.putEntitiesAsync(
+        entities, UserGroupInformation.createRemoteUser("test-user"));
+
+    verify(writer, times(1)).write(
+        anyString(), anyString(), anyString(), anyString(), anyLong(),
+        anyString(), any(TimelineEntities.class));
+    verify(writer, never()).flush();
+  }
+
+  private static class TimelineCollectorForTest extends TimelineCollector {
+    private final TimelineCollectorContext context =
+        new TimelineCollectorContext();
+
+    TimelineCollectorForTest(TimelineWriter writer) {
+      super("TimelineCollectorForTest");
+      setWriter(writer);
+    }
+
+    @Override
+    public TimelineCollectorContext getTimelineEntityContext() {
+      return context;
+    }
+  }
 }