cloudflow
cloudflow copied to clipboard
More powerful and flexible API for sending messages to outlets
Is your feature request related to a problem? Please describe.
Background: I'm working on an integration project where I'm using Cloudflow with various other Akka streams sinks and sources.
There are some scenarios which do not seem to be supported well by Cloudflow API:
- After a message is sent successfully to an outlet, some actions need to be performed other than committing a Kafka offset.
- For each message received from a Kafka topic, a collection of outbound messages is generated. The offset for inbound message should be committed after all outbound messages generated from it are successfully written to outlet.
- Same as case 2, but an Akka stream of outbound messages is generated instead of a strict collection. This stream can be potentially very large.
There could be also combinations of cases 1 and 2 or 3 when some element received from a non-Kafka Source
is transformed into multiple Kafka messages. After sending is finished, some action needs to be performed.
Some less abstract examples:
- An S3 bucket is monitored for new files. When a new file is detected, it is parsed as CSV, then every line is converted to an outbound Kafka message. After all messages are sent successfully, file needs to be deleted.
- Some huge database table needs to be scanned periodically (say, every hour). Then every row is sent as a Kafka message. On successful completion of the whole process, some SQL update statement needs to be executed.
It's not hard to support these scenarios with Alpakka Kafka:
-
Producer.flexiFlow
is useful for the scenario 1. It allows to pass some data through and to perform some actions after messages are sent. However, Cloudflow does not seem to have anything similar to it. -
ProducerMessage.MultiMessage
used together with acommittableSink
provides a solution for the scenario 2. However, Cloudflow'scommittableSink
takes only one outbound message paired withCommittable
. - One of the solutions for scenario 3 can be spawning a nested Akka stream inside
mapAsync
, running it to completion, and then performing necessary actions. To help with that, bothProducer.plainSink
andProducer.committableSink
return aFuture
as a materialized value allowing to monitor stream termination. However, Cloudflow counterparts are not designed to be used in a stream which lifetime is shorter that that of the streamlet.
Is your feature request related to a specific runtime of cloudflow or applicable for all runtimes?
It's related to Akka Streamlets.
Describe the solution you'd like
One option would be adding more sinks and flows to Cloudflow API:
- flow similar to
flexiFlow
, - sink that materializes to a
Future
and does not try to terminate the streamlet on stream completion (optional, as this can be done withflexiFlow
), - committable sink which takes
(Seq[T], Committable)
as its element.
Another option might be providing some API for adapting Alpakka Kafka sinks and flows to work with Cloudflow CodecOutlet
.
Describe alternatives you've considered
I tried to add a Flow similar to a flexiFlow
from Alpakka Kafka. For that I used some of AkkaStreamletContext
public methods and copy-pasted some code from private ones:
import akka.NotUsed
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Flow
import cloudflow.akkastream.AkkaStreamletContext
import cloudflow.streamlets.{CodecOutlet, Topic}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import java.nio.charset.StandardCharsets
object AkkaStreamletContextExt {
def flexiFlow[T, PassThrough](
outlet: CodecOutlet[T]
)(implicit context: AkkaStreamletContext): Flow[(T, PassThrough), PassThrough, NotUsed] = {
val topic = context.findTopicForPort(outlet)
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new ByteArraySerializer)
.withBootstrapServers(context.runtimeBootstrapServers(topic))
.withProperties(topic.kafkaProducerProperties)
Flow[(T, PassThrough)]
.map { case (value, committable) =>
ProducerMessage.Message(producerRecord(outlet, topic, value), committable)
}
.via(Producer.flexiFlow(producerSettings))
.map(results => results.passThrough)
}
private def producerRecord[T](outlet: CodecOutlet[T], topic: Topic, value: T) = {
val key = outlet.partitioner(value)
val bytesKey = keyBytes(key)
val bytesValue = outlet.codec.encode(value)
new ProducerRecord(topic.name, bytesKey, bytesValue)
}
private def keyBytes(key: String) = if (key != null) key.getBytes(StandardCharsets.UTF_8) else null
}
However, in this case I can no longer test my streamlet with AkkaStreamletTestKit
.
Hi @vkorenev thanks for the feedback!
In general, there is absolutely nothing wrong with using Alpakka Kafka directly. I agree it would be nice, as you say:
Another option might be providing some API for adapting Alpakka Kafka sinks and flows to work with Cloudflow CodecOutlet.
We can check what we can do in this regard.
Right now, (as you show in your example) you can use the codec directly on CodecOutlet
/ CodecInlet
, which is available as codec
method.
The StreamletContext provides findTopicForPort(port)
, where port
is an inlet
or an outlet
you can pass in, which returns a Topic
, which you can use to get the Kafka bootstrapServers with the method runtimeBootstrapServers(topic)
.
This provides you with everything you need to create an Alpakka Kafka Source / Sink from ConsumerSettings or ProducerSettings. But I agree this is not the most user friendly right now, which we can improve upon.
(Sorry, I just noticed that this is exactly what you did)
I'm thinking maybe we can add some convenience methods to create a ConsumerSettings or ProducerSettings from an inlet, outlet respectively. Another convenience is to add a method to create a ProducerRecord, as you have in your example. What do you think?
If Alpakka Kafka would support the idea of a topic-id, a way to configure topics by id in the configuration, and methods to create a source / sink from a particular topic-id, we could leverage this in Cloudflow. All that would be left then is to plugin the codec, which could possible also be added in some way. This is just me brainstorming, let me know if this makes sense.
Some more comments, and questions for clarification:
For:
- After a message is sent successfully to an outlet, some actions need to be performed other than committing a Kafka offset.
I assume when you say "other than committing a Kafka offset", you mean you want to commit the offset, and do something after the commit is successful? This assumes that commits are completed per message, and that the next operation only occurs after the commit has occurred? (Please note that this has performance consequences.)
For points 2 and 3:
- For each message received from a Kafka topic, a collection of outbound messages is generated. The offset for inbound message should be committed after all outbound messages generated from it are successfully written to outlet.
- Same as case 2, but an Akka stream of outbound messages is generated instead of a strict collection. This stream can be potentially very large.
In general, the committableSink(outlet)
can be used to acknowledge reading from an inlet, as well as writing messages to the outlet, at the same time. The committableSink
commits in batches. You can configure the committing / acknowledging reads behaviour with CommitterSettings
. Batching acknowledgements is good for performance, but it obviously results in more duplicate messages on restart. So I at least wanted to point out that you can configure this CommitterSettings
differently.
If you create many messages for every message in the source, commits happen after a change of the offset is observed. In CommitterSettings
this is set by default to NextOffsetObserved
So if you use, say mapConcat
on a source, this already works. For this to work you need to use the sourceWithCommittableContext
. if you generate more messages per incoming message, or even streams of messages per message, as long as the context is retained, the committer will commit on next offset observed.
Let me know if I am misunderstanding your need, or if there is something that you need specifically in this case, that is not supported by ...WithCommittableContext
methods and the CommitterSettings
, defaulting to NextOffsetObserved
.
Please also note that the AkkaStreamletTestkit
does not provide features for acknowledging / committing offsets.
Hi @RayRoestenburg, thank you for your reply!
In general, there is absolutely nothing wrong with using Alpakka Kafka directly. I agree it would be nice, as you say:
Another option might be providing some API for adapting Alpakka Kafka sinks and flows to work with Cloudflow CodecOutlet.
We can check what we can do in this regard. Right now, (as you show in your example) you can use the codec directly on
CodecOutlet
/CodecInlet
, which is available ascodec
method. The StreamletContext providesfindTopicForPort(port)
, whereport
is aninlet
or anoutlet
you can pass in, which returns aTopic
, which you can use to get the Kafka bootstrapServers with the methodruntimeBootstrapServers(topic)
. This provides you with everything you need to create an Alpakka Kafka Source / Sink from ConsumerSettings or ProducerSettings. But I agree this is not the most user friendly right now, which we can improve upon. (Sorry, I just noticed that this is exactly what you did)I'm thinking maybe we can add some convenience methods to create a ConsumerSettings or ProducerSettings from an inlet, outlet respectively. Another convenience is to add a method to create a ProducerRecord, as you have in your example. What do you think?
That would be nice. The only possible problem which I see here is if this can be made to work with the test kit. For example, the code which I provided throws at val topic = context.findTopicForPort(outlet)
in tests. That's because the AkkaStreamletTestKit
sets the context to cloudflow.akkastream.testkit.TestContext
which overrides everything related to Kafka with its own implementation.
- After a message is sent successfully to an outlet, some actions need to be performed other than committing a Kafka offset.
I assume when you say "other than committing a Kafka offset", you mean you want to commit the offset, and do something after the commit is successful? This assumes that commits are completed per message, and that the next operation only occurs after the commit has occurred? (Please note that this has performance consequences.)
I mean some non-Kafka source and Kafka sink. Since the source is not Kafka, the "commit" action is not committing Kafka offset, but some other source-specific action. For example, if the source is the Alpakka adapter for JMS, then the action may be calling commit()
on the source element. If the source is a directory with files, the action could be deleting or moving the file. But what is important is that the action needs to be performed after the message(s) generated for the source element have been successfully written to the Kafka producer.
The most straightforward approach would probably be implementing custom Committable
and passing it to Cloudflow's committableSink(outlet)
. However, Committable
is not supposed to be extended as it is declared with @DoNotInherit
annotation. Also my understanding is that the default behaviour of the committableSink
is to commit once in a while for performance reasons, which is good for Kafka source. But other sources may require every element to be "committed". However, this might be solved by providing CommitterSettings
configured to commit every single message.
- For each message received from a Kafka topic, a collection of outbound messages is generated. The offset for inbound message should be committed after all outbound messages generated from it are successfully written to outlet.
- Same as case 2, but an Akka stream of outbound messages is generated instead of a strict collection. This stream can be potentially very large.
In general, the
committableSink(outlet)
can be used to acknowledge reading from an inlet, as well as writing messages to the outlet, at the same time. ThecommittableSink
commits in batches. You can configure the committing / acknowledging reads behaviour withCommitterSettings
. Batching acknowledgements is good for performance, but it obviously results in more duplicate messages on restart. So I at least wanted to point out that you can configure thisCommitterSettings
differently.If you create many messages for every message in the source, commits happen after a change of the offset is observed. In
CommitterSettings
this is set by default to NextOffsetObservedSo if you use, say
mapConcat
on a source, this already works. For this to work you need to use thesourceWithCommittableContext
. if you generate more messages per incoming message, or even streams of messages per message, as long as the context is retained, the committer will commit on next offset observed.
Nice! Thanks for explaining! I didn't know about that. Yes, this is a solution for one input Kafka message being mapped to many output Kafka messages. However, there might be a case with a non-Kafka source element being mapped to multiple Kafka output elements.