librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

rdkafka EXC_BAD_INSTRUCTION (code=1, subcode=0x616b6661)

Open guan25 opened this issue 3 years ago • 0 comments

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions

Description

0x000000016bb7ab60 RdKafka::log_cb_trampoline(rd_kafka_s const*, int, char const*, char const*) 0x00000001043f3354 rd_kafka_log0 0x00000001047a9a18 rd_kafka_broker_bufq_timeout_scan 0x00000001047bfb78 rd_kafka_broker_ops_io_serve 0x00000001047c1194 rd_kafka_broker_serve 0x00000001047bf6f8 rd_kafka_broker_thread_main 0x00000001047bb21c _thrd_wrapper_function 0x0000000104815b74 _pthread_start 0x000000018febc26c

How to reproduce

create multi-thread create rdkakfka with RdKafka::KafkaConsumer, and consume the msg,which is producting by 10 time per second, Nomally,it running ok,but it broke by accident。

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest ver.5sion the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): v1.9.2
  • [ ] Apache Kafka version: 2.3.0
  • [ ] librdkafka client configuration: <"enable.partition.eof", "true" "session.timeout.ms", "200000" "auto.offset.reset", "latest">
  • [ ] Operating system: <mac os 12.5.1>

the demo code below:

===========

// // Created by 王冠中 on 2022/6/1. //

#pragma once

#include #include <rdkafkacpp.h>

class RdKafkaConsumer { public: RdKafkaConsumer(std::string brokers, std::string groupId, std::vectorstd::string topics): Run(true), M_Brokers(brokers), M_GroupId(groupId), M_TopicVector(topics) { }

RdKafkaConsumer(const RdKafkaConsumer& Other)=delete;

RdKafkaConsumer& operator=(const RdKafkaConsumer& Other) = delete;

volatile sig_atomic_t Run;
bool exit_eof = true;
bool do_conf_dump = true;
int partition_cnt = 0;
int eof_cnt = 0;

std::string M_Brokers;
std::string M_GroupId;

std::vector<std::string> M_TopicVector;

RdKafka::Conf* M_Config; //GLOBAL 级别的配置(Consumer客户端级别)
//RdKafka::Conf *m_topicConfig;        //TOPIC	级别的配置

RdKafka::KafkaConsumer* M_Consumer; //消费者客户端实例

public: bool KafkaClientWork(std::string& Msg); bool ConsumeMsg(); bool Msg_Consume(RdKafka::Message* message, void* opaque, std::string& Msg);

void dumpConfig(RdKafka::Conf* mConfig);

private: void sigterm(int sig);

public: virtual ~RdKafkaConsumer(); };

class FEventCb final : public RdKafka::EventCb { public: RdKafkaConsumer* Consumer; // std::function<>

explicit FEventCb(RdKafkaConsumer* Consumer)
	: Consumer(Consumer)
{
}

virtual void event_cb(RdKafka::Event& Event) override;

// ~FEventCb();

};

class ExampleRebalanceCb : public RdKafka::RebalanceCb { public: RdKafkaConsumer* Consumer;

explicit ExampleRebalanceCb(RdKafkaConsumer* Consumer)
	: Consumer(Consumer)
{
}

private: static void part_list_print(const std::vectorRdKafka::TopicPartition*& partitions);

public: void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vectorRdKafka::TopicPartition*& partitions); };

========

// // Created by 王冠中 on 2022/6/1. //

#include "../Public/RdKafkaConsumer.h" #include

// static volatile sig_atomic_t run = 1; // static volatile int run = 1;

/void * RdKafkaConsumer::sigterm(int sig) { std::cout << "recive sig: " << sig << std::endl; Run = 0; return nullptr; }/

