|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -114,45 +115,42 @@ public class AbstractTimelineAggregatorTest {
|
|
|
long currentTime = System.currentTimeMillis();
|
|
|
long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
|
|
|
sleepIntervalMillis);
|
|
|
-
|
|
|
+
|
|
|
//Test first run of aggregator with no checkpoint
|
|
|
checkPoint.set(-1);
|
|
|
- agg.setLastAggregatedEndTime(-1l);
|
|
|
agg.runOnce(sleepIntervalMillis);
|
|
|
assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
|
|
|
assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
|
|
|
assertEquals(roundedOffAggregatorTime, checkPoint.get());
|
|
|
assertEquals("Do not aggregate on first run", 0, actualRuns);
|
|
|
|
|
|
- //Test first run with Too Old checkpoint
|
|
|
+// //Test first run with too "recent" checkpoint
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
- checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
|
|
|
- agg.setLastAggregatedEndTime(-1l);
|
|
|
+ checkPoint.set(currentTime);
|
|
|
+ agg.setSleepIntervalMillis(sleepIntervalMillis);
|
|
|
agg.runOnce(sleepIntervalMillis);
|
|
|
assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
|
|
|
assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
|
|
|
- assertEquals(roundedOffAggregatorTime, checkPoint.get());
|
|
|
assertEquals("Do not aggregate on first run", 0, actualRuns);
|
|
|
|
|
|
- //Test first run with too "recent" checkpoint
|
|
|
+ //Test first run with Too Old checkpoint
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
- checkPoint.set(currentTime);
|
|
|
- agg.setLastAggregatedEndTime(-1l);
|
|
|
- agg.setSleepIntervalMillis(sleepIntervalMillis);
|
|
|
+ checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
|
|
|
agg.runOnce(sleepIntervalMillis);
|
|
|
- assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
|
|
|
- assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
|
|
|
- assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get());
|
|
|
- assertEquals("Do not aggregate on first run", 0, actualRuns);
|
|
|
+ long checkPointTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(sleepIntervalMillis);
|
|
|
+ assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get());
|
|
|
+ assertEquals("endTime should be zero", checkPointTime, endTimeInDoWork.get());
|
|
|
+ assertEquals(roundedOffAggregatorTime, checkPoint.get());
|
|
|
+ assertEquals("Do not aggregate on first run", 1, actualRuns);
|
|
|
+
|
|
|
|
|
|
- //Test first run with perfect checkpoint (sleepIntervalMillis back)
|
|
|
+// //Test first run with perfect checkpoint (sleepIntervalMillis back)
|
|
|
currentTime = System.currentTimeMillis();
|
|
|
roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
|
|
|
sleepIntervalMillis);
|
|
|
- long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
|
|
|
+ checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
|
|
|
long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
|
|
|
checkPoint.set(checkPointTime);
|
|
|
- agg.setLastAggregatedEndTime(-1l);
|
|
|
agg.runOnce(sleepIntervalMillis);
|
|
|
assertEquals("startTime should the lower rounded time of the checkpoint time",
|
|
|
expectedCheckPoint, startTimeInDoWork.get());
|
|
@@ -160,20 +158,20 @@ public class AbstractTimelineAggregatorTest {
|
|
|
expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
|
|
|
assertEquals(expectedCheckPoint + sleepIntervalMillis,
|
|
|
checkPoint.get());
|
|
|
- assertEquals("Aggregate on first run", 1, actualRuns);
|
|
|
+ assertEquals("Aggregate on first run", 2, actualRuns);
|
|
|
|
|
|
//Test edge case for checkpoint (2 x sleepIntervalMillis)
|
|
|
- checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis;
|
|
|
- expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
|
|
|
- checkPoint.set(checkPointTime);
|
|
|
+ currentTime = System.currentTimeMillis();
|
|
|
+ checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000);
|
|
|
agg.runOnce(sleepIntervalMillis);
|
|
|
+ long expectedStartTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
|
|
|
assertEquals("startTime should the lower rounded time of the checkpoint time",
|
|
|
- expectedCheckPoint, startTimeInDoWork.get());
|
|
|
+ expectedStartTime, startTimeInDoWork.get());
|
|
|
assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
|
|
|
- expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
|
|
|
- assertEquals(expectedCheckPoint + sleepIntervalMillis,
|
|
|
+ expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get());
|
|
|
+ assertEquals(expectedStartTime + sleepIntervalMillis,
|
|
|
checkPoint.get());
|
|
|
- assertEquals("Aggregate on second run", 2, actualRuns);
|
|
|
+ assertEquals("Aggregate on second run", 3, actualRuns);
|
|
|
|
|
|
|
|
|
}
|