kafka-streams
kafka-streams copied to clipboard
no .mapWrapKafkaKey() & .wrapAsKafkaKey()
Hi, To add Key on produce messages after .filter() was not possible for me, always null even if the consuming message got a value. Is There another way to push it to the producer with the : .to() ? Regards Martin
Hi @cnwme , Are you able to provide more context and some code examples of what you are trying to achieve?
Hi, At first i'm sorry about delay. i'm trying to test a kafka stream to read topic A and with some filtering if it's true, put data to topic B as is in topic A (Key and Value event). When using .mapWrapKafkaValue & .wrapAsKafkaValue we got the message value (on topic B) but the Key still null even if it contain values on topic A.
here is my code.
const { Kafka, logLevel, CompressionCodecs, CompressionTypes } = require('kafkajs')
const dotenv = require('dotenv');
const {KEY} = require('uuid');
const { debug, info, Console } = require('console');
dotenv.config({ path: '.env' });
const { pid, cpuUsage } = require('process');
const idGroup = process.env.KAFKA_C_GROUPID;
const idProducer = process.env.PRODUCERID;
const mainTopics = process.env.KAFKA_C_TOPIC ? process.env.KAFKA_C_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const subTopics = process.env.KAFKA_P_TOPIC ? process.env.KAFKA_P_TOPIC.split(',') : ['TEST_KAFKA','TEST_KAFKA2']
const brokerString = process.env.KAFKA_BROKERS_STRING ? process.env.KAFKA_BROKERS_STRING.split(',') : ['kafka:9092']
const { KafkaStreams } = require('kafka-streams');
const config = dotenv.config({path: 'config'});
var now = new Date().toISOString();
const { nativeConfig: configure } = require("./config.js")
/** INIT Filtering */
const FilteringStreamMonoKafka = new KafkaStreams(configure);
/** INIT consumer */
const kafkaMainTopic = mainTopics;
/** INIT Producers */
const kafkaSubtopicBAL = process.env.KAFKA_P_STREAM_BAL;
const stream = FilteringStreamMonoKafka.getKStream()
FilteringStreamMonoKafka.on("error", (error) => console.error(error));
/** Start stream */
stream
/** from topic consumer */
.from(kafkaMainTopic)
/** condition */
stream.filter( (message) => {
console.log(message.value.toString())
/** condition push message to producer */
/*
return (
JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`
)
*/
if (JSON.parse(message.value.toString()).values.brand[0].data === `${process.env.FILTER_BRAND_BAL}`){
return true
}
})
.mapJSONConvenience(kafkaSubtopicBAL)
.mapWrapKafkaValue(kafkaSubtopicBAL)
.wrapAsKafkaValue(kafkaSubtopicBAL)
.to(kafkaSubtopicBAL, "auto", "buffer");
stream.start()
Regards Martin
Thanks for the info @cnwme i'll take a look into this shortly.
Hi @rob3000 , any update ?
Regards Martin