streamz
streamz copied to clipboard
Application level ACK of in-only message exchanges
In addition to replying to consumed in-out message exchanges also support delayed ack of consumed in-only message exchanges e.g. by extending the DSL with an .ack
element. Implementation of this features should also cover #32.
Stream errors (see also difference between errors and failures) should be translatable to negative acknowledgements (exceptions or faults) on individual message exchanges while stream failures should fail all message exchanges that are currently being processed.
Where does this ticket lie on the development roadmap? I have a project requiring application-level JMS ack, and need to decide between waiting for this ticket or implementing another solution.
Thanks.
I also need this feature! how can I help implementing it?
@igreenfield glad to hear 😃. Essentially, this is going to be a special case of (a more generic) EndpointConsumerReplier which is going to replace EndpointConsumer. I still plan working on it in September but PR are welcome of course!
I could not find where the actual ack is happening. could you point me to it?
Here: https://github.com/krasserm/streamz/blob/master/streamz-camel-akka/src/main/scala/streamz/camel/akka/EndpointConsumerReplier.scala#L76
What do you think is the right way of giving the callback object to the application?
What are you trying to achieve?
How the application can ack the exchange without getting it?
By adding an ack
to the DSL that works like reply except that ack
only completes the exchange without setting a response.
ok. thanks. I still don't understand how in so high level I can ack the exchange.
What exactly is unclear?
ok I start adding this to the DSL:
class AckDsl[A, M](val self: Flow[A, A, M]) {
def ack: RunnableGraph[M] = self.joinMat(Flow[A])(Keep.left)
}
implicit def ackDsl[A, M](self: Flow[A, A, M]): AckDsl[A, M] =
new AckDsl(self)
def ackExchange[A](exchange: Exchange, parallelism: Int = 1)(implicit context: StreamContext): Graph[FlowShape[StreamMessage[A], StreamMessage[A]], NotUsed] =
Flow[StreamMessage[A]].mapAsync(parallelism)(ack[A](exchange))
private def ack[A](exchange: Exchange)(implicit context: StreamContext): Future[StreamMessage[A]] = {
val promise = Promise[StreamMessage[A]]()
Future {
try {
val message = StreamMessage.from[A](exchange.getIn)
context.consumerTemplate.doneUoW(exchange)
promise.success(message)
} catch {
case e: Exception => promise.failure(e)
}
}(ExecutionContext.fromExecutorService(context.executorService))
promise.future
}
but this can't work... hence I don't have the exchange...
reply
works in conjunction with receiveReuqest which uses EndpointConsumerReplier under the hood.
ack
requires a re-implementation of receive so that is uses a modified version of EndpointConsumerReplier
under the hood (i.e. a EndpointConsumerAcker
or whatever, I'm sure you find a better name 😃).
They'll have much logic in common, so a proper abstraction is needed here.
I see I don't have time to work on this now. sorry,
No problem, thanks for your interest.
@krasserm: I'm interested in taking a shot at this and I have some questions:
Essentially, this is going to be a special case of (a more generic)
EndpointConsumerReplier
which is going to replaceEndpointConsumer
.
ack
requires a re-implementation ofreceive
so that is uses a modified version ofEndpointConsumerReplier
under the hood....
They'll have much logic in common, so a proper abstraction is needed here.
-
Do you mean that
EndpointConsumerReplier
will become "more generic"? If so, what will be its new type? -
Will the replacement for
EndpointConsumer
be an implementation of a modifiedEndpointConsumerReplier
?
It's unclear to me what the new types should be, given that the current EndpointConsumerReplier
is of type GraphStage[FlowShape[StreamMessage[A], StreamMessage[B]]]
and EndpointConsumer
is a GraphStage[SourceShape[StreamMessage[A]]]
.
@chunjef sorry for the late reply. I didn't think through the design yet but, in a first step, I'd start re-implementing only EndpointConsumer
to achieve the goal of this this ticket, leaving EndpointConsumerReplier
as-is for the moment. You'll most likely end up with something that is highly redundant to EndpointConsumerReplier
. In a second step, I'd then create new abstractions that serve as implementation basis for both concrete solutions. This should avoid the problem of premature abstraction I think. Hope that helps.