|
@@ -26,6 +26,9 @@ import java.util.Map;
|
|
|
import java.util.Stack;
|
|
|
import java.util.Vector;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.hive.conf.HiveConf;
|
|
|
import org.apache.hadoop.hive.ql.metadata.HiveException;
|
|
|
import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
|
|
|
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
|
|
@@ -42,7 +45,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
|
|
|
*/
|
|
|
public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
|
|
|
- // a list of value expressions for each alias are maintained
|
|
|
+ static final private Log LOG = LogFactory.getLog(JoinOperator.class.getName());
|
|
|
+
|
|
|
+ // a list of value expressions for each alias are maintained
|
|
|
public static class JoinExprMap {
|
|
|
ExprNodeEvaluator[] valueFields;
|
|
|
|
|
@@ -56,62 +61,79 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
|
|
|
}
|
|
|
|
|
|
- public static class IntermediateObject{
|
|
|
+ public static class IntermediateObject {
|
|
|
ArrayList<Object>[] objs;
|
|
|
int curSize;
|
|
|
|
|
|
public IntermediateObject(ArrayList<Object>[] objs, int curSize) {
|
|
|
- this.objs = objs;
|
|
|
+ this.objs = objs;
|
|
|
this.curSize = curSize;
|
|
|
}
|
|
|
|
|
|
- public ArrayList<Object>[] getObjs() { return objs; }
|
|
|
- public int getCurSize() { return curSize; }
|
|
|
- public void pushObj(ArrayList<Object> obj) { objs[curSize++] = obj; }
|
|
|
- public void popObj() { curSize--; }
|
|
|
+ public ArrayList<Object>[] getObjs() {
|
|
|
+ return objs;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getCurSize() {
|
|
|
+ return curSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void pushObj(ArrayList<Object> obj) {
|
|
|
+ objs[curSize++] = obj;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void popObj() {
|
|
|
+ curSize--;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
transient protected int numValues; // number of aliases
|
|
|
transient static protected ExprNodeEvaluator aliasField;
|
|
|
+ transient static protected ExprNodeEvaluator keyField;
|
|
|
transient protected HashMap<Byte, JoinExprMap> joinExprs;
|
|
|
- transient static protected Byte[] order; // order in which the results should be outputted
|
|
|
+ transient static protected Byte[] order; // order in which the results should
|
|
|
+ // be outputted
|
|
|
transient protected joinCond[] condn;
|
|
|
transient protected boolean noOuterJoin;
|
|
|
- transient private Object[] dummyObj; // for outer joins, contains the potential nulls for the concerned aliases
|
|
|
+ transient private Object[] dummyObj; // for outer joins, contains the
|
|
|
+ // potential nulls for the concerned
|
|
|
+ // aliases
|
|
|
transient private Vector<ArrayList<Object>>[] dummyObjVectors;
|
|
|
transient private Stack<Iterator<ArrayList<Object>>> iterators;
|
|
|
transient private int totalSz; // total size of the composite object
|
|
|
transient ObjectInspector joinOutputObjectInspector;
|
|
|
-
|
|
|
- static
|
|
|
- {
|
|
|
- aliasField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(String.class, Utilities.ReduceField.ALIAS.toString()));
|
|
|
+
|
|
|
+ static {
|
|
|
+ aliasField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(
|
|
|
+ String.class, Utilities.ReduceField.ALIAS.toString()));
|
|
|
+ keyField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(
|
|
|
+ String.class, Utilities.ReduceField.KEY.toString()));
|
|
|
}
|
|
|
-
|
|
|
- HashMap<Byte, Vector<ArrayList<Object>>> storage;
|
|
|
|
|
|
+ HashMap<Byte, Vector<ArrayList<Object>>> storage;
|
|
|
+ int joinEmitInterval = -1;
|
|
|
+
|
|
|
public void initialize(Configuration hconf) throws HiveException {
|
|
|
super.initialize(hconf);
|
|
|
totalSz = 0;
|
|
|
// Map that contains the rows for each alias
|
|
|
storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
|
|
|
-
|
|
|
+
|
|
|
numValues = conf.getExprs().size();
|
|
|
joinExprs = new HashMap<Byte, JoinExprMap>();
|
|
|
- if (order == null)
|
|
|
- {
|
|
|
+ if (order == null) {
|
|
|
order = new Byte[numValues];
|
|
|
for (int i = 0; i < numValues; i++)
|
|
|
- order[i] = (byte)i;
|
|
|
+ order[i] = (byte) i;
|
|
|
}
|
|
|
condn = conf.getConds();
|
|
|
noOuterJoin = conf.getNoOuterJoin();
|
|
|
Map<Byte, ArrayList<exprNodeDesc>> map = conf.getExprs();
|
|
|
Iterator entryIter = map.entrySet().iterator();
|
|
|
while (entryIter.hasNext()) {
|
|
|
- Map.Entry e = (Map.Entry)entryIter.next();
|
|
|
- Byte key = (Byte)e.getKey();
|
|
|
- ArrayList<exprNodeDesc> expr = (ArrayList<exprNodeDesc>)e.getValue();
|
|
|
+ Map.Entry e = (Map.Entry) entryIter.next();
|
|
|
+ Byte key = (Byte) e.getKey();
|
|
|
+ ArrayList<exprNodeDesc> expr = (ArrayList<exprNodeDesc>) e.getValue();
|
|
|
int sz = expr.size();
|
|
|
totalSz += sz;
|
|
|
|
|
@@ -123,12 +145,15 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
joinExprs.put(key, new JoinExprMap(valueFields));
|
|
|
}
|
|
|
|
|
|
- ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
|
|
|
- for(int i=0; i<totalSz; i++) {
|
|
|
- structFieldObjectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
|
|
|
+ ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(
|
|
|
+ totalSz);
|
|
|
+ for (int i = 0; i < totalSz; i++) {
|
|
|
+ structFieldObjectInspectors.add(ObjectInspectorFactory
|
|
|
+ .getStandardPrimitiveObjectInspector(String.class));
|
|
|
}
|
|
|
- joinOutputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
|
|
|
- ObjectInspectorUtils.getIntegerArray(totalSz), structFieldObjectInspectors);
|
|
|
+ joinOutputObjectInspector = ObjectInspectorFactory
|
|
|
+ .getStandardStructObjectInspector(ObjectInspectorUtils
|
|
|
+ .getIntegerArray(totalSz), structFieldObjectInspectors);
|
|
|
|
|
|
dummyObj = new Object[numValues];
|
|
|
dummyObjVectors = new Vector[numValues];
|
|
@@ -149,6 +174,8 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
}
|
|
|
|
|
|
iterators = new Stack<Iterator<ArrayList<Object>>>();
|
|
|
+
|
|
|
+ joinEmitInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME);
|
|
|
}
|
|
|
|
|
|
public void startGroup() throws HiveException {
|
|
@@ -159,7 +186,9 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
}
|
|
|
|
|
|
InspectableObject tempAliasInspectableObject = new InspectableObject();
|
|
|
- public void process(Object row, ObjectInspector rowInspector) throws HiveException {
|
|
|
+
|
|
|
+ public void process(Object row, ObjectInspector rowInspector)
|
|
|
+ throws HiveException {
|
|
|
try {
|
|
|
// get alias
|
|
|
aliasField.evaluate(row, rowInspector, tempAliasInspectableObject);
|
|
@@ -176,15 +205,40 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
nr.add(tempAliasInspectableObject.o);
|
|
|
}
|
|
|
|
|
|
+ // Are we consuming too much memory
|
|
|
+ if (storage.get(alias).size() == joinEmitInterval) {
|
|
|
+ if (alias == numValues - 1) {
|
|
|
+ // The input is sorted by alias, so if we are already in the last join
|
|
|
+ // operand,
|
|
|
+ // we can emit some results now.
|
|
|
+ // Note this has to be done before adding the current row to the
|
|
|
+ // storage,
|
|
|
+ // to preserve the correctness for outer joins.
|
|
|
+ checkAndGenObject();
|
|
|
+ storage.get(alias).clear();
|
|
|
+ } else {
|
|
|
+ // Output a warning if we reached at least 1000 rows for a join
|
|
|
+ // operand
|
|
|
+ // We won't output a warning for the last join operand since the size
|
|
|
+ // will never goes to joinEmitInterval.
|
|
|
+ InspectableObject io = new InspectableObject();
|
|
|
+ keyField.evaluate(row, rowInspector, io);
|
|
|
+ LOG.warn("table " + alias
|
|
|
+ + " has more than joinEmitInterval rows for join key " + io.o);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Add the value to the vector
|
|
|
storage.get(alias).add(nr);
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
throw new HiveException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void createForwardJoinObject(IntermediateObject intObj, boolean[] nullsArr) throws HiveException {
|
|
|
+ private void createForwardJoinObject(IntermediateObject intObj,
|
|
|
+ boolean[] nullsArr) throws HiveException {
|
|
|
ArrayList<Object> nr = new ArrayList<Object>(totalSz);
|
|
|
for (int i = 0; i < numValues; i++) {
|
|
|
Byte alias = order[i];
|
|
@@ -204,15 +258,17 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
}
|
|
|
|
|
|
private void copyOldArray(boolean[] src, boolean[] dest) {
|
|
|
- for (int i = 0; i < src.length; i++) dest[i] = src[i];
|
|
|
+ for (int i = 0; i < src.length; i++)
|
|
|
+ dest[i] = src[i];
|
|
|
}
|
|
|
|
|
|
- private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
|
|
|
- {
|
|
|
- if (newObjNull) return resNulls;
|
|
|
+ private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls,
|
|
|
+ Vector<boolean[]> inputNulls, ArrayList<Object> newObj,
|
|
|
+ IntermediateObject intObj, int left, boolean newObjNull) {
|
|
|
+ if (newObjNull)
|
|
|
+ return resNulls;
|
|
|
Iterator<boolean[]> nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
boolean oldObjNull = oldNulls[left];
|
|
|
if (!oldObjNull) {
|
|
@@ -224,12 +280,13 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
}
|
|
|
return resNulls;
|
|
|
}
|
|
|
-
|
|
|
- private Vector<boolean[]> joinObjectsLeftOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
|
|
|
- {
|
|
|
+
|
|
|
+ private Vector<boolean[]> joinObjectsLeftOuterJoin(
|
|
|
+ Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
|
|
|
+ ArrayList<Object> newObj, IntermediateObject intObj, int left,
|
|
|
+ boolean newObjNull) {
|
|
|
Iterator<boolean[]> nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
boolean oldObjNull = oldNulls[left];
|
|
|
boolean[] newNulls = new boolean[intObj.getCurSize()];
|
|
@@ -243,25 +300,25 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
return resNulls;
|
|
|
}
|
|
|
|
|
|
- private Vector<boolean[]> joinObjectsRightOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
|
|
|
- {
|
|
|
- if (newObjNull) return resNulls;
|
|
|
+ private Vector<boolean[]> joinObjectsRightOuterJoin(
|
|
|
+ Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
|
|
|
+ ArrayList<Object> newObj, IntermediateObject intObj, int left,
|
|
|
+ boolean newObjNull) {
|
|
|
+ if (newObjNull)
|
|
|
+ return resNulls;
|
|
|
boolean allOldObjsNull = true;
|
|
|
|
|
|
Iterator<boolean[]> nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
- if (!oldNulls[left])
|
|
|
- {
|
|
|
+ if (!oldNulls[left]) {
|
|
|
allOldObjsNull = false;
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
boolean oldObjNull = oldNulls[left];
|
|
|
|
|
@@ -270,8 +327,7 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
copyOldArray(oldNulls, newNulls);
|
|
|
newNulls[oldNulls.length] = newObjNull;
|
|
|
resNulls.add(newNulls);
|
|
|
- }
|
|
|
- else if (allOldObjsNull) {
|
|
|
+ } else if (allOldObjsNull) {
|
|
|
boolean[] newNulls = new boolean[intObj.getCurSize()];
|
|
|
for (int i = 0; i < intObj.getCurSize() - 1; i++)
|
|
|
newNulls[i] = true;
|
|
@@ -282,12 +338,13 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
return resNulls;
|
|
|
}
|
|
|
|
|
|
- private Vector<boolean[]> joinObjectsFullOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
|
|
|
- {
|
|
|
+ private Vector<boolean[]> joinObjectsFullOuterJoin(
|
|
|
+ Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
|
|
|
+ ArrayList<Object> newObj, IntermediateObject intObj, int left,
|
|
|
+ boolean newObjNull) {
|
|
|
if (newObjNull) {
|
|
|
Iterator<boolean[]> nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
boolean[] newNulls = new boolean[intObj.getCurSize()];
|
|
|
copyOldArray(oldNulls, newNulls);
|
|
@@ -296,15 +353,13 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
}
|
|
|
return resNulls;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
boolean allOldObjsNull = true;
|
|
|
|
|
|
Iterator<boolean[]> nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
- if (!oldNulls[left])
|
|
|
- {
|
|
|
+ if (!oldNulls[left]) {
|
|
|
allOldObjsNull = false;
|
|
|
break;
|
|
|
}
|
|
@@ -312,24 +367,21 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
boolean rhsPreserved = false;
|
|
|
|
|
|
nullsIter = inputNulls.iterator();
|
|
|
- while (nullsIter.hasNext())
|
|
|
- {
|
|
|
+ while (nullsIter.hasNext()) {
|
|
|
boolean[] oldNulls = nullsIter.next();
|
|
|
boolean oldObjNull = oldNulls[left];
|
|
|
|
|
|
- if (!oldObjNull)
|
|
|
- {
|
|
|
+ if (!oldObjNull) {
|
|
|
boolean[] newNulls = new boolean[intObj.getCurSize()];
|
|
|
copyOldArray(oldNulls, newNulls);
|
|
|
newNulls[oldNulls.length] = newObjNull;
|
|
|
resNulls.add(newNulls);
|
|
|
- }
|
|
|
- else if (oldObjNull) {
|
|
|
+ } else if (oldObjNull) {
|
|
|
boolean[] newNulls = new boolean[intObj.getCurSize()];
|
|
|
copyOldArray(oldNulls, newNulls);
|
|
|
newNulls[oldNulls.length] = true;
|
|
|
resNulls.add(newNulls);
|
|
|
-
|
|
|
+
|
|
|
if (allOldObjsNull && !rhsPreserved) {
|
|
|
newNulls = new boolean[intObj.getCurSize()];
|
|
|
for (int i = 0; i < oldNulls.length; i++)
|
|
@@ -344,35 +396,35 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * The new input is added to the list of existing inputs. Each entry in the
|
|
|
- * array of inputNulls denotes the entries in the intermediate object to
|
|
|
- * be used. The intermediate object is augmented with the new object, and
|
|
|
- * list of nulls is changed appropriately. The list will contain all non-nulls
|
|
|
- * for a inner join. The outer joins are processed appropriately.
|
|
|
+ * The new input is added to the list of existing inputs. Each entry in the
|
|
|
+ * array of inputNulls denotes the entries in the intermediate object to be
|
|
|
+ * used. The intermediate object is augmented with the new object, and list of
|
|
|
+ * nulls is changed appropriately. The list will contain all non-nulls for a
|
|
|
+ * inner join. The outer joins are processed appropriately.
|
|
|
*/
|
|
|
- private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int joinPos)
|
|
|
- {
|
|
|
+ private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls,
|
|
|
+ ArrayList<Object> newObj, IntermediateObject intObj, int joinPos) {
|
|
|
Vector<boolean[]> resNulls = new Vector<boolean[]>();
|
|
|
boolean newObjNull = newObj == dummyObj[joinPos] ? true : false;
|
|
|
- if (joinPos == 0)
|
|
|
- {
|
|
|
- if (newObjNull) return null;
|
|
|
+ if (joinPos == 0) {
|
|
|
+ if (newObjNull)
|
|
|
+ return null;
|
|
|
boolean[] nulls = new boolean[1];
|
|
|
nulls[0] = newObjNull;
|
|
|
resNulls.add(nulls);
|
|
|
return resNulls;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
int left = condn[joinPos - 1].getLeft();
|
|
|
int type = condn[joinPos - 1].getType();
|
|
|
-
|
|
|
+
|
|
|
// process all nulls for RIGHT and FULL OUTER JOINS
|
|
|
- if (((type == joinDesc.RIGHT_OUTER_JOIN) || (type == joinDesc.FULL_OUTER_JOIN))
|
|
|
- && !newObjNull && (inputNulls == null)) {
|
|
|
+ if (((type == joinDesc.RIGHT_OUTER_JOIN) || (type == joinDesc.FULL_OUTER_JOIN))
|
|
|
+ && !newObjNull && (inputNulls == null)) {
|
|
|
boolean[] newNulls = new boolean[intObj.getCurSize()];
|
|
|
for (int i = 0; i < newNulls.length - 1; i++)
|
|
|
newNulls[i] = true;
|
|
|
- newNulls[newNulls.length-1] = false;
|
|
|
+ newNulls[newNulls.length - 1] = false;
|
|
|
resNulls.add(newNulls);
|
|
|
return resNulls;
|
|
|
}
|
|
@@ -380,41 +432,45 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
if (inputNulls == null)
|
|
|
return null;
|
|
|
|
|
|
- if (type == joinDesc.INNER_JOIN)
|
|
|
- return joinObjectsInnerJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
|
|
|
- else if (type == joinDesc.LEFT_OUTER_JOIN)
|
|
|
- return joinObjectsLeftOuterJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
|
|
|
- else if (type == joinDesc.RIGHT_OUTER_JOIN)
|
|
|
- return joinObjectsRightOuterJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
|
|
|
+ if (type == joinDesc.INNER_JOIN)
|
|
|
+ return joinObjectsInnerJoin(resNulls, inputNulls, newObj, intObj, left,
|
|
|
+ newObjNull);
|
|
|
+ else if (type == joinDesc.LEFT_OUTER_JOIN)
|
|
|
+ return joinObjectsLeftOuterJoin(resNulls, inputNulls, newObj, intObj,
|
|
|
+ left, newObjNull);
|
|
|
+ else if (type == joinDesc.RIGHT_OUTER_JOIN)
|
|
|
+ return joinObjectsRightOuterJoin(resNulls, inputNulls, newObj, intObj,
|
|
|
+ left, newObjNull);
|
|
|
assert (type == joinDesc.FULL_OUTER_JOIN);
|
|
|
- return joinObjectsFullOuterJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
|
|
|
+ return joinObjectsFullOuterJoin(resNulls, inputNulls, newObj, intObj, left,
|
|
|
+ newObjNull);
|
|
|
}
|
|
|
-
|
|
|
- /*
|
|
|
- * genObject is a recursive function. For the inputs, a array of
|
|
|
- * bitvectors is maintained (inputNulls) where each entry denotes whether
|
|
|
- * the element is to be used or not (whether it is null or not). The size of
|
|
|
- * the bitvector is same as the number of inputs under consideration
|
|
|
- * currently. When all inputs are accounted for, the output is forwared
|
|
|
- * appropriately.
|
|
|
+
|
|
|
+ /*
|
|
|
+ * genObject is a recursive function. For the inputs, a array of bitvectors is
|
|
|
+ * maintained (inputNulls) where each entry denotes whether the element is to
|
|
|
+ * be used or not (whether it is null or not). The size of the bitvector is
|
|
|
+ * same as the number of inputs under consideration currently. When all inputs
|
|
|
+ * are accounted for, the output is forwared appropriately.
|
|
|
*/
|
|
|
- private void genObject(Vector<boolean[]> inputNulls, int aliasNum, IntermediateObject intObj)
|
|
|
- throws HiveException {
|
|
|
+ private void genObject(Vector<boolean[]> inputNulls, int aliasNum,
|
|
|
+ IntermediateObject intObj) throws HiveException {
|
|
|
if (aliasNum < numValues) {
|
|
|
Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
|
|
|
- .iterator();
|
|
|
+ .iterator();
|
|
|
iterators.push(aliasRes);
|
|
|
while (aliasRes.hasNext()) {
|
|
|
ArrayList<Object> newObj = aliasRes.next();
|
|
|
intObj.pushObj(newObj);
|
|
|
- Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj, aliasNum);
|
|
|
+ Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj,
|
|
|
+ aliasNum);
|
|
|
genObject(newNulls, aliasNum + 1, intObj);
|
|
|
intObj.popObj();
|
|
|
}
|
|
|
iterators.pop();
|
|
|
- }
|
|
|
- else {
|
|
|
- if (inputNulls == null) return;
|
|
|
+ } else {
|
|
|
+ if (inputNulls == null)
|
|
|
+ return;
|
|
|
Iterator<boolean[]> nullsIter = inputNulls.iterator();
|
|
|
while (nullsIter.hasNext()) {
|
|
|
boolean[] nullsVec = nullsIter.next();
|
|
@@ -429,29 +485,27 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
* @throws HiveException
|
|
|
*/
|
|
|
public void endGroup() throws HiveException {
|
|
|
- try {
|
|
|
- LOG.trace("Join Op: endGroup called: numValues=" + numValues);
|
|
|
-
|
|
|
- // does any result need to be emitted
|
|
|
- for (int i = 0; i < numValues; i++) {
|
|
|
- Byte alias = order[i];
|
|
|
- if (storage.get(alias).iterator().hasNext() == false) {
|
|
|
- if (noOuterJoin) {
|
|
|
- LOG.trace("No data for alias=" + i);
|
|
|
- return;
|
|
|
- } else {
|
|
|
- storage.put(alias, dummyObjVectors[i]);
|
|
|
- }
|
|
|
+ LOG.trace("Join Op: endGroup called: numValues=" + numValues);
|
|
|
+ checkAndGenObject();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkAndGenObject() throws HiveException {
|
|
|
+ // does any result need to be emitted
|
|
|
+ for (int i = 0; i < numValues; i++) {
|
|
|
+ Byte alias = order[i];
|
|
|
+ if (storage.get(alias).iterator().hasNext() == false) {
|
|
|
+ if (noOuterJoin) {
|
|
|
+ LOG.trace("No data for alias=" + i);
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ storage.put(alias, dummyObjVectors[i]);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- LOG.trace("calling genObject");
|
|
|
- genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0));
|
|
|
- LOG.trace("called genObject");
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- throw new HiveException(e);
|
|
|
}
|
|
|
+
|
|
|
+ LOG.trace("calling genObject");
|
|
|
+ genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0));
|
|
|
+ LOG.trace("called genObject");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -462,6 +516,5 @@ public class JoinOperator extends Operator<joinDesc> implements Serializable {
|
|
|
LOG.trace("Join Op close");
|
|
|
super.close(abort);
|
|
|
}
|
|
|
-}
|
|
|
-
|
|
|
|
|
|
+}
|