|
@@ -30,11 +30,16 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hbase.client.Connection;
|
|
|
import org.apache.hadoop.hbase.client.Result;
|
|
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
|
+import org.apache.hadoop.hbase.filter.BinaryComparator;
|
|
|
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
|
|
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
|
|
+import org.apache.hadoop.hbase.filter.FamilyFilter;
|
|
|
+import org.apache.hadoop.hbase.filter.Filter;
|
|
|
import org.apache.hadoop.hbase.filter.FilterList;
|
|
|
import org.apache.hadoop.hbase.filter.FilterList.Operator;
|
|
|
import org.apache.hadoop.hbase.filter.QualifierFilter;
|
|
|
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
|
|
+import org.apache.hadoop.hbase.util.Bytes;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
@@ -122,11 +127,12 @@ public abstract class TimelineEntityReader extends
|
|
|
* results fetched from HBase back-end storage. This is called only for
|
|
|
* multiple entity reads.
|
|
|
*
|
|
|
+ * @param cfsInFields column families in the fields
|
|
|
* @return a {@link FilterList} object.
|
|
|
* @throws IOException if any problem occurs while creating filter list.
|
|
|
*/
|
|
|
- protected abstract FilterList constructFilterListBasedOnFields()
|
|
|
- throws IOException;
|
|
|
+ protected abstract FilterList constructFilterListBasedOnFields(
|
|
|
+ Set<String> cfsInFields) throws IOException;
|
|
|
|
|
|
/**
|
|
|
* Creates a {@link FilterList} based on info, config and metric filters. This
|
|
@@ -151,7 +157,9 @@ public abstract class TimelineEntityReader extends
|
|
|
FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
|
|
|
boolean hasListBasedOnFilters = listBasedOnFilters != null &&
|
|
|
!listBasedOnFilters.getFilters().isEmpty();
|
|
|
- FilterList listBasedOnFields = constructFilterListBasedOnFields();
|
|
|
+ Set<String> cfsInListBasedOnFields = new HashSet<>(0);
|
|
|
+ FilterList listBasedOnFields =
|
|
|
+ constructFilterListBasedOnFields(cfsInListBasedOnFields);
|
|
|
boolean hasListBasedOnFields = listBasedOnFields != null &&
|
|
|
!listBasedOnFields.getFilters().isEmpty();
|
|
|
// If filter lists based on both filters and fields can be created,
|
|
@@ -164,6 +172,21 @@ public abstract class TimelineEntityReader extends
|
|
|
if (hasListBasedOnFilters && hasListBasedOnFields) {
|
|
|
FilterList list = new FilterList();
|
|
|
list.addFilter(listBasedOnFilters);
|
|
|
+
|
|
|
+ Set<String> cfsInListBasedOnFilters = new HashSet<>(0);
|
|
|
+ extractColumnFamiliesFromFiltersBasedOnFilters(
|
|
|
+ listBasedOnFilters, cfsInListBasedOnFilters);
|
|
|
+
|
|
|
+ // must exclude cfs that are already covered in fields-based filters
|
|
|
+ // otherwise we will return the whole cf
|
|
|
+ cfsInListBasedOnFilters.removeAll(cfsInListBasedOnFields);
|
|
|
+
|
|
|
+ if (!cfsInListBasedOnFilters.isEmpty()) {
|
|
|
+ for (String cf: cfsInListBasedOnFilters) {
|
|
|
+ listBasedOnFields.addFilter(new FamilyFilter(CompareOp.EQUAL,
|
|
|
+ new BinaryComparator(Bytes.toBytes(cf))));
|
|
|
+ }
|
|
|
+ }
|
|
|
list.addFilter(listBasedOnFields);
|
|
|
return list;
|
|
|
} else if (hasListBasedOnFilters) {
|
|
@@ -174,6 +197,21 @@ public abstract class TimelineEntityReader extends
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ private static void extractColumnFamiliesFromFiltersBasedOnFilters(
|
|
|
+ Filter hbaseFilterBasedOnTLSFilter, Set<String> columnFamilies) {
|
|
|
+ if (hbaseFilterBasedOnTLSFilter instanceof SingleColumnValueFilter) {
|
|
|
+ byte[] cf = ((SingleColumnValueFilter)
|
|
|
+ hbaseFilterBasedOnTLSFilter).getFamily();
|
|
|
+ columnFamilies.add(Bytes.toString(cf));
|
|
|
+ } else if (hbaseFilterBasedOnTLSFilter instanceof FilterList) {
|
|
|
+ FilterList filterListBase = (FilterList) hbaseFilterBasedOnTLSFilter;
|
|
|
+ for (Filter fs: filterListBase.getFilters()) {
|
|
|
+ extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
protected TimelineDataToRetrieve getDataToRetrieve() {
|
|
|
return dataToRetrieve;
|
|
|
}
|
|
@@ -206,7 +244,7 @@ public abstract class TimelineEntityReader extends
|
|
|
validateParams();
|
|
|
augmentParams(hbaseConf, conn);
|
|
|
|
|
|
- FilterList filterList = constructFilterListBasedOnFields();
|
|
|
+ FilterList filterList = constructFilterListBasedOnFields(new HashSet<>(0));
|
|
|
if (LOG.isDebugEnabled() && filterList != null) {
|
|
|
LOG.debug("FilterList created for get is - " + filterList);
|
|
|
}
|