divolte-collector icon indicating copy to clipboard operation
divolte-collector copied to clipboard

Allow separation of event type in different Kafka topics

Open friso opened this issue 10 years ago • 8 comments

Similar to creating different HDFS files for different event types, it would be nice to be able to do the same for Kafka topics.

friso avatar Aug 13 '14 09:08 friso

are you planning on implementing this feature anytime soon ? it would be really useful to directly track the interesting events in spark streaming

lucleray avatar Jul 20 '15 13:07 lucleray

There is a larger effort underway to be able to have different mappings for events go in different directions, meaning that separating out certain events to different Kafka topics would come for free.

Could you perhaps elaborate a bit on your use case? Perhaps I can provide some guidance on how to work around this using the current version of the collector. You can comment here or if you like post to the mailing list.

friso avatar Jul 20 '15 13:07 friso

I will detail what is our goal that lucleray mentionned.

Currently we have a kafka consumer reading the topic "divolte" and storing all the data into a cassandra db for two month but we want at the same time different topic with specific event_type.

Example: In our divolte topic we have event_type like displayPage, showBanner and clickBanner. We want to store all this event in cassandra but at the same time only retrieve event of type showBanner to update a mab process elsewhere.

I was thinking about creating a dispatcher kafka consumer rewriting the data into a new queue but i'm stuck with converters and data format.

So if divolte-collector can provide a queue for specific event I would be pleased. Or if you have any advise on how to create this dispatcher.

Thanks ahead

BHMath avatar Jul 21 '15 09:07 BHMath

Here something I just did and seems to work perfectly

val ctx = getStreamingContext(Seconds(1), "divolte-collector-streaming-dispatcher")

  val topicSettings = Map("divolte" -> Runtime.getRuntime.availableProcessors())
  val byteStream = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ctx, getConsumerConfig(), topicSettings, StorageLevel.MEMORY_ONLY)
  val decoder = AvroDecoder[GenericRecord](MyEventRecord.getClassSchema)

  byteStream.foreachRDD { byteRdd =>
    byteRdd.collect().foreach { element =>
      val value = decoder.decode(element._2)
      val eventType = value.get("event_type").toString

      eventType match {
        case "showBanner" => {
          println("BANNER showed")
          MyProducer.sendMessage("divolte-banner", element._1, element._2)
        }
        case "clickBanner" => {
          println("BANNER Clicked")
          MyProducer.sendMessage("divolte-banner", element._1, element._2)
        }
        case _ => println(eventType)
      }
    }
  }

  // Start the stream and await termination.
  ctx.start()
  ctx.awaitTermination()

Then i can easily connect to the kafka topic divolte-banner and use the toRecords option or fields.

BHMath avatar Jul 22 '15 09:07 BHMath

Re-producing from a initial consumer is an option for sure. It would generate some overhead in the additional messages, though.

Another option is to just create two consumers: one for clicks and one for shows. The former will filter the stream on click events, the latter on show events and both will discard the events that they don't require. If you put these consumers in different consumer groups, they can co-exists without interfering.

This way I think you have a bit more flexibility and less tight coupling of separate components. Also, when we get to the point where Divolte allows you to separate the events onto different topics based on event type, the code changes to benefit from this would be minimal on your side.

friso avatar Jul 22 '15 11:07 friso

I find it simpler just to have a Kafka Connector that swallows from the divolte producer's topic and reroutes the event as needed (generally publishing to another Kafka topic). Sure it increases the load on Kafka, but that tends to be an easily scalable resource, and it makes a lot of operational matters simpler.

Honestly, we're using the HDFS appender, but I'm sorely tempted to turn it off and just have a Kafka Connector do that job. It gives me the flexibility of writing directly to cheaper storage than HDFS as well (like S3 ;-).

cbsmith avatar Dec 04 '15 06:12 cbsmith

Yeah, re-routing by reproducing the events onto other topics is pretty common as well. I think the decision for one or the other will depend on your use case and context.

BTW: note that Divolte is writing to HDFS through the normal HDFS client API, which incidentally also supports writing to S3 directly. Still, using only Kafka and taking care of sending events to storage yourself is definitely a good option. The reason for having both HDFS and Kafka support is really that we'd like to have a nicer out-of-the-box experience for users who don't need streaming support (managing Kafka can be painful).

friso avatar Dec 04 '15 06:12 friso

Yeah, I grok the use case for having both. I hadn't thought about using the HDFS client API with the s3 URL. I may try that out.

cbsmith avatar Dec 04 '15 07:12 cbsmith