druid
druid copied to clipboard
Multi-cluster Stream (Kafka/Kinesis) Druid Ingest Proposal
Description
I'm currently building support for ingesting from multiple kafka clusters simultaneously in the same datasource/supervisor (e.g have multiple consumer/broker pairs). This issue is for marking this feature as well as design discussion.
Motivation
Ingesting from multiple Kafka clusters simultaneously is useful when data is in multiple regions, but Druid is only in a single region. Rather than spending the cost of mirroring the data across to the region-local topic, this would allow tasks to do cross-region reads from multiple Kafka clusters simultaneously.
Proposal
- Decouple supervisor ID from datasource. This will allow for multiple supervisors to run concurrently and ingest data into the same datasource.
- Update any logic outside of StreamSupervisor, APIs, metrics which rely on there being a 1:1 stream supervisor:datasource relationship.
- Add a new API to fetch all the supervisors related to a specific datasource.
This could be an extension of the multiTopic functionality, I suppose: the different topics may be able to come from different Kafka clusters. Is that how you were thinking of doing it, or some other way?
This could be an extension of the multiTopic functionality, I suppose: the different topics may be able to come from different Kafka clusters.
Yeah. I just prefix the partition ids with a unique cluster name; I'll raise a draft PR with my POC. My main concerns were avoiding noisy-neighbor (reading from multiple consumer in a single task, waiting on slower, cross-region consumers), compatible with current auto-scaling, and backwards compatibility with the existing supervisor spec. The POC should address these concerns.
+1 on this feature. @abhishekagarwal87 and I have this discussion a while back at https://github.com/apache/druid/pull/14424#issuecomment-1933738231 Looking forward to the PR!
I had a similar thought - I was wondering if it would be possible to have multiple supervisors ingest into the same Druid datasource. That way, we could scale independently, configure separate consumerProperties for different clusters, etc.
One current limitation is that the streaming supervisor’s ID is tied to the datasource name. If we could relax that constraint and make the supervisorID be unique (e.g., datasource + id) in a backwards-compatible way, I think this could potentially work.
I haven’t fully thought through the implications of this design, I’d imagine the real-time query logic would need some changes - but it’s something I thought might be worth considering as a design.
That way, we could scale independently, configure separate consumerProperties for different clusters, etc.
With a single supervisor, I think the latter requirement is still doable. The former is the main concern, where the autoscaler might need to be adjusted to allow for scaling unequal proportions between the different clusters (i.e 1 region is lagging, so add N more tasks for that 1 region vs adding N/M tasks per region).
Had a brief discussion with @jtuglu-netflix on this.
I agree with @abhishekrb19 in that separate supervisors would be a preferable approach for this feature. As the feature evolves, we would feel the need to have cluster-specific configs and keeping them in a single supervisor makes it more error prone and difficult to debug. Separate supervisors would allow ingestion from the two sources to be completely independent of each other except that they both ingest into the same sink (the datasource).
(This also makes me wonder why we didn't do this when we wanted to implement multi-topic Kafka ingestion. Perhaps reading from multiple topics in a single supervisor was more straightforward since a lot of the configs would be common. @abhishekagarwal87 would have more info on that.)
Having multiple supervisors appending to the same datasource is not inherently wrong and is in fact the preferred approach.
- We already support concurrent appends (the concurrent append and replace feature would come in handy here)
- We would need to remove the constraint of 1-to-1 mapping from supervisor ID to datasource with back compat as pointed out by @abhishekrb19
- Persistence of datasource metadata would have to be against supervisor ID instead of datasource name
- Realtime querying should probably just work as-is, or might need some minor tweaks
- Supervisor A would be completely agnostic of Supervisor B and they would also write to different segment IDs (thanks to the way segment IDs are built using the
taskAllocatorId)
Perhaps reading from multiple topics in a single supervisor was more straightforward.
Regarding this, I think that was a much simpler issue since it was still only reading from a single consumer (single region). The consumer can already natively handle reading from multi-topic partitions, so it was a matter of just plugging into that functionality.
The difference here is we have N consumers, so the ingest lag, throughput values, etc. differ across each consumer. While the draft I raised above works well if task counts are inflated enough, with the current task scaling scheme, you can run into asymmetrical lag when you have a laggard region. Scaling up in this case doesn't do enough to split the load across all regions, hence you run into scaling loops. Adding some sort of per-cluster task count just sounds like adding a new supervisor.
It was indeed simpler to extend the single supervisor to read from multiple topics in the same cluster. You also get the benefit of auto-discovering new topics against a regex and don't have to make any changes to supervisor for ingestion to continue.
With multi-supervisor approach, you would need to add one every time a new topic is added. That's ok when topics sit in different kafka clusters since you would need to pass on kafka cluster settings for the new topic anyway.
IMO a multi-supervisor approach does seem cleaner than stuffing in various topic-related settings into the same supervisor. You can also choose to write a different supervisor altogether that knows how to consume from topics from multiple clusters, reusing what code you can reuse. The task logic will remain the same as it but the supervisor will be different.
It was indeed simpler to extend the single supervisor to read from multiple topics in the same cluster. You also get the benefit of auto-discovering new topics against a regex and don't have to make any changes to supervisor for ingestion to continue.
Yeah, the regex is a neat trick!
You can also choose to write a different supervisor altogether that knows how to consume from topics from multiple clusters, reusing what code you can reuse. The task logic will remain the same as it but the supervisor will be different.
I suppose a composite supervisor (internally made up of one or more Kafka supervisors) could be doable. But it would be fairly complicated to orchestrate this supervisor and would lead to the same complications down the line as a single Kafka supervisor would, if it were to read from multiple clusters.
With the multi-topic feature already in place, I think the recommended approach should be "one supervisor per Kafka cluster, regardless of the number of topics".
I wasn't imagining that composite supervisor itself has multiple supervisors. Its just a separate supervisor that directly manages various kafka tasks that could be consuming data from different clusters. I don't imagine it to be a generic composite supervisor.
Ah, I see, thanks for the clarification, @abhishekagarwal87 .
But we would still need to have configs for multiple clusters in the same (new) supervisor. I feel that the challenges with that would remain the same.
The one clear benefit would be that we do not modify any of the existing code.
IMO, conceptually, there should be no problem having multiple supervisors per datasource. Nothing about the design of supervisors should limit that. Certain APIs do assume the supervisor ID and datasource name are the same, but this limitation could be unraveled. I think this is preferable to a composite supervisor, so each one could be managed separately.
Identifying supervisors needs to be reworked a bit. IMO, each supervisor should still be associated with a specific datasource (i.e. there should be a getDataSource method on the supervisor). To differentiate the supervisors I think we should have a "supervisor name" that is logically namespaced within the datasource. The unique key for a supervisor would be the pair of dataSource + supervisorName. In particular, two supervisors for different datasources should be able to have the same name.
IMO, we should avoid having an "ID" for supervisors that is a single string. This creates possibilities for difficult-to-resolve conflicts. For example: just from looking at the string, it's ambiguous if foo_bar_baz is the supervisor baz for datasource foo_bar, or the supervisor bar_baz for datasource foo. This suggests we should change the getId method on SupervisorSpec to return a SupervisorId (which itself contains two String, one for datasource and one for name) rather than returning a String. For compatibility with currently-existing supervisors, we should allow String to be read into SupervisorId, and create one where the string is used as the datasource name, and the supervisor name is set according to some convention (e.g. set it to the same as datasource). We'd need to include a database migration to make this work.
A database migration may also be needed to store multiple metadata rows (one per supervisor, not one per datasource).
That makes sense, @gianm . I agree that separate supervisors are much more easily maintainable than a composite supervisor.
Although, there is one concern with making the SupervisorId a POJO rather than a plain String.
All the GET APIs in SupervisorResource use the supervisor ID as a path parameter.
So, I suppose we would need to expose new APIs for all of these operations.
These new APIs could either be GETs which have two path params, one for datasource and one for supervisor name,
or POSTs and accept a SupervisorId in the payload.
While the POST approach would be more extensible in the future,
it would be weird to have to make POST calls for even simple get requests.
As an alternative, how do you feel about using a UUID for the supervisor ID? Having a unique String supervisor ID allows for easy identification and has other conveniences like the GET APIs mentioned above. It also becomes completely independent of datasource name, and thus we are not bound in the future to have any relationship between a supervisor and datasource. So, we could potentially have supervisors in the future that are able to operate on multiple datasources.
DB migration also becomes much simpler if we start using a UUID.
We get to keep all the niceties of a unique supervisor ID. So druid_supervisors table doesn't need to change.
And druid_dataSource table would just need a new column supervisor_id and any metadata entry
would need to specify a value for either the dataSource column or supervisor_id column.
If both are specified, we could just honor the value of the supervisor_id column since it is newer.
@kfaraz I'm slightly against supervisor UUIDs since that might make things less readable on the console (users would have to look at the spec to get an idea of what topic, etc. it's pulling from). My thought is we keep the same supervisor ID scheme and just validate as we normally do via the id uniqueness constraint in druid_supervisors table.
I do see the value in having supervisor id be unique, but for the time being as we require a 1:N relationship between datasource:supervisor I feel like this isn't solving much for us. If the time comes where that relationship does become M:N, we can switch to the UUID then.
I agree that druid_dataSource would need a supervisor_id column. As for APIs, I feel like keeping them the same for now is fine. The only new addition might be getting the full list of supervisors for a given datasource, which can be done by pulling the list of supervisors from druid_supervisors and filtering on datasource outside the query.
I'm slightly against supervisor UUIDs since that might make things less readable on the console (users would have to look at the spec to get an idea of what topic, etc. it's pulling from).
@jtuglu-netflix , that is easily remedied since the web-console is powered by the sys.supervisors table which itself reads
from an Overlord API. So, it would be fairly straightforward to add a datasource column to web-console.
We would need to add this column anyway regardless of the approach we take since supervisor IDs can now deviate from
the datasource name.
My thought is we keep the same supervisor ID scheme and just validate as we normally do via the id uniqueness constraint in druid_supervisors table.
Do you mean something like supervisor 1 with id = user-given-id-1 and supervisor 2 with id = user-given-id-2,
both can write to the same datasource?
That would work too. In fact, that is what I had in mind when I mentioned removing the 1:1 relationship between
datasource and supervisor in this older comment.
Optional: We might also want to consider adding some more validations in the same vein as #17955 so that users don't accidentally update a supervisor when they actually intended to create a new supervisor to write to the same datasource but read from a different topic/cluster.
I agree that druid_dataSource would need a supervisor_id column. As for APIs, I feel like keeping them the same for now is fine. The only new addition might be getting the full list of supervisors for a given datasource, which can be done by pulling the list of supervisors from druid_supervisors and filtering on datasource outside the query.
Sounds good. This new API need not read from metadata store though, the SupervisorManager should
already have all the necessary info cached in memory.
Also, if we do the modifications with the sys.supervisors table suggested above, we probably wouldn't need this API.
Going to keep this open for now