Optimize WorkflowChannel PubSub with workflow-specific topics
Details
The WorkflowChannel currently subscribes to project-level work order events (WorkOrders.subscribe(project_id)), which means every socket receives ALL work order and run events for the entire project, then filters them down to only the relevant workflow.
Current implementation (workflow_channel.ex:64):
WorkOrders.subscribe(project.id) # Gets ALL project events
This causes performance issues:
-
WorkOrder events - Filtered in handle_info (lines 349, 366):
if wo.workflow_id == socket.assigns.workflow_id do # Push update end -
Run events - Require database queries to filter (lines 385, 409):
case WorkOrders.get(run.work_order_id, include: [:workflow]) do %{workflow_id: workflow_id} when workflow_id == socket.assigns.workflow_id -> # Push update _ -> :ok end
Scale problem:
- Project with 10 workflows
- 20 users editing different workflows
- Work order created in workflow A
- Result: ALL 20 sockets receive event, 19 discard it after checking/querying
Even worse for runs: Every run event requires a database query per socket just to check if it should be ignored.
Implementation notes
Recommended approach: Multi-level topic broadcasting
The current implementation subscribes WorkflowChannel to project-level topics, then filters out 90% of events. This violates the principle: "if we're listening for it, we probably want it."
Current subscribers and their needs:
-
WorkListener - Subscribes to
"all_events", needs ALL runs globally -
RunLive.Index - Subscribes to
"project:#{project_id}", needs ALL events for all workflows in project -
WorkflowChannel - Subscribes to
"project:#{project_id}", needs ONLY events for ONE workflow (currently filters!)
Solution: Broadcast to multiple topic levels
Add workflow-specific topics and broadcast events to all relevant levels:
# workorders/events.ex - Add workflow topic function
defp topic(project_id, workflow_id), do: "workflow:#{workflow_id}"
# Broadcast work order events to 2 levels
# WorkOrder has workflow_id field, so we can extract it
def work_order_created(project_id, work_order) do
event = %WorkOrderCreated{work_order: work_order, project_id: project_id}
Lightning.broadcast(topic(project_id), event) # RunLive.Index
Lightning.broadcast(topic(project_id, work_order.workflow_id), event) # WorkflowChannel
end
# Broadcast run events to 3 levels
# Run doesn't have workflow_id field, so must be passed as parameter
def run_created(project_id, workflow_id, run) do
event = %RunCreated{run: run, project_id: project_id}
Lightning.broadcast(topic(), event) # WorkListener (global)
Lightning.broadcast(topic(project_id), event) # RunLive.Index (project)
Lightning.broadcast(topic(project_id, workflow_id), event) # WorkflowChannel (workflow)
end
Update WorkflowChannel subscription:
# workflow_channel.ex:64 - Subscribe to workflow-specific topic
WorkOrders.subscribe(project.id, workflow.id)
# No filtering needed - all received events are relevant!
def handle_info(%WorkOrders.Events.RunCreated{run: run}, socket) do
formatted_run = format_run_for_history(run)
push(socket, "history_updated", %{...})
{:noreply, socket}
end
Benefits:
- No filtering logic in WorkflowChannel
- No database queries to check relevance
- Each subscriber only receives events it cares about
- Follows "subscribe to what you need" principle
Related code
Event broadcasting functions to update:
-
lib/lightning/workorders/events.ex:24-48- Add workflow_id param, broadcast to multiple topics -
lib/lightning/workorders/events.ex:55-57- Add subscribe(project_id, workflow_id) function
Callers to update:
-
lib/lightning/work_orders.ex:137-138- work_order_created + run_created (workorder has workflow_id field) -
lib/lightning/work_orders.ex:367-368- work_order_updated + run_created (workorder has workflow_id field) -
lib/lightning/work_orders.ex:733-736- work_order_updated + run_created (workorder has workflow_id field) -
lib/lightning/runs/handlers.ex:152- run_updated (run.workflow preloaded, use run.workflow.id)
Note: WorkOrder structs have workflow_id as a field (via belongs_to :workflow), so event functions can extract it directly from the work_order without needing it passed separately.
Subscription to update:
-
lib/lightning_web/channels/workflow_channel.ex:64- Subscribe to workflow-specific topic
Filtering to remove:
-
lib/lightning_web/channels/workflow_channel.ex:349,366- WorkOrder workflow_id checks -
lib/lightning_web/channels/workflow_channel.ex:379-401- RunCreated with DB query -
lib/lightning_web/channels/workflow_channel.ex:404-425- RunUpdated with DB query
Release notes
Optimized collaborative editor real-time updates to reduce database queries and event processing overhead when multiple workflows are being edited simultaneously.
User acceptance criteria
- [ ] Workflow-specific PubSub topics added:
"workflow:#{workflow_id}" - [ ] Event functions accept workflow_id parameter and broadcast to multiple topics
- [ ] All callers pass workflow_id (4 locations in work_orders.ex and runs/handlers.ex)
- [ ] WorkflowChannel subscribes to workflow-specific topic, not project-level
- [ ] WorkflowChannel has NO filtering logic (all received events are relevant)
- [ ] WorkflowChannel has NO database queries in handle_info for filtering
- [ ] RunLive.Index still receives all project-wide events (unchanged behavior)
- [ ] WorkListener still receives all global run events (unchanged behavior)
- [ ] Existing history panel functionality works identically
- [ ] Tests updated for new subscription patterns
- [ ] Manual test: Multiple workflows editing simultaneously shows no N+1 queries