ballerina-lang icon indicating copy to clipboard operation
ballerina-lang copied to clipboard

Support for `group by` clause

Open KavinduZoysa opened this issue 2 years ago • 13 comments

Description: $subject.

group-by-clause := "group" "by" grouping-key ["," grouping-key]*
grouping-key :=
   variable-name
   | inferable-type-descriptor variable-name "=" expression

Under this issue we need to make sure basic group by clause works.

ex:

   Order[] orders = [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 14}, {buyer: "b3", price: 15}];
   string[] ordersDup = from var {buyer, price} in orders
         group by buyer 
         select buyer; // @output ["b1", "b2", "b3"]
   Order[] orders = [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 14}, {buyer: "b3", price: 15}];
   Order[] aggregatedOrders = from var {buyer, price} in orders
         group by buyer 
         select {buyer, sum(price)}; // @output [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 29}]

As mentioned in the spec issue, this clause should be executed as follows.

  • Iterate over each input frame f:
    • partition the input frames into groups, where two frames are in the same group if they have the same (==) value for all grouping-key variables
    • within a group, frames are in incoming order
    • groups are ordered by earliest member in incoming order
  • for each group
    • emit a frame that has
      • each grouping key variable bound to the value common to the group
      • other variables become indexed

KavinduZoysa avatar Jul 01 '22 07:07 KavinduZoysa

We need to create a class called _GroupByFunction and we need to implement the process function which returns a frame grouped by key(s).

KavinduZoysa avatar Jul 01 '22 07:07 KavinduZoysa

Still, I did not finalize how to represent the final frame (the frame that has a mapping between the groping key and indexed variables).

KavinduZoysa avatar Jul 01 '22 07:07 KavinduZoysa

I would suggest the design of _GroupByFunction as follows.

class _GroupByFunction {
    *_StreamFunction;

    _Iterator? itr = ();
    # + _inputFrame - The input frame which came through the pipeline
    # + _frames - This represents the set of output frames that is created after grouping.
    #             For each `_inputFrame`, `_frames` should be updated.
    #             After this is updated for each `_inputFrame`, an iterator of this is created and
    #             each value is passed via pipeline.
    function (_Frame _inputFrame, _Frame[] _frames) returns error? groupByFunc;

    function init(function (_Frame _frame, _Frame[] frames) returns error? groupByFunc) {
        self.groupByFunc = groupByFunc;
        self.prevFunc = ();
    }

    public function process() returns _Frame|error? {
        // 1. If `itr` is (), `itr` should be created.
        //      1.1 Initialize an array of `_Frame` (let's call `_frames`), This is the 2nd argument for `groupByFunc`. 
        //          Each `_Frame` element of `_frames` corresponds to a single group of group-by operation. 
        //          The `_Frame` element cosists of, 
        //              1. grouping keys and its values.
        //              2. non grouping variables and it's values as an array.
        //              Refer the example implementation of `groupByFunc`.
        //      1.2 Iterate over each input frame and call `groupByFunc`.
        //      1.3 Assign the iterator of `_Frame[]` into `itr`
        // 2. Else itr.next() should be returned.
    }

    public function reset() {
    }
}

Consider the following example:

   Order[] orders = [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 14}, {buyer: "b3", price: 15}];
   string[] ordersDup = from var {buyer, price} in orders
         group by buyer
         select buyer; // @output ["b1", "b2", "b3"]

Since the grouping key is buyer, at the end of 1st step of process() function, the _frames array should look like

[{"buyer": "b1", "price": [12]},
{"buyer": "b2", "price": [13]},
{"buyer": "b3", "price": [14, 15]}]

I think that the desugared code for groupByFunc related to above example will look like,

function groupByFunc(_Frame inputFrame, _Frame[] frames) returns error? {
    string key = <string> check inputFrame["buyer"];
    foreach var frame in frames {
        if (check frame["buyer"] == key) {
            int[] arr = <int[]> check frame["price"];
            arr.push(<int> check inputFrame["price"]);
            break;
        }
    }
}

