hive-udaf-maxrow
hive-udaf-maxrow copied to clipboard
GenericUDAFCollectSetArray
Hi, all. I'm writting a GenericUDAFCollectSetArray, which should worked like:
id ts somedata 1 2 data-1,2 1 3 data-1,3 1 4 data-1,4 2 5 data-2,5 2 3 data-2,3 2 4 data-2,4 3 6 data-3,6 3 1 data-3,1 3 4 data-3,4
SELECT id, collectSetArray(ts, somedata) FROM sometable GROUP BY id; result:
id col1 1 [{"_col0": "2", "_col1": "data-1,2"}, {"_col0": "3", "_col1": "data-1,3"}, {"_col0": "4", "_col1": "data-1,4"}] 2 [{"_col0": "3", "_col1": "data-2,3"}, {"_col0": "4", "_col1": "data-2,4"}, {"_col0": "5", "_col1": "data-2,5"}] 3 [{"_col0": "1", "_col1": "data-3,1"}, {"_col0": "4", "_col1": "data-3,4"}, {"_col0": "6", "_col1": "data-3,6"}]
And I'm stuck in init() return and merge(). The below would be my own code which imitate GenericUDAFMaxRow and GenericUDAFCollectSet.
Thanks for your patience for the long code. And any hint would be helpful.
public class GenericUDAFCollectSet extends AbstractGenericUDAFResolver {
public GenericUDAFCollectSet() {
}
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ parameters[0].getTypeName() + " was passed as parameter 1.");
}
return new GenericUDAFMkSetEvaluator();
}
public static class GenericUDAFMkSetEvaluator extends GenericUDAFEvaluator {
private StandardListObjectInspector internalMergeOI;
private PrimitiveObjectInspector internalMergeElementOI;
private ObjectInspector[] inputOIs;
private ObjectInspector[] outputOIs;
private ObjectInspector structOI;
private StandardListObjectInspector loi;
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
super.init(m, parameters);
System.out.println("init() mode: " + m);
int paramsLength = parameters.length;
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
assert(parameters instanceof Object[]);
inputOIs = parameters;
} else {
assert(paramsLength == 1);
internalMergeOI = (StandardListObjectInspector) parameters[0];
internalMergeElementOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
System.out.println("internalMergeOI: " + internalMergeOI.getTypeName());
System.out.println("internalMergeElementOI: " + internalMergeElementOI.getTypeName());
}
outputOIs = new ObjectInspector[paramsLength];
List<String> fieldNames = new ArrayList<String>(paramsLength);
List<ObjectInspector> fieldOIs = Arrays.asList(outputOIs);
for (int i = 0; i < paramsLength; i++) {
fieldNames.add("_col" + i);
outputOIs[i] = ObjectInspectorUtils.getStandardObjectInspector(parameters[i]);
}
structOI = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
loi = ObjectInspectorFactory.getStandardListObjectInspector(structOI);
System.out.println("return from init() ");
return loi;
}
static class MkArrayAggregationBuffer implements AggregationBuffer {
// What you see is what you get.
List<Object[]> container;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((MkArrayAggregationBuffer) agg).container = new ArrayList<Object[]>();
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer();
reset(ret);
return ret;
}
//mapside
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
System.out.println("iterate() p.length: " + parameters.length);
if (parameters.length > 0) {
MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
putIntoSet(parameters, listAgg);
}
printAgg(agg);
}
//mapside
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
System.out.println("terminatePartial() ");
printAgg(agg);
MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
// However, the log said it seems to be ArrayList<Object>.
ArrayList<Object[]> ret = new ArrayList<Object[]>(listAgg.container.size());
ret.addAll(listAgg.container);
return ret;
}
@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
System.out.println("merge() ");
MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
ArrayList<Object[]> partialResult = (ArrayList<Object[]>) internalMergeOI.getList(partial);
for(Object[] i : partialResult) {
putIntoSet(i, listAgg);
}
printAgg(agg);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
System.out.println("terminate() ");
printAgg(agg);
MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
ArrayList<Object[]> ret = new ArrayList<Object[]>(listAgg.container.size());
ret.addAll(listAgg.container);
return ret;
}
private void putIntoSet(Object[] p, MkArrayAggregationBuffer myagg) {
System.out.println("putIntoSet() ");
Object[] objects = new Object[p.length];
for (int i = 0; i < p.length; i++) {
objects[i] = ObjectInspectorUtils.copyToStandardObject(p[i], this.inputOIs[i]);
}
myagg.container.add(objects);
}
}
}
Check your email, @CipherChen.