node-rdkafka icon indicating copy to clipboard operation
node-rdkafka copied to clipboard

Proposal: Implement transformation middleware

Open thanpolas opened this issue 7 years ago • 10 comments

When implementing kafka-avro I resorted to hacking node-rdkafka so I could intercept Produced and Consumed messages and encode or decode them using the appropriate avro schema.

I wanted to know if I put the effort to design and spec with your approval and implement a middleware transformation pattern for all Producing and Consuming methods, if you would accept such a pull request.

The idea is to provide a method for end-users or libraries to be able to transform a message before it will be actually produced or consumed. Something like:

consumer.transform(function(message) {
  message.id = message.uuid;

  return message;
});

The transform() methods would accept multiple calls and would serially store the middleware to be consecutively executed. It could also be asynchronous if the value returned is a promise.

What do you think?

thanpolas avatar Feb 17 '17 11:02 thanpolas

I definitely see the appeal and use case for using Kafka this way. There is even a utility in the Java client called "KafkaStreams" which are meant to transform data like this. However, I think given that the spirit of the library is to try and be a 1:1 mapping to the underlying c++ as much as possible, with language specific enhancements, it doesn't necessarily fit in this package.

However, node is great at doing things like this with prototyping, so I don't think there is anything stopping you from achieving this pretty easily.

const Producer = require('node-rdkafka').Producer;

Producer.prototype._oldproduce = Producer.prototype.produce;

Producer.prototype.produce = function(obj) {
   // JSON encode the data before we produce it
   return this._oldproduce('topic', null, JSON.stringify(obj), null);
}

If this doesn't work for your case I definitely would support this being added as a supporting library to add this functionality, but for now I even hold myself back adding new features that I feel don't fit with the main spirit of the library in being bindings.

webmakersteve avatar Feb 22 '17 22:02 webmakersteve

If this doesn't work for your case I definitely would support this being added as a supporting library to add this functionality,

That is exactly what kafka-avro does, the only difference being that it also couples the logic of avro de/serializing into that pipeline.

However I have to tell you that things don't look pretty in trying to do so. The produce() part that you mentioned is the easiest implementation of the 4 parts that need to be implemented:

  • Produce using produce().
  • Produce using writeable streams.
  • Consume using consumer.on().
  • Consume using readable streams.

It is a nasty hack that has to go all the way down to the EventEmitter's on method in case of consumer.on() usage and an even uglier hack involving a transformation stream. You can view the implementation details in the kafka-consumer.js module of kafka-avro.

So, yea, it's already done, it's ugly and dangerous. What I am trying to accomplish here is make node-rdkafka more extensible and safer to use for all scenarios. Avro de/serializing and confluent tooling is not that much of an edge case when it comes to Kafka so I'd encourage you to reconsider and green light this, since, as you already know, there is a paradigm shift in how we do microservices and kafka will get into the spotlight in the following years.

If you're still not convinced feel free to close the issue.

thanpolas avatar Mar 02 '17 09:03 thanpolas

Anything I can do to make consuming and producing data this way easier is something I'd be happy to help with. I think one of our Kafka use-cases is pretty similar to what you're describing:

  1. We read data off of kafka. The data is in Protocol Buffer serialized binary.
  2. We deserialize the data into a JavaScript object representation of the protocol buffer.
  3. We transform the data into a new schema, filling in certain fields that our transformer is responsible for, and fetch a more specific schema to fit the message into.
  4. We then serialize this as binary and produce it to a new topic, which is determined by parts of the data in the payload.

I imagine this is pretty similar to what you're doing in your transform steps. Looking at your code, you're using the kafka.KafkaConsumer class as your consumer without any composition. This is definitely the dream! - If it's possible.

But I feel that this proposal is more an evidence of the failure of the Readable/Writable streams being as useful as it should be. And I definitely get that. There are some changes I'd like to make to it that I haven't gotten to. Perhaps we can work together to draft a proposal for how those should look to make this use case work better, so transforming your data is as easy as piping into a generic transform stream.

webmakersteve avatar Mar 02 '17 17:03 webmakersteve

While I like the idea of simplifying use-cases, the primary goal of this library is to be a performant way to produce/consume from Kafka. This falls outside of that goal since it's merely a convenient way to do transforms that can be done prior/after the publish/consume step already (or as Stephen suggested, prototyped in).

Beyond being outside of primary goals, a couple of other considerations:

  • How to handle transform failures?
  • Simply checking if the event was hooked for every message would be expensive, so (attempting to) optimizing that way would introduce code complexity and risk
  • Even with optimizations, the performance hit would still be significant at scale (thousands/millions per second)

jordansirwin avatar Mar 02 '17 17:03 jordansirwin

But I feel that this proposal is more an evidence of the failure of the Readable/Writable streams being as useful as it should be.

