librdkafka
librdkafka copied to clipboard
rdkafka EXC_BAD_INSTRUCTION (code=1, subcode=0x616b6661)
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions
Description
How to reproduce
create multi-thread create rdkakfka with RdKafka::KafkaConsumer, and consume the msg,which is producting by 10 time per second, Nomally,it running ok,but it broke by accident。
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest ver.5sion the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag): v1.9.2
- [ ] Apache Kafka version: 2.3.0
- [ ] librdkafka client configuration:
<"enable.partition.eof", "true" "session.timeout.ms", "200000" "auto.offset.reset", "latest"> - [ ] Operating system:
<mac os 12.5.1>
the demo code below:
===========
// // Created by 王冠中 on 2022/6/1. //
#pragma once
#include
class RdKafkaConsumer { public: RdKafkaConsumer(std::string brokers, std::string groupId, std::vectorstd::string topics): Run(true), M_Brokers(brokers), M_GroupId(groupId), M_TopicVector(topics) { }
RdKafkaConsumer(const RdKafkaConsumer& Other)=delete;
RdKafkaConsumer& operator=(const RdKafkaConsumer& Other) = delete;
volatile sig_atomic_t Run;
bool exit_eof = true;
bool do_conf_dump = true;
int partition_cnt = 0;
int eof_cnt = 0;
std::string M_Brokers;
std::string M_GroupId;
std::vector<std::string> M_TopicVector;
RdKafka::Conf* M_Config; //GLOBAL 级别的配置(Consumer客户端级别)
//RdKafka::Conf *m_topicConfig; //TOPIC 级别的配置
RdKafka::KafkaConsumer* M_Consumer; //消费者客户端实例
public: bool KafkaClientWork(std::string& Msg); bool ConsumeMsg(); bool Msg_Consume(RdKafka::Message* message, void* opaque, std::string& Msg);
void dumpConfig(RdKafka::Conf* mConfig);
private: void sigterm(int sig);
public: virtual ~RdKafkaConsumer(); };
class FEventCb final : public RdKafka::EventCb { public: RdKafkaConsumer* Consumer; // std::function<>
explicit FEventCb(RdKafkaConsumer* Consumer)
: Consumer(Consumer)
{
}
virtual void event_cb(RdKafka::Event& Event) override;
// ~FEventCb();
};
class ExampleRebalanceCb : public RdKafka::RebalanceCb { public: RdKafkaConsumer* Consumer;
explicit ExampleRebalanceCb(RdKafkaConsumer* Consumer)
: Consumer(Consumer)
{
}
private: static void part_list_print(const std::vectorRdKafka::TopicPartition*& partitions);
public: void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vectorRdKafka::TopicPartition*& partitions); };
========
// // Created by 王冠中 on 2022/6/1. //
#include "../Public/RdKafkaConsumer.h"
#include
// static volatile sig_atomic_t run = 1; // static volatile int run = 1;
/void * RdKafkaConsumer::sigterm(int sig) { std::cout << "recive sig: " << sig << std::endl; Run = 0; return nullptr; }/
void FEventCb::event_cb(RdKafka::Event &Event) { switch (Event.type()) { case RdKafka::Event::EVENT_ERROR:
if (Event.fatal()) {
//UE_LOG(LogTemp, Error, TEXT(">>> RdKafkaConsumer: FATAL"))
std::cout << "[ERROR]" << "FATAL ";
Consumer->Run = false;
}
//UE_LOG(LogTemp, Error, TEXT(">>> RdKafkaConsumer: ERROR ( %s ): %s"), UTF8_TO_TCHAR(err2str(Event.err()).c_str()),
// UTF8_TO_TCHAR(Event.str().c_str()));
std::cout << "[ERROR]" << "ERROR (" << err2str(Event.err()) << "): " << Event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: STATS ( %s )"), UTF8_TO_TCHAR( Event.str().c_str()));
std::cout << "[ERROR]" << "\"STATS\": " << Event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: LOG-%i-%s: %s"), Event.severity(),UTF8_TO_TCHAR( Event.fac().c_str()),UTF8_TO_TCHAR(Event.str().c_str()));
fprintf(stderr, "LOG-%i-%s: %s\n", Event.severity(), Event.fac().c_str(), Event.str().c_str());
break;
case RdKafka::Event::EVENT_THROTTLE:
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: THROTTLED: %i,ms by: %s id: %i"),Event.throttle_time(),UTF8_TO_TCHAR(Event.broker_name().c_str()),Event.broker_id());
std::cout << "[ERROR]" << "THROTTLED: " << Event.throttle_time() << "ms by " << Event.broker_name() << " id "
<< (int) Event.broker_id() << std::endl;
break;
default:
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: EVENT: %i,( %s ) : %s"),Event.type(),UTF8_TO_TCHAR(err2str(Event.err()).c_str()),UTF8_TO_TCHAR(Event.str().c_str()));
std::cout << "[ERROR]" << "EVENT " << Event.type() << " (" << RdKafka::err2str(Event.err()) << "): " << Event.str()
<< std::endl;
break;
}
}
void ExampleRebalanceCb::part_list_print(const std::vector<RdKafka::TopicPartition *> &partitions) { for (unsigned int i = 0; i < partitions.size(); i++) { //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: partitions topic:( %s ) : %i"),UTF8_TO_TCHAR(partitions[i]->topic().c_str()),partitions[i]->partition()); std::cout << "[ERROR]" << partitions[i]->topic() << "[" << partitions[i]->partition() << "], " << std::endl; } }
void ExampleRebalanceCb::rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition *> &partitions) { //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: RebalanceCb :( %s )"),UTF8_TO_TCHAR(err2str(err).c_str())); std::cout << "[ERROR]" << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
part_list_print(partitions);
RdKafka::Error *error = NULL;
RdKafka::ErrorCode ret_err = RdKafka::ERR_NO_ERROR;
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
if (consumer->rebalance_protocol() == "COOPERATIVE")
error = consumer->incremental_assign(partitions);
else
ret_err = consumer->assign(partitions);
Consumer->partition_cnt += (int) partitions.size();
} else {
if (consumer->rebalance_protocol() == "COOPERATIVE") {
error = consumer->incremental_unassign(partitions);
Consumer->partition_cnt -= (int) partitions.size();
} else {
ret_err = consumer->unassign();
Consumer->partition_cnt = 0;
}
}
Consumer->eof_cnt = 0; /* FIXME: Won't work with COOPERATIVE */
if (error) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: incremental assign failed: ( %s )"),UTF8_TO_TCHAR(error->str().c_str()));
std::cout << "[ERROR]" << "incremental assign failed: " << error->str() << "\n";
delete error;
} else if (ret_err)
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: assign failed: ( %s )"),UTF8_TO_TCHAR(err2str(ret_err).c_str()));
std::cout << "[ERROR]" << "assign failed: " << RdKafka::err2str(ret_err) << "\n";
}
bool RdKafkaConsumer::Msg_Consume(RdKafka::Message *message, void *opaque, std::string &Msg) { //std::string Msg; bool Result; switch (message->err()) { case RdKafka::ERR__TIMED_OUT: Result = false; break;
case RdKafka::ERR_NO_ERROR:
RdKafka::MessageTimestamp ts;
ts = message->timestamp();
// Msg = * static_cast<std::string*>(message->payload());
Msg = static_cast<char *>(message->payload());
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: === MESSAGE === Timestamp: %i , message len: %i, payload: %s"),ts.timestamp,static_cast<int>(message->len()),UTF8_TO_TCHAR(Msg.c_str()));
std::cout << "[ " << pthread_self() << "] " << "Timestamp: " << ts.timestamp << "message len: "<< static_cast<int>(message->len()) << "payload: " << Msg << std::endl;
Result = true;
Run = true;
break;
case RdKafka::ERR__PARTITION_EOF:
if (exit_eof && ++eof_cnt == partition_cnt) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: EOF reached for all: partitions ( %i )"),partition_cnt);
std::cout << "[ERROR]" << "%% EOF reached for all " << partition_cnt << " partition(s)" << std::endl;
Run = false;
}
std::cout << "[ERROR]" << "ERR__PARTITION_EOF" <<std::endl;
Result = false;
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: Consume failed: partitions ( %s )"),UTF8_TO_TCHAR(message->errstr().c_str()));
std::cout << "[ERROR]" << "Consume failed:-topic-partion " << message->errstr() << std::endl;
Run = false;
Result = false;
break;
default:
/* Errors */
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: Consume failed others: partitions ( %s )"),UTF8_TO_TCHAR(message->errstr().c_str()));
std::cout << "[ERROR]" << "Consume failed: -other " << message->errstr() << std::endl;
Run = false;
Result = false;
}
return Result;
}
bool RdKafkaConsumer::KafkaClientWork(std::string &Msg) { if (this->M_Brokers == "") { Run = false; return false; }
//RdKafka::Message* Message;
RdKafka::Message *Message = M_Consumer->consume(2000);
// std::string Msg = Msg_Consume(Message, nullptr);
bool Result = Msg_Consume(Message, nullptr, Msg);
if (Message) {
delete Message;
}
return Result;
}
bool RdKafkaConsumer::ConsumeMsg() { std::string Error_Str; RdKafka::Conf::ConfResult result;
M_Config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (!M_Config) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Failed to create Config."));
std::cout << "[ERROR]" << "Failed to create Config." << std::endl;
return false;
}
//ExampleRebalanceCb ex_rebalance_cb(this);
//result = M_Config->set("rebalance_cb", &ex_rebalance_cb, Error_Str);
//if (result != RdKafka::Conf::CONF_OK)
//{
// //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'rebalance_cb' failed: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
// std::cout << "Conf set 'rebalance_cb' failed: " << Error_Str << std::endl;
// return false;
//}
result = M_Config->set("enable.partition.eof", "true", Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'enable.partition.eof' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "Conf set 'enable.partition.eof' failed: " << Error_Str << std::endl;
return false;
}
result = M_Config->set("bootstrap.servers", M_Brokers, Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'bootstrap.servers' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "Conf set 'bootstrap.servers' failed: " << Error_Str << std::endl;
return false;
}
result = M_Config->set("session.timeout.ms", "200000", Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'session.timeout.ms' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "session.timeout.ms' failed: " << Error_Str << std::endl;
return false;
}
result = M_Config->set("group.id", M_GroupId, Error_Str); //设置消费组名:group.id(string类型)
if (result != RdKafka::Conf::CONF_OK) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'group.id' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "Conf set 'group.id' failed: " << Error_Str << std::endl;
return false;
}
/*
result = M_Config->set("max.partition.fetch.bytes", "10240000", Error_Str); //消费消息的最大大小
if (result != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << Error_Str << std::endl;
}
result = M_Config->set("statistics.interval.ms", "10240000", Error_Str); //是否启用统计
if (result != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set 'statistics.interval.ms' failed: " << Error_Str << std::endl;
}
result = M_Config->set("compression.codec", "none", Error_Str); //是否启用压缩
if (result != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set 'compression.codec' failed: " << Error_Str << std::endl;
}*/
//"earliest"
//"latest"
result = M_Config->set("auto.offset.reset", "latest", Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'auto.offset.reset' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "Conf set 'auto.offset.reset' failed: " << Error_Str << std::endl;
return false;
}
FEventCb FEventCb(this);
result = M_Config->set("event_cb", &FEventCb, Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'event_cb' failed: %s "),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "Conf set 'event_cb' failed: " << Error_Str << std::endl;
return false;
}
//signal(SIGINT, &RdKafkaConsumer::sigterm);
//signal(SIGTERM, &RdKafkaConsumer::sigterm);
// RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
M_Consumer = RdKafka::KafkaConsumer::create(M_Config, Error_Str);
if (!M_Consumer) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Failed to create consumer: %s "),UTF8_TO_TCHAR(Error_Str.c_str()));
std::cout << "[ERROR]" << "Failed to create consumer: " << Error_Str << std::endl;
return false;
}
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] ==== consumer name === %s"),UTF8_TO_TCHAR(M_Consumer->name().c_str()));
std::cout << "[ "<< pthread_self() << "]" << "% Created consumer " << M_Consumer->name() << std::endl;
const RdKafka::ErrorCode err = M_Consumer->subscribe(M_TopicVector);
if (err) {
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Failed to subscribe to the topic: %s"),UTF8_TO_TCHAR(err2str(err).c_str()));
std::cout << "[ERROR]" << "Failed to subscribe to " << M_TopicVector.size() << " topics: " << RdKafka::err2str(err)
<< std::endl;
return false;
}
dumpConfig(M_Config);
return true;
/*while (run) {
KafkaClientWork();
}*/
}
void RdKafkaConsumer::dumpConfig(RdKafka::Conf *mConfig) { if (do_conf_dump) { std::liststd::string *dump; dump = mConfig->dump(); std::cout << "# Global config" << std::endl;
std::string strMsg;
for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
strMsg = strMsg + *it + " = ";
std::cout << *it << " = ";
it++;
strMsg = strMsg + *it + ",";
std::cout << *it << std::endl;
it++;
}
// std::cout << std::endl;
//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] # Global config: %s"),UTF8_TO_TCHAR(strMsg.c_str()));
delete dump;
}
}
RdKafkaConsumer::~RdKafkaConsumer() {
std::cout <<"release the kafka resource"<<std::endl;
if (M_Config) {
delete M_Config;
}
if (M_Consumer) {
M_Consumer->close();
delete M_Consumer;
}
/*
* Wait for RdKafka to decommission.
* This is not strictly needed (with check outq_len() above), but
* allows RdKafka to clean up all its resources before the application
* exits so that memory profilers such as valgrind wont complain about
* memory leaks.
*/
RdKafka::wait_destroyed(5000);
}
========
#include
void kafkaRun(const bool &bIsRunning,const std::string &broke, const std::string &groupId, const std::vectorstd::string &topics) { std::cout << "["<< pthread_self() << " kafka begin]" << std::endl;
RdKafkaConsumer rdKafkaConsumer(broke, groupId, topics);
bool ResultConfig = rdKafkaConsumer.ConsumeMsg();
if (!ResultConfig) {
std::cout << "["<< pthread_self() << " create failed]" << "rdKafkaConsumer create:" << std::endl;
return;
}
//卡夫卡运转
while (rdKafkaConsumer.Run) {
std::string Msg;
bool ResultMsg = rdKafkaConsumer.KafkaClientWork(Msg);
if (!ResultMsg ) {
std::cout << "["<< pthread_self() << " -rlt error]" << "rdKafkaConsumer msg:" << Msg << std::endl;
continue;
}
if (Msg.empty()) {
std::cout << "["<< pthread_self() << "-rlt empty]" << "rdKafkaConsumer msg:" << Msg << std::endl;
continue;
}
std::cout << "["<< pthread_self() << "]" << "rdKafkaConsumer msg:" << Msg << std::endl;
}
} void Runnable1(std::string broke, std::string groupId, std::vectorstd::string topics) { bool bIsRunning = true;
std::cout <<"["<< pthread_self()<<"]" << "runnable" <<std::endl;
while (bIsRunning) {
kafkaRun(bIsRunning,broke, groupId, topics);
std::cout << "["<< pthread_self() << " quit kafka work]" << std::endl;
sleep(5);
}
}
int main() { std::cout << "Hello, World!" << std::endl;
//std::string broke = "180.76.149.48:9093";
std::string broke = "111.175.186.147:9092";
std::string groupId = "myGroup21";
std::vector<std::string> topics1;
topics1.push_back("nanjingjxz-dev-c2bs_mec_up_participants");
//topics1.push_back("nanjingjxz-dev-c2bs_vehicle_realtime");
std::vector<std::string> topics2;
//topics2.push_back("nanjingjxz-dev-c2bs_mec_up_participants");
topics2.push_back("nanjingjxz-dev-c2bs_vehicle_realtime");
std::thread f1(Runnable1,broke, groupId, topics1);
std::thread f2(Runnable1,broke, groupId, topics2);
f1.join();
f2.join();
std::cout<<"main over"<<std::endl;
return 0;
}
==========
cmkefilelist.txt
cmake_minimum_required(VERSION 3.22) project(test3)
set(CMAKE_CXX_STANDARD 14)
include_directories(/opt/homebrew/Cellar/librdkafka/1.9.2/include/librdkafka) link_directories(/opt/homebrew/Cellar/librdkafka/1.9.2/lib) link_libraries(rdkafka++)
add_executable(test3 main.cpp Private/RdKafkaConsumer.cpp Public/RdKafkaConsumer.h)