elsa-core icon indicating copy to clipboard operation
elsa-core copied to clipboard

Add Kafka support

Open borjaferv opened this issue 3 years ago • 3 comments

This PR contains Kafka support using Confluent.Kafka for sending and receiving events. Use a work manager to deploy a worker with a consumer for each topic-group configured in the workflow activities. Additionally, support for headers has been added to be able to filter specific events based on the flow.

Regarding the reception of events, it works as a first activity (Trigger) or as a suspended activity (BookMark)

Known issues

In M1 Macs (arm) the Kafka client (Confluent.Kafka) has trouble locating the library librdkafka

So the solution I came up with was having to manually install it with the Homebrew wizard and manually compile it from source.

git clone https://github.com/edenhill/librdkafka.git  
cd librdkafka 
./configure --install-deps 
brew install  openssl zstd pkg-config 
./configure 
make 
sudo make install 

borjaferv avatar Jun 27 '22 10:06 borjaferv

Thanks for this PR, have you take a look at the Service Bus Implementation for Request/Response Pattern using Correlation? In case you need to Send a message and receive a Response in the next activity, depending on the speed of the process you can have race condition.

On the Service Bus Activity, some lines were added :

    protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context)
    {
        var message = CreateMessage(Message);


        if (!string.IsNullOrWhiteSpace(context.WorkflowExecutionContext.CorrelationId))
            message.CorrelationId = context.WorkflowExecutionContext.CorrelationId;


        return Combine(Done(), new ServiceBusActionResult(GetQueue(), GetTopic(), message, SendMessageOnSuspend));
    }

If we want to be sure that Elsa Bookmark are indexed before receiving the response, we need to delay the send on the Suspend using (in case of the ServiceBus) the ServiceBusActionResult.

https://github.com/elsa-workflows/elsa-core/blob/c35e3a4995a374020ffcfd14c51058b25915a428/src/activities/Elsa.Activities.AzureServiceBus/ActivityExecutionResults/ServiceBusActionResult.cs#L11

What do you think about that? If we think about this kind of flow using this connector, I think It could be good to implement also that code. @sfmskywalker , any thought on this point ?

jdevillard avatar Jun 27 '22 11:06 jdevillard

@jdevillard Agreed, it would be good to apply the same strategy to avoid the race condition you mentioned.

sfmskywalker avatar Jul 11 '22 09:07 sfmskywalker

what do you think about to integrate those activities with slim ? https://github.com/zarusz/SlimMessageBus this allow us to use diferents event bus

Juandavi1 avatar Jul 15 '22 18:07 Juandavi1