cosmo icon indicating copy to clipboard operation
cosmo copied to clipboard

feat: edfs refactor kafka and nats as datasource in router

Open alepane21 opened this issue 8 months ago • 1 comments

Motivation and Context

Today changing or adding and EDFS provider requires changes to cosmo and the engine, making it harder than it should. The main point of this PR is to make it easier to add future EDFS providers.

So I moved all the pubsub code from the engine to the pubsub package in the router and refactored it to make more sense with the new structure.

There is a README that will help implementing new EDFS providers (manually or using AI).

Important points

  • I added a test to verify that kafka publish is working, there was nothing testing it before;
  • connections operations (connection, closing connection) should happen as before;
  • switching from a previous version to the new one should just works, without any change to the configuration or schema.

Design

Goals

  1. Plug‑n‑play providers – adding Kafka, NATS, etc. must not touch core engine code.

  2. Provider freedom – each provider may implement protocol‑specific quirks without affecting others.


Current pain point

Engine and Router share provider logic; every new provider requires coordinated changes in both repositories.


Architecture

The refactor introduces four interfaces:

Interface Responsibility
ProviderFactory Creates a concrete PubSubProvider given a DataSourceConfiguration + router event config.
PubSubProvider Owns provider‑wide resources (e.g. NATS connection pool).
PubSubDataSource Implements ResolveDataSource + SubscribeDataSource for one GraphQL field.
Adapter Thin, protocol‑specific driver that actually publishes / subscribes.

Execution flow

When the router’s Loader (router/core/factoryresolver.go) encounters a PubSub node, it:

  1. Iterates over every registered ProviderFactory, passing each factory

    • the node’s DataSourceConfiguration, and

    • the router’s event‑configuration block.

      Each factory creates (or retrieves) its own PubSubProvider instance as needed.

  2. Builds a new datasource.Factory, injecting the full slice of initialised providers.

    This factory implements Planner(), returning a datasource.Planner that keeps the list of PubSubProviders.

  3. At execution time the engine calls the planner for each GraphQL field.

    The planner scans its providers and picks the matching PubSubDataSource—e.g. a NatsDataSource for a field annotated with @edfs_natsSubscribe.

    The data source already contains a pre‑configured Adapter.

  4. The chosen PubSubDataSource supplies the engine with both resolve.DataSource and resolve.SubscribeDataSource, always propagating the Adapter so downstream code can read any value from the event configuration before invoking it.

  5. Finally, the Adapter hands off the actual publish / subscribe work to the appropriate external driver (Kafka, NATS, etc.).


Lifecycle notes

  • A single PubSubProvider instance lives as long as the graphServer, opening connections when is started, and shutting them down when the router is stopped;

  • The Adapter hides third‑party SDKs; swap it to migrate from one client library to another.


Outcome

So adding Redis now means:

  1. Implement redis.ProviderFactory, redis.Provider, redis.DataSource, redis.Adapter.

  2. Register the factory once.

  3. Implements the changes to the composition side of the thing

No engine or router code changes → Goal 1 satisfied.

Provider can decide batching, ACK strategy, etc. internally → Goal 2 satisfied.

Checklist

  • [x] I have discussed my proposed changes in an issue and have received approval to proceed.
  • [x] I have followed the coding standards of the project.
  • [x] Tests or benchmarks have been added or updated.
  • [x] Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • [x] I have read the Contributors Guide.

alepane21 avatar Mar 18 '25 15:03 alepane21

Router image scan passed

:white_check_mark: No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-8138e124a9aefb7e5e2eedf3287b2b7917117b4a

github-actions[bot] avatar Mar 24 '25 17:03 github-actions[bot]

The most questionable thing is that we are still looking for all providers/datasources during planning time

I would prefer to have a single datasource per event or per single provider id

Fixed in https://github.com/wundergraph/cosmo/pull/1848

alepane21 avatar May 27 '25 08:05 alepane21