ballerina-lang
ballerina-lang copied to clipboard
Support for `group by` clause
Description: $subject.
group-by-clause := "group" "by" grouping-key ["," grouping-key]*
grouping-key :=
variable-name
| inferable-type-descriptor variable-name "=" expression
Under this issue we need to make sure basic group by
clause works.
ex:
Order[] orders = [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 14}, {buyer: "b3", price: 15}];
string[] ordersDup = from var {buyer, price} in orders
group by buyer
select buyer; // @output ["b1", "b2", "b3"]
Order[] orders = [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 14}, {buyer: "b3", price: 15}];
Order[] aggregatedOrders = from var {buyer, price} in orders
group by buyer
select {buyer, sum(price)}; // @output [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 29}]
As mentioned in the spec issue, this clause should be executed as follows.
- Iterate over each input frame f:
- partition the input frames into groups, where two frames are in the same group if they have the same (==) value for all grouping-key variables
- within a group, frames are in incoming order
- groups are ordered by earliest member in incoming order
- for each group
- emit a frame that has
- each grouping key variable bound to the value common to the group
- other variables become indexed
- emit a frame that has
We need to create a class called _GroupByFunction
and we need to implement the process
function which returns a frame grouped by key(s).
Still, I did not finalize how to represent the final frame (the frame that has a mapping between the groping key and indexed variables).
I would suggest the design of _GroupByFunction
as follows.
class _GroupByFunction {
*_StreamFunction;
_Iterator? itr = ();
# + _inputFrame - The input frame which came through the pipeline
# + _frames - This represents the set of output frames that is created after grouping.
# For each `_inputFrame`, `_frames` should be updated.
# After this is updated for each `_inputFrame`, an iterator of this is created and
# each value is passed via pipeline.
function (_Frame _inputFrame, _Frame[] _frames) returns error? groupByFunc;
function init(function (_Frame _frame, _Frame[] frames) returns error? groupByFunc) {
self.groupByFunc = groupByFunc;
self.prevFunc = ();
}
public function process() returns _Frame|error? {
// 1. If `itr` is (), `itr` should be created.
// 1.1 Initialize an array of `_Frame` (let's call `_frames`), This is the 2nd argument for `groupByFunc`.
// Each `_Frame` element of `_frames` corresponds to a single group of group-by operation.
// The `_Frame` element cosists of,
// 1. grouping keys and its values.
// 2. non grouping variables and it's values as an array.
// Refer the example implementation of `groupByFunc`.
// 1.2 Iterate over each input frame and call `groupByFunc`.
// 1.3 Assign the iterator of `_Frame[]` into `itr`
// 2. Else itr.next() should be returned.
}
public function reset() {
}
}
Consider the following example:
Order[] orders = [{buyer: "b1", price: 12}, {buyer: "b2", price: 13}, {buyer: "b3", price: 14}, {buyer: "b3", price: 15}];
string[] ordersDup = from var {buyer, price} in orders
group by buyer
select buyer; // @output ["b1", "b2", "b3"]
Since the grouping key is buyer
, at the end of 1st step of process()
function, the _frames
array should look like
[{"buyer": "b1", "price": [12]},
{"buyer": "b2", "price": [13]},
{"buyer": "b3", "price": [14, 15]}]
I think that the desugared code for groupByFunc
related to above example will look like,
function groupByFunc(_Frame inputFrame, _Frame[] frames) returns error? {
string key = <string> check inputFrame["buyer"];
foreach var frame in frames {
if (check frame["buyer"] == key) {
int[] arr = <int[]> check frame["price"];
arr.push(<int> check inputFrame["price"]);
break;
}
}
}
I think in above approach even if the process
function is simple, desugaring the groupByFunc
can be complicated for the complex examples that have multiple keys.
I think we need to pass the grouping keys to process
function and we need to update the _frames
array within that function (something similar to order by
). Currently trying to figure out a new approach.
I think in above approach even if the
process
function is simple, desugaring thegroupByFunc
can be complicated for the complex examples that have multiple keys.I think we need to pass the grouping keys to
process
function and we need to update the_frames
array within that function (something similar toorder by
). Currently trying to figure out a new approach.
Grouping keys are known at compile time right? If we know the keys, we can simply access the key values from input frame. Thats a part of the desugared code inside the groupByFunc
. We can think of the grouping keys as primary keys . If the same keys come in the input frame, we put them together
I have a few concerns
- If query output is not an array(eg: stream); how are we supporting this? We may have to maintain a separate struct
- If we are to group by; shouldn't we consume the whole thing as we do in order-by and maintain different stacks?
I think in above approach even if the
process
function is simple, desugaring thegroupByFunc
can be complicated for the complex examples that have multiple keys. I think we need to pass the grouping keys toprocess
function and we need to update the_frames
array within that function (something similar toorder by
). Currently trying to figure out a new approach.Grouping keys are known at compile time right? If we know the keys, we can simply access the key values from input frame. Thats a part of the desugared code inside the
groupByFunc
. We can think of the grouping keys as primary keys . If the same keys come in the input frame, we put them together
Since group by
can have any expression; we'll not be able do this. Isn't it?
Since
group by
can have any expression; we'll not be able do this. Isn't it?
Here is Kavindu's example. Added a simple invocation (foo).
string[] ordersDup = from var {buyer, price} in orders
group by foo(buyer)
select buyer; // @output ["b1", "b2", "b3"]
This will be desugared to
function groupByFunc(_Frame inputFrame, _Frame[] frames) returns error? {
string key = foo(<string> check inputFrame["buyer"]);
...
}
We have to improve the desugared logic not to iterate the frames array everytime
As per the offline discussion with @gimantha, @pcnfernando, @suleka96, it was concluded that we can use a table to internally represent the frames that are result in group by operation. Therefore _GroupByFunction
class can be implemented as follows.
type RowDataType record {|
readonly string hash;
_Frame[] frames;
|};
class _GroupByFunction {
*_StreamFunction;
_Iterator? itr = ();
function (_Frame _inputFrame) returns string groupByFunc;
table <RowDataType> key(hash) tbl = table [];
string[] groupingKeys = [];
string[] nonGroupingKeys = [];
_Frame[] groupedFrames = [];
function init(function (_Frame _frame) returns string groupByFunc,
function () returns string[] groupingKeys,
function () returns string[] nonGroupingKeys) {
self.groupByFunc = groupByFunc;
self.groupingKeys = groupingKeys();
self.nonGroupingKeys = nonGroupingKeys();
self.prevFunc = ();
}
public function process() returns _Frame|error? {
if (self.itr is ()) {
_StreamFunction pf = <_StreamFunction>self.prevFunc;
_Frame|error? pFrame = pf.process();
while pFrame is _Frame {
// 1. call `groupByFunc` and get`hash`
// 2. If hash contains in tbl, add pFrame to frames array
// 3. Else create new entry
pFrame = pf.process();
}
// Now, each entry of table has same set of frames that belong to each group
// Iterate over all entries and create output frames list `groupedFrames`.
self.itr = self.groupedFrames.iterator();
}
_Iterator itr = <_Iterator>self.itr;
record {|(any|error) value;|}|error? v = itr.next();
if (v is record {|(any|error) value;|}) {
record {|(any|error)...;|} _frame = {...v};
return _frame;
}
return v;
}
public function reset() {
}
}
// function groupingKeys() returns string[] {
// return ["buyer", "price"];
// }
// function nonGroupingKeys() returns string[] {
// return ["count"];
// }
// function groupByFunc(_Frame incommingFrame) returns string|error {
// return hash(check incommingFrame["buyer"], check incommingFrame["price"]);
// // group by var buyerPrice = buyer + price
// // string groupingValue1 = (check incommingFrame["buyer"] + check incommingFrame["price"]).toString()
// }
We can use the hash function which is used in table implementation.
- [x] #36939
- [ ] https://github.com/ballerina-platform/ballerina-lang/issues/37888
- [ ] #37160
- [ ] Add desugar changes for
group by
clause
After further discussions we decided on the below points:
- Desugar grouping keys that are not variable refs (ex:
group by var c = a + b
) similar to thelet clause
. - No need to send non-grouping-keys to the
init
function of_GroupByFunction
class. - Hash for non variable ref grouping keys, ex:
group by var c = a + b
, should be calculated forc
.
As per the discussion with @gimantha and @KavinduZoysa the _GroupByFunction
class implementation was updated as follows.
type RowDataType record {|
readonly anydata groupByKey;
_Frame[] frames;
|};
class _GroupByFunction {
*_StreamFunction;
_Iterator? itr = ();
function (_Frame _inputFrame) returns anydata|error groupByFunc;
table<RowDataType> key(groupByKey) tbl = table [];
_Frame[] groupedFrames = [];
function init(function (_Frame _frame) returns anydata|error groupByFunc) {
self.groupByFunc = groupByFunc;
self.prevFunc = ();
}
public function process() returns _Frame|error? {
if (self.itr is ()) {
_StreamFunction pf = <_StreamFunction>self.prevFunc;
_Frame|error? pFrame = pf.process();
while pFrame is _Frame {
// 1. call `groupByFunc` and get `groupByKey`
var groupingKey = self.groupByFunc(pFrame);
// 2. If hash contains in tbl, add pFrame to frames array
// 3. Else create new entry
pFrame = pf.process();
}
// Now, each entry of table has same set of frames that belong to each group
// Iterate over all entries and create output frames list `groupedFrames`.
self.itr = self.groupedFrames.iterator();
}
_Iterator itr = <_Iterator>self.itr;
record {|(any|error) value;|}|error? v = itr.next();
if (v is record {|(any|error) value;|}) {
record {|(any|error)...; |} _frame = {...v};
return _frame;
}
return v;
}
public function reset() {
}
}
function groupByFunc(_Frame incomingFrame) returns anydata|error {
return [<anydata>check incomingFrame["price1"], <anydata>check incomingFrame["price"]];
}
As per the discussion with @gimantha @pcnfernando @KavinduZoysa we decided on the below points:
- Send the grouping key list to the
init ()
instead of sending the lamda function that returns the desugared grouping key. - Create the grouping key for the table from the above mentioned list in ballerina itself.