void FEventCb::event_cb(RdKafka::Event &Event) { switch (Event.type()) { case RdKafka::Event::EVENT_ERROR:

        if (Event.fatal()) {
            //UE_LOG(LogTemp, Error, TEXT(">>> RdKafkaConsumer: FATAL"))
            std::cout << "[ERROR]" << "FATAL ";
            Consumer->Run = false;
        }

        //UE_LOG(LogTemp, Error, TEXT(">>> RdKafkaConsumer: ERROR ( %s ): %s"), UTF8_TO_TCHAR(err2str(Event.err()).c_str()),
        //       UTF8_TO_TCHAR(Event.str().c_str()));
        std::cout << "[ERROR]" << "ERROR (" << err2str(Event.err()) << "): " << Event.str() << std::endl;
        break;

    case RdKafka::Event::EVENT_STATS:
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: STATS ( %s )"), UTF8_TO_TCHAR( Event.str().c_str()));
        std::cout << "[ERROR]" << "\"STATS\": " << Event.str() << std::endl;
        break;

    case RdKafka::Event::EVENT_LOG:
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: LOG-%i-%s: %s"), Event.severity(),UTF8_TO_TCHAR( Event.fac().c_str()),UTF8_TO_TCHAR(Event.str().c_str()));
        fprintf(stderr, "LOG-%i-%s: %s\n", Event.severity(), Event.fac().c_str(), Event.str().c_str());
        break;

    case RdKafka::Event::EVENT_THROTTLE:
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: THROTTLED: %i,ms by: %s  id: %i"),Event.throttle_time(),UTF8_TO_TCHAR(Event.broker_name().c_str()),Event.broker_id());
        std::cout << "[ERROR]" << "THROTTLED: " << Event.throttle_time() << "ms by " << Event.broker_name() << " id "
                  << (int) Event.broker_id() << std::endl;
        break;

    default:
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: EVENT: %i,( %s ) : %s"),Event.type(),UTF8_TO_TCHAR(err2str(Event.err()).c_str()),UTF8_TO_TCHAR(Event.str().c_str()));
        std::cout << "[ERROR]" << "EVENT " << Event.type() << " (" << RdKafka::err2str(Event.err()) << "): " << Event.str()
                  << std::endl;
        break;
}

}

void ExampleRebalanceCb::part_list_print(const std::vector<RdKafka::TopicPartition *> &partitions) { for (unsigned int i = 0; i < partitions.size(); i++) { //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: partitions topic:( %s ) : %i"),UTF8_TO_TCHAR(partitions[i]->topic().c_str()),partitions[i]->partition()); std::cout << "[ERROR]" << partitions[i]->topic() << "[" << partitions[i]->partition() << "], " << std::endl; } }

void ExampleRebalanceCb::rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition *> &partitions) { //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: RebalanceCb :( %s )"),UTF8_TO_TCHAR(err2str(err).c_str())); std::cout << "[ERROR]" << "RebalanceCb: " << RdKafka::err2str(err) << ": ";

part_list_print(partitions);

RdKafka::Error *error = NULL;
RdKafka::ErrorCode ret_err = RdKafka::ERR_NO_ERROR;

if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
    if (consumer->rebalance_protocol() == "COOPERATIVE")
        error = consumer->incremental_assign(partitions);
    else
        ret_err = consumer->assign(partitions);
    Consumer->partition_cnt += (int) partitions.size();
} else {
    if (consumer->rebalance_protocol() == "COOPERATIVE") {
        error = consumer->incremental_unassign(partitions);
        Consumer->partition_cnt -= (int) partitions.size();
    } else {
        ret_err = consumer->unassign();
        Consumer->partition_cnt = 0;
    }
}
Consumer->eof_cnt = 0; /* FIXME: Won't work with COOPERATIVE */

if (error) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: incremental assign failed: ( %s )"),UTF8_TO_TCHAR(error->str().c_str()));
    std::cout << "[ERROR]" << "incremental assign failed: " << error->str() << "\n";
    delete error;
} else if (ret_err)
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: assign failed: ( %s )"),UTF8_TO_TCHAR(err2str(ret_err).c_str()));
    std::cout << "[ERROR]" << "assign failed: " << RdKafka::err2str(ret_err) << "\n";

}

bool RdKafkaConsumer::Msg_Consume(RdKafka::Message *message, void *opaque, std::string &Msg) { //std::string Msg; bool Result; switch (message->err()) { case RdKafka::ERR__TIMED_OUT: Result = false; break;

    case RdKafka::ERR_NO_ERROR:

        RdKafka::MessageTimestamp ts;
        ts = message->timestamp();
        // Msg = * static_cast<std::string*>(message->payload());
        Msg = static_cast<char *>(message->payload());
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: === MESSAGE === Timestamp: %i , message len: %i, payload: %s"),ts.timestamp,static_cast<int>(message->len()),UTF8_TO_TCHAR(Msg.c_str()));
        std::cout << "[ " << pthread_self() << "] " << "Timestamp: " << ts.timestamp << "message len: "<< static_cast<int>(message->len()) << "payload: " << Msg << std::endl;
        Result = true;
        Run = true;
        break;

    case RdKafka::ERR__PARTITION_EOF:

        if (exit_eof && ++eof_cnt == partition_cnt) {
            //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: EOF reached for all: partitions ( %i )"),partition_cnt);
            std::cout << "[ERROR]" << "%% EOF reached for all " << partition_cnt << " partition(s)" << std::endl;
            Run = false;
        }
        std::cout << "[ERROR]" << "ERR__PARTITION_EOF" <<std::endl;
        Result = false;
        break;

    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: Consume failed: partitions ( %s )"),UTF8_TO_TCHAR(message->errstr().c_str()));
        std::cout << "[ERROR]" << "Consume failed:-topic-partion " << message->errstr() << std::endl;
        Run = false;
        Result = false;
        break;

    default:
        /* Errors */
        //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: Consume failed others: partitions ( %s )"),UTF8_TO_TCHAR(message->errstr().c_str()));
        std::cout << "[ERROR]" << "Consume failed: -other " << message->errstr() << std::endl;
        Run = false;
        Result = false;
}
return Result;

}