KavinduZoysa avatar Jul 01 '22 17:07 KavinduZoysa

I think in above approach even if the process function is simple, desugaring the groupByFunc can be complicated for the complex examples that have multiple keys.

I think we need to pass the grouping keys to process function and we need to update the _frames array within that function (something similar to order by). Currently trying to figure out a new approach.

KavinduZoysa avatar Jul 02 '22 07:07 KavinduZoysa

I think in above approach even if the process function is simple, desugaring the groupByFunc can be complicated for the complex examples that have multiple keys.

I think we need to pass the grouping keys to process function and we need to update the _frames array within that function (something similar to order by). Currently trying to figure out a new approach.

Grouping keys are known at compile time right? If we know the keys, we can simply access the key values from input frame. Thats a part of the desugared code inside the groupByFunc. We can think of the grouping keys as primary keys . If the same keys come in the input frame, we put them together

gimantha avatar Jul 04 '22 06:07 gimantha

I have a few concerns

  1. If query output is not an array(eg: stream); how are we supporting this? We may have to maintain a separate struct
  2. If we are to group by; shouldn't we consume the whole thing as we do in order-by and maintain different stacks?

pcnfernando avatar Jul 04 '22 06:07 pcnfernando

I think in above approach even if the process function is simple, desugaring the groupByFunc can be complicated for the complex examples that have multiple keys. I think we need to pass the grouping keys to process function and we need to update the _frames array within that function (something similar to order by). Currently trying to figure out a new approach.

Grouping keys are known at compile time right? If we know the keys, we can simply access the key values from input frame. Thats a part of the desugared code inside the groupByFunc. We can think of the grouping keys as primary keys . If the same keys come in the input frame, we put them together

Since group by can have any expression; we'll not be able do this. Isn't it?

pcnfernando avatar Jul 04 '22 06:07 pcnfernando

Since group by can have any expression; we'll not be able do this. Isn't it?

Here is Kavindu's example. Added a simple invocation (foo).

string[] ordersDup = from var {buyer, price} in orders
         group by foo(buyer)
         select buyer; // @output ["b1", "b2", "b3"]

This will be desugared to

function groupByFunc(_Frame inputFrame, _Frame[] frames) returns error? {
    string key = foo(<string> check inputFrame["buyer"]);
    ...
}

We have to improve the desugared logic not to iterate the frames array everytime

gimantha avatar Jul 04 '22 07:07 gimantha

As per the offline discussion with @gimantha, @pcnfernando, @suleka96, it was concluded that we can use a table to internally represent the frames that are result in group by operation. Therefore _GroupByFunction class can be implemented as follows.

type RowDataType record {|
    readonly string hash;
    _Frame[] frames;
|};

class _GroupByFunction {
    *_StreamFunction;

    _Iterator? itr = ();
    function (_Frame _inputFrame) returns string groupByFunc;
    table <RowDataType> key(hash) tbl = table [];
    string[] groupingKeys = [];
    string[] nonGroupingKeys = [];
    _Frame[] groupedFrames = [];

    function init(function (_Frame _frame) returns string groupByFunc, 
                    function () returns string[] groupingKeys,
                    function () returns string[] nonGroupingKeys) {
        self.groupByFunc = groupByFunc;
        self.groupingKeys = groupingKeys();
        self.nonGroupingKeys = nonGroupingKeys();
        self.prevFunc = ();
    }

