kafka-streams icon indicating copy to clipboard operation
kafka-streams copied to clipboard

no .mapWrapKafkaKey() & .wrapAsKafkaKey()

Open cnwme opened this issue 4 years ago • 4 comments

Hi, To add Key on produce messages after .filter() was not possible for me, always null even if the consuming message got a value. Is There another way to push it to the producer with the : .to() ? Regards Martin

cnwme avatar Aug 07 '20 14:08 cnwme

Hi @cnwme , Are you able to provide more context and some code examples of what you are trying to achieve?

rob3000 avatar Aug 11 '20 00:08 rob3000

Hi, At first i'm sorry about delay. i'm trying to test a kafka stream to read topic A and with some filtering if it's true, put data to topic B as is in topic A (Key and Value event). When using .mapWrapKafkaValue & .wrapAsKafkaValue we got the message value (on topic B) but the Key still null even if it contain values on topic A.

here is my code.

const { Kafka, logLevel, CompressionCodecs, CompressionTypes } = require('kafkajs')
const dotenv = require('dotenv');
const {KEY} = require('uuid');
const { debug, info, Console } = require('console');
dotenv.config({ path: '.env' });
const { pid, cpuUsage } = require('process');
const idGroup = process.env.KAFKA_C_GROUPID;
const idProducer = process.env.PRODUCERID;
const mainTopics = process.env.KAFKA_C_TOPIC ? process.env.KAFKA_C_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const subTopics = process.env.KAFKA_P_TOPIC ? process.env.KAFKA_P_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const brokerString = process.env.KAFKA_BROKERS_STRING ? process.env.KAFKA_BROKERS_STRING.split(',') : ['kafka:9092']
const { KafkaStreams } = require('kafka-streams');
const config = dotenv.config({path: 'config'});
var now = new Date().toISOString();
const { nativeConfig: configure } = require("./config.js")
/** INIT Filtering */
const FilteringStreamMonoKafka = new KafkaStreams(configure);
/** INIT consumer */
const kafkaMainTopic = mainTopics;
/** INIT Producers */
const kafkaSubtopicBAL = process.env.KAFKA_P_STREAM_BAL;
const stream = FilteringStreamMonoKafka.getKStream()
FilteringStreamMonoKafka.on("error", (error) => console.error(error));
    /** Start stream  */
    stream
    /** from topic consumer */
    .from(kafkaMainTopic)
    /** condition */
    stream.filter( (message) => { 
        console.log(message.value.toString())
        /** condition push message to producer */
        /*
        return ( 
            JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`
            )
        */
       if (JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`){
           return true
       }
    })
    .mapJSONConvenience(kafkaSubtopicBAL)
    .mapWrapKafkaValue(kafkaSubtopicBAL)
    .wrapAsKafkaValue(kafkaSubtopicBAL)
    .to(kafkaSubtopicBAL, "auto", "buffer");
stream.start()

Regards Martin

cnwme avatar Aug 17 '20 08:08 cnwme

Thanks for the info @cnwme i'll take a look into this shortly.

rob3000 avatar Aug 18 '20 03:08 rob3000

Hi @rob3000 , any update ?

Regards Martin

cnwme avatar Sep 07 '20 10:09 cnwme