streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Application level ACK of in-only message exchanges

Open krasserm opened this issue 7 years ago • 17 comments

In addition to replying to consumed in-out message exchanges also support delayed ack of consumed in-only message exchanges e.g. by extending the DSL with an .ack element. Implementation of this features should also cover #32.

Stream errors (see also difference between errors and failures) should be translatable to negative acknowledgements (exceptions or faults) on individual message exchanges while stream failures should fail all message exchanges that are currently being processed.

krasserm avatar Aug 02 '17 14:08 krasserm

Where does this ticket lie on the development roadmap? I have a project requiring application-level JMS ack, and need to decide between waiting for this ticket or implementing another solution.

Thanks.

ssturdivant avatar Aug 25 '17 20:08 ssturdivant

I also need this feature! how can I help implementing it?

igreenfield avatar Sep 10 '17 08:09 igreenfield

@igreenfield glad to hear 😃. Essentially, this is going to be a special case of (a more generic) EndpointConsumerReplier which is going to replace EndpointConsumer. I still plan working on it in September but PR are welcome of course!

krasserm avatar Sep 10 '17 08:09 krasserm

I could not find where the actual ack is happening. could you point me to it?

igreenfield avatar Sep 10 '17 10:09 igreenfield

Here: https://github.com/krasserm/streamz/blob/master/streamz-camel-akka/src/main/scala/streamz/camel/akka/EndpointConsumerReplier.scala#L76

krasserm avatar Sep 10 '17 12:09 krasserm

What do you think is the right way of giving the callback object to the application?

igreenfield avatar Sep 14 '17 09:09 igreenfield

What are you trying to achieve?

krasserm avatar Sep 14 '17 10:09 krasserm

How the application can ack the exchange without getting it?

igreenfield avatar Sep 14 '17 10:09 igreenfield

By adding an ack to the DSL that works like reply except that ack only completes the exchange without setting a response.

krasserm avatar Sep 14 '17 10:09 krasserm

ok. thanks. I still don't understand how in so high level I can ack the exchange.

igreenfield avatar Sep 14 '17 11:09 igreenfield

What exactly is unclear?

krasserm avatar Sep 14 '17 12:09 krasserm

ok I start adding this to the DSL:

  class AckDsl[A, M](val self: Flow[A, A, M]) {
    def ack: RunnableGraph[M] = self.joinMat(Flow[A])(Keep.left)
  }

 implicit def ackDsl[A, M](self: Flow[A, A, M]): AckDsl[A, M] =
    new AckDsl(self)

def ackExchange[A](exchange: Exchange, parallelism: Int = 1)(implicit context: StreamContext): Graph[FlowShape[StreamMessage[A], StreamMessage[A]], NotUsed] =
    Flow[StreamMessage[A]].mapAsync(parallelism)(ack[A](exchange))

 private def ack[A](exchange: Exchange)(implicit context: StreamContext): Future[StreamMessage[A]] = {
    val promise = Promise[StreamMessage[A]]()

    Future {
      try {
        val message = StreamMessage.from[A](exchange.getIn)
        context.consumerTemplate.doneUoW(exchange)
        promise.success(message)
      } catch {
        case e: Exception => promise.failure(e)
      }
    }(ExecutionContext.fromExecutorService(context.executorService))

    promise.future
  }

but this can't work... hence I don't have the exchange...

igreenfield avatar Sep 14 '17 12:09 igreenfield

reply works in conjunction with receiveReuqest which uses EndpointConsumerReplier under the hood.

ack requires a re-implementation of receive so that is uses a modified version of EndpointConsumerReplier under the hood (i.e. a EndpointConsumerAcker or whatever, I'm sure you find a better name 😃).

They'll have much logic in common, so a proper abstraction is needed here.

krasserm avatar Sep 14 '17 13:09 krasserm

I see I don't have time to work on this now. sorry,

igreenfield avatar Sep 17 '17 06:09 igreenfield

No problem, thanks for your interest.

krasserm avatar Sep 17 '17 13:09 krasserm

@krasserm: I'm interested in taking a shot at this and I have some questions:

Essentially, this is going to be a special case of (a more generic) EndpointConsumerReplier which is going to replace EndpointConsumer.

ack requires a re-implementation of receive so that is uses a modified version of EndpointConsumerReplier under the hood....

They'll have much logic in common, so a proper abstraction is needed here.

  1. Do you mean that EndpointConsumerReplier will become "more generic"? If so, what will be its new type?

  2. Will the replacement for EndpointConsumer be an implementation of a modified EndpointConsumerReplier?

It's unclear to me what the new types should be, given that the current EndpointConsumerReplier is of type GraphStage[FlowShape[StreamMessage[A], StreamMessage[B]]] and EndpointConsumer is a GraphStage[SourceShape[StreamMessage[A]]].

chunjef avatar Nov 10 '17 21:11 chunjef

@chunjef sorry for the late reply. I didn't think through the design yet but, in a first step, I'd start re-implementing only EndpointConsumer to achieve the goal of this this ticket, leaving EndpointConsumerReplier as-is for the moment. You'll most likely end up with something that is highly redundant to EndpointConsumerReplier. In a second step, I'd then create new abstractions that serve as implementation basis for both concrete solutions. This should avoid the problem of premature abstraction I think. Hope that helps.

krasserm avatar Nov 14 '17 08:11 krasserm