multidplyr icon indicating copy to clipboard operation
multidplyr copied to clipboard

GROUP BY semantics not honored

Open xabriel opened this issue 2 years ago • 1 comments

It seems multiplyr is not honoring GROUP BY semantics as seen on reprex below.

library(tidyverse)
library(multidplyr)

data <-
  tibble(
    group = c("a", "a", "b", "b"),
    int = c(1, 1, 1, 1)
  )

# group_by works as expected w/o multidplyr
data %>%
  group_by(group) %>%
  summarise(sum = sum(int), n = n())
#> # A tibble: 2 × 3
#>   group   sum     n
#>   <chr> <dbl> <int>
#> 1 a         2     2
#> 2 b         2     2

cluster <- new_cluster(2)
data_partitioned <- data %>%
  partition(cluster)

# semantics are changed!
# multidplyr should honor that a GROUP BY is inherently non-paralellizable
# I would have expected multidplyr to automatically shuffle.
data_partitioned %>%
  group_by(group) %>%
  summarise(sum = sum(int))
#> Source: party_df [4 x 2]
#> Shards: 2 [2--2 rows]
#> 
#>   group   sum
#>   <chr> <dbl>
#> 1 a         1
#> 2 b         1
#> 3 a         1
#> 4 b         1

# if we group by *before* partitioning, we do get correct behavior:
# however this limits significantly since we'd want to do other calculations before grouping.
data_grouped_partitioned <- data %>%
  group_by(group) %>%
  partition(cluster)

data_grouped_partitioned %>%
  summarise(sum = sum(int))
#> Source: party_df [2 x 2]
#> Shards: 2 [1--1 rows]
#> 
#>   group   sum
#>   <chr> <dbl>
#> 1 a         2
#> 2 b         2

Created on 2022-03-02 by the reprex package (v2.0.1)

xabriel avatar Mar 02 '22 17:03 xabriel

I'm agree with this, group by can be a way to sort everything, but replicate the behavior when we need group by and not can be hard, I think can be more simpler, a function that helps us to choose how to split the data in the clusters.

latot avatar Oct 03 '22 15:10 latot

From my understanding, workers cannot communicate. As such, this is an expected behavior. Each worker performs the group_by operation. To a large extent, once you partition your tibble, start thinking of it as you would after splitting. E.g:

library(purrr)
library(dplyr, warn.conflicts = FALSE)
tibble(
    group = c("a", "a", "b", "b"),
    int = c(1, 1, 1, 1)
) %>% 
    split.data.frame(rep(c(1,2), times = 2)) %>% 
    map(
        ~ .x %>% 
            group_by(group) %>% 
            summarise(sum = sum(int), n = n())
    ) %>% 
    bind_rows()
#> # A tibble: 4 × 3
#>   group   sum     n
#>   <chr> <dbl> <int>
#> 1 a         1     1
#> 2 b         1     1
#> 3 a         1     1
#> 4 b         1     1

Created on 2023-04-06 with reprex v2.0.2

PhilipBerg avatar Apr 07 '23 01:04 PhilipBerg

From my understanding, workers cannot communicate. As such, this is an expected behavior.

Seems like we are rationalizing a bug. Consider that GROUP BY semantics have been long established by the SQL community. I can name many systems that work like I expect GROUP BY to work, including IBM DB2, MySQL, Postgres, Apache Spark and, yes, tibbles without multiplyr. I cannot name a single one that doesn't.

Perhaps workers cannot communicate. In that case, why wouldn't it be reasonable for multiplyr to implement some sort of IPC?

xabriel avatar Apr 07 '23 13:04 xabriel

Yeah, this is expected behaviour. Everything is implicitly grouped by the partition and there's no way around in multidplyr. If this is the behaviour you want, I'd suggest you use a system that does support it.

hadley avatar Oct 31 '23 17:10 hadley