Kaynağa Gözat

YARN-6375 App level aggregation should not consider metric values reported in the previous aggregation cycle (Varun Saxena via Vrushali C)

Vrushali Channapattan 8 yıl önce
ebeveyn
işleme
54e2b9e876

+ 14 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

@@ -310,13 +310,15 @@ public abstract class TimelineCollector extends CompositeService {
         // Update aggregateTable
         Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
         if (aggrRow == null) {
-          Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>();
+          Map<String, TimelineMetric> tempRow = new HashMap<>();
           aggrRow = aggregateTable.putIfAbsent(m, tempRow);
           if (aggrRow == null) {
             aggrRow = tempRow;
           }
         }
-        aggrRow.put(entityId, m);
+        synchronized (aggrRow) {
+          aggrRow.put(entityId, m);
+        }
       }
     }
 
@@ -335,14 +337,17 @@ public abstract class TimelineCollector extends CompositeService {
         }
         aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
         Map<Object, Object> status = new HashMap<>();
-        for (TimelineMetric m : aggrRow.values()) {
-          TimelineMetric.aggregateTo(m, aggrMetric, status);
-          // getRealtimeAggregationOp returns an enum so we can directly
-          // compare with "!=".
-          if (m.getRealtimeAggregationOp()
-              != aggrMetric.getRealtimeAggregationOp()) {
-            aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
+        synchronized (aggrRow) {
+          for (TimelineMetric m : aggrRow.values()) {
+            TimelineMetric.aggregateTo(m, aggrMetric, status);
+            // getRealtimeAggregationOp returns an enum so we can directly
+            // compare with "!=".
+            if (m.getRealtimeAggregationOp()
+                != aggrMetric.getRealtimeAggregationOp()) {
+              aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
+            }
           }
+          aggrRow.clear();
         }
         Set<TimelineMetric> metrics = e.getMetrics();
         metrics.remove(aggrMetric);

+ 94 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java

@@ -18,19 +18,27 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -187,4 +195,89 @@ public class TestTimelineCollector {
       return context;
     }
   }
-}
+
+  private static TimelineEntity createEntity(String id, String type) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId(id);
+    entity.setType(type);
+    return entity;
+  }
+
+  private static TimelineMetric createDummyMetric(long ts, Long value) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setId("dummy_metric");
+    metric.addValue(ts, value);
+    metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+    return metric;
+  }
+
+  @Test
+  public void testClearPreviousEntitiesOnAggregation() throws Exception {
+    final long ts = System.currentTimeMillis();
+    TimelineCollector collector = new TimelineCollector("") {
+        @Override
+        public TimelineCollectorContext getTimelineEntityContext() {
+          return new TimelineCollectorContext("cluster", "user", "flow", "1",
+              1L, ApplicationId.newInstance(ts, 1).toString());
+        }
+    };
+    collector.init(new Configuration());
+    collector.setWriter(mock(TimelineWriter.class));
+
+    // Put 5 entities with different metric values.
+    TimelineEntities entities = new TimelineEntities();
+    for (int i = 1; i <=5; i++) {
+      TimelineEntity entity = createEntity("e" + i, "type");
+      entity.addMetric(createDummyMetric(ts + i, Long.valueOf(i * 50)));
+      entities.addEntity(entity);
+    }
+    collector.putEntities(entities, UserGroupInformation.getCurrentUser());
+
+    TimelineCollectorContext currContext = collector.getTimelineEntityContext();
+    // Aggregate the entities.
+    Map<String, AggregationStatusTable> aggregationGroups
+        = collector.getAggregationGroups();
+    assertEquals(Sets.newHashSet("type"), aggregationGroups.keySet());
+    TimelineEntity aggregatedEntity = TimelineCollector.
+        aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
+            TimelineEntityType.YARN_APPLICATION.toString());
+    TimelineMetric aggregatedMetric =
+        aggregatedEntity.getMetrics().iterator().next();
+    assertEquals(750L, aggregatedMetric.getValues().values().iterator().next());
+    assertEquals(TimelineMetricOperation.SUM,
+        aggregatedMetric.getRealtimeAggregationOp());
+
+    // Aggregate entities.
+    aggregatedEntity = TimelineCollector.
+        aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
+            TimelineEntityType.YARN_APPLICATION.toString());
+    aggregatedMetric = aggregatedEntity.getMetrics().iterator().next();
+    // No values aggregated as no metrics put for an entity between this
+    // aggregation and the previous one.
+    assertTrue(aggregatedMetric.getValues().isEmpty());
+    assertEquals(TimelineMetricOperation.NOP,
+        aggregatedMetric.getRealtimeAggregationOp());
+
+    // Put 3 entities.
+    entities = new TimelineEntities();
+    for (int i = 1; i <=3; i++) {
+      TimelineEntity entity = createEntity("e" + i, "type");
+      entity.addMetric(createDummyMetric(System.currentTimeMillis() + i, 50L));
+      entities.addEntity(entity);
+    }
+    aggregationGroups = collector.getAggregationGroups();
+    collector.putEntities(entities, UserGroupInformation.getCurrentUser());
+
+    // Aggregate entities.
+    aggregatedEntity = TimelineCollector.
+        aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(),
+            TimelineEntityType.YARN_APPLICATION.toString());
+    // Last 3 entities picked up for aggregation.
+    aggregatedMetric = aggregatedEntity.getMetrics().iterator().next();
+    assertEquals(150L, aggregatedMetric.getValues().values().iterator().next());
+    assertEquals(TimelineMetricOperation.SUM,
+        aggregatedMetric.getRealtimeAggregationOp());
+
+    collector.close();
+  }
+}