spring-cloud-dataflow icon indicating copy to clipboard operation
spring-cloud-dataflow copied to clipboard

Consuming from multiple topics using single SCDF stream

Open ramit0407 opened this issue 1 year ago • 3 comments

While creating a stream, we are allowed to give only a single topic. To override the destination, we use the spring.cloud.stream.bindings.input.destination property to set multiple Kafka topics. However, when attempting to do so, the following error message occurs, even though the streams still get deployed successfully.

Error Message: 115E:(pos 5): unexpected data in stream definition ',' :trip,duty > trip-office --management.endpoints.web.exposure.include=health,info,bindings,prometheus,metrics --management.metrics.export.prometheus.enabled=true --management.prometheus.metrics.export.rsocket.port=7001 --management.metrics.tags.instance.index=${vcap.application.instance_index:${spring.cloud.stream.instanceIndex:0}} --spring.cloud.stream.kafka.binder.configuration.max.poll.records=22 --wavefront.application.service=${spring.cloud.dataflow.stream.app.label:unknown}-${spring.cloud.dataflow.stream.app.type:unknown}-${vcap.application.instance_index:${spring.cloud.stream.instanceIndex:0}} --spring.cloud.stream.instanceCount=1 --management.metrics.tags.application.guid=${spring.cloud.application.guid:unknown} --management.metrics.tags.application.name=${vcap.application.application_name:${spring.cloud.dataflow.stream.app.label:unknown}} --management.metrics.export.prometheus.rsocket.host=spring-cloud-dataflow-prometheus-proxy --management.prometheus.metrics.export.rsocket.enabled=true --wavefront.application.name=${spring.cloud.dataflow.stream.name:unknown} --management.prometheus.metrics.export.enabled=true --management.metrics.tags.application.type=${spring.cloud.dataflow.stream.app.type:unknown} --management.metrics.tags.stream.name=${spring.cloud.dataflow.stream.name:unknown} --management.prometheus.metrics.export.rsocket.host=spring-cloud-dataflow-prometheus-proxy --management.metrics.tags.application=${spring.cloud.dataflow.stream.name:unknown}-${spring.cloud.dataflow.stream.app.label:unknown}-${spring.cloud.dataflow.stream.app.type:unknown} --management.metrics.export.prometheus.rsocket.port=7001 --management.metrics.export.prometheus.rsocket.enabled=true ^

Steps to Reproduce: Create a stream using Spring Cloud Data Flow with a single topic. Override the destination with the spring.cloud.stream.bindings.input.destination property to specify multiple Kafka topics, e.g., trip,duty. Deploy the stream.

Spring Cloud Data Flow Version: 3.2.5

ramit0407 avatar Oct 19 '24 05:10 ramit0407

@cppwfs

ramit0407 avatar Oct 19 '24 08:10 ramit0407

Create streams for each topic using kafa-source and send to tap kafa > :MY_TAP Then create a stream that uses tap to start your stream. My later comment provides a solution.

corneil avatar Oct 19 '24 09:10 corneil

But doing so, I will have to deploy 3 times if we have 2 topics. I want to do this in one deployment.

ramit0407 avatar Oct 19 '24 09:10 ramit0407

Hello @ramit0407 , This will be a feature add to SCDF. I'll add it to our backlog.

cppwfs avatar Oct 23 '24 11:10 cppwfs

@ramit0407

I you use kafka-source-kafka you will be able to specify multiple topics that already exist using property kafka.supplier.topics

You will need to register using Applications -> Add Applications -> from properties file

source.kafka.uri=springcloudstream/kafka-source-kafka:5.0.0
source.kafka.bootVersion=3
source.kafka.metadata=maven://org.springframework.cloud.stream.app:kafka-source-kafka:jar:metadata:5.0.0

Assuming your DSL was:

:trip,duty > x | y

It will now be: kafka --kafka.supplier.topics=trip,duty | x | y

corneil avatar Oct 23 '24 13:10 corneil

Closing this issue due to inactivity. If this has been closed in error please leave a comment letting us know to reopen it. Thank you.

cppwfs avatar Nov 12 '24 14:11 cppwfs