cosmo
cosmo copied to clipboard
feat: edfs refactor kafka and nats as datasource in router
Motivation and Context
Today changing or adding and EDFS provider requires changes to cosmo and the engine, making it harder than it should. The main point of this PR is to make it easier to add future EDFS providers.
So I moved all the pubsub code from the engine to the pubsub package in the router and refactored it to make more sense with the new structure.
There is a README that will help implementing new EDFS providers (manually or using AI).
Important points
- I added a test to verify that kafka publish is working, there was nothing testing it before;
- connections operations (connection, closing connection) should happen as before;
- switching from a previous version to the new one should just works, without any change to the configuration or schema.
Design
Goals
Plug‑n‑play providers – adding Kafka, NATS, etc. must not touch core engine code.
Provider freedom – each provider may implement protocol‑specific quirks without affecting others.
Current pain point
Engine and Router share provider logic; every new provider requires coordinated changes in both repositories.
Architecture
The refactor introduces four interfaces:
| Interface | Responsibility |
|---|---|
| ProviderFactory | Creates a concrete PubSubProvider given a DataSourceConfiguration + router event config. |
| PubSubProvider | Owns provider‑wide resources (e.g. NATS connection pool). |
| PubSubDataSource | Implements ResolveDataSource + SubscribeDataSource for one GraphQL field. |
| Adapter | Thin, protocol‑specific driver that actually publishes / subscribes. |
Execution flow
When the router’s Loader (router/core/factoryresolver.go) encounters a PubSub node, it:
Iterates over every registered ProviderFactory, passing each factory
the node’s DataSourceConfiguration, and
the router’s event‑configuration block.
Each factory creates (or retrieves) its own PubSubProvider instance as needed.
Builds a new datasource.Factory, injecting the full slice of initialised providers.
This factory implements Planner(), returning a datasource.Planner that keeps the list of PubSubProviders.
At execution time the engine calls the planner for each GraphQL field.
The planner scans its providers and picks the matching PubSubDataSource—e.g. a NatsDataSource for a field annotated with @edfs_natsSubscribe.
The data source already contains a pre‑configured Adapter.
The chosen PubSubDataSource supplies the engine with both resolve.DataSource and resolve.SubscribeDataSource, always propagating the Adapter so downstream code can read any value from the event configuration before invoking it.
Finally, the Adapter hands off the actual publish / subscribe work to the appropriate external driver (Kafka, NATS, etc.).
Lifecycle notes
A single PubSubProvider instance lives as long as the graphServer, opening connections when is started, and shutting them down when the router is stopped;
The Adapter hides third‑party SDKs; swap it to migrate from one client library to another.
Outcome
So adding Redis now means:
Implement redis.ProviderFactory, redis.Provider, redis.DataSource, redis.Adapter.
Register the factory once.
Implements the changes to the composition side of the thing
No engine or router code changes → Goal 1 satisfied.
Provider can decide batching, ACK strategy, etc. internally → Goal 2 satisfied.
Checklist
- [x] I have discussed my proposed changes in an issue and have received approval to proceed.
- [x] I have followed the coding standards of the project.
- [x] Tests or benchmarks have been added or updated.
- [x] Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
- [x] I have read the Contributors Guide.
Router image scan passed
:white_check_mark: No security vulnerabilities found in image:
ghcr.io/wundergraph/cosmo/router:sha-8138e124a9aefb7e5e2eedf3287b2b7917117b4a
The most questionable thing is that we are still looking for all providers/datasources during planning time
I would prefer to have a single datasource per event or per single provider id
Fixed in https://github.com/wundergraph/cosmo/pull/1848