|
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -28,11 +29,11 @@ import org.apache.hadoop.hbase.client.Result;
|
|
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
|
import org.apache.hadoop.hbase.client.Scan;
|
|
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
|
|
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
|
|
import org.apache.hadoop.hbase.filter.FamilyFilter;
|
|
|
import org.apache.hadoop.hbase.filter.FilterList;
|
|
|
-import org.apache.hadoop.hbase.filter.QualifierFilter;
|
|
|
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
|
|
import org.apache.hadoop.hbase.filter.FilterList.Operator;
|
|
|
+import org.apache.hadoop.hbase.filter.QualifierFilter;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
|
@@ -44,11 +45,16 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
|
|
|
|
@@ -66,6 +72,12 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
*/
|
|
|
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
|
|
|
|
|
|
+ /**
|
|
|
+ * Used to convert strings key components to and from storage format.
|
|
|
+ */
|
|
|
+ private final KeyConverter<String> stringKeyConverter =
|
|
|
+ new StringKeyConverter();
|
|
|
+
|
|
|
public GenericEntityReader(TimelineReaderContext ctxt,
|
|
|
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
|
|
|
boolean sortedKeys) {
|
|
@@ -95,32 +107,29 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
long createdTimeBegin = filters.getCreatedTimeBegin();
|
|
|
long createdTimeEnd = filters.getCreatedTimeEnd();
|
|
|
if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
|
|
|
- listBasedOnFilters.addFilter(
|
|
|
- TimelineFilterUtils.createSingleColValueFiltersByRange(
|
|
|
- EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
|
|
|
+ listBasedOnFilters.addFilter(TimelineFilterUtils
|
|
|
+ .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME,
|
|
|
+ createdTimeBegin, createdTimeEnd));
|
|
|
}
|
|
|
// Create filter list based on metric filters and add it to
|
|
|
// listBasedOnFilters.
|
|
|
TimelineFilterList metricFilters = filters.getMetricFilters();
|
|
|
if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
|
|
|
- listBasedOnFilters.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseFilterList(
|
|
|
- EntityColumnPrefix.METRIC, metricFilters));
|
|
|
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
|
|
|
+ EntityColumnPrefix.METRIC, metricFilters));
|
|
|
}
|
|
|
// Create filter list based on config filters and add it to
|
|
|
// listBasedOnFilters.
|
|
|
TimelineFilterList configFilters = filters.getConfigFilters();
|
|
|
if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
|
|
|
- listBasedOnFilters.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseFilterList(
|
|
|
- EntityColumnPrefix.CONFIG, configFilters));
|
|
|
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
|
|
|
+ EntityColumnPrefix.CONFIG, configFilters));
|
|
|
}
|
|
|
// Create filter list based on info filters and add it to listBasedOnFilters
|
|
|
TimelineFilterList infoFilters = filters.getInfoFilters();
|
|
|
if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
|
|
|
- listBasedOnFilters.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseFilterList(
|
|
|
- EntityColumnPrefix.INFO, infoFilters));
|
|
|
+ listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
|
|
|
+ EntityColumnPrefix.INFO, infoFilters));
|
|
|
}
|
|
|
return listBasedOnFilters;
|
|
|
}
|
|
@@ -130,10 +139,10 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
*
|
|
|
* @return true if we need to fetch some of the columns, false otherwise.
|
|
|
*/
|
|
|
- private static boolean fetchPartialEventCols(TimelineFilterList eventFilters,
|
|
|
+ private boolean fetchPartialEventCols(TimelineFilterList eventFilters,
|
|
|
EnumSet<Field> fieldsToRetrieve) {
|
|
|
return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
|
|
|
- !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS));
|
|
|
+ !hasField(fieldsToRetrieve, Field.EVENTS));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -141,10 +150,10 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
*
|
|
|
* @return true if we need to fetch some of the columns, false otherwise.
|
|
|
*/
|
|
|
- private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
|
|
|
+ private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
|
|
|
EnumSet<Field> fieldsToRetrieve) {
|
|
|
return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
|
|
|
- !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO));
|
|
|
+ !hasField(fieldsToRetrieve, Field.RELATES_TO));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -152,10 +161,10 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
*
|
|
|
* @return true if we need to fetch some of the columns, false otherwise.
|
|
|
*/
|
|
|
- private static boolean fetchPartialIsRelatedToCols(
|
|
|
- TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) {
|
|
|
+ private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo,
|
|
|
+ EnumSet<Field> fieldsToRetrieve) {
|
|
|
return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
|
|
|
- !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
|
|
|
+ !hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -163,19 +172,20 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
* relatesto and isrelatedto from info family.
|
|
|
*
|
|
|
* @return true, if we need to fetch only some of the columns, false if we
|
|
|
- * need to fetch all the columns under info column family.
|
|
|
+ * need to fetch all the columns under info column family.
|
|
|
*/
|
|
|
protected boolean fetchPartialColsFromInfoFamily() {
|
|
|
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
TimelineEntityFilters filters = getFilters();
|
|
|
- return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) ||
|
|
|
- fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) ||
|
|
|
- fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve);
|
|
|
+ return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve)
|
|
|
+ || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve)
|
|
|
+ || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(),
|
|
|
+ fieldsToRetrieve);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Check if we need to create filter list based on fields. We need to create
|
|
|
- * a filter list iff all fields need not be retrieved or we have some specific
|
|
|
+ * Check if we need to create filter list based on fields. We need to create a
|
|
|
+ * filter list iff all fields need not be retrieved or we have some specific
|
|
|
* fields or metrics to retrieve. We also need to create a filter list if we
|
|
|
* have relationships(relatesTo/isRelatedTo) and event filters specified for
|
|
|
* the query.
|
|
@@ -188,22 +198,24 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// be retrieved, also check if we have some metrics or configs to
|
|
|
// retrieve specified for the query because then a filter list will have
|
|
|
// to be created.
|
|
|
- boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) ||
|
|
|
- (dataToRetrieve.getConfsToRetrieve() != null &&
|
|
|
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) ||
|
|
|
- (dataToRetrieve.getMetricsToRetrieve() != null &&
|
|
|
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty());
|
|
|
+ boolean flag =
|
|
|
+ !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)
|
|
|
+ || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve
|
|
|
+ .getConfsToRetrieve().getFilterList().isEmpty())
|
|
|
+ || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve
|
|
|
+ .getMetricsToRetrieve().getFilterList().isEmpty());
|
|
|
// Filters need to be checked only if we are reading multiple entities. If
|
|
|
// condition above is false, we check if there are relationships(relatesTo/
|
|
|
// isRelatedTo) and event filters specified for the query.
|
|
|
if (!flag && !isSingleEntityRead()) {
|
|
|
TimelineEntityFilters filters = getFilters();
|
|
|
- flag = (filters.getEventFilters() != null &&
|
|
|
- !filters.getEventFilters().getFilterList().isEmpty()) ||
|
|
|
- (filters.getIsRelatedTo() != null &&
|
|
|
- !filters.getIsRelatedTo().getFilterList().isEmpty()) ||
|
|
|
- (filters.getRelatesTo() != null &&
|
|
|
- !filters.getRelatesTo().getFilterList().isEmpty());
|
|
|
+ flag =
|
|
|
+ (filters.getEventFilters() != null && !filters.getEventFilters()
|
|
|
+ .getFilterList().isEmpty())
|
|
|
+ || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo()
|
|
|
+ .getFilterList().isEmpty())
|
|
|
+ || (filters.getRelatesTo() != null && !filters.getRelatesTo()
|
|
|
+ .getFilterList().isEmpty());
|
|
|
}
|
|
|
return flag;
|
|
|
}
|
|
@@ -216,8 +228,8 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
*/
|
|
|
protected void updateFixedColumns(FilterList list) {
|
|
|
for (EntityColumn column : EntityColumn.values()) {
|
|
|
- list.addFilter(new QualifierFilter(CompareOp.EQUAL,
|
|
|
- new BinaryComparator(column.getColumnQualifierBytes())));
|
|
|
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
|
|
|
+ column.getColumnQualifierBytes())));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -226,30 +238,29 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
* qualifiers in the info column family will be returned in result.
|
|
|
*
|
|
|
* @param isApplication If true, it means operations are to be performed for
|
|
|
- * application table, otherwise for entity table.
|
|
|
+ * application table, otherwise for entity table.
|
|
|
* @return filter list.
|
|
|
* @throws IOException if any problem occurs while creating filter list.
|
|
|
*/
|
|
|
- private FilterList createFilterListForColsOfInfoFamily()
|
|
|
- throws IOException {
|
|
|
+ private FilterList createFilterListForColsOfInfoFamily() throws IOException {
|
|
|
FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
|
|
|
// Add filters for each column in entity table.
|
|
|
updateFixedColumns(infoFamilyColsFilter);
|
|
|
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
// If INFO field has to be retrieved, add a filter for fetching columns
|
|
|
// with INFO column prefix.
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
+ infoFamilyColsFilter
|
|
|
+ .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
CompareOp.EQUAL, EntityColumnPrefix.INFO));
|
|
|
}
|
|
|
TimelineFilterList relatesTo = getFilters().getRelatesTo();
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
// If RELATES_TO field has to be retrieved, add a filter for fetching
|
|
|
// columns with RELATES_TO column prefix.
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
- CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO));
|
|
|
+ infoFamilyColsFilter.addFilter(TimelineFilterUtils
|
|
|
+ .createHBaseQualifierFilter(CompareOp.EQUAL,
|
|
|
+ EntityColumnPrefix.RELATES_TO));
|
|
|
} else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
|
|
|
// Even if fields to retrieve does not contain RELATES_TO, we still
|
|
|
// need to have a filter to fetch some of the column qualifiers if
|
|
@@ -257,17 +268,16 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// matched after fetching rows from HBase.
|
|
|
Set<String> relatesToCols =
|
|
|
TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
|
|
|
- EntityColumnPrefix.RELATES_TO, relatesToCols));
|
|
|
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
|
|
|
+ EntityColumnPrefix.RELATES_TO, relatesToCols));
|
|
|
}
|
|
|
TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
// If IS_RELATED_TO field has to be retrieved, add a filter for fetching
|
|
|
// columns with IS_RELATED_TO column prefix.
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
- CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO));
|
|
|
+ infoFamilyColsFilter.addFilter(TimelineFilterUtils
|
|
|
+ .createHBaseQualifierFilter(CompareOp.EQUAL,
|
|
|
+ EntityColumnPrefix.IS_RELATED_TO));
|
|
|
} else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
|
|
|
// Even if fields to retrieve does not contain IS_RELATED_TO, we still
|
|
|
// need to have a filter to fetch some of the column qualifiers if
|
|
@@ -275,27 +285,26 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// matched after fetching rows from HBase.
|
|
|
Set<String> isRelatedToCols =
|
|
|
TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
|
|
|
- EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
|
|
|
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
|
|
|
+ EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
|
|
|
}
|
|
|
TimelineFilterList eventFilters = getFilters().getEventFilters();
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
+ if (hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
// If EVENTS field has to be retrieved, add a filter for fetching columns
|
|
|
// with EVENT column prefix.
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ infoFamilyColsFilter
|
|
|
+ .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
CompareOp.EQUAL, EntityColumnPrefix.EVENT));
|
|
|
- } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
|
|
|
+ } else if (eventFilters != null &&
|
|
|
+ !eventFilters.getFilterList().isEmpty()) {
|
|
|
// Even if fields to retrieve does not contain EVENTS, we still need to
|
|
|
// have a filter to fetch some of the column qualifiers on the basis of
|
|
|
// event filters specified. Event filters will then be matched after
|
|
|
// fetching rows from HBase.
|
|
|
Set<String> eventCols =
|
|
|
TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
|
|
|
- infoFamilyColsFilter.addFilter(
|
|
|
- TimelineFilterUtils.createFiltersFromColumnQualifiers(
|
|
|
- EntityColumnPrefix.EVENT, eventCols));
|
|
|
+ infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
|
|
|
+ EntityColumnPrefix.EVENT, eventCols));
|
|
|
}
|
|
|
return infoFamilyColsFilter;
|
|
|
}
|
|
@@ -310,28 +319,28 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
|
|
|
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
// Events not required.
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
- infoColFamilyList.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
- CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT));
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
+ infoColFamilyList.addFilter(TimelineFilterUtils
|
|
|
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
+ EntityColumnPrefix.EVENT));
|
|
|
}
|
|
|
// info not required.
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
- infoColFamilyList.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
- CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO));
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
+ infoColFamilyList.addFilter(TimelineFilterUtils
|
|
|
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
+ EntityColumnPrefix.INFO));
|
|
|
}
|
|
|
// is related to not required.
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
- infoColFamilyList.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
- CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO));
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
+ infoColFamilyList.addFilter(TimelineFilterUtils
|
|
|
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
+ EntityColumnPrefix.IS_RELATED_TO));
|
|
|
}
|
|
|
// relates to not required.
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
- infoColFamilyList.addFilter(
|
|
|
- TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
- CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO));
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
+ infoColFamilyList.addFilter(TimelineFilterUtils
|
|
|
+ .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
+ EntityColumnPrefix.RELATES_TO));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -348,18 +357,18 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// CONFS to fields to retrieve in augmentParams() even if not specified.
|
|
|
if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
|
|
|
// Create a filter list for configs.
|
|
|
- listBasedOnFields.addFilter(TimelineFilterUtils.
|
|
|
- createFilterForConfsOrMetricsToRetrieve(
|
|
|
- dataToRetrieve.getConfsToRetrieve(),
|
|
|
- EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG));
|
|
|
+ listBasedOnFields.addFilter(TimelineFilterUtils
|
|
|
+ .createFilterForConfsOrMetricsToRetrieve(
|
|
|
+ dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
|
|
|
+ EntityColumnPrefix.CONFIG));
|
|
|
}
|
|
|
|
|
|
// Please note that if metricsToRetrieve is specified, we would have added
|
|
|
// METRICS to fields to retrieve in augmentParams() even if not specified.
|
|
|
if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
|
|
|
// Create a filter list for metrics.
|
|
|
- listBasedOnFields.addFilter(TimelineFilterUtils.
|
|
|
- createFilterForConfsOrMetricsToRetrieve(
|
|
|
+ listBasedOnFields.addFilter(TimelineFilterUtils
|
|
|
+ .createFilterForConfsOrMetricsToRetrieve(
|
|
|
dataToRetrieve.getMetricsToRetrieve(),
|
|
|
EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
|
|
|
}
|
|
@@ -375,8 +384,8 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
FilterList infoColFamilyList = new FilterList();
|
|
|
// By default fetch everything in INFO column family.
|
|
|
FamilyFilter infoColumnFamily =
|
|
|
- new FamilyFilter(CompareOp.EQUAL,
|
|
|
- new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
|
|
|
+ new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
|
|
|
+ EntityColumnFamily.INFO.getBytes()));
|
|
|
infoColFamilyList.addFilter(infoColumnFamily);
|
|
|
if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
|
|
|
// We can fetch only some of the columns from info family.
|
|
@@ -394,27 +403,27 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
/**
|
|
|
* Looks up flow context from AppToFlow table.
|
|
|
*
|
|
|
- * @param clusterId Cluster Id.
|
|
|
- * @param appId App Id.
|
|
|
+ * @param appToFlowRowKey to identify Cluster and App Ids.
|
|
|
* @param hbaseConf HBase configuration.
|
|
|
* @param conn HBase Connection.
|
|
|
* @return flow context information.
|
|
|
* @throws IOException if any problem occurs while fetching flow information.
|
|
|
*/
|
|
|
- protected FlowContext lookupFlowContext(String clusterId, String appId,
|
|
|
+ protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
|
|
|
Configuration hbaseConf, Connection conn) throws IOException {
|
|
|
- byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
|
|
|
+ byte[] rowKey = appToFlowRowKey.getRowKey();
|
|
|
Get get = new Get(rowKey);
|
|
|
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
|
|
|
if (result != null && !result.isEmpty()) {
|
|
|
- return new FlowContext(
|
|
|
- AppToFlowColumn.USER_ID.readResult(result).toString(),
|
|
|
- AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
|
|
- ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
|
|
|
+ return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
|
|
|
+ .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
|
|
+ ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
|
|
|
+ .longValue());
|
|
|
} else {
|
|
|
throw new NotFoundException(
|
|
|
- "Unable to find the context flow ID and flow run ID for clusterId=" +
|
|
|
- clusterId + ", appId=" + appId);
|
|
|
+ "Unable to find the context flow ID and flow run ID for clusterId="
|
|
|
+ + appToFlowRowKey.getClusterId() + ", appId="
|
|
|
+ + appToFlowRowKey.getAppId());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -425,17 +434,21 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
private final String userId;
|
|
|
private final String flowName;
|
|
|
private final Long flowRunId;
|
|
|
+
|
|
|
public FlowContext(String user, String flowName, Long flowRunId) {
|
|
|
this.userId = user;
|
|
|
this.flowName = flowName;
|
|
|
this.flowRunId = flowRunId;
|
|
|
}
|
|
|
+
|
|
|
protected String getUserId() {
|
|
|
return userId;
|
|
|
}
|
|
|
+
|
|
|
protected String getFlowName() {
|
|
|
return flowName;
|
|
|
}
|
|
|
+
|
|
|
protected Long getFlowRunId() {
|
|
|
return flowRunId;
|
|
|
}
|
|
@@ -444,8 +457,8 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
@Override
|
|
|
protected void validateParams() {
|
|
|
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
|
|
- Preconditions.checkNotNull(
|
|
|
- getDataToRetrieve(), "data to retrieve shouldn't be null");
|
|
|
+ Preconditions.checkNotNull(getDataToRetrieve(),
|
|
|
+ "data to retrieve shouldn't be null");
|
|
|
Preconditions.checkNotNull(getContext().getClusterId(),
|
|
|
"clusterId shouldn't be null");
|
|
|
Preconditions.checkNotNull(getContext().getAppId(),
|
|
@@ -463,11 +476,13 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
throws IOException {
|
|
|
TimelineReaderContext context = getContext();
|
|
|
// In reality all three should be null or neither should be null
|
|
|
- if (context.getFlowName() == null || context.getFlowRunId() == null ||
|
|
|
- context.getUserId() == null) {
|
|
|
+ if (context.getFlowName() == null || context.getFlowRunId() == null
|
|
|
+ || context.getUserId() == null) {
|
|
|
// Get flow context information from AppToFlow table.
|
|
|
- FlowContext flowContext = lookupFlowContext(
|
|
|
- context.getClusterId(), context.getAppId(), hbaseConf, conn);
|
|
|
+ AppToFlowRowKey appToFlowRowKey =
|
|
|
+ new AppToFlowRowKey(context.getClusterId(), context.getAppId());
|
|
|
+ FlowContext flowContext =
|
|
|
+ lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
|
|
|
context.setFlowName(flowContext.flowName);
|
|
|
context.setFlowRunId(flowContext.flowRunId);
|
|
|
context.setUserId(flowContext.userId);
|
|
@@ -485,9 +500,9 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
FilterList filterList) throws IOException {
|
|
|
TimelineReaderContext context = getContext();
|
|
|
byte[] rowKey =
|
|
|
- EntityRowKey.getRowKey(context.getClusterId(), context.getUserId(),
|
|
|
+ new EntityRowKey(context.getClusterId(), context.getUserId(),
|
|
|
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
|
|
|
- context.getEntityType(), context.getEntityId());
|
|
|
+ context.getEntityType(), context.getEntityId()).getRowKey();
|
|
|
Get get = new Get(rowKey);
|
|
|
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
|
|
|
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
|
@@ -497,15 +512,17 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected ResultScanner getResults(Configuration hbaseConf,
|
|
|
- Connection conn, FilterList filterList) throws IOException {
|
|
|
+ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
|
|
|
+ FilterList filterList) throws IOException {
|
|
|
// Scan through part of the table to find the entities belong to one app
|
|
|
// and one type
|
|
|
Scan scan = new Scan();
|
|
|
TimelineReaderContext context = getContext();
|
|
|
- scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
|
|
|
- context.getClusterId(), context.getUserId(), context.getFlowName(),
|
|
|
- context.getFlowRunId(), context.getAppId(), context.getEntityType()));
|
|
|
+ RowKeyPrefix<EntityRowKey> entityRowKeyPrefix =
|
|
|
+ new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(),
|
|
|
+ context.getFlowName(), context.getFlowRunId(), context.getAppId(),
|
|
|
+ context.getEntityType());
|
|
|
+ scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
|
|
|
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
|
|
|
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
|
|
scan.setFilter(filterList);
|
|
@@ -535,18 +552,16 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// locally as relevant HBase filters to filter out rows on the basis of
|
|
|
// isRelatedTo are not set in HBase scan.
|
|
|
boolean checkIsRelatedTo =
|
|
|
- !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
|
|
|
- filters.getIsRelatedTo().getFilterList().size() > 0;
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
|
|
|
- checkIsRelatedTo) {
|
|
|
- TimelineStorageUtils.readRelationship(
|
|
|
- entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
|
|
|
- if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
|
|
|
- filters.getIsRelatedTo())) {
|
|
|
+ !isSingleEntityRead() && filters.getIsRelatedTo() != null
|
|
|
+ && filters.getIsRelatedTo().getFilterList().size() > 0;
|
|
|
+ if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
|
|
|
+ readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
|
|
|
+ if (checkIsRelatedTo
|
|
|
+ && !TimelineStorageUtils.matchIsRelatedTo(entity,
|
|
|
+ filters.getIsRelatedTo())) {
|
|
|
return null;
|
|
|
}
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
|
|
|
- Field.IS_RELATED_TO)) {
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
entity.getIsRelatedToEntities().clear();
|
|
|
}
|
|
|
}
|
|
@@ -556,31 +571,29 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// locally as relevant HBase filters to filter out rows on the basis of
|
|
|
// relatesTo are not set in HBase scan.
|
|
|
boolean checkRelatesTo =
|
|
|
- !isSingleEntityRead() && filters.getRelatesTo() != null &&
|
|
|
- filters.getRelatesTo().getFilterList().size() > 0;
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
|
|
|
- checkRelatesTo) {
|
|
|
- TimelineStorageUtils.readRelationship(
|
|
|
- entity, result, EntityColumnPrefix.RELATES_TO, false);
|
|
|
- if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
|
|
|
- filters.getRelatesTo())) {
|
|
|
+ !isSingleEntityRead() && filters.getRelatesTo() != null
|
|
|
+ && filters.getRelatesTo().getFilterList().size() > 0;
|
|
|
+ if (hasField(fieldsToRetrieve, Field.RELATES_TO)
|
|
|
+ || checkRelatesTo) {
|
|
|
+ readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
|
|
|
+ if (checkRelatesTo
|
|
|
+ && !TimelineStorageUtils.matchRelatesTo(entity,
|
|
|
+ filters.getRelatesTo())) {
|
|
|
return null;
|
|
|
}
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
entity.getRelatesToEntities().clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// fetch info if fieldsToRetrieve contains INFO or ALL.
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
- TimelineStorageUtils.readKeyValuePairs(
|
|
|
- entity, result, EntityColumnPrefix.INFO, false);
|
|
|
+ if (hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
+ readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
|
|
|
}
|
|
|
|
|
|
// fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
|
|
|
- TimelineStorageUtils.readKeyValuePairs(
|
|
|
- entity, result, EntityColumnPrefix.CONFIG, true);
|
|
|
+ if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
|
|
|
+ readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
|
|
|
}
|
|
|
|
|
|
// fetch events and match event filters if they exist. If event filters do
|
|
@@ -588,24 +601,48 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// as relevant HBase filters to filter out rows on the basis of events
|
|
|
// are not set in HBase scan.
|
|
|
boolean checkEvents =
|
|
|
- !isSingleEntityRead() && filters.getEventFilters() != null &&
|
|
|
- filters.getEventFilters().getFilterList().size() > 0;
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
|
|
|
- checkEvents) {
|
|
|
- TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT);
|
|
|
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
|
|
|
- filters.getEventFilters())) {
|
|
|
+ !isSingleEntityRead() && filters.getEventFilters() != null
|
|
|
+ && filters.getEventFilters().getFilterList().size() > 0;
|
|
|
+ if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
|
|
|
+ readEvents(entity, result, EntityColumnPrefix.EVENT);
|
|
|
+ if (checkEvents
|
|
|
+ && !TimelineStorageUtils.matchEventFilters(entity,
|
|
|
+ filters.getEventFilters())) {
|
|
|
return null;
|
|
|
}
|
|
|
- if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
+ if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
entity.getEvents().clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// fetch metrics if fieldsToRetrieve contains METRICS or ALL.
|
|
|
- if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
|
|
|
+ if (hasField(fieldsToRetrieve, Field.METRICS)) {
|
|
|
readMetrics(entity, result, EntityColumnPrefix.METRIC);
|
|
|
}
|
|
|
return entity;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper method for reading key-value pairs for either info or config.
|
|
|
+ *
|
|
|
+ * @param <T> Describes the type of column prefix.
|
|
|
+ * @param entity entity to fill.
|
|
|
+ * @param result result from HBase.
|
|
|
+ * @param prefix column prefix.
|
|
|
+ * @param isConfig if true, means we are reading configs, otherwise info.
|
|
|
+ * @throws IOException if any problem is encountered while reading result.
|
|
|
+ */
|
|
|
+ protected <T> void readKeyValuePairs(TimelineEntity entity, Result result,
|
|
|
+ ColumnPrefix<T> prefix, boolean isConfig) throws IOException {
|
|
|
+ // info and configuration are of type Map<String, Object or String>
|
|
|
+ Map<String, Object> columns =
|
|
|
+ prefix.readResults(result, stringKeyConverter);
|
|
|
+ if (isConfig) {
|
|
|
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
|
|
|
+ entity.addConfig(column.getKey(), column.getValue().toString());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ entity.addInfo(columns);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|