bionic
bionic copied to clipboard
parallel evaluation inconsistencies with persist settings
trafficstars
Parallel evaluation of a flow doesn't seem to work when core__persist_by_default = False, even if all entities in the flow (in particular, the ones on the parallel branches of the dag) are manually persisted (@persist(True)). FWIW I am getting the same result with both implicit vs explicit parallel structure of the flow (e.g. setting multiplicity with values= vs explicitly separately defining entities for each branch).
Minimal example below (with the following dag):

import bionic as bn
import logging
import time
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
builder = bn.FlowBuilder('my_flow')
builder.set('core__parallel_execution__enabled', True)
builder.set("core__persist_by_default", False)
builder.assign('greeting', 'Hello')
builder.assign('subject', 'world')
@builder
@bn.persist(True)
def subject_slow(subject):
logger.info('started slow execution')
start = time.time()
time.sleep(5)
end = time.time()
logger.info(f'ended slow execution in {end - start:0.2f}s')
return subject
@builder
@bn.persist(True)
def message(greeting, subject_slow):
return f'{greeting} {subject_slow}!'
@builder
@bn.persist(True)
@bn.gather(over='subject', also='message', into='gdf')
def all_messages(gdf):
return ' '.join(gdf.message.tolist())
flow = builder.build()
flow.setting('subject', values=['Earth', 'Mars']).get('all_messages')
This flow runs sequentially even though all entities seem to be persisted. It runs in parallel as expected when core__persist_by_default=True