    public function process() returns _Frame|error? {
        if (self.itr is ()) {
            _StreamFunction pf = <_StreamFunction>self.prevFunc;
            _Frame|error? pFrame = pf.process();
            while pFrame is _Frame {
               // 1. call `groupByFunc` and get`hash`
               // 2. If hash contains in tbl, add pFrame to frames array
               // 3. Else create new entry
                pFrame = pf.process();
            }
            // Now, each entry of table has same set of frames that belong to each group
            // Iterate over all entries and create output frames list `groupedFrames`.
            self.itr = self.groupedFrames.iterator();
        }

        _Iterator itr = <_Iterator>self.itr;
        record {|(any|error) value;|}|error? v = itr.next();
        if (v is record {|(any|error) value;|}) {
            record {|(any|error)...;|} _frame = {...v};
            return _frame;
        }
        return v;
    }

    public function reset() {
    }
}

// function groupingKeys() returns string[] {
//     return ["buyer", "price"];
// }

// function nonGroupingKeys() returns string[] {
//     return ["count"];
// }

// function groupByFunc(_Frame incommingFrame) returns string|error {
//     return hash(check incommingFrame["buyer"], check incommingFrame["price"]);
//     // group by var buyerPrice = buyer + price
//     // string groupingValue1 = (check incommingFrame["buyer"] + check incommingFrame["price"]).toString()
// }

We can use the hash function which is used in table implementation.

KavinduZoysa avatar Jul 06 '22 08:07 KavinduZoysa

  • [x] #36939
  • [ ] https://github.com/ballerina-platform/ballerina-lang/issues/37888
  • [ ] #37160
  • [ ] Add desugar changes for group by clause

KavinduZoysa avatar Jul 14 '22 10:07 KavinduZoysa

After further discussions we decided on the below points:

  • Desugar grouping keys that are not variable refs (ex: group by var c = a + b ) similar to the let clause.
  • No need to send non-grouping-keys to the init function of _GroupByFunction class.
  • Hash for non variable ref grouping keys, ex: group by var c = a + b, should be calculated for c.

suleka96 avatar Oct 11 '22 07:10 suleka96

As per the discussion with @gimantha and @KavinduZoysa the _GroupByFunction class implementation was updated as follows.


type RowDataType record {|
    readonly anydata groupByKey;
    _Frame[] frames;
|};

class _GroupByFunction {
    *_StreamFunction;

    _Iterator? itr = ();
    function (_Frame _inputFrame) returns anydata|error groupByFunc;
    table<RowDataType> key(groupByKey) tbl = table [];
    _Frame[] groupedFrames = [];

    function init(function (_Frame _frame) returns anydata|error groupByFunc) {
        self.groupByFunc = groupByFunc;
        self.prevFunc = ();
    }

    public function process() returns _Frame|error? {
        if (self.itr is ()) {
            _StreamFunction pf = <_StreamFunction>self.prevFunc;
            _Frame|error? pFrame = pf.process();
            while pFrame is _Frame {
                // 1. call `groupByFunc` and get `groupByKey`
                var groupingKey = self.groupByFunc(pFrame);
                // 2. If hash contains in tbl, add pFrame to frames array
                // 3. Else create new entry
                pFrame = pf.process();
            }
            // Now, each entry of table has same set of frames that belong to each group
            // Iterate over all entries and create output frames list `groupedFrames`.
            self.itr = self.groupedFrames.iterator();
        }

        _Iterator itr = <_Iterator>self.itr;
        record {|(any|error) value;|}|error? v = itr.next();
        if (v is record {|(any|error) value;|}) {
            record {|(any|error)...; |} _frame = {...v};
            return _frame;
        }
        return v;
    }

    public function reset() {
    }

}

function groupByFunc(_Frame incomingFrame) returns anydata|error {
    return [<anydata>check incomingFrame["price1"], <anydata>check incomingFrame["price"]];
}

suleka96 avatar Oct 12 '22 06:10 suleka96

As per the discussion with @gimantha @pcnfernando @KavinduZoysa we decided on the below points:

  • Send the grouping key list to the init () instead of sending the lamda function that returns the desugared grouping key.
  • Create the grouping key for the table from the above mentioned list in ballerina itself.

suleka96 avatar Oct 14 '22 08:10 suleka96