|
@@ -0,0 +1,723 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
|
|
+
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.SortedSet;
|
|
|
+import java.util.TreeSet;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hbase.Cell;
|
|
|
+import org.apache.hadoop.hbase.CellUtil;
|
|
|
+import org.apache.hadoop.hbase.HRegionInfo;
|
|
|
+import org.apache.hadoop.hbase.KeyValue;
|
|
|
+import org.apache.hadoop.hbase.Tag;
|
|
|
+import org.apache.hadoop.hbase.client.Scan;
|
|
|
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
|
|
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
|
|
+import org.apache.hadoop.hbase.regionserver.Region;
|
|
|
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
|
|
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
|
|
+import org.apache.hadoop.hbase.util.Bytes;
|
|
|
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
|
|
|
+ * table. Looks through the list of cells per row, checks their tags and does
|
|
|
+ * operation on those cells as per the cell tags. Transforms reads of the stored
|
|
|
+ * metrics into calculated sums for each column Also, finds the min and max for
|
|
|
+ * start and end times in a flow run.
|
|
|
+ */
|
|
|
+class FlowScanner implements RegionScanner, Closeable {
|
|
|
+
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(FlowScanner.class);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * use a special application id to represent the flow id this is needed since
|
|
|
+ * TimestampGenerator parses the app id to generate a cell timestamp.
|
|
|
+ */
|
|
|
+ private static final String FLOW_APP_ID = "application_00000000000_0000";
|
|
|
+
|
|
|
+ private final Region region;
|
|
|
+ private final InternalScanner flowRunScanner;
|
|
|
+ private final int batchSize;
|
|
|
+ private final long appFinalValueRetentionThreshold;
|
|
|
+ private RegionScanner regionScanner;
|
|
|
+ private boolean hasMore;
|
|
|
+ private byte[] currentRow;
|
|
|
+ private List<Cell> availableCells = new ArrayList<>();
|
|
|
+ private int currentIndex;
|
|
|
+ private FlowScannerOperation action = FlowScannerOperation.READ;
|
|
|
+
|
|
|
+ FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
|
|
|
+ FlowScannerOperation action) {
|
|
|
+ this(env, null, internalScanner, action);
|
|
|
+ }
|
|
|
+
|
|
|
+ FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
|
|
|
+ InternalScanner internalScanner, FlowScannerOperation action) {
|
|
|
+ this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
|
|
|
+ // TODO initialize other scan attributes like Scan#maxResultSize
|
|
|
+ this.flowRunScanner = internalScanner;
|
|
|
+ if (internalScanner instanceof RegionScanner) {
|
|
|
+ this.regionScanner = (RegionScanner) internalScanner;
|
|
|
+ }
|
|
|
+ this.action = action;
|
|
|
+ if (env == null) {
|
|
|
+ this.appFinalValueRetentionThreshold =
|
|
|
+ YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
|
|
|
+ this.region = null;
|
|
|
+ } else {
|
|
|
+ this.region = env.getRegion();
|
|
|
+ Configuration hbaseConf = env.getConfiguration();
|
|
|
+ this.appFinalValueRetentionThreshold = hbaseConf.getLong(
|
|
|
+ YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
|
|
|
+ YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(" batch size=" + batchSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*
|
|
|
+ * (non-Javadoc)
|
|
|
+ *
|
|
|
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public HRegionInfo getRegionInfo() {
|
|
|
+ return new HRegionInfo(region.getRegionInfo());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean nextRaw(List<Cell> cells) throws IOException {
|
|
|
+ return nextRaw(cells, ScannerContext.newBuilder().build());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
|
|
|
+ throws IOException {
|
|
|
+ return nextInternal(cells, scannerContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean next(List<Cell> cells) throws IOException {
|
|
|
+ return next(cells, ScannerContext.newBuilder().build());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean next(List<Cell> cells, ScannerContext scannerContext)
|
|
|
+ throws IOException {
|
|
|
+ return nextInternal(cells, scannerContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get value converter associated with a column or a column prefix. If nothing
|
|
|
+ * matches, generic converter is returned.
|
|
|
+ * @param colQualifierBytes
|
|
|
+ * @return value converter implementation.
|
|
|
+ */
|
|
|
+ private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
|
|
|
+ // Iterate over all the column prefixes for flow run table and get the
|
|
|
+ // appropriate converter for the column qualifier passed if prefix matches.
|
|
|
+ for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
|
|
|
+ byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
|
|
|
+ if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
|
|
|
+ colQualifierBytes, 0, colPrefixBytes.length) == 0) {
|
|
|
+ return colPrefix.getValueConverter();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Iterate over all the columns for flow run table and get the
|
|
|
+ // appropriate converter for the column qualifier passed if match occurs.
|
|
|
+ for (FlowRunColumn column : FlowRunColumn.values()) {
|
|
|
+ if (Bytes.compareTo(
|
|
|
+ column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
|
|
|
+ return column.getValueConverter();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Return generic converter if nothing matches.
|
|
|
+ return GenericConverter.getInstance();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method loops through the cells in a given row of the
|
|
|
+ * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
|
|
|
+ * to process the contents. It then calculates the sum or min or max for each
|
|
|
+ * column or returns the cell as is.
|
|
|
+ *
|
|
|
+ * @param cells
|
|
|
+ * @param scannerContext
|
|
|
+ * @return true if next row is available for the scanner, false otherwise
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
|
|
|
+ throws IOException {
|
|
|
+ Cell cell = null;
|
|
|
+ startNext();
|
|
|
+ // Loop through all the cells in this row
|
|
|
+ // For min/max/metrics we do need to scan the entire set of cells to get the
|
|
|
+ // right one
|
|
|
+ // But with flush/compaction, the number of cells being scanned will go down
|
|
|
+ // cells are grouped per column qualifier then sorted by cell timestamp
|
|
|
+ // (latest to oldest) per column qualifier
|
|
|
+ // So all cells in one qualifier come one after the other before we see the
|
|
|
+ // next column qualifier
|
|
|
+ ByteArrayComparator comp = new ByteArrayComparator();
|
|
|
+ byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
|
|
|
+ AggregationOperation currentAggOp = null;
|
|
|
+ SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
|
|
|
+ Set<String> alreadySeenAggDim = new HashSet<>();
|
|
|
+ int addedCnt = 0;
|
|
|
+ long currentTimestamp = System.currentTimeMillis();
|
|
|
+ ValueConverter converter = null;
|
|
|
+ int limit = batchSize;
|
|
|
+
|
|
|
+ while (limit <= 0 || addedCnt < limit) {
|
|
|
+ cell = peekAtNextCell(scannerContext);
|
|
|
+ if (cell == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
|
|
|
+ if (previousColumnQualifier == null) {
|
|
|
+ // first time in loop
|
|
|
+ previousColumnQualifier = currentColumnQualifier;
|
|
|
+ }
|
|
|
+
|
|
|
+ converter = getValueConverter(currentColumnQualifier);
|
|
|
+ if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
|
|
|
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
|
|
|
+ converter, currentTimestamp);
|
|
|
+ resetState(currentColumnCells, alreadySeenAggDim);
|
|
|
+ previousColumnQualifier = currentColumnQualifier;
|
|
|
+ currentAggOp = getCurrentAggOp(cell);
|
|
|
+ converter = getValueConverter(currentColumnQualifier);
|
|
|
+ }
|
|
|
+ collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
|
|
|
+ converter, scannerContext);
|
|
|
+ nextCell(scannerContext);
|
|
|
+ }
|
|
|
+ if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
|
|
|
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
|
|
|
+ currentTimestamp);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ if (addedCnt > 0) {
|
|
|
+ LOG.debug("emitted cells. " + addedCnt + " for " + this.action
|
|
|
+ + " rowKey="
|
|
|
+ + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
|
|
|
+ } else {
|
|
|
+ LOG.debug("emitted no cells for " + this.action);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return hasMore();
|
|
|
+ }
|
|
|
+
|
|
|
+ private AggregationOperation getCurrentAggOp(Cell cell) {
|
|
|
+ List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
|
|
+ // We assume that all the operations for a particular column are the same
|
|
|
+ return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * resets the parameters to an initialized state for next loop iteration.
|
|
|
+ */
|
|
|
+ private void resetState(SortedSet<Cell> currentColumnCells,
|
|
|
+ Set<String> alreadySeenAggDim) {
|
|
|
+ currentColumnCells.clear();
|
|
|
+ alreadySeenAggDim.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void collectCells(SortedSet<Cell> currentColumnCells,
|
|
|
+ AggregationOperation currentAggOp, Cell cell,
|
|
|
+ Set<String> alreadySeenAggDim, ValueConverter converter,
|
|
|
+ ScannerContext scannerContext) throws IOException {
|
|
|
+
|
|
|
+ if (currentAggOp == null) {
|
|
|
+ // not a min/max/metric cell, so just return it as is
|
|
|
+ currentColumnCells.add(cell);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (currentAggOp) {
|
|
|
+ case GLOBAL_MIN:
|
|
|
+ if (currentColumnCells.size() == 0) {
|
|
|
+ currentColumnCells.add(cell);
|
|
|
+ } else {
|
|
|
+ Cell currentMinCell = currentColumnCells.first();
|
|
|
+ Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
|
|
|
+ (NumericValueConverter) converter);
|
|
|
+ if (!currentMinCell.equals(newMinCell)) {
|
|
|
+ currentColumnCells.remove(currentMinCell);
|
|
|
+ currentColumnCells.add(newMinCell);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case GLOBAL_MAX:
|
|
|
+ if (currentColumnCells.size() == 0) {
|
|
|
+ currentColumnCells.add(cell);
|
|
|
+ } else {
|
|
|
+ Cell currentMaxCell = currentColumnCells.first();
|
|
|
+ Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
|
|
|
+ (NumericValueConverter) converter);
|
|
|
+ if (!currentMaxCell.equals(newMaxCell)) {
|
|
|
+ currentColumnCells.remove(currentMaxCell);
|
|
|
+ currentColumnCells.add(newMaxCell);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case SUM:
|
|
|
+ case SUM_FINAL:
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("In collect cells "
|
|
|
+ + " FlowSannerOperation="
|
|
|
+ + this.action
|
|
|
+ + " currentAggOp="
|
|
|
+ + currentAggOp
|
|
|
+ + " cell qualifier="
|
|
|
+ + Bytes.toString(CellUtil.cloneQualifier(cell))
|
|
|
+ + " cell value= "
|
|
|
+ + converter.decodeValue(CellUtil.cloneValue(cell))
|
|
|
+ + " timestamp=" + cell.getTimestamp());
|
|
|
+ }
|
|
|
+
|
|
|
+ // only if this app has not been seen yet, add to current column cells
|
|
|
+ List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
|
|
+ String aggDim = HBaseTimelineServerUtils
|
|
|
+ .getAggregationCompactionDimension(tags);
|
|
|
+ if (!alreadySeenAggDim.contains(aggDim)) {
|
|
|
+ // if this agg dimension has already been seen,
|
|
|
+ // since they show up in sorted order
|
|
|
+ // we drop the rest which are older
|
|
|
+ // in other words, this cell is older than previously seen cells
|
|
|
+ // for that agg dim
|
|
|
+ // but when this agg dim is not seen,
|
|
|
+ // consider this cell in our working set
|
|
|
+ currentColumnCells.add(cell);
|
|
|
+ alreadySeenAggDim.add(aggDim);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ } // end of switch case
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Processes the cells in input param currentColumnCells and populates
|
|
|
+ * List<Cell> cells as the output based on the input AggregationOperation
|
|
|
+ * parameter.
|
|
|
+ */
|
|
|
+ private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
|
|
|
+ AggregationOperation currentAggOp, ValueConverter converter,
|
|
|
+ long currentTimestamp) throws IOException {
|
|
|
+ if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ if (currentAggOp == null) {
|
|
|
+ cells.addAll(currentColumnCells);
|
|
|
+ return currentColumnCells.size();
|
|
|
+ }
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
|
|
|
+ + currentColumnCells.size() + " currentAggOp" + currentAggOp);
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (currentAggOp) {
|
|
|
+ case GLOBAL_MIN:
|
|
|
+ case GLOBAL_MAX:
|
|
|
+ cells.addAll(currentColumnCells);
|
|
|
+ return currentColumnCells.size();
|
|
|
+ case SUM:
|
|
|
+ case SUM_FINAL:
|
|
|
+ switch (action) {
|
|
|
+ case FLUSH:
|
|
|
+ case MINOR_COMPACTION:
|
|
|
+ cells.addAll(currentColumnCells);
|
|
|
+ return currentColumnCells.size();
|
|
|
+ case READ:
|
|
|
+ Cell sumCell = processSummation(currentColumnCells,
|
|
|
+ (NumericValueConverter) converter);
|
|
|
+ cells.add(sumCell);
|
|
|
+ return 1;
|
|
|
+ case MAJOR_COMPACTION:
|
|
|
+ List<Cell> finalCells = processSummationMajorCompaction(
|
|
|
+ currentColumnCells, (NumericValueConverter) converter,
|
|
|
+ currentTimestamp);
|
|
|
+ cells.addAll(finalCells);
|
|
|
+ return finalCells.size();
|
|
|
+ default:
|
|
|
+ cells.addAll(currentColumnCells);
|
|
|
+ return currentColumnCells.size();
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ cells.addAll(currentColumnCells);
|
|
|
+ return currentColumnCells.size();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Returns a cell whose value is the sum of all cell values in the input set.
|
|
|
+ * The new cell created has the timestamp of the most recent metric cell. The
|
|
|
+ * sum of a metric for a flow run is the summation at the point of the last
|
|
|
+ * metric update in that flow till that time.
|
|
|
+ */
|
|
|
+ private Cell processSummation(SortedSet<Cell> currentColumnCells,
|
|
|
+ NumericValueConverter converter) throws IOException {
|
|
|
+ Number sum = 0;
|
|
|
+ Number currentValue = 0;
|
|
|
+ long ts = 0L;
|
|
|
+ long mostCurrentTimestamp = 0L;
|
|
|
+ Cell mostRecentCell = null;
|
|
|
+ for (Cell cell : currentColumnCells) {
|
|
|
+ currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
|
|
|
+ ts = cell.getTimestamp();
|
|
|
+ if (mostCurrentTimestamp < ts) {
|
|
|
+ mostCurrentTimestamp = ts;
|
|
|
+ mostRecentCell = cell;
|
|
|
+ }
|
|
|
+ sum = converter.add(sum, currentValue);
|
|
|
+ }
|
|
|
+ byte[] sumBytes = converter.encodeValue(sum);
|
|
|
+ Cell sumCell =
|
|
|
+ HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes);
|
|
|
+ return sumCell;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns a list of cells that contains
|
|
|
+ *
|
|
|
+ * A) the latest cells for applications that haven't finished yet
|
|
|
+ * B) summation
|
|
|
+ * for the flow, based on applications that have completed and are older than
|
|
|
+ * a certain time
|
|
|
+ *
|
|
|
+ * The new cell created has the timestamp of the most recent metric cell. The
|
|
|
+ * sum of a metric for a flow run is the summation at the point of the last
|
|
|
+ * metric update in that flow till that time.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ List<Cell> processSummationMajorCompaction(
|
|
|
+ SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
|
|
|
+ long currentTimestamp)
|
|
|
+ throws IOException {
|
|
|
+ Number sum = 0;
|
|
|
+ Number currentValue = 0;
|
|
|
+ long ts = 0L;
|
|
|
+ boolean summationDone = false;
|
|
|
+ List<Cell> finalCells = new ArrayList<Cell>();
|
|
|
+ if (currentColumnCells == null) {
|
|
|
+ return finalCells;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("In processSummationMajorCompaction,"
|
|
|
+ + " will drop cells older than " + currentTimestamp
|
|
|
+ + " CurrentColumnCells size=" + currentColumnCells.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Cell cell : currentColumnCells) {
|
|
|
+ AggregationOperation cellAggOp = getCurrentAggOp(cell);
|
|
|
+ // if this is the existing flow sum cell
|
|
|
+ List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
|
|
+ String appId = HBaseTimelineServerUtils
|
|
|
+ .getAggregationCompactionDimension(tags);
|
|
|
+ if (appId == FLOW_APP_ID) {
|
|
|
+ sum = converter.add(sum, currentValue);
|
|
|
+ summationDone = true;
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("reading flow app id sum=" + sum);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ currentValue = (Number) converter.decodeValue(CellUtil
|
|
|
+ .cloneValue(cell));
|
|
|
+ // read the timestamp truncated by the generator
|
|
|
+ ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
|
|
|
+ if ((cellAggOp == AggregationOperation.SUM_FINAL)
|
|
|
+ && ((ts + this.appFinalValueRetentionThreshold)
|
|
|
+ < currentTimestamp)) {
|
|
|
+ sum = converter.add(sum, currentValue);
|
|
|
+ summationDone = true;
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("MAJOR COMPACTION loop sum= " + sum
|
|
|
+ + " discarding now: " + " qualifier="
|
|
|
+ + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
|
|
|
+ + converter.decodeValue(CellUtil.cloneValue(cell))
|
|
|
+ + " timestamp=" + cell.getTimestamp() + " " + this.action);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // not a final value but it's the latest cell for this app
|
|
|
+ // so include this cell in the list of cells to write back
|
|
|
+ finalCells.add(cell);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (summationDone) {
|
|
|
+ Cell anyCell = currentColumnCells.first();
|
|
|
+ List<Tag> tags = new ArrayList<Tag>();
|
|
|
+ Tag t = HBaseTimelineServerUtils.createTag(
|
|
|
+ AggregationOperation.SUM_FINAL.getTagType(),
|
|
|
+ Bytes.toBytes(FLOW_APP_ID));
|
|
|
+ tags.add(t);
|
|
|
+ t = HBaseTimelineServerUtils.createTag(
|
|
|
+ AggregationCompactionDimension.APPLICATION_ID.getTagType(),
|
|
|
+ Bytes.toBytes(FLOW_APP_ID));
|
|
|
+ tags.add(t);
|
|
|
+ byte[] tagByteArray =
|
|
|
+ HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
|
|
+ Cell sumCell = HBaseTimelineServerUtils.createNewCell(
|
|
|
+ CellUtil.cloneRow(anyCell),
|
|
|
+ CellUtil.cloneFamily(anyCell),
|
|
|
+ CellUtil.cloneQualifier(anyCell),
|
|
|
+ TimestampGenerator.getSupplementedTimestamp(
|
|
|
+ System.currentTimeMillis(), FLOW_APP_ID),
|
|
|
+ converter.encodeValue(sum), tagByteArray);
|
|
|
+ finalCells.add(sumCell);
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
|
|
|
+ + Bytes.toString(CellUtil.cloneQualifier(sumCell))
|
|
|
+ + " " + this.action);
|
|
|
+ }
|
|
|
+ LOG.info("After major compaction for qualifier="
|
|
|
+ + Bytes.toString(CellUtil.cloneQualifier(sumCell))
|
|
|
+ + " with currentColumnCells.size="
|
|
|
+ + currentColumnCells.size()
|
|
|
+ + " returning finalCells.size=" + finalCells.size()
|
|
|
+ + " with sum=" + sum.longValue()
|
|
|
+ + " with cell timestamp " + sumCell.getTimestamp());
|
|
|
+ } else {
|
|
|
+ String qualifier = "";
|
|
|
+ LOG.info("After major compaction for qualifier=" + qualifier
|
|
|
+ + " with currentColumnCells.size="
|
|
|
+ + currentColumnCells.size()
|
|
|
+ + " returning finalCells.size=" + finalCells.size()
|
|
|
+ + " with zero sum="
|
|
|
+ + sum.longValue());
|
|
|
+ }
|
|
|
+ return finalCells;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Determines which cell is to be returned based on the values in each cell
|
|
|
+ * and the comparison operation MIN or MAX.
|
|
|
+ *
|
|
|
+ * @param previouslyChosenCell
|
|
|
+ * @param currentCell
|
|
|
+ * @param currentAggOp
|
|
|
+ * @return the cell which is the min (or max) cell
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
|
|
|
+ AggregationOperation currentAggOp, NumericValueConverter converter)
|
|
|
+ throws IOException {
|
|
|
+ if (previouslyChosenCell == null) {
|
|
|
+ return currentCell;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Number previouslyChosenCellValue = (Number)converter.decodeValue(
|
|
|
+ CellUtil.cloneValue(previouslyChosenCell));
|
|
|
+ Number currentCellValue = (Number) converter.decodeValue(CellUtil
|
|
|
+ .cloneValue(currentCell));
|
|
|
+ switch (currentAggOp) {
|
|
|
+ case GLOBAL_MIN:
|
|
|
+ if (converter.compare(
|
|
|
+ currentCellValue, previouslyChosenCellValue) < 0) {
|
|
|
+ // new value is minimum, hence return this cell
|
|
|
+ return currentCell;
|
|
|
+ } else {
|
|
|
+ // previously chosen value is miniumum, hence return previous min cell
|
|
|
+ return previouslyChosenCell;
|
|
|
+ }
|
|
|
+ case GLOBAL_MAX:
|
|
|
+ if (converter.compare(
|
|
|
+ currentCellValue, previouslyChosenCellValue) > 0) {
|
|
|
+ // new value is max, hence return this cell
|
|
|
+ return currentCell;
|
|
|
+ } else {
|
|
|
+ // previously chosen value is max, hence return previous max cell
|
|
|
+ return previouslyChosenCell;
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ return currentCell;
|
|
|
+ }
|
|
|
+ } catch (IllegalArgumentException iae) {
|
|
|
+ LOG.error("caught iae during conversion to long ", iae);
|
|
|
+ return currentCell;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ if (flowRunScanner != null) {
|
|
|
+ flowRunScanner.close();
|
|
|
+ } else {
|
|
|
+ LOG.warn("scanner close called but scanner is null");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Called to signal the start of the next() call by the scanner.
|
|
|
+ */
|
|
|
+ public void startNext() {
|
|
|
+ currentRow = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns whether or not the underlying scanner has more rows.
|
|
|
+ */
|
|
|
+ public boolean hasMore() {
|
|
|
+ return currentIndex < availableCells.size() ? true : hasMore;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the next available cell for the current row and advances the
|
|
|
+ * pointer to the next cell. This method can be called multiple times in a row
|
|
|
+ * to advance through all the available cells.
|
|
|
+ *
|
|
|
+ * @param scannerContext
|
|
|
+ * context information for the batch of cells under consideration
|
|
|
+ * @return the next available cell or null if no more cells are available for
|
|
|
+ * the current row
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Cell nextCell(ScannerContext scannerContext) throws IOException {
|
|
|
+ Cell cell = peekAtNextCell(scannerContext);
|
|
|
+ if (cell != null) {
|
|
|
+ currentIndex++;
|
|
|
+ }
|
|
|
+ return cell;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the next available cell for the current row, without advancing the
|
|
|
+ * pointer. Calling this method multiple times in a row will continue to
|
|
|
+ * return the same cell.
|
|
|
+ *
|
|
|
+ * @param scannerContext
|
|
|
+ * context information for the batch of cells under consideration
|
|
|
+ * @return the next available cell or null if no more cells are available for
|
|
|
+ * the current row
|
|
|
+ * @throws IOException if any problem is encountered while grabbing the next
|
|
|
+ * cell.
|
|
|
+ */
|
|
|
+ public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
|
|
|
+ if (currentIndex >= availableCells.size()) {
|
|
|
+ // done with current batch
|
|
|
+ availableCells.clear();
|
|
|
+ currentIndex = 0;
|
|
|
+ hasMore = flowRunScanner.next(availableCells, scannerContext);
|
|
|
+ }
|
|
|
+ Cell cell = null;
|
|
|
+ if (currentIndex < availableCells.size()) {
|
|
|
+ cell = availableCells.get(currentIndex);
|
|
|
+ if (currentRow == null) {
|
|
|
+ currentRow = CellUtil.cloneRow(cell);
|
|
|
+ } else if (!CellUtil.matchingRow(cell, currentRow)) {
|
|
|
+ // moved on to the next row
|
|
|
+ // don't use the current cell
|
|
|
+ // also signal no more cells for this row
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return cell;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * (non-Javadoc)
|
|
|
+ *
|
|
|
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public long getMaxResultSize() {
|
|
|
+ if (regionScanner == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "RegionScanner.isFilterDone() called when the flow "
|
|
|
+ + "scanner's scanner is not a RegionScanner");
|
|
|
+ }
|
|
|
+ return regionScanner.getMaxResultSize();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * (non-Javadoc)
|
|
|
+ *
|
|
|
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public long getMvccReadPoint() {
|
|
|
+ if (regionScanner == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "RegionScanner.isFilterDone() called when the flow "
|
|
|
+ + "scanner's internal scanner is not a RegionScanner");
|
|
|
+ }
|
|
|
+ return regionScanner.getMvccReadPoint();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * (non-Javadoc)
|
|
|
+ *
|
|
|
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public boolean isFilterDone() throws IOException {
|
|
|
+ if (regionScanner == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "RegionScanner.isFilterDone() called when the flow "
|
|
|
+ + "scanner's internal scanner is not a RegionScanner");
|
|
|
+ }
|
|
|
+ return regionScanner.isFilterDone();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * (non-Javadoc)
|
|
|
+ *
|
|
|
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public boolean reseek(byte[] bytes) throws IOException {
|
|
|
+ if (regionScanner == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "RegionScanner.reseek() called when the flow "
|
|
|
+ + "scanner's internal scanner is not a RegionScanner");
|
|
|
+ }
|
|
|
+ return regionScanner.reseek(bytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int getBatch() {
|
|
|
+ return batchSize;
|
|
|
+ }
|
|
|
+}
|