Please correct me if I'm wrong here, cause it's not very clear, but I am under the impression (I think i read it somewhere in the docs) that using the .on() event-listener is faster for throughput than Streams. So naturally, that is why we chose to go for .on().

I'm really glad you laid out your workflow @webmakersteve, we now have a common denominator. I am assuming that your protobuf de/serialize methods exist in every consumer/producer throughout your codebase, I'll need you to think how you would go about making that a library for all your services to re-use. We, at Waldo, are investing heavily on this, we nearly bring online a new node service depending on node-rdkafka every week, so normalization of the methods and DRY principles had to apply.

There are 3 touch points that need to be addressed:

  1. Configuring and connecting the Consumers and Producers.
  2. Deserializing on consume.
  3. Serializing on produce.

Ideally, we would want all of this to happen from a single library. Thus the "full-wrapper" decision in kafka-avro. It would simplify my life to have the node-rdkafka Consumer or Producer instances configured and have a "thin-wrapper" around that, re-emitting events after deserialization and providing a produce() interface which serializes behind the scenes.

The catch here is that "it would simplify my life", but give a really hard time to developers who will now have to manage 2 third-party dependencies and their version parity and wire things up at runtime. In the long run, there will be measurable losses in efficiency with this solution.

So I think, it would be wise to have a single do-it-all library. My proposal, and to respond to @jordansirwin concerns, is to apply the Middleware pattern, a well-established technique championed by express.

The performance concerns are there but I do not believe they will be so dramatic; If anything else, one could not call Express non-performant, so we can get there. Plus, in today's cloud world we depend on multiple instances running the same service, so this concern is somewhat alleviated by that fact.

I am really excited to be having this conversation and grateful for all the work you've done. I'm sorry if I sound like I'm pressing for anything here, it's just the way I express myself. We can do our job today as things are, I'm just letting you know that this use case exists.

thanpolas avatar Mar 03 '17 11:03 thanpolas

Here's my two cents :)

It is oh so tempting to expand on this libraries features, but we have to convince even ourselves that this is merely a mapping to librdkafka and nothing more. I think the fact it's a bit messy for you to add transforms is actually evidence we didn't stick to that well enough! If there were one and only one way to produce/consume, you'd have only one set of methods to prototype. Since we have streams and events, it's that much more work. Of course, I still think it was the right thing to do, since streams are basically nodes expected access pattern for this kind of thing.

We do in fact have a common library that is shared and takes care of the heavy lifting of transformations. But it's used as a separate stream we pipe into.

Instead of making rdkafka your shared lib with common transform config, can you simply fully wrap it, and not expect your services to use it directly? Then you are in complete control over how rdkafka is used and need not support transforms for every possible produce/consume pattern.

InfinitiesLoop avatar Mar 03 '17 15:03 InfinitiesLoop

Instead of making rdkafka your shared lib with common transform config, can you simply fully wrap it, and not expect your services to use it directly?

We've come full circle here, this is the starting point, this is where we are today (read first posts).

I've seen throughout this thread the point of the node-rdkafka's purpose and how it's job is to wrap around librdkafka and do only that.

Nobody said anything to the contrary. I don't know at which point I might have been misunderstood but I do not expect anything more from nore-rdkafka. To be a wrapper for the native librdkafka.

What I'm merely saying is that the way this is done, is problematic for the library to be extensible. 2 patterns for message consumption exist today, events and streams.

Both of them are problematic when trying to extend (or wrap) on the library. Events would have to be re-emitted or hacked (like kafka-avro does) and streams require a complex transformation type stream which in the end ultimately is also an event the end-consumer has to listen to (albeit with a different mechanism).

Stream transformations as ways of manipulating data has always been on the fringes of the node community, if only because of their complexity and higher skill levels required to handle (my point being here that we're not helping the library be ubiquitous to all developers).

Same concerns apply on the Production part.

So I think this issue requires some deeper thinking than just "adding a new feature or a switch", it involves the strategy and architecture of the library wholesale.

thanpolas avatar Mar 04 '17 12:03 thanpolas

If I'm understanding correctly it looks like this may make it into librdkafka:

https://github.com/edenhill/librdkafka/pull/1191

webmakersteve avatar Apr 29 '17 20:04 webmakersteve

Note that the librdkafka interceptor interface does not support message mutability, they're for intercepting, not transforming/enveloping/serialization.

edenhill avatar May 10 '17 09:05 edenhill

It's also worth keeping in mind that wrapping your produce and consume calls in something like higher-order functions that do the (de-)serialization before sending / after receiving, is not exactly rocket surgery in Javascript.

So while I can see the appeal of having that ability as part of this library out-of-the-box, keeping its purpose as a straightforward wrapper of rdkafka seems more valuable in the long run.

I reckon most people are likely to wrap their kafka interactions in custom provider functions anyway, so adding the (de-)serialization there would be the natural thing to do.

batjko avatar Mar 11 '18 12:03 batjko