|
@@ -39,8 +39,10 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
|
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
|
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
|
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
|
|
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
|
|
@@ -113,6 +115,45 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
return appId;
|
|
return appId;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get value converter associated with a column or a column prefix. If nothing
|
|
|
|
+ * matches, generic converter is returned.
|
|
|
|
+ * @param colQualifierBytes
|
|
|
|
+ * @return value converter implementation.
|
|
|
|
+ */
|
|
|
|
+ private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
|
|
|
|
+ // Iterate over all the column prefixes for flow run table and get the
|
|
|
|
+ // appropriate converter for the column qualifier passed if prefix matches.
|
|
|
|
+ for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
|
|
|
|
+ byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
|
|
|
|
+ if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
|
|
|
|
+ colQualifierBytes, 0, colPrefixBytes.length) == 0) {
|
|
|
|
+ return colPrefix.getValueConverter();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // Iterate over all the columns for flow run table and get the
|
|
|
|
+ // appropriate converter for the column qualifier passed if match occurs.
|
|
|
|
+ for (FlowRunColumn column : FlowRunColumn.values()) {
|
|
|
|
+ if (Bytes.compareTo(
|
|
|
|
+ column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
|
|
|
|
+ return column.getValueConverter();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // Return generic converter if nothing matches.
|
|
|
|
+ return GenericConverter.getInstance();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Checks if the converter is a numeric converter or not. For a converter to
|
|
|
|
+ * be numeric, it must implement {@link NumericValueConverter} interface.
|
|
|
|
+ * @param converter
|
|
|
|
+ * @return true, if converter is of type NumericValueConverter, false
|
|
|
|
+ * otherwise.
|
|
|
|
+ */
|
|
|
|
+ private static boolean isNumericConverter(ValueConverter converter) {
|
|
|
|
+ return (converter instanceof NumericValueConverter);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This method loops through the cells in a given row of the
|
|
* This method loops through the cells in a given row of the
|
|
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how
|
|
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how
|
|
@@ -141,20 +182,32 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
|
|
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
|
|
Set<String> alreadySeenAggDim = new HashSet<>();
|
|
Set<String> alreadySeenAggDim = new HashSet<>();
|
|
int addedCnt = 0;
|
|
int addedCnt = 0;
|
|
|
|
+ ValueConverter converter = null;
|
|
while (((cell = peekAtNextCell(limit)) != null)
|
|
while (((cell = peekAtNextCell(limit)) != null)
|
|
&& (limit <= 0 || addedCnt < limit)) {
|
|
&& (limit <= 0 || addedCnt < limit)) {
|
|
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
|
|
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
|
|
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
|
|
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
|
|
- addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
|
|
|
|
|
|
+ if (converter != null && isNumericConverter(converter)) {
|
|
|
|
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
|
|
|
|
+ (NumericValueConverter)converter);
|
|
|
|
+ }
|
|
resetState(currentColumnCells, alreadySeenAggDim);
|
|
resetState(currentColumnCells, alreadySeenAggDim);
|
|
currentColumnQualifier = newColumnQualifier;
|
|
currentColumnQualifier = newColumnQualifier;
|
|
currentAggOp = getCurrentAggOp(cell);
|
|
currentAggOp = getCurrentAggOp(cell);
|
|
|
|
+ converter = getValueConverter(newColumnQualifier);
|
|
}
|
|
}
|
|
- collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
|
|
|
|
|
|
+ // No operation needs to be performed on non numeric converters.
|
|
|
|
+ if (!isNumericConverter(converter)) {
|
|
|
|
+ nextCell(limit);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
|
|
|
|
+ (NumericValueConverter)converter);
|
|
nextCell(limit);
|
|
nextCell(limit);
|
|
}
|
|
}
|
|
if (!currentColumnCells.isEmpty()) {
|
|
if (!currentColumnCells.isEmpty()) {
|
|
- emitCells(cells, currentColumnCells, currentAggOp);
|
|
|
|
|
|
+ emitCells(cells, currentColumnCells, currentAggOp,
|
|
|
|
+ (NumericValueConverter)converter);
|
|
}
|
|
}
|
|
return hasMore();
|
|
return hasMore();
|
|
}
|
|
}
|
|
@@ -183,7 +236,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
|
|
|
|
private void collectCells(SortedSet<Cell> currentColumnCells,
|
|
private void collectCells(SortedSet<Cell> currentColumnCells,
|
|
AggregationOperation currentAggOp, Cell cell,
|
|
AggregationOperation currentAggOp, Cell cell,
|
|
- Set<String> alreadySeenAggDim) throws IOException {
|
|
|
|
|
|
+ Set<String> alreadySeenAggDim, NumericValueConverter converter)
|
|
|
|
+ throws IOException {
|
|
if (currentAggOp == null) {
|
|
if (currentAggOp == null) {
|
|
// not a min/max/metric cell, so just return it as is
|
|
// not a min/max/metric cell, so just return it as is
|
|
currentColumnCells.add(cell);
|
|
currentColumnCells.add(cell);
|
|
@@ -197,7 +251,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
currentColumnCells.add(cell);
|
|
currentColumnCells.add(cell);
|
|
} else {
|
|
} else {
|
|
Cell currentMinCell = currentColumnCells.first();
|
|
Cell currentMinCell = currentColumnCells.first();
|
|
- Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
|
|
|
|
|
|
+ Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
|
|
|
|
+ converter);
|
|
if (!currentMinCell.equals(newMinCell)) {
|
|
if (!currentMinCell.equals(newMinCell)) {
|
|
currentColumnCells.remove(currentMinCell);
|
|
currentColumnCells.remove(currentMinCell);
|
|
currentColumnCells.add(newMinCell);
|
|
currentColumnCells.add(newMinCell);
|
|
@@ -209,7 +264,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
currentColumnCells.add(cell);
|
|
currentColumnCells.add(cell);
|
|
} else {
|
|
} else {
|
|
Cell currentMaxCell = currentColumnCells.first();
|
|
Cell currentMaxCell = currentColumnCells.first();
|
|
- Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
|
|
|
|
|
|
+ Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
|
|
|
|
+ converter);
|
|
if (!currentMaxCell.equals(newMaxCell)) {
|
|
if (!currentMaxCell.equals(newMaxCell)) {
|
|
currentColumnCells.remove(currentMaxCell);
|
|
currentColumnCells.remove(currentMaxCell);
|
|
currentColumnCells.add(newMaxCell);
|
|
currentColumnCells.add(newMaxCell);
|
|
@@ -245,7 +301,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
* parameter.
|
|
* parameter.
|
|
*/
|
|
*/
|
|
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
|
|
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
|
|
- AggregationOperation currentAggOp) throws IOException {
|
|
|
|
|
|
+ AggregationOperation currentAggOp, NumericValueConverter converter)
|
|
|
|
+ throws IOException {
|
|
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
|
|
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
@@ -261,7 +318,7 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
return currentColumnCells.size();
|
|
return currentColumnCells.size();
|
|
case SUM:
|
|
case SUM:
|
|
case SUM_FINAL:
|
|
case SUM_FINAL:
|
|
- Cell sumCell = processSummation(currentColumnCells);
|
|
|
|
|
|
+ Cell sumCell = processSummation(currentColumnCells, converter);
|
|
cells.add(sumCell);
|
|
cells.add(sumCell);
|
|
return 1;
|
|
return 1;
|
|
default:
|
|
default:
|
|
@@ -276,24 +333,24 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
* sum of a metric for a flow run is the summation at the point of the last
|
|
* sum of a metric for a flow run is the summation at the point of the last
|
|
* metric update in that flow till that time.
|
|
* metric update in that flow till that time.
|
|
*/
|
|
*/
|
|
- private Cell processSummation(SortedSet<Cell> currentColumnCells)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ private Cell processSummation(SortedSet<Cell> currentColumnCells,
|
|
|
|
+ NumericValueConverter converter) throws IOException {
|
|
Number sum = 0;
|
|
Number sum = 0;
|
|
Number currentValue = 0;
|
|
Number currentValue = 0;
|
|
long ts = 0L;
|
|
long ts = 0L;
|
|
- long mostCurrentTimestamp = 0l;
|
|
|
|
|
|
+ long mostCurrentTimestamp = 0L;
|
|
Cell mostRecentCell = null;
|
|
Cell mostRecentCell = null;
|
|
for (Cell cell : currentColumnCells) {
|
|
for (Cell cell : currentColumnCells) {
|
|
- currentValue = (Number) GenericObjectMapper.read(CellUtil
|
|
|
|
- .cloneValue(cell));
|
|
|
|
|
|
+ currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
|
|
ts = cell.getTimestamp();
|
|
ts = cell.getTimestamp();
|
|
if (mostCurrentTimestamp < ts) {
|
|
if (mostCurrentTimestamp < ts) {
|
|
mostCurrentTimestamp = ts;
|
|
mostCurrentTimestamp = ts;
|
|
mostRecentCell = cell;
|
|
mostRecentCell = cell;
|
|
}
|
|
}
|
|
- sum = sum.longValue() + currentValue.longValue();
|
|
|
|
|
|
+ sum = converter.add(sum, currentValue);
|
|
}
|
|
}
|
|
- Cell sumCell = createNewCell(mostRecentCell, sum);
|
|
|
|
|
|
+ byte[] sumBytes = converter.encodeValue(sum);
|
|
|
|
+ Cell sumCell = createNewCell(mostRecentCell, sumBytes);
|
|
return sumCell;
|
|
return sumCell;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -308,18 +365,20 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
|
|
private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
|
|
- AggregationOperation currentAggOp) throws IOException {
|
|
|
|
|
|
+ AggregationOperation currentAggOp, NumericValueConverter converter)
|
|
|
|
+ throws IOException {
|
|
if (previouslyChosenCell == null) {
|
|
if (previouslyChosenCell == null) {
|
|
return currentCell;
|
|
return currentCell;
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
- long previouslyChosenCellValue = ((Number) GenericObjectMapper
|
|
|
|
- .read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
|
|
|
|
- long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
|
|
|
|
- .cloneValue(currentCell))).longValue();
|
|
|
|
|
|
+ Number previouslyChosenCellValue = (Number)converter.decodeValue(
|
|
|
|
+ CellUtil.cloneValue(previouslyChosenCell));
|
|
|
|
+ Number currentCellValue = (Number) converter.decodeValue(CellUtil
|
|
|
|
+ .cloneValue(currentCell));
|
|
switch (currentAggOp) {
|
|
switch (currentAggOp) {
|
|
case MIN:
|
|
case MIN:
|
|
- if (currentCellValue < previouslyChosenCellValue) {
|
|
|
|
|
|
+ if (converter.compare(
|
|
|
|
+ currentCellValue, previouslyChosenCellValue) < 0) {
|
|
// new value is minimum, hence return this cell
|
|
// new value is minimum, hence return this cell
|
|
return currentCell;
|
|
return currentCell;
|
|
} else {
|
|
} else {
|
|
@@ -327,7 +386,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
return previouslyChosenCell;
|
|
return previouslyChosenCell;
|
|
}
|
|
}
|
|
case MAX:
|
|
case MAX:
|
|
- if (currentCellValue > previouslyChosenCellValue) {
|
|
|
|
|
|
+ if (converter.compare(
|
|
|
|
+ currentCellValue, previouslyChosenCellValue) > 0) {
|
|
// new value is max, hence return this cell
|
|
// new value is max, hence return this cell
|
|
return currentCell;
|
|
return currentCell;
|
|
} else {
|
|
} else {
|
|
@@ -343,8 +403,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private Cell createNewCell(Cell origCell, Number number) throws IOException {
|
|
|
|
- byte[] newValue = GenericObjectMapper.write(number);
|
|
|
|
|
|
+ private Cell createNewCell(Cell origCell, byte[] newValue)
|
|
|
|
+ throws IOException {
|
|
return CellUtil.createCell(CellUtil.cloneRow(origCell),
|
|
return CellUtil.createCell(CellUtil.cloneRow(origCell),
|
|
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
|
|
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
|
|
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
|
|
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
|