lightning icon indicating copy to clipboard operation
lightning copied to clipboard

Optimize WorkflowChannel PubSub with workflow-specific topics

Open stuartc opened this issue 5 months ago • 0 comments

Details

The WorkflowChannel currently subscribes to project-level work order events (WorkOrders.subscribe(project_id)), which means every socket receives ALL work order and run events for the entire project, then filters them down to only the relevant workflow.

Current implementation (workflow_channel.ex:64):

WorkOrders.subscribe(project.id)  # Gets ALL project events

This causes performance issues:

  1. WorkOrder events - Filtered in handle_info (lines 349, 366):

    if wo.workflow_id == socket.assigns.workflow_id do
      # Push update
    end
    
  2. Run events - Require database queries to filter (lines 385, 409):

    case WorkOrders.get(run.work_order_id, include: [:workflow]) do
      %{workflow_id: workflow_id} when workflow_id == socket.assigns.workflow_id ->
        # Push update
      _ ->
        :ok
    end
    

Scale problem:

  • Project with 10 workflows
  • 20 users editing different workflows
  • Work order created in workflow A
  • Result: ALL 20 sockets receive event, 19 discard it after checking/querying

Even worse for runs: Every run event requires a database query per socket just to check if it should be ignored.

Implementation notes

Recommended approach: Multi-level topic broadcasting

The current implementation subscribes WorkflowChannel to project-level topics, then filters out 90% of events. This violates the principle: "if we're listening for it, we probably want it."

Current subscribers and their needs:

  1. WorkListener - Subscribes to "all_events", needs ALL runs globally
  2. RunLive.Index - Subscribes to "project:#{project_id}", needs ALL events for all workflows in project
  3. WorkflowChannel - Subscribes to "project:#{project_id}", needs ONLY events for ONE workflow (currently filters!)

Solution: Broadcast to multiple topic levels

Add workflow-specific topics and broadcast events to all relevant levels:

# workorders/events.ex - Add workflow topic function
defp topic(project_id, workflow_id), do: "workflow:#{workflow_id}"

# Broadcast work order events to 2 levels
# WorkOrder has workflow_id field, so we can extract it
def work_order_created(project_id, work_order) do
  event = %WorkOrderCreated{work_order: work_order, project_id: project_id}
  Lightning.broadcast(topic(project_id), event)                        # RunLive.Index
  Lightning.broadcast(topic(project_id, work_order.workflow_id), event)  # WorkflowChannel
end

# Broadcast run events to 3 levels
# Run doesn't have workflow_id field, so must be passed as parameter
def run_created(project_id, workflow_id, run) do
  event = %RunCreated{run: run, project_id: project_id}
  Lightning.broadcast(topic(), event)                      # WorkListener (global)
  Lightning.broadcast(topic(project_id), event)            # RunLive.Index (project)
  Lightning.broadcast(topic(project_id, workflow_id), event)  # WorkflowChannel (workflow)
end

Update WorkflowChannel subscription:

# workflow_channel.ex:64 - Subscribe to workflow-specific topic
WorkOrders.subscribe(project.id, workflow.id)

# No filtering needed - all received events are relevant!
def handle_info(%WorkOrders.Events.RunCreated{run: run}, socket) do
  formatted_run = format_run_for_history(run)
  push(socket, "history_updated", %{...})
  {:noreply, socket}
end

Benefits:

  • No filtering logic in WorkflowChannel
  • No database queries to check relevance
  • Each subscriber only receives events it cares about
  • Follows "subscribe to what you need" principle

Related code

Event broadcasting functions to update:

  • lib/lightning/workorders/events.ex:24-48 - Add workflow_id param, broadcast to multiple topics
  • lib/lightning/workorders/events.ex:55-57 - Add subscribe(project_id, workflow_id) function

Callers to update:

  • lib/lightning/work_orders.ex:137-138 - work_order_created + run_created (workorder has workflow_id field)
  • lib/lightning/work_orders.ex:367-368 - work_order_updated + run_created (workorder has workflow_id field)
  • lib/lightning/work_orders.ex:733-736 - work_order_updated + run_created (workorder has workflow_id field)
  • lib/lightning/runs/handlers.ex:152 - run_updated (run.workflow preloaded, use run.workflow.id)

Note: WorkOrder structs have workflow_id as a field (via belongs_to :workflow), so event functions can extract it directly from the work_order without needing it passed separately.

Subscription to update:

  • lib/lightning_web/channels/workflow_channel.ex:64 - Subscribe to workflow-specific topic

Filtering to remove:

  • lib/lightning_web/channels/workflow_channel.ex:349,366 - WorkOrder workflow_id checks
  • lib/lightning_web/channels/workflow_channel.ex:379-401 - RunCreated with DB query
  • lib/lightning_web/channels/workflow_channel.ex:404-425 - RunUpdated with DB query

Release notes

Optimized collaborative editor real-time updates to reduce database queries and event processing overhead when multiple workflows are being edited simultaneously.

User acceptance criteria

  • [ ] Workflow-specific PubSub topics added: "workflow:#{workflow_id}"
  • [ ] Event functions accept workflow_id parameter and broadcast to multiple topics
  • [ ] All callers pass workflow_id (4 locations in work_orders.ex and runs/handlers.ex)
  • [ ] WorkflowChannel subscribes to workflow-specific topic, not project-level
  • [ ] WorkflowChannel has NO filtering logic (all received events are relevant)
  • [ ] WorkflowChannel has NO database queries in handle_info for filtering
  • [ ] RunLive.Index still receives all project-wide events (unchanged behavior)
  • [ ] WorkListener still receives all global run events (unchanged behavior)
  • [ ] Existing history panel functionality works identically
  • [ ] Tests updated for new subscription patterns
  • [ ] Manual test: Multiple workflows editing simultaneously shows no N+1 queries

stuartc avatar Oct 30 '25 06:10 stuartc