|
@@ -19,13 +19,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.EnumSet;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hbase.client.Connection;
|
|
|
import org.apache.hadoop.hbase.client.Get;
|
|
@@ -33,28 +28,22 @@ import org.apache.hadoop.hbase.client.Result;
|
|
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
|
import org.apache.hadoop.hbase.client.Scan;
|
|
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
|
|
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
|
|
|
import org.apache.hadoop.hbase.filter.FamilyFilter;
|
|
|
import org.apache.hadoop.hbase.filter.FilterList;
|
|
|
import org.apache.hadoop.hbase.filter.QualifierFilter;
|
|
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
|
|
import org.apache.hadoop.hbase.filter.FilterList.Operator;
|
|
|
-import org.apache.hadoop.hbase.util.Bytes;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
|
|
@@ -71,7 +60,6 @@ import com.google.common.base.Preconditions;
|
|
|
*/
|
|
|
class GenericEntityReader extends TimelineEntityReader {
|
|
|
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
|
|
- private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
|
|
|
|
|
|
/**
|
|
|
* Used to look up the flow context.
|
|
@@ -97,92 +85,322 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected FilterList constructFilterListBasedOnFields() {
|
|
|
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
|
|
|
- TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
|
|
|
- // Fetch all the columns.
|
|
|
- if (dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
|
|
|
- (dataToRetrieve.getConfsToRetrieve() == null ||
|
|
|
- dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) &&
|
|
|
- (dataToRetrieve.getMetricsToRetrieve() == null ||
|
|
|
- dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
|
|
|
- return list;
|
|
|
+ protected FilterList constructFilterListBasedOnFilters() throws IOException {
|
|
|
+ // Filters here cannot be null for multiple entity reads as they are set in
|
|
|
+ // augmentParams if null.
|
|
|
+ FilterList listBasedOnFilters = new FilterList();
|
|
|
+ TimelineEntityFilters filters = getFilters();
|
|
|
+ // Create filter list based on created time range and add it to
|
|
|
+ // listBasedOnFilters.
|
|
|
+ long createdTimeBegin = filters.getCreatedTimeBegin();
|
|
|
+ long createdTimeEnd = filters.getCreatedTimeEnd();
|
|
|
+ if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
|
|
|
+ listBasedOnFilters.addFilter(
|
|
|
+ TimelineFilterUtils.createSingleColValueFiltersByRange(
|
|
|
+ EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
|
|
|
}
|
|
|
- FilterList infoColFamilyList = new FilterList();
|
|
|
- // By default fetch everything in INFO column family.
|
|
|
- FamilyFilter infoColumnFamily =
|
|
|
- new FamilyFilter(CompareOp.EQUAL,
|
|
|
- new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
|
|
|
- infoColFamilyList.addFilter(infoColumnFamily);
|
|
|
+ // Create filter list based on metric filters and add it to
|
|
|
+ // listBasedOnFilters.
|
|
|
+ TimelineFilterList metricFilters = filters.getMetricFilters();
|
|
|
+ if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
|
|
|
+ listBasedOnFilters.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseFilterList(
|
|
|
+ EntityColumnPrefix.METRIC, metricFilters));
|
|
|
+ }
|
|
|
+ // Create filter list based on config filters and add it to
|
|
|
+ // listBasedOnFilters.
|
|
|
+ TimelineFilterList configFilters = filters.getConfigFilters();
|
|
|
+ if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
|
|
|
+ listBasedOnFilters.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseFilterList(
|
|
|
+ EntityColumnPrefix.CONFIG, configFilters));
|
|
|
+ }
|
|
|
+ // Create filter list based on info filters and add it to listBasedOnFilters
|
|
|
+ TimelineFilterList infoFilters = filters.getInfoFilters();
|
|
|
+ if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
|
|
|
+ listBasedOnFilters.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseFilterList(
|
|
|
+ EntityColumnPrefix.INFO, infoFilters));
|
|
|
+ }
|
|
|
+ return listBasedOnFilters;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if we need to fetch only some of the event columns.
|
|
|
+ *
|
|
|
+ * @return true if we need to fetch some of the columns, false otherwise.
|
|
|
+ */
|
|
|
+ private static boolean fetchPartialEventCols(TimelineFilterList eventFilters,
|
|
|
+ EnumSet<Field> fieldsToRetrieve) {
|
|
|
+ return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
|
|
|
+ !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if we need to fetch only some of the relates_to columns.
|
|
|
+ *
|
|
|
+ * @return true if we need to fetch some of the columns, false otherwise.
|
|
|
+ */
|
|
|
+ private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
|
|
|
+ EnumSet<Field> fieldsToRetrieve) {
|
|
|
+ return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
|
|
|
+ !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if we need to fetch only some of the is_related_to columns.
|
|
|
+ *
|
|
|
+ * @return true if we need to fetch some of the columns, false otherwise.
|
|
|
+ */
|
|
|
+ private static boolean fetchPartialIsRelatedToCols(
|
|
|
+ TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) {
|
|
|
+ return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
|
|
|
+ !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if we need to fetch only some of the columns based on event filters,
|
|
|
+ * relatesto and isrelatedto from info family.
|
|
|
+ *
|
|
|
+ * @return true, if we need to fetch only some of the columns, false if we
|
|
|
+ * need to fetch all the columns under info column family.
|
|
|
+ */
|
|
|
+ protected boolean fetchPartialColsFromInfoFamily() {
|
|
|
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
TimelineEntityFilters filters = getFilters();
|
|
|
+ return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) ||
|
|
|
+ fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) ||
|
|
|
+ fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if we need to create filter list based on fields. We need to create
|
|
|
+ * a filter list iff all fields need not be retrieved or we have some specific
|
|
|
+ * fields or metrics to retrieve. We also need to create a filter list if we
|
|
|
+ * have relationships(relatesTo/isRelatedTo) and event filters specified for
|
|
|
+ * the query.
|
|
|
+ *
|
|
|
+ * @return true if we need to create the filter list, false otherwise.
|
|
|
+ */
|
|
|
+ protected boolean needCreateFilterListBasedOnFields() {
|
|
|
+ TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
|
|
|
+ // Check if all fields are to be retrieved or not. If all fields have to
|
|
|
+ // be retrieved, also check if we have some metrics or configs to
|
|
|
+ // retrieve specified for the query because then a filter list will have
|
|
|
+ // to be created.
|
|
|
+ boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) ||
|
|
|
+ (dataToRetrieve.getConfsToRetrieve() != null &&
|
|
|
+ !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) ||
|
|
|
+ (dataToRetrieve.getMetricsToRetrieve() != null &&
|
|
|
+ !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty());
|
|
|
+ // Filters need to be checked only if we are reading multiple entities. If
|
|
|
+ // condition above is false, we check if there are relationships(relatesTo/
|
|
|
+ // isRelatedTo) and event filters specified for the query.
|
|
|
+ if (!flag && !isSingleEntityRead()) {
|
|
|
+ TimelineEntityFilters filters = getFilters();
|
|
|
+ flag = (filters.getEventFilters() != null &&
|
|
|
+ !filters.getEventFilters().getFilterList().isEmpty()) ||
|
|
|
+ (filters.getIsRelatedTo() != null &&
|
|
|
+ !filters.getIsRelatedTo().getFilterList().isEmpty()) ||
|
|
|
+ (filters.getRelatesTo() != null &&
|
|
|
+ !filters.getRelatesTo().getFilterList().isEmpty());
|
|
|
+ }
|
|
|
+ return flag;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add {@link QualifierFilter} filters to filter list for each column of
|
|
|
+ * entity table.
|
|
|
+ *
|
|
|
+ * @param list filter list to which qualifier filters have to be added.
|
|
|
+ */
|
|
|
+ protected void updateFixedColumns(FilterList list) {
|
|
|
+ for (EntityColumn column : EntityColumn.values()) {
|
|
|
+ list.addFilter(new QualifierFilter(CompareOp.EQUAL,
|
|
|
+ new BinaryComparator(column.getColumnQualifierBytes())));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a filter list which indicates that only some of the column
|
|
|
+ * qualifiers in the info column family will be returned in result.
|
|
|
+ *
|
|
|
+ * @param isApplication If true, it means operations are to be performed for
|
|
|
+ * application table, otherwise for entity table.
|
|
|
+ * @return filter list.
|
|
|
+ * @throws IOException if any problem occurs while creating filter list.
|
|
|
+ */
|
|
|
+ private FilterList createFilterListForColsOfInfoFamily()
|
|
|
+ throws IOException {
|
|
|
+ FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
|
|
|
+ // Add filters for each column in entity table.
|
|
|
+ updateFixedColumns(infoFamilyColsFilter);
|
|
|
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
+ // If INFO field has to be retrieved, add a filter for fetching columns
|
|
|
+ // with INFO column prefix.
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.EQUAL, EntityColumnPrefix.INFO));
|
|
|
+ }
|
|
|
+ TimelineFilterList relatesTo = getFilters().getRelatesTo();
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
+ // If RELATES_TO field has to be retrieved, add a filter for fetching
|
|
|
+ // columns with RELATES_TO column prefix.
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO));
|
|
|
+ } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
|
|
|
+ // Even if fields to retrieve does not contain RELATES_TO, we still
|
|
|
+ // need to have a filter to fetch some of the column qualifiers if
|
|
|
+ // relatesTo filters are specified. relatesTo filters will then be
|
|
|
+ // matched after fetching rows from HBase.
|
|
|
+ Set<String> relatesToCols =
|
|
|
+ TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
|
|
|
+ EntityColumnPrefix.RELATES_TO, relatesToCols));
|
|
|
+ }
|
|
|
+ TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
+ // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
|
|
|
+ // columns with IS_RELATED_TO column prefix.
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO));
|
|
|
+ } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
|
|
|
+ // Even if fields to retrieve does not contain IS_RELATED_TO, we still
|
|
|
+ // need to have a filter to fetch some of the column qualifiers if
|
|
|
+ // isRelatedTo filters are specified. isRelatedTo filters will then be
|
|
|
+ // matched after fetching rows from HBase.
|
|
|
+ Set<String> isRelatedToCols =
|
|
|
+ TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
|
|
|
+ EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
|
|
|
+ }
|
|
|
+ TimelineFilterList eventFilters = getFilters().getEventFilters();
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
+ // If EVENTS field has to be retrieved, add a filter for fetching columns
|
|
|
+ // with EVENT column prefix.
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.EQUAL, EntityColumnPrefix.EVENT));
|
|
|
+ } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){
|
|
|
+ // Even if fields to retrieve does not contain EVENTS, we still need to
|
|
|
+ // have a filter to fetch some of the column qualifiers on the basis of
|
|
|
+ // event filters specified. Event filters will then be matched after
|
|
|
+ // fetching rows from HBase.
|
|
|
+ Set<String> eventCols =
|
|
|
+ TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
|
|
|
+ infoFamilyColsFilter.addFilter(
|
|
|
+ TimelineFilterUtils.createFiltersFromColumnQualifiers(
|
|
|
+ EntityColumnPrefix.EVENT, eventCols));
|
|
|
+ }
|
|
|
+ return infoFamilyColsFilter;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Exclude column prefixes via filters which are not required(based on fields
|
|
|
+ * to retrieve) from info column family. These filters are added to filter
|
|
|
+ * list which contains a filter for getting info column family.
|
|
|
+ *
|
|
|
+ * @param infoColFamilyList filter list for info column family.
|
|
|
+ */
|
|
|
+ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
|
|
|
+ EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
// Events not required.
|
|
|
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) &&
|
|
|
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
|
|
|
- (isSingleEntityRead() || filters.getEventFilters() == null)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
infoColFamilyList.addFilter(
|
|
|
- new QualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
- new BinaryPrefixComparator(
|
|
|
- EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT));
|
|
|
}
|
|
|
// info not required.
|
|
|
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) &&
|
|
|
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
|
|
|
- (isSingleEntityRead() || filters.getInfoFilters() == null)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
infoColFamilyList.addFilter(
|
|
|
- new QualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
- new BinaryPrefixComparator(
|
|
|
- EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO));
|
|
|
}
|
|
|
// is related to not required.
|
|
|
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) &&
|
|
|
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
|
|
|
- (isSingleEntityRead() || filters.getIsRelatedTo() == null)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
|
|
|
infoColFamilyList.addFilter(
|
|
|
- new QualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
- new BinaryPrefixComparator(
|
|
|
- EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO));
|
|
|
}
|
|
|
// relates to not required.
|
|
|
- if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) &&
|
|
|
- !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) &&
|
|
|
- (isSingleEntityRead() || filters.getRelatesTo() == null)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
infoColFamilyList.addFilter(
|
|
|
- new QualifierFilter(CompareOp.NOT_EQUAL,
|
|
|
- new BinaryPrefixComparator(
|
|
|
- EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
|
|
|
+ TimelineFilterUtils.createHBaseQualifierFilter(
|
|
|
+ CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO));
|
|
|
}
|
|
|
- list.addFilter(infoColFamilyList);
|
|
|
- if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) ||
|
|
|
- (!isSingleEntityRead() && filters.getConfigFilters() != null)) ||
|
|
|
- (dataToRetrieve.getConfsToRetrieve() != null &&
|
|
|
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) {
|
|
|
- FilterList filterCfg =
|
|
|
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
|
|
|
- new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
|
|
|
- if (dataToRetrieve.getConfsToRetrieve() != null &&
|
|
|
- !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) {
|
|
|
- filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
|
|
|
- EntityColumnPrefix.CONFIG, dataToRetrieve.getConfsToRetrieve()));
|
|
|
- }
|
|
|
- list.addFilter(filterCfg);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Updates filter list based on fields for confs and metrics to retrieve.
|
|
|
+ *
|
|
|
+ * @param listBasedOnFields filter list based on fields.
|
|
|
+ * @throws IOException if any problem occurs while updating filter list.
|
|
|
+ */
|
|
|
+ private void updateFilterForConfsAndMetricsToRetrieve(
|
|
|
+ FilterList listBasedOnFields) throws IOException {
|
|
|
+ TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
|
|
|
+ // Please note that if confsToRetrieve is specified, we would have added
|
|
|
+ // CONFS to fields to retrieve in augmentParams() even if not specified.
|
|
|
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
|
|
|
+ // Create a filter list for configs.
|
|
|
+ listBasedOnFields.addFilter(TimelineFilterUtils.
|
|
|
+ createFilterForConfsOrMetricsToRetrieve(
|
|
|
+ dataToRetrieve.getConfsToRetrieve(),
|
|
|
+ EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG));
|
|
|
}
|
|
|
- if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) ||
|
|
|
- (!isSingleEntityRead() && filters.getMetricFilters() != null)) ||
|
|
|
- (dataToRetrieve.getMetricsToRetrieve() != null &&
|
|
|
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) {
|
|
|
- FilterList filterMetrics =
|
|
|
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
|
|
|
- new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
|
|
|
- if (dataToRetrieve.getMetricsToRetrieve() != null &&
|
|
|
- !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()) {
|
|
|
- filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
|
|
|
- EntityColumnPrefix.METRIC, dataToRetrieve.getMetricsToRetrieve()));
|
|
|
- }
|
|
|
- list.addFilter(filterMetrics);
|
|
|
+
|
|
|
+ // Please note that if metricsToRetrieve is specified, we would have added
|
|
|
+ // METRICS to fields to retrieve in augmentParams() even if not specified.
|
|
|
+ if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
|
|
|
+ // Create a filter list for metrics.
|
|
|
+ listBasedOnFields.addFilter(TimelineFilterUtils.
|
|
|
+ createFilterForConfsOrMetricsToRetrieve(
|
|
|
+ dataToRetrieve.getMetricsToRetrieve(),
|
|
|
+ EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected FilterList constructFilterListBasedOnFields() throws IOException {
|
|
|
+ if (!needCreateFilterListBasedOnFields()) {
|
|
|
+ // Fetch all the columns. No need of a filter.
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
|
|
|
+ FilterList infoColFamilyList = new FilterList();
|
|
|
+ // By default fetch everything in INFO column family.
|
|
|
+ FamilyFilter infoColumnFamily =
|
|
|
+ new FamilyFilter(CompareOp.EQUAL,
|
|
|
+ new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
|
|
|
+ infoColFamilyList.addFilter(infoColumnFamily);
|
|
|
+ if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
|
|
|
+ // We can fetch only some of the columns from info family.
|
|
|
+ infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
|
|
|
+ } else {
|
|
|
+ // Exclude column prefixes in info column family which are not required
|
|
|
+ // based on fields to retrieve.
|
|
|
+ excludeFieldsFromInfoColFamily(infoColFamilyList);
|
|
|
}
|
|
|
- return list;
|
|
|
+ listBasedOnFields.addFilter(infoColFamilyList);
|
|
|
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
|
|
|
+ return listBasedOnFields;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Looks up flow context from AppToFlow table.
|
|
|
+ *
|
|
|
+ * @param clusterId Cluster Id.
|
|
|
+ * @param appId App Id.
|
|
|
+ * @param hbaseConf HBase configuration.
|
|
|
+ * @param conn HBase Connection.
|
|
|
+ * @return flow context information.
|
|
|
+ * @throws IOException if any problem occurs while fetching flow information.
|
|
|
+ */
|
|
|
protected FlowContext lookupFlowContext(String clusterId, String appId,
|
|
|
Configuration hbaseConf, Connection conn) throws IOException {
|
|
|
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
|
|
@@ -200,6 +418,9 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Encapsulates flow context information.
|
|
|
+ */
|
|
|
protected static class FlowContext {
|
|
|
private final String userId;
|
|
|
private final String flowName;
|
|
@@ -222,6 +443,9 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
|
|
|
@Override
|
|
|
protected void validateParams() {
|
|
|
+ Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
|
|
+ Preconditions.checkNotNull(
|
|
|
+ getDataToRetrieve(), "data to retrieve shouldn't be null");
|
|
|
Preconditions.checkNotNull(getContext().getClusterId(),
|
|
|
"clusterId shouldn't be null");
|
|
|
Preconditions.checkNotNull(getContext().getAppId(),
|
|
@@ -241,13 +465,19 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// In reality all three should be null or neither should be null
|
|
|
if (context.getFlowName() == null || context.getFlowRunId() == null ||
|
|
|
context.getUserId() == null) {
|
|
|
+ // Get flow context information from AppToFlow table.
|
|
|
FlowContext flowContext = lookupFlowContext(
|
|
|
context.getClusterId(), context.getAppId(), hbaseConf, conn);
|
|
|
context.setFlowName(flowContext.flowName);
|
|
|
context.setFlowRunId(flowContext.flowRunId);
|
|
|
context.setUserId(flowContext.userId);
|
|
|
}
|
|
|
+ // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
|
|
|
+ // metricsToRetrieve are specified.
|
|
|
getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
|
|
|
+ if (!isSingleEntityRead()) {
|
|
|
+ createFiltersIfNull();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -298,215 +528,84 @@ class GenericEntityReader extends TimelineEntityReader {
|
|
|
// fetch created time
|
|
|
Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
|
|
|
entity.setCreatedTime(createdTime.longValue());
|
|
|
- if (!isSingleEntityRead() &&
|
|
|
- (entity.getCreatedTime() < filters.getCreatedTimeBegin() ||
|
|
|
- entity.getCreatedTime() > filters.getCreatedTimeEnd())) {
|
|
|
- return null;
|
|
|
- }
|
|
|
+
|
|
|
EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
|
|
|
- // fetch is related to entities
|
|
|
+ // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
|
|
|
+ // filters do not match, entity would be dropped. We have to match filters
|
|
|
+ // locally as relevant HBase filters to filter out rows on the basis of
|
|
|
+ // isRelatedTo are not set in HBase scan.
|
|
|
boolean checkIsRelatedTo =
|
|
|
- filters != null && filters.getIsRelatedTo() != null &&
|
|
|
- filters.getIsRelatedTo().size() > 0;
|
|
|
- if (fieldsToRetrieve.contains(Field.ALL) ||
|
|
|
- fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
|
|
|
- readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
|
|
|
- if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
|
|
|
- entity.getIsRelatedToEntities(), filters.getIsRelatedTo())) {
|
|
|
+ !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
|
|
|
+ filters.getIsRelatedTo().getFilterList().size() > 0;
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
|
|
|
+ checkIsRelatedTo) {
|
|
|
+ TimelineStorageUtils.readRelationship(
|
|
|
+ entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
|
|
|
+ if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
|
|
|
+ filters.getIsRelatedTo())) {
|
|
|
return null;
|
|
|
}
|
|
|
- if (!fieldsToRetrieve.contains(Field.ALL) &&
|
|
|
- !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
|
|
|
+ Field.IS_RELATED_TO)) {
|
|
|
entity.getIsRelatedToEntities().clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // fetch relates to entities
|
|
|
+ // fetch relates to entities and match relatesTo filter. If relatesTo
|
|
|
+ // filters do not match, entity would be dropped. We have to match filters
|
|
|
+ // locally as relevant HBase filters to filter out rows on the basis of
|
|
|
+ // relatesTo are not set in HBase scan.
|
|
|
boolean checkRelatesTo =
|
|
|
- filters != null && filters.getRelatesTo() != null &&
|
|
|
- filters.getRelatesTo().size() > 0;
|
|
|
- if (fieldsToRetrieve.contains(Field.ALL) ||
|
|
|
- fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
|
|
|
- readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
|
|
|
- if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
|
|
|
- entity.getRelatesToEntities(), filters.getRelatesTo())) {
|
|
|
+ !isSingleEntityRead() && filters.getRelatesTo() != null &&
|
|
|
+ filters.getRelatesTo().getFilterList().size() > 0;
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
|
|
|
+ checkRelatesTo) {
|
|
|
+ TimelineStorageUtils.readRelationship(
|
|
|
+ entity, result, EntityColumnPrefix.RELATES_TO, false);
|
|
|
+ if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
|
|
|
+ filters.getRelatesTo())) {
|
|
|
return null;
|
|
|
}
|
|
|
- if (!fieldsToRetrieve.contains(Field.ALL) &&
|
|
|
- !fieldsToRetrieve.contains(Field.RELATES_TO)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
|
|
|
entity.getRelatesToEntities().clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // fetch info
|
|
|
- boolean checkInfo = filters != null && filters.getInfoFilters() != null &&
|
|
|
- filters.getInfoFilters().size() > 0;
|
|
|
- if (fieldsToRetrieve.contains(Field.ALL) ||
|
|
|
- fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
|
|
|
- readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
|
|
|
- if (checkInfo &&
|
|
|
- !TimelineStorageUtils.matchFilters(
|
|
|
- entity.getInfo(), filters.getInfoFilters())) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- if (!fieldsToRetrieve.contains(Field.ALL) &&
|
|
|
- !fieldsToRetrieve.contains(Field.INFO)) {
|
|
|
- entity.getInfo().clear();
|
|
|
- }
|
|
|
+ // fetch info if fieldsToRetrieve contains INFO or ALL.
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
|
|
|
+ TimelineStorageUtils.readKeyValuePairs(
|
|
|
+ entity, result, EntityColumnPrefix.INFO, false);
|
|
|
}
|
|
|
|
|
|
- // fetch configs
|
|
|
- boolean checkConfigs =
|
|
|
- filters != null && filters.getConfigFilters() != null &&
|
|
|
- filters.getConfigFilters().size() > 0;
|
|
|
- if (fieldsToRetrieve.contains(Field.ALL) ||
|
|
|
- fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
|
|
|
- readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
|
|
|
- if (checkConfigs && !TimelineStorageUtils.matchFilters(
|
|
|
- entity.getConfigs(), filters.getConfigFilters())) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- if (!fieldsToRetrieve.contains(Field.ALL) &&
|
|
|
- !fieldsToRetrieve.contains(Field.CONFIGS)) {
|
|
|
- entity.getConfigs().clear();
|
|
|
- }
|
|
|
+ // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
|
|
|
+ TimelineStorageUtils.readKeyValuePairs(
|
|
|
+ entity, result, EntityColumnPrefix.CONFIG, true);
|
|
|
}
|
|
|
|
|
|
- // fetch events
|
|
|
+ // fetch events and match event filters if they exist. If event filters do
|
|
|
+ // not match, entity would be dropped. We have to match filters locally
|
|
|
+ // as relevant HBase filters to filter out rows on the basis of events
|
|
|
+ // are not set in HBase scan.
|
|
|
boolean checkEvents =
|
|
|
- filters != null && filters.getEventFilters() != null &&
|
|
|
- filters.getEventFilters().size() > 0;
|
|
|
- if (fieldsToRetrieve.contains(Field.ALL) ||
|
|
|
- fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
|
|
|
- readEvents(entity, result, false);
|
|
|
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(
|
|
|
- entity.getEvents(), filters.getEventFilters())) {
|
|
|
+ !isSingleEntityRead() && filters.getEventFilters() != null &&
|
|
|
+ filters.getEventFilters().getFilterList().size() > 0;
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
|
|
|
+ checkEvents) {
|
|
|
+ TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT);
|
|
|
+ if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
|
|
|
+ filters.getEventFilters())) {
|
|
|
return null;
|
|
|
}
|
|
|
- if (!fieldsToRetrieve.contains(Field.ALL) &&
|
|
|
- !fieldsToRetrieve.contains(Field.EVENTS)) {
|
|
|
+ if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
|
|
|
entity.getEvents().clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // fetch metrics
|
|
|
- boolean checkMetrics =
|
|
|
- filters != null && filters.getMetricFilters() != null &&
|
|
|
- filters.getMetricFilters().size() > 0;
|
|
|
- if (fieldsToRetrieve.contains(Field.ALL) ||
|
|
|
- fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
|
|
|
+ // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
|
|
|
+ if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
|
|
|
readMetrics(entity, result, EntityColumnPrefix.METRIC);
|
|
|
- if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
|
|
|
- entity.getMetrics(), filters.getMetricFilters())) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- if (!fieldsToRetrieve.contains(Field.ALL) &&
|
|
|
- !fieldsToRetrieve.contains(Field.METRICS)) {
|
|
|
- entity.getMetrics().clear();
|
|
|
- }
|
|
|
}
|
|
|
return entity;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Helper method for reading relationship.
|
|
|
- *
|
|
|
- * @param <T> Describes the type of column prefix.
|
|
|
- * @param entity entity to fill.
|
|
|
- * @param result result from HBase.
|
|
|
- * @param prefix column prefix.
|
|
|
- * @param isRelatedTo if true, means relationship is to be added to
|
|
|
- * isRelatedTo, otherwise its added to relatesTo.
|
|
|
- * @throws IOException if any problem is encountered while reading result.
|
|
|
- */
|
|
|
- protected <T> void readRelationship(
|
|
|
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
|
|
|
- boolean isRelatedTo) throws IOException {
|
|
|
- // isRelatedTo and relatesTo are of type Map<String, Set<String>>
|
|
|
- Map<String, Object> columns = prefix.readResults(result);
|
|
|
- for (Map.Entry<String, Object> column : columns.entrySet()) {
|
|
|
- for (String id : Separator.VALUES.splitEncoded(
|
|
|
- column.getValue().toString())) {
|
|
|
- if (isRelatedTo) {
|
|
|
- entity.addIsRelatedToEntity(column.getKey(), id);
|
|
|
- } else {
|
|
|
- entity.addRelatesToEntity(column.getKey(), id);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Helper method for reading key-value pairs for either info or config.
|
|
|
- *
|
|
|
- * @param <T> Describes the type of column prefix.
|
|
|
- * @param entity entity to fill.
|
|
|
- * @param result result from HBase.
|
|
|
- * @param prefix column prefix.
|
|
|
- * @param isConfig if true, means we are reading configs, otherwise info.
|
|
|
- * @throws IOException if any problem is encountered while reading result.
|
|
|
- */
|
|
|
- protected <T> void readKeyValuePairs(
|
|
|
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
|
|
|
- boolean isConfig) throws IOException {
|
|
|
- // info and configuration are of type Map<String, Object or String>
|
|
|
- Map<String, Object> columns = prefix.readResults(result);
|
|
|
- if (isConfig) {
|
|
|
- for (Map.Entry<String, Object> column : columns.entrySet()) {
|
|
|
- entity.addConfig(column.getKey(), column.getValue().toString());
|
|
|
- }
|
|
|
- } else {
|
|
|
- entity.addInfo(columns);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read events from the entity table or the application table. The column name
|
|
|
- * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
|
|
|
- * if there is no info associated with the event.
|
|
|
- *
|
|
|
- * @param entity entity to fill.
|
|
|
- * @param result HBase Result.
|
|
|
- * @param isApplication if true, event read is for application table,
|
|
|
- * otherwise its being read for entity table.
|
|
|
- * @throws IOException if any problem is encountered while reading result.
|
|
|
- *
|
|
|
- * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
|
|
|
- * schema description.
|
|
|
- */
|
|
|
- protected void readEvents(TimelineEntity entity, Result result,
|
|
|
- boolean isApplication) throws IOException {
|
|
|
- Map<String, TimelineEvent> eventsMap = new HashMap<>();
|
|
|
- Map<?, Object> eventsResult = isApplication ?
|
|
|
- ApplicationColumnPrefix.EVENT.
|
|
|
- readResultsHavingCompoundColumnQualifiers(result) :
|
|
|
- EntityColumnPrefix.EVENT.
|
|
|
- readResultsHavingCompoundColumnQualifiers(result);
|
|
|
- for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
|
|
|
- byte[][] karr = (byte[][])eventResult.getKey();
|
|
|
- // the column name is of the form "eventId=timestamp=infoKey"
|
|
|
- if (karr.length == 3) {
|
|
|
- String id = Bytes.toString(karr[0]);
|
|
|
- long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
|
|
|
- String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
|
|
|
- TimelineEvent event = eventsMap.get(key);
|
|
|
- if (event == null) {
|
|
|
- event = new TimelineEvent();
|
|
|
- event.setId(id);
|
|
|
- event.setTimestamp(ts);
|
|
|
- eventsMap.put(key, event);
|
|
|
- }
|
|
|
- // handle empty info
|
|
|
- String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
|
|
|
- if (infoKey != null) {
|
|
|
- event.addInfo(infoKey, eventResult.getValue());
|
|
|
- }
|
|
|
- } else {
|
|
|
- LOG.warn("incorrectly formatted column name: it will be discarded");
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
|
|
|
- entity.addEvents(eventsSet);
|
|
|
- }
|
|
|
}
|