bool RdKafkaConsumer::KafkaClientWork(std::string &Msg) { if (this->M_Brokers == "") { Run = false; return false; }

//RdKafka::Message* Message;
RdKafka::Message *Message = M_Consumer->consume(2000);
// std::string Msg = Msg_Consume(Message, nullptr);
bool Result = Msg_Consume(Message, nullptr, Msg);
if (Message) {
    delete Message;
}
return Result;

}

bool RdKafkaConsumer::ConsumeMsg() { std::string Error_Str; RdKafka::Conf::ConfResult result;

M_Config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (!M_Config) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Failed to create Config."));
    std::cout << "[ERROR]" << "Failed to create Config." << std::endl;
    return false;
}

//ExampleRebalanceCb ex_rebalance_cb(this);
//result = M_Config->set("rebalance_cb", &ex_rebalance_cb, Error_Str);
//if (result != RdKafka::Conf::CONF_OK)
//{
//	//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'rebalance_cb' failed: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
//	 std::cout << "Conf set 'rebalance_cb' failed: " << Error_Str << std::endl;
//	return false;
//}

result = M_Config->set("enable.partition.eof", "true", Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'enable.partition.eof' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "Conf set 'enable.partition.eof' failed: " << Error_Str << std::endl;
    return false;
}

result = M_Config->set("bootstrap.servers", M_Brokers, Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'bootstrap.servers' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "Conf set 'bootstrap.servers' failed: " << Error_Str << std::endl;
    return false;
}

result = M_Config->set("session.timeout.ms", "200000", Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'session.timeout.ms' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "session.timeout.ms' failed: " << Error_Str << std::endl;
    return false;
}

result = M_Config->set("group.id", M_GroupId, Error_Str); //设置消费组名:group.id(string类型)
if (result != RdKafka::Conf::CONF_OK) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'group.id' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "Conf set 'group.id' failed: " << Error_Str << std::endl;
    return false;
}

/*
result = M_Config->set("max.partition.fetch.bytes", "10240000", Error_Str); //消费消息的最大大小
if (result != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << Error_Str << std::endl;
}

result = M_Config->set("statistics.interval.ms", "10240000", Error_Str); //是否启用统计
if (result != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set 'statistics.interval.ms' failed: " << Error_Str << std::endl;
}

result = M_Config->set("compression.codec", "none", Error_Str); //是否启用压缩
if (result != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set 'compression.codec' failed: " << Error_Str << std::endl;
}*/

//"earliest"
//"latest"
result = M_Config->set("auto.offset.reset", "latest", Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'auto.offset.reset' failed:: %s."),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "Conf set 'auto.offset.reset' failed: " << Error_Str << std::endl;
    return false;
}

FEventCb FEventCb(this);
result = M_Config->set("event_cb", &FEventCb, Error_Str);
if (result != RdKafka::Conf::CONF_OK) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Conf set 'event_cb' failed: %s "),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "Conf set 'event_cb' failed: " << Error_Str << std::endl;
    return false;
}

//signal(SIGINT, &RdKafkaConsumer::sigterm);
//signal(SIGTERM, &RdKafkaConsumer::sigterm);

// RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
M_Consumer = RdKafka::KafkaConsumer::create(M_Config, Error_Str);
if (!M_Consumer) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Failed to create consumer: %s "),UTF8_TO_TCHAR(Error_Str.c_str()));
    std::cout << "[ERROR]" << "Failed to create consumer: " << Error_Str << std::endl;
    return false;
}

//UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] ==== consumer name === %s"),UTF8_TO_TCHAR(M_Consumer->name().c_str()));
std::cout << "[ "<< pthread_self() << "]" << "% Created consumer " << M_Consumer->name() << std::endl;

const RdKafka::ErrorCode err = M_Consumer->subscribe(M_TopicVector);
if (err) {
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] Failed to subscribe to the topic: %s"),UTF8_TO_TCHAR(err2str(err).c_str()));
    std::cout << "[ERROR]" << "Failed to subscribe to " << M_TopicVector.size() << " topics: " << RdKafka::err2str(err)
              << std::endl;
    return false;
}

