npartitions ignored
Hello. I'm using swifter 1.4.0 from pip with Python 3.11.2. Swifter works fine for my use cases except that the configuration variables are ignored. In particular, I want to set a much larger number of partitions than the default (20*cpu_count()), but all attempts to do so have failed: swifter just sticks to its default of 24 (I have 12 vCPUs).
I have attempted the following:
grouped = (
df.swifter
.progress_bar(enable=True)
.force_parallel(enable=True)
.set_npartitions(npartitions=20*os.cpu_count())
.groupby(symbol_col, group_keys=False)[[]]
)
result_df = grouped.apply(...)
swifter.set_defaults(
npartitions=20*os.cpu_count(),
dask_threshold=0,
progress_bar=True,
progress_bar_desc='customized',
allow_dask_on_strings=True,
force_parallel=True,
scheduler='processes',
)
swifter.swifter.DEFAULT_KWARGS.update(dict(
npartitions=20*os.cpu_count(),
dask_threshold=0,
progress_bar=True,
progress_bar_desc='customized',
allow_dask_on_strings=True,
force_parallel=True,
scheduler='processes',
))
In all the above cases, swifter starts a local ray cluster and runs the computation to completion successfully, but all configuration options are ignored, as can be seen from this screenshot of the progress_bar, which shows only 24 partitions and does not show the "customized" description.
What am I doing wrong?
On a separate note, in order to mitigate the problem of a fat-tailed distribution of runtimes for each group of GroupBy, I would recommend setting the default number of partitions to a much higher value. Consider that in a typical GroupBy you might have a large number of groups that are of negligible length, while a small number of groups might contain the majority of the data. In this case, using only 2*cpu_count() partitions results in a low level of parallelism when the shorter partitions complete and the stragglers end up running on only a small number of cores. 20*cpu_count() is probably reasonable, though a higher number of partitions might still be beneficial.
Same problem here.
On our HPC cluster I request 4 cores and set npartitions to 8 and then the job gets dispatched to a compute node with 128 cores, which causes swifter to take the default value of 2*cpu_count() even though I have set npartitions.
I guess one problem here is that cpu_count() is used, which counts the cores in the server but not the cores that the computation can actually use. On the cluster all processes/threads within a job are bound to the requested cores. Swifter then assumes that it can use 128 cores while all processes are bound to 4 cores.
I replaced the value of npartition form 2*cpu_count() in base.py to 2* os.environ["SLURM_CPUS_PER_NODE"] and hoped that this would resolve the issue. Now the number of partitions is correct.
The swifter computation still starts 128 processes even though it can only use 4 cores, which makes the program unusable on our HPC cluster cluster as all processes are bound to the requested 4 cores via cgroups.
I will ask our users to find another solution instead of using swifter.