hive-udaf-maxrow icon indicating copy to clipboard operation
hive-udaf-maxrow copied to clipboard

GenericUDAFCollectSetArray

Open CipherChen opened this issue 12 years ago • 1 comments

Hi, all. I'm writting a GenericUDAFCollectSetArray, which should worked like:

id ts somedata 1 2 data-1,2 1 3 data-1,3 1 4 data-1,4 2 5 data-2,5 2 3 data-2,3 2 4 data-2,4 3 6 data-3,6 3 1 data-3,1 3 4 data-3,4

SELECT id, collectSetArray(ts, somedata) FROM sometable GROUP BY id; result:

id col1 1 [{"_col0": "2", "_col1": "data-1,2"}, {"_col0": "3", "_col1": "data-1,3"}, {"_col0": "4", "_col1": "data-1,4"}] 2 [{"_col0": "3", "_col1": "data-2,3"}, {"_col0": "4", "_col1": "data-2,4"}, {"_col0": "5", "_col1": "data-2,5"}] 3 [{"_col0": "1", "_col1": "data-3,1"}, {"_col0": "4", "_col1": "data-3,4"}, {"_col0": "6", "_col1": "data-3,6"}]

And I'm stuck in init() return and merge(). The below would be my own code which imitate GenericUDAFMaxRow and GenericUDAFCollectSet.

Thanks for your patience for the long code. And any hint would be helpful.

public class GenericUDAFCollectSet extends AbstractGenericUDAFResolver {
public GenericUDAFCollectSet() {
}

@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
        throws SemanticException {

    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
        throw new UDFArgumentTypeException(0,
                "Only primitive type arguments are accepted but "
                        + parameters[0].getTypeName() + " was passed as parameter 1.");
    }

    return new GenericUDAFMkSetEvaluator();
}

public static class GenericUDAFMkSetEvaluator extends GenericUDAFEvaluator {


    private StandardListObjectInspector internalMergeOI;
    private PrimitiveObjectInspector internalMergeElementOI;

    private ObjectInspector[] inputOIs;
    private ObjectInspector[] outputOIs;
    private ObjectInspector structOI;
    private StandardListObjectInspector loi;

    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
            throws HiveException {
        super.init(m, parameters);

        System.out.println("init() mode: " + m);

        int paramsLength = parameters.length;

        if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
            assert(parameters instanceof Object[]);

            inputOIs = parameters;
        } else {
            assert(paramsLength == 1);

            internalMergeOI = (StandardListObjectInspector) parameters[0];

            internalMergeElementOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();

            System.out.println("internalMergeOI: " + internalMergeOI.getTypeName());
            System.out.println("internalMergeElementOI: " + internalMergeElementOI.getTypeName());
        }

        outputOIs = new ObjectInspector[paramsLength];

        List<String> fieldNames = new ArrayList<String>(paramsLength);
        List<ObjectInspector> fieldOIs = Arrays.asList(outputOIs);

        for (int i = 0; i < paramsLength; i++) {
            fieldNames.add("_col" + i);
            outputOIs[i] = ObjectInspectorUtils.getStandardObjectInspector(parameters[i]);
        }

        structOI = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

        loi = ObjectInspectorFactory.getStandardListObjectInspector(structOI);

        System.out.println("return from init() ");
        return loi;
    }

    static class MkArrayAggregationBuffer implements AggregationBuffer {
        // What you see is what you get.
        List<Object[]> container;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
        ((MkArrayAggregationBuffer) agg).container = new ArrayList<Object[]>();
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer();
        reset(ret);
        return ret;
    }

    //mapside
    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
            throws HiveException {
        System.out.println("iterate() p.length: " + parameters.length);

        if (parameters.length > 0) {
            MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;

            putIntoSet(parameters, listAgg);
        }

        printAgg(agg);
    }

    //mapside
    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
        System.out.println("terminatePartial() ");
        printAgg(agg);

        MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
        // However, the log said it seems to be ArrayList<Object>.
        ArrayList<Object[]> ret = new ArrayList<Object[]>(listAgg.container.size());
        ret.addAll(listAgg.container);
        return ret;
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial)
            throws HiveException {
        System.out.println("merge() ");

        MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
        ArrayList<Object[]> partialResult = (ArrayList<Object[]>) internalMergeOI.getList(partial);
        for(Object[] i : partialResult) {
            putIntoSet(i, listAgg);
        }

        printAgg(agg);
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
        System.out.println("terminate() ");
        printAgg(agg);

        MkArrayAggregationBuffer listAgg = (MkArrayAggregationBuffer) agg;
        ArrayList<Object[]> ret = new ArrayList<Object[]>(listAgg.container.size());
        ret.addAll(listAgg.container);
        return ret;
    }

    private void putIntoSet(Object[] p, MkArrayAggregationBuffer myagg) {
        System.out.println("putIntoSet() ");

        Object[] objects = new Object[p.length];

        for (int i = 0; i < p.length; i++) {
            objects[i] = ObjectInspectorUtils.copyToStandardObject(p[i], this.inputOIs[i]);
        }

        myagg.container.add(objects);
    }
}

}

CipherChen avatar Aug 29 '13 11:08 CipherChen

Check your email, @CipherChen.

mbirk avatar Aug 29 '13 15:08 mbirk