dumpConfig(M_Config);

return true;
/*while (run) {
    KafkaClientWork();
}*/

}

void RdKafkaConsumer::dumpConfig(RdKafka::Conf *mConfig) { if (do_conf_dump) { std::liststd::string *dump; dump = mConfig->dump(); std::cout << "# Global config" << std::endl;

    std::string strMsg;
    for (std::list<std::string>::iterator it = dump->begin(); it != dump->end();) {
        strMsg = strMsg + *it + " = ";
        std::cout << *it << " = ";
        it++;
        strMsg = strMsg + *it + ",";
        std::cout << *it << std::endl;
        it++;
    }

    // std::cout << std::endl;
    //UE_LOG(LogTemp, Warning, TEXT(">>> RdKafkaConsumer: [Create Kafka-client] # Global config: %s"),UTF8_TO_TCHAR(strMsg.c_str()));
    delete dump;
}

}

RdKafkaConsumer::~RdKafkaConsumer() {

std::cout <<"release the kafka resource"<<std::endl;

if (M_Config) {
    delete M_Config;
}
if (M_Consumer) {
    M_Consumer->close();
    delete M_Consumer;
}
/*
 * Wait for RdKafka to decommission.
 * This is not strictly needed (with check outq_len() above), but
 * allows RdKafka to clean up all its resources before the application
 * exits so that memory profilers such as valgrind wont complain about
 * memory leaks.
 */
RdKafka::wait_destroyed(5000);

}

========

#include #include <libc.h> #include "Public/RdKafkaConsumer.h" #include #include <pthread.h>

void kafkaRun(const bool &bIsRunning,const std::string &broke, const std::string &groupId, const std::vectorstd::string &topics) { std::cout << "["<< pthread_self() << " kafka begin]" << std::endl;

RdKafkaConsumer rdKafkaConsumer(broke, groupId, topics);

bool ResultConfig = rdKafkaConsumer.ConsumeMsg();
if (!ResultConfig) {
    std::cout << "["<< pthread_self() << " create failed]" << "rdKafkaConsumer  create:" << std::endl;
    return;
}

//卡夫卡运转
while (rdKafkaConsumer.Run) {
    std::string Msg;


    bool ResultMsg = rdKafkaConsumer.KafkaClientWork(Msg);

    if (!ResultMsg ) {
        std::cout << "["<< pthread_self() << " -rlt error]" << "rdKafkaConsumer  msg:" << Msg << std::endl;
        continue;
    }

    if (Msg.empty()) {
        std::cout << "["<< pthread_self() << "-rlt empty]" << "rdKafkaConsumer  msg:" << Msg << std::endl;
        continue;
    }
    std::cout << "["<< pthread_self() << "]" << "rdKafkaConsumer  msg:" << Msg << std::endl;
}

} void Runnable1(std::string broke, std::string groupId, std::vectorstd::string topics) { bool bIsRunning = true;

std::cout <<"["<< pthread_self()<<"]" << "runnable" <<std::endl;

while (bIsRunning) {

    kafkaRun(bIsRunning,broke, groupId, topics);
    std::cout << "["<< pthread_self() << " quit kafka work]"  << std::endl;
    sleep(5);
}

}

int main() { std::cout << "Hello, World!" << std::endl;

//std::string broke = "180.76.149.48:9093";
std::string broke = "111.175.186.147:9092";
std::string groupId = "myGroup21";
std::vector<std::string> topics1;
topics1.push_back("nanjingjxz-dev-c2bs_mec_up_participants");
//topics1.push_back("nanjingjxz-dev-c2bs_vehicle_realtime");

std::vector<std::string> topics2;
//topics2.push_back("nanjingjxz-dev-c2bs_mec_up_participants");
topics2.push_back("nanjingjxz-dev-c2bs_vehicle_realtime");

std::thread f1(Runnable1,broke, groupId, topics1);
std::thread f2(Runnable1,broke, groupId, topics2);

f1.join();
f2.join();

std::cout<<"main over"<<std::endl;

return 0;

}

==========

cmkefilelist.txt

cmake_minimum_required(VERSION 3.22) project(test3)

set(CMAKE_CXX_STANDARD 14)

include_directories(/opt/homebrew/Cellar/librdkafka/1.9.2/include/librdkafka) link_directories(/opt/homebrew/Cellar/librdkafka/1.9.2/lib) link_libraries(rdkafka++)

add_executable(test3 main.cpp Private/RdKafkaConsumer.cpp Public/RdKafkaConsumer.h)

guan25 avatar Aug 20 '22 09:08 guan25