laravel-kafka
laravel-kafka copied to clipboard
cant send message
hi my laravel version is 11 php version 8.3.8 i can send data to specific topic with this custom class:
<?php
namespace App\Library\Kafka;
use RdKafka\Conf;
use RdKafka\Headers;
use RdKafka\Producer;
class Publish
{
private $producer;
public function __construct()
{
$conf = new Conf();
$conf->set('bootstrap.servers', config('kafka.brokers'));
$conf->set('message.max.bytes',209715200);
$this->producer = new Producer($conf);
}
public function produce(string $topic, array $message, array $headers = [], string $key = null, $partition = null): void
{
$topic = $this->producer->newTopic($topic);
$messageJson = json_encode($message);
if (is_null($partition)) {
$partition = RD_KAFKA_PARTITION_UA;
}
try {
if (count($headers)) {
$topic->producev($partition, 0, $messageJson, $key, $headers);
} else {
$topic->produce($partition, 0, $messageJson, $key);
}
$this->producer->flush(86400000);//24H
} catch (RdKafka\Exception $e) {
report($e);
throw $e;
}
}
}
and when use this code:
try {
$message = new \Junges\Kafka\Message\Message(
headers: ['a' => 'b'],
body: ['c' => 'd'],
key: '1'
);
\Junges\Kafka\Facades\Kafka::publish()
->onTopic('test_topic')
->withMessage($message)
->usingSerializer(new \Junges\Kafka\Message\Serializers\JsonSerializer())
->send();
return 'Success';
} catch (ConsumerException|\Exception $e) {
return 'Fail';
}
receive success message but I can't see any message on the confluent panel. Below is a sample log.
%7|1719055458.966|WAKEUPFD|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1719055458.966|BRKMAIN|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1719055458.967|BROKER|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Added new broker with NodeId -1
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1719055458.967|INIT|rdkafka#producer-20| [thrd:app]: librdkafka v2.0.2 (0x20002ff) rdkafka#producer-20 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, GCC GXX INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT SYSLOG RAPIDJSON SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff)
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: Client configuration:
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: client.software.version = 2.0.2
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: metadata.broker.list = broker:29092
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: log_level = 7
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: opaque = 0x7f4ea4736290
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: ssl_key = [redacted]
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: compression.codec = snappy
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: dr_msg_cb = 0x7f4ea6901160
%7|1719055458.967|TOPIC|rdkafka#producer-20| [thrd:app]: New local topic: test_topic
%7|1719055458.967|TOPPARNEW|rdkafka#producer-20| [thrd:app]: NEW test_topic [-1] 0x55558c1a6810 refcnt 0x55558c1a68a0 (at rd_kafka_topic_new0:468)
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
%7|1719055458.967|METADATA|rdkafka#producer-20| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1719055458.967|METADATA|rdkafka#producer-20| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1719055458.967|PURGE|rdkafka#producer-20| [thrd:app]: test_topic [-1]: purging queues (purge_flags 0x3, exclude xmit_msgq)
%7|1719055458.967|BRKMAIN|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Enter main broker thread
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Received CONNECT op
%7|1719055458.967|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1719055458.967|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: broker in state TRY_CONNECT connecting
%7|1719055458.967|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1719055458.967|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Connecting to ipv4#172.18.0.3:29092 (plaintext) with socket 9
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Connected to ipv4#172.18.0.3:29092
%7|1719055458.967|CONNECTED|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Connected (#1)
%7|1719055458.967|FEATURE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1719055458.967|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1719055458.967|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.967|PURGE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging queues with flags queue,inflight
%7|1719055458.967|PURGEQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purged 0 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%7|1719055458.967|PURGEQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purged 0 message(s) from 0 partition(s)
%7|1719055458.968|PURGEQ|rdkafka#producer-20| [thrd:app]: Purged 0 message(s) from 0 UA-partition(s)
%7|1719055458.968|WAKEUP|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Wake-up: flushing
%7|1719055458.968|WAKEUP|rdkafka#producer-20| [thrd:app]: Wake-up sent to 1 broker thread in state >= UP: flushing
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:app]: Interrupting timers
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:app]: Sending TERMINATE to internal main thread
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:app]: Joining internal main thread
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Handle is terminating in state APIVERSION_QUERY: 3 refcnts (0x55558c13ac38), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 1 request(s) in retry+outbuf
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Internal main thread terminating
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: Destroy internal
%7|1719055458.968|BROADCAST|rdkafka#producer-20| [thrd:main]: Broadcasting state change
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: Removing all topics
%7|1719055458.968|TOPPARREMOVE|rdkafka#producer-20| [thrd:main]: Removing toppar test_topic [-1] 0x55558c1a6810
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: test_topic [-1]: 0x55558c1a6810 DESTROY_FINAL
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: Sending TERMINATE to broker:29092/bootstrap
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Purging reply queue
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Decommissioning internal broker
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Join 2 broker thread(s)
%7|1719055458.968|TERM|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Received TERMINATE op in state APIVERSION_QUERY: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1719055458.968|TERM|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Client is terminating (after 1ms in state INIT) (_DESTROY)
%7|1719055458.968|STATE|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Client is terminating (after 0ms in state APIVERSION_QUERY) (_DESTROY)
%7|1719055458.968|FEATURE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1719055458.968|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%7|1719055458.968|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updating 0 buffers on connection reset
%7|1719055458.968|BRKTERM|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x55558c13ac38), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updating 0 buffers on connection reset
%7|1719055458.968|BROADCAST|rdkafka#producer-20| [thrd::0/internal]: Broadcasting state change
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1719055458.968|BRKTERM|rdkafka#producer-20| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x55558c13a008), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1719055458.969|TERMINATE|rdkafka#producer-20| [thrd:main]: Internal main thread termination done
%7|1719055458.969|TERMINATE|rdkafka#producer-20| [thrd:app]: Destroying op queues
%7|1719055458.969|TERMINATE|rdkafka#producer-20| [thrd:app]: Termination done: freeing resources
= "Success"
below image when use my custom class and can send message successfully
You don't need to configure the native RDKafka ext yourself. You just need to use producer and consumer from the package. You can find it on the doc. This might be mismatch between this package and rdkafka lib ext.
@aungmyatmoethegreat No, it's not like that. I checked the package codes and found out that it uses php native rdkafka lib.
Correct, this package use RDKafka internally because RDKafka is defacto for PHP to communicate with Kafka Servers. But you don't need to use like that. You only need to use abstracted version of this package.
@aungmyatmoethegreat You didn't understand my question. When I send a message with my custom class, everything is fine. But when I use the package to send a message, receive success but not shown on confluent panel.
I see your issue, it might be some issue with this package, I just tested with confluent in last month is it's fine. Check the sasl config.
which version of laravel-kafka are you using?
@mateusjunges V2
Can you please try with the following docker-compose file? I have no issues publishing/consuming messages using v2.
---
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.7.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.7.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.7.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.7.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.7.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.7.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.7.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.7.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'