morphir-elm icon indicating copy to clipboard operation
morphir-elm copied to clipboard

Support aggregation of multiple values in a single function

Open jonathanmaw opened this issue 2 years ago • 7 comments

There are times when a query will aggregate multiple values at once, e.g.

source = [{cost=1, weight=2}, {cost=3, weight=4}]
testMultiple source =
    [{
        lightest =
            source |> List.map .weight |> List.minimum,
        dearest =
            source |> List.map .cost |> List.maximum
    }]

which we want to be translated into

source.select(
    org.apache.spark.sql.functions.min(org.apache.spark.sql.functions.col("weight")).alias("lightest"),
    org.apache.spark.sql.functions.max(org.apache.spark.sql.functions.col("cost")).alias("dearest"),
)

This work should involve:

  • [ ] Agree on an example that represents this problem succinctly and what the output should look like
  • [ ] Decide on a way to implement this
  • [ ] Write documentation, examples, implementation and tests to cover this behaviour.

jonathanmaw avatar Jul 11 '22 16:07 jonathanmaw

@AttilaMihaly I've created an issue to track our need to handle aggregation in more general cases. I'm a little unclear of exactly what's required, so I'd appreciate if you could give it a look and make sure I understand this issue correctly.

jonathanmaw avatar Jul 11 '22 16:07 jonathanmaw

Attila has created a Pull Request for a generic implementation of Aggregations (https://github.com/finos/morphir-elm/pull/800).

It recommends we support aggregations by providing a Morphir.SDK.Aggregate module that users can write aggregations in and Morphir can parse, instead of trying to interpret aggregations as they might be written in elm.

jonathanmaw avatar Jul 15 '22 15:07 jonathanmaw

Using examples and documentation at https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.Dataset, I think that while morphir's aggregation supports separate filters for each aggregation expression, Spark does not. Based on examples, I think filters must be applied in the function chain before the groupBy and aggregation.

I think it's fair for the Spark backend to not support per-aggregation-expression filters, with documentation making this clear and also making clear how filters can be done in the Spark backend.

jonathanmaw avatar Jul 20 '22 15:07 jonathanmaw

I have found another limitation of Spark's aggregations compared to Morphir SDK's aggregations - giving the key an arbitrary name. i.e. in Morphir SDK aggregate, we do:

testDataSet =
        [ TestInput1 "k1_1" "k2_1" 1
        , TestInput1 "k1_1" "k2_1" 2
        , TestInput1 "k1_1" "k2_2" 3
        , TestInput1 "k1_1" "k2_2" 4
        , TestInput1 "k1_2" "k2_1" 5
        , TestInput1 "k1_2" "k2_1" 6
        , TestInput1 "k1_2" "k2_2" 7
        , TestInput1 "k1_2" "k2_2" 8
        ]

testDataSet
    |> groupBy .key1
    |> aggregate
        (\key inputs ->
            { key = key
            , count = inputs (count |> withFilter (\a -> a.value < 7))
            , sum = inputs (sumOf .value)
            , max = inputs (maximumOf .value)
            , min = inputs (minimumOf .value)
            }
        )

and get the results

[ { key = "k1_1", count = 4, sum = 10, max = 4, min = 1 }
, { key = "k1_2", count = 2, sum = 26, max = 8, min = 5 }
]

Specifically, key = key part of the aggregate expression creates a column named "key"

In Spark aggregate, a similar aggregation is:

val simpleData = Seq(("James","Sales","NY",90000,34,10000),
  ("Michael","Sales","NY",86000,56,20000),
  ("Robert","Sales","CA",81000,30,23000),
  ("Maria","Finance","CA",90000,24,23000),
  ("Raman","Finance","CA",99000,40,24000),
  ("Scott","Finance","NY",83000,36,19000),
  ("Jen","Finance","NY",79000,53,15000),
  ("Jeff","Marketing","CA",80000,25,18000),
  ("Kumar","Marketing","NY",91000,50,21000)
)
val df = simpleData.toDF("employee_name","department","state","salary","age","bonus")
df.groupBy("department")
    .agg(
      sum("salary").as("sum_salary"),
      avg("salary").as("avg_salary"),
      sum("bonus").as("sum_bonus"),
      max("bonus").as("max_bonus"))

which generates

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+

i.e. the column used to group by, "department", is always given that name in the aggregation.

It's possible to rename the columns by following this up with a Select ObjectExpression, but it's not possible to omit that column (which is possible in Morphir SDK's aggregate), and I don't know if renaming that column is specifically required.

For now, I will be proceeding with the assumption that "key" is always used to create a column with the exact same name as the one grouped by, and the Spark backend will deliberately discard any fields that use "key", leaving that to Spark's default behaviour.

jonathanmaw avatar Jul 21 '22 16:07 jonathanmaw

Following discussions, it was decided that:

  • Spark AST should have a data structure that encompasses the whole of the Morphir.SDK.Aggregation structure, including renaming "key" fields and per-field filters.
  • Per-field filters is possible using case expressions (like SQL), or window functions. Implementation of this can be moved to a subsequent issue.
  • Renaming "key" fields is possible with a subsequent Select expression. We should implement doing this if the specified key is different to the default. Implementation of this can be moved to a subsequent issue.

jonathanmaw avatar Jul 25 '22 11:07 jonathanmaw

One correction to the first bullet-point: We want the Spark AST to represent Spark group-by and aggregation operations, not the Aggregation API in the Morphir SDK. The rest looks correct.

AttilaMihaly avatar Jul 25 '22 12:07 AttilaMihaly

Pull Request #848 has been made for this issue.

jonathanmaw avatar Aug 08 '22 15:08 jonathanmaw