moleculer icon indicating copy to clipboard operation
moleculer copied to clipboard

Need replacement for kafka transporter

Open nimdeveloper opened this issue 2 years ago • 4 comments

kafka-node is not active, I think it's better to move on a new lib for Kafka transporter.

See: kafka-node

nimdeveloper avatar Mar 23 '23 12:03 nimdeveloper

I wrote a custom transporter based on kafkajs and current kafka.js file.I hope it helps

/*
 * moleculer
 * Copyright (c) 2019 MoleculerJS (https://github.com/moleculerjs/moleculer)
 * MIT Licensed
 */

"use strict";

const { defaultsDeep } = require("lodash");
const Transporter = require("moleculer").Transporters.Base;
const C = require("./constants");

/**
 * Lightweight transporter for Kafka
 *
 * For test:
 *   1. clone https://github.com/wurstmeister/kafka-docker.git repo
 *   2. follow instructions on https://github.com/wurstmeister/kafka-docker#pre-requisites
 * 	 3. start containers with Docker Compose
 *
 * 			docker-compose -f docker-compose-single-broker.yml up -d
 *
 * @class KafkaTransporter
 * @property {ServiceBroker} broker
 * @property {GenericObject} opts
 * @property {LoggerInstance} logger
 * @property {boolean} connected
 * @extends {Transporter}
 */
class KafkaTransporter extends Transporter {
	/**
	 * Creates an instance of KafkaTransporter.
	 *
	 * @param {any} opts
	 *
	 * @memberof KafkaTransporter
	 */
	constructor(opts) {
		if (typeof opts === "string") {
			opts = { brokers: opts.replace("kafka://", "") };
		} else if (opts == null) {
			opts = {};
		}

		opts = defaultsDeep(opts, {
			// KafkaClient options. More info: https://kafka.js.org/docs/configuration
			client: {
				brokers: (Array.isArray(opts.brokers) ? opts.brokers : [opts.brokers])
			},

			// KafkaProducer options. More info: https://kafka.js.org/docs/producing#options
			producer: {},

			// ConsumerGroup options. More info: https://kafka.js.org/docs/consuming#a-name-options-a-options
			consumer: {},

			// Advanced options for `send`. More info: https://kafka.js.org/docs/producing#producing-messages
			publish: {
				partition: 0,
				attributes: 0
			}
		});

		super(opts);

		this.client = null;
		this.producer = null;
		this.consumer = null;
		this.admin = null;
	}

	/**
	 * Connect to the server
	 *
	 * @memberof KafkaTransporter
	 */
	connect() {
		return new this.broker.Promise((resolve, reject) => {
			let Kafka;
			try {
				Kafka = require("kafkajs").Kafka;
			} catch (err) {
				/* istanbul ignore next */
				this.broker.fatal(
					"The 'kafka-node' package is missing. Please install it with 'npm install kafkajs' command.",
					err,
					true
				);
			}

			this.client = new Kafka(this.opts.client);

			// Create Producer
			this.producer = this.client.producer(this.opts.producer);
			this.admin = this.client.admin();
			this.admin.connect().then(() => {
				this.producer.connect().then(() => {
					/* Moved to ConsumerGroup
					// Create Consumer
					this.consumer = new Kafka.Consumer(this.client, this.opts.consumerPayloads || [], this.opts.consumer);
					this.consumer.on("error", e => {
						this.logger.error("Kafka Consumer error", e.message);
						this.logger.debug(e);
						if (!this.connected)
							reject(e);
					});
					this.consumer.on("message", message => {
						const topic = message.topic;
						const cmd = topic.split(".")[1];
						console.log(cmd);
						this.incomingMessage(cmd, message.value);
					});*/

					this.logger.info("Kafka client is connected.");

					this.onConnected().then(resolve);
				}).catch((e) => {
					this.logger.error("Kafka Producer error", e.message);
					this.logger.debug(e);

					this.broker.broadcastLocal("$transporter.error", {
						error: e,
						module: "transporter",
						type: C.FAILED_PUBLISHER_ERROR
					});

					if (!this.connected) reject(e);
				});
			}).catch((e) => {
				this.logger.error("Kafka Producer error", e.message);
				this.logger.debug(e);

				this.broker.broadcastLocal("$transporter.error", {
					error: e,
					module: "transporter",
					type: C.FAILED_PUBLISHER_ERROR
				});

				if (!this.connected) reject(e);
			});
		});
	}

