|
@@ -1026,6 +1026,78 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("nls")
|
|
|
+ private OperatorInfo genGroupByPlanGroupByOpForward(
|
|
|
+ QBParseInfo parseInfo, String dest, OperatorInfo forwardOpInfo,
|
|
|
+ groupByDesc.Mode mode)
|
|
|
+ throws SemanticException {
|
|
|
+ RowResolver inputRS = forwardOpInfo.getRowResolver();
|
|
|
+ RowResolver outputRS = new RowResolver();
|
|
|
+ outputRS.setIsExprResolver(true);
|
|
|
+ ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
|
|
|
+ ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
|
|
|
+ List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
|
|
|
+ for (int i = 0; i < grpByExprs.size(); i++) {
|
|
|
+ CommonTree grpbyExpr = grpByExprs.get(i);
|
|
|
+ String text = grpbyExpr.toStringTree();
|
|
|
+ ColumnInfo exprInfo = inputRS.get("",text);
|
|
|
+
|
|
|
+ if (exprInfo == null) {
|
|
|
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(grpbyExpr));
|
|
|
+ }
|
|
|
+
|
|
|
+ groupByKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
|
|
|
+ String field = (Integer.valueOf(i)).toString();
|
|
|
+ outputRS.put("", text,
|
|
|
+ new ColumnInfo(field, exprInfo.getType(), exprInfo.getIsVirtual()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // For each aggregation
|
|
|
+ HashMap<String, CommonTree> aggregationTrees = parseInfo
|
|
|
+ .getAggregationExprsForClause(dest);
|
|
|
+ assert (aggregationTrees != null);
|
|
|
+ for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
|
|
|
+ CommonTree value = entry.getValue();
|
|
|
+ String aggName = value.getChild(0).getText();
|
|
|
+ Class<? extends UDAF> aggClass = UDAFRegistry.getUDAF(aggName);
|
|
|
+ assert (aggClass != null);
|
|
|
+ ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
|
|
|
+ ArrayList<Class<?>> aggClasses = new ArrayList<Class<?>>();
|
|
|
+ // 0 is the function name
|
|
|
+ for (int i = 1; i < value.getChildCount(); i++) {
|
|
|
+ String text = value.getChild(i).toStringTree();
|
|
|
+ CommonTree paraExpr = (CommonTree)value.getChild(i);
|
|
|
+ ColumnInfo paraExprInfo = inputRS.get("", text);
|
|
|
+ if (paraExprInfo == null) {
|
|
|
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(paraExpr));
|
|
|
+ }
|
|
|
+
|
|
|
+ String paraExpression = paraExprInfo.getInternalName();
|
|
|
+ assert(paraExpression != null);
|
|
|
+ aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExprInfo.getInternalName()));
|
|
|
+ aggClasses.add(paraExprInfo.getType().getPrimitiveClass());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (null == UDAFRegistry.getUDAFMethod(aggName, aggClasses)) {
|
|
|
+ String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
|
|
|
+ throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)value.getChild(0), reason));
|
|
|
+ }
|
|
|
+
|
|
|
+ aggregations.add(new aggregationDesc(aggClass, aggParameters,
|
|
|
+ value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
|
|
|
+ outputRS.put("",value.toStringTree(),
|
|
|
+ new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
|
|
|
+ String.class, false)); // Everything is a string right now
|
|
|
+ }
|
|
|
+
|
|
|
+ return new OperatorInfo(
|
|
|
+ OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
|
|
|
+ new RowSchema(outputRS.getColumnInfos()),
|
|
|
+ forwardOpInfo.getOp()),
|
|
|
+ outputRS
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("nls")
|
|
|
private OperatorInfo genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
|
|
|
String dest, OperatorInfo inputOperatorInfo, int numPartitionFields)
|
|
@@ -1094,6 +1166,106 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("nls")
|
|
|
+ private OperatorInfo genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
|
|
|
+ OperatorInfo input, CommonTree distinctText, TreeSet<String> ks)
|
|
|
+ throws SemanticException {
|
|
|
+ RowResolver inputRS = input.getRowResolver();
|
|
|
+ RowResolver outputRS = new RowResolver();
|
|
|
+ outputRS.setIsExprResolver(true);
|
|
|
+ ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
|
|
|
+
|
|
|
+ // Spray on distinctText first
|
|
|
+ if (distinctText != null)
|
|
|
+ {
|
|
|
+ reduceKeys.add(genExprNodeDesc(distinctText, parseInfo.getAlias(), inputRS));
|
|
|
+ String text = distinctText.toStringTree();
|
|
|
+ assert (outputRS.get("", text) == null);
|
|
|
+ outputRS.put("", text,
|
|
|
+ new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
|
|
|
+ String.class, false));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ reduceKeys.add(new exprNodeNullDesc());
|
|
|
+
|
|
|
+ // copy the input row resolver
|
|
|
+ ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
|
|
|
+ Iterator<String> keysIter = inputRS.getTableNames().iterator();
|
|
|
+ while (keysIter.hasNext())
|
|
|
+ {
|
|
|
+ String key = keysIter.next();
|
|
|
+ HashMap<String, ColumnInfo> map = inputRS.getFieldMap(key);
|
|
|
+ Iterator<String> fNamesIter = map.keySet().iterator();
|
|
|
+ while (fNamesIter.hasNext())
|
|
|
+ {
|
|
|
+ String field = fNamesIter.next();
|
|
|
+ ColumnInfo valueInfo = inputRS.get(key, field);
|
|
|
+
|
|
|
+ if (outputRS.get(key, field) == null)
|
|
|
+ {
|
|
|
+ reduceValues.add(new exprNodeColumnDesc(valueInfo.getType(), valueInfo.getInternalName()));
|
|
|
+ outputRS.put(key, field, new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(), valueInfo.getType(),
|
|
|
+ valueInfo.getIsVirtual()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (String dest : ks) {
|
|
|
+ List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
|
|
|
+
|
|
|
+ // send all the group by expressions
|
|
|
+ for (int i = 0; i < grpByExprs.size(); ++i) {
|
|
|
+ CommonTree grpbyExpr = grpByExprs.get(i);
|
|
|
+ String text = grpbyExpr.toStringTree();
|
|
|
+ if (outputRS.get("", text) == null) {
|
|
|
+ exprNodeDesc grpbyExprNode = genExprNodeDesc(grpbyExpr, parseInfo.getAlias(), inputRS);
|
|
|
+ reduceValues.add(grpbyExprNode);
|
|
|
+ outputRS.put("", text,
|
|
|
+ new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
|
|
|
+ grpbyExprNode.getTypeInfo(), false));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // send all the aggregation expressions
|
|
|
+ HashMap<String, CommonTree> aggregationTrees = parseInfo.getAggregationExprsForClause(dest);
|
|
|
+ for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
|
|
|
+ CommonTree value = entry.getValue();
|
|
|
+ // 0 is function name
|
|
|
+ for (int i = 1; i < value.getChildCount(); i++) {
|
|
|
+ CommonTree parameter = (CommonTree) value.getChild(i);
|
|
|
+ String text = parameter.toStringTree();
|
|
|
+ if (outputRS.get("",text) == null) {
|
|
|
+ exprNodeDesc pNode = genExprNodeDesc(parameter, parseInfo.getAlias(), inputRS);
|
|
|
+ reduceValues.add(pNode);
|
|
|
+ outputRS.put("", text,
|
|
|
+ new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
|
|
|
+ pNode.getTypeInfo(), false));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return new OperatorInfo(
|
|
|
+ OperatorFactory.getAndMakeChild(new reduceSinkDesc(reduceKeys, reduceValues, distinctText == null ? -1 : 1),
|
|
|
+ new RowSchema(outputRS.getColumnInfos()), input.getOp()),
|
|
|
+ outputRS);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("nls")
|
|
|
+ private OperatorInfo genGroupByPlanForwardOperator(QBParseInfo parseInfo, OperatorInfo input)
|
|
|
+ throws SemanticException {
|
|
|
+ RowResolver outputRS = input.getRowResolver();;
|
|
|
+
|
|
|
+ Operator<? extends Serializable> forward = OperatorFactory.get(forwardDesc.class,
|
|
|
+ new RowSchema(outputRS.getColumnInfos()));
|
|
|
+ // set forward operator as child of each of input
|
|
|
+ List<Operator<? extends Serializable>> child = new ArrayList<Operator<? extends Serializable>>();
|
|
|
+ child.add(forward);
|
|
|
+ input.getOp().setChildOperators(child);
|
|
|
+
|
|
|
+ return new OperatorInfo(forward, outputRS);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("nls")
|
|
|
private OperatorInfo genGroupByPlanReduceSinkOperator2MR(
|
|
|
QBParseInfo parseInfo, String dest, OperatorInfo groupByOperatorInfo,
|
|
@@ -1298,6 +1470,46 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
|
|
|
return output;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Generate a Group-By plan using a 2 map-reduce jobs. The first map-reduce
|
|
|
+ * job has already been constructed. Evaluate partial aggregates first,
|
|
|
+ * followed by actual aggregates. The first map-reduce stage will be
|
|
|
+ * shared by all groupbys.
|
|
|
+ */
|
|
|
+ @SuppressWarnings("nls")
|
|
|
+ private OperatorInfoList genGroupByPlan3MR(String dest, QB qb,
|
|
|
+ OperatorInfoList inputList) throws SemanticException {
|
|
|
+
|
|
|
+ // We can assert here that the input list is of size one
|
|
|
+ if (inputList.size() != 1) {
|
|
|
+ throw new SemanticException("Select has more than one inputs");
|
|
|
+ }
|
|
|
+ OperatorInfo inputOperatorInfo = inputList.get(0);
|
|
|
+ QBParseInfo parseInfo = qb.getParseInfo();
|
|
|
+
|
|
|
+ // ////// Generate GroupbyOperator
|
|
|
+ OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOpForward(parseInfo,
|
|
|
+ dest, inputOperatorInfo, groupByDesc.Mode.PARTIAL1);
|
|
|
+
|
|
|
+ // ////// Generate ReduceSinkOperator2
|
|
|
+ OperatorInfo reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
|
|
|
+ parseInfo, dest, groupByOperatorInfo,
|
|
|
+ getGroupByForClause(parseInfo, dest).size());
|
|
|
+
|
|
|
+ // ////// Generate GroupbyOperator2
|
|
|
+ OperatorInfo groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(
|
|
|
+ parseInfo, dest, reduceSinkOperatorInfo2);
|
|
|
+
|
|
|
+ // ////// Generate SelectOperator
|
|
|
+ OperatorInfo selectOperatorInfo = genGroupByPlanSelectOperator(parseInfo,
|
|
|
+ dest, groupByOperatorInfo2);
|
|
|
+
|
|
|
+ // ////// Create output
|
|
|
+ OperatorInfoList output = new OperatorInfoList();
|
|
|
+ output.add(selectOperatorInfo);
|
|
|
+ return output;
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("nls")
|
|
|
private OperatorInfoList genConversionOps(String dest, QB qb,
|
|
|
OperatorInfoList input) throws SemanticException {
|
|
@@ -1870,26 +2082,66 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
|
|
|
TreeSet<String> ks = new TreeSet<String>();
|
|
|
ks.addAll(qbp.getClauseNames());
|
|
|
|
|
|
- // Go over all the destination tables
|
|
|
+ String distinctText = null;
|
|
|
+ CommonTree distn = null;
|
|
|
+ OperatorInfoList opList = null;
|
|
|
+ boolean grpBy = false;
|
|
|
+
|
|
|
+ // In case of a multiple group bys, all of them should have the same distinct key
|
|
|
for (String dest : ks) {
|
|
|
+ // is it a group by
|
|
|
+ if ((qbp.getAggregationExprsForClause(dest).size() != 0)
|
|
|
+ || (getGroupByForClause(qbp, dest).size() > 0)) {
|
|
|
+ grpBy = true;
|
|
|
+
|
|
|
+ // If there is a distinctFuncExp, add all parameters to the reduceKeys.
|
|
|
+ if (qbp.getDistinctFuncExprForClause(dest) != null) {
|
|
|
+ CommonTree value = qbp.getDistinctFuncExprForClause(dest);
|
|
|
+ if (value.getChildCount() != 2)
|
|
|
+ throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.getMsg(value));
|
|
|
+ distn = (CommonTree)value.getChild(1);
|
|
|
+ String dist = distn.toStringTree();;
|
|
|
+ if (distinctText == null)
|
|
|
+ distinctText = dist;
|
|
|
+ if (!distinctText.equals(dist))
|
|
|
+ throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.getMsg(value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // In the first stage, copy the input and all the group by expressions
|
|
|
+ // and aggregate paramaters. This can be optimized in the future to only
|
|
|
+ // evaluate expressions that occur frequently
|
|
|
+ if (grpBy) {
|
|
|
+ OperatorInfo reduceSinkOperatorInfo =
|
|
|
+ genGroupByPlanReduceSinkOperator(qbp, input.get(0), distn, ks);
|
|
|
|
|
|
- OperatorInfoList curr = input;
|
|
|
+ // ////// 2. Generate GroupbyOperator
|
|
|
+ OperatorInfo forwardOperatorInfo = genGroupByPlanForwardOperator(qbp, reduceSinkOperatorInfo);
|
|
|
+ opList = new OperatorInfoList();
|
|
|
+ opList.add(forwardOperatorInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Go over all the destination tables
|
|
|
+ for (String dest : ks) {
|
|
|
+ boolean groupByExpr = false;
|
|
|
+ if (qbp.getAggregationExprsForClause(dest).size() != 0
|
|
|
+ || getGroupByForClause(qbp, dest).size() > 0)
|
|
|
+ groupByExpr = true;
|
|
|
+
|
|
|
+ OperatorInfoList curr = input;
|
|
|
+ if (groupByExpr)
|
|
|
+ curr = opList;
|
|
|
|
|
|
if (qbp.getWhrForClause(dest) != null) {
|
|
|
curr = genFilterPlan(dest, qb, curr);
|
|
|
}
|
|
|
|
|
|
if (qbp.getAggregationExprsForClause(dest).size() != 0
|
|
|
- || getGroupByForClause(qbp, dest).size() > 0) {
|
|
|
- // We always do 2MR group-by for now.
|
|
|
- // In the future, we will do 1MR group-by for
|
|
|
- // 1. Queries without DISTINCT (we will add combiner to 1MR group-by);
|
|
|
- // 2. Queries with UDAFs that does not support partial aggregations.
|
|
|
- curr = genGroupByPlan2MR(dest, qb, curr);
|
|
|
- // curr = genGroupByPlan1MR(dest, qb, curr);
|
|
|
- } else {
|
|
|
+ || getGroupByForClause(qbp, dest).size() > 0)
|
|
|
+ curr = genGroupByPlan3MR(dest, qb, curr);
|
|
|
+ else
|
|
|
curr = genSelectPlan(dest, qb, curr);
|
|
|
- }
|
|
|
|
|
|
if (qbp.getClusterByForClause(dest) != null) {
|
|
|
curr = genReduceSinkPlan(dest, qb, curr);
|