	/**
	 * Disconnect from the server
	 *
	 * @memberof KafkaTransporter
	 */
	disconnect() {
		return new this.broker.Promise((resolve, reject) => {
			if (this.consumer) {
				this.consumer.disconnect(() => {
					this.consumer = null;
				});
			}
			if (this.producer) {
				this.producer.disconnect(() => {
					// this.client = null;
					this.producer = null;
				});
			}
		});
	}

	/**
	 * Subscribe to all topics
	 *
	 * @param {Array<Object>} topics
	 *
	 * @memberof BaseTransporter
	 */
	makeSubscriptions(topics) {
		topics = topics.map(({ cmd, nodeID }) => ({topic: this.getTopicName(cmd, nodeID)}));

		return new this.broker.Promise((resolve, reject) => {
			this.admin.createTopics({topics: topics}).then(() => {
				const consumerOptions = Object.assign(
					{
						id: "default-kafka-consumer",
						kafkaHost: this.opts.host,
						groupId: this.broker.instanceID, //this.nodeID,
						fromOffset: "latest",
						encoding: "buffer"
					},
					this.opts.consumer
				);

				this.consumer = this.client.consumer(consumerOptions);
				this.consumer.connect().then(() => {
					this.consumer.subscribe({topics: topics.map((topic)=> topic.topic)});
					// Ref: https://kafka.js.org/docs/consuming#a-name-each-message-a-eachmessage
					this.consumer.run({
						eachMessage: async ({ topic, message }) => {
							const cmd = topic.split(".")[1];
							await this.receive(cmd, message.value);
							console.log({
								topic,
								key: (message.key ? message.key.toString() : ""),
								value: message.value.toString(),
								headers: message.headers,
							});
						},
					});
					resolve();
				}).catch((e)=> {
					/* istanbul ignore next */
					this.logger.error("Kafka Consumer error", e.message);
					this.logger.debug(e);

					this.broker.broadcastLocal("$transporter.error", {
						error: e,
						module: "transporter",
						type: C.FAILED_CONSUMER_ERROR
					});

					if (!this.connected) reject(e);
				});
			}).catch(err => {
				/* istanbul ignore next */
				if (err) {
					this.logger.error("Unable to create topics!", topics, err);

					this.broker.broadcastLocal("$transporter.error", {
						error: err,
						module: "transporter",
						type: C.FAILED_TOPIC_CREATION
					});
					return reject(err);
				}
			});
		});
	}

	/**
	 * Send data buffer.
	 *
	 * @param {String} topic
	 * @param {Buffer} data
	 * @param {Object} meta
	 *
	 * @returns {Promise}
	 */
	send(topic, data, { packet }) {
		if (!this.client) return this.broker.Promise.resolve();

		return new this.broker.Promise((resolve, reject) => {
			this.producer.send({
				topic: this.getTopicName(packet.type, packet.target),
				messages: [{value: data, partition: this.opts.publish.partition}], // Ref: https://kafka.js.org/docs/producing#message-structure
				...this.opts.publish.attributes
			}).then(() => {
				resolve();
			}).catch(err => {
				if (err) {
					this.logger.error("Publish error", err);

					this.broker.broadcastLocal("$transporter.error", {
						error: err,
						module: "transporter",
						type: C.FAILED_PUBLISHER_ERROR
					});
					reject(err);
				}
			});
		});
	}
}

module.exports = KafkaTransporter;

nimdeveloper avatar Mar 26 '23 12:03 nimdeveloper

Cool, thanks for sharing. I will check it and if it looks good, I will use it in the next (0.15) branch.

icebob avatar Mar 26 '23 13:03 icebob

FYI:

I am using a custom KafkaTransport implemented with KafkaJS. It is working, but the CPU usage is really high. There are some issues regarding this.

The bad news is that KafkaJS is looking for maintainers. https://github.com/tulios/kafkajs/issues/1603

The good news, as mentioned in https://github.com/tulios/kafkajs/issues/1603#issuecomment-1969110879, is that Confluent is now officially supporting a JavaScript library.

davidnussio avatar Mar 07 '24 08:03 davidnussio

Awesome. However, the Confluent solution uses the rdkafka bindings. I've used it some years ago, but I have no good memories about it.

If kafkajs won't have maintainer, I will remove the Kafka transporter from the core and move to a separated repo and anybody can choose whether to use the kafkajs transporter or make an own based on another library.

icebob avatar Mar 31 '24 15:03 icebob