RDKafka.jl icon indicating copy to clipboard operation
RDKafka.jl copied to clipboard

Failed to send messages of 1MB or greater

Open EnalLS opened this issue 2 years ago • 10 comments

Hi, I’m trying to send different sizes message from my producer – with changing the default configuration of max message bytes – till the 1MB message, all sent, and then got error; When sending messages of 1MB or greater I keep getting an Unknown Error (code: -1) that appears to come from the RDKafka Dll used by the RDKafka.jl package. it reproduces all of the time.

This type and size of messages work with the same Kafka cluster and other non-Julia clients, so it looks like an issue coming from the RDKafka.jl package.

Producer code:

p = KafkaProducer(KAFKA_BROKER_ADDRESS)
partition = 0
# Tried both
p.client.conf["message.max.bytes"] = 36423360
p.client.conf["max.message.bytes"] = 36423360
 
function send_data(message,nameOfdata)
    println("sending $nameOfdata")
    produce(p, KAFKA_TOPIC, partition, nameOfdata, message)
end
 
# Prepare random data with different sizes 
rand_string_10B = randstring(10);
rand_string_100B = randstring(100);
rand_string_1KB = randstring(1024);
rand_string_1MB = randstring(1048576);
rand_string_5MB = randstring(5242880);
rand_string_20MB = randstring(20971520);
 
for i in 1:10
    send_data(rand_string_10B,"10B String")
    send_data(rand_string_100B,"100B String")
    send_data(rand_string_1KB,"1KB String")
    send_data(rand_string_1MB,"1MB String")
    send_data (rand_string_5MB,"15MB String")
    send_data (rand_string_20MB,"20MB String")
end
 

Producer output:

sending 10B String sending 100B String sending 1KB String sending 1MB String -1 ERROR: LoadError: Produce request failed with error code: -1 : Unknown e Stacktrace: [1] error(s::String) @ Base .\error.jl:33 [2] produce(rkt::Ptr{Nothing}, partition::Int64, key::Vector{UInt8}, pa @ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\wrappe [3] produce(kt::RDKafka.KafkaTopic, partition::Int64, key::String, payl @ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\produc [4] produce(p::KafkaProducer, topic::String, partition::Int64, key::Str @ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\produc [5] prepare_data_and_send_it(message::String, nameOfdata::String) @ Main c:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.j [6] top-level scope @ c:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.jl:38 in expression starting at c:\S\repos\dummyRepo\JuliaKafka\producer\produ

Consumer code:

c = KafkaConsumer(KAFKA_BROKER_ADDRESS, "my-consumer-group")
parlist = [(KAFKA_TOPIC, 0)]
subscribe(c, parlist)
timeout_ms = 1000
while true
    msg = poll(String, String, c, timeout_ms)
    println(msg)
    # @show(msg);
end

Consumer output:

nothing nothing nothing Message(10B String: eYcYGpFuqe) Message(100B String: o4dyZXBcCxsewjpOQosoxS6aiuWy2lrcofluVJJt79QXtW2f58DzRy1mZEnPc2HAReQHoW8BWc4blxtlHzQd0AwKFnUCPvcNLPCb)
Message(1KB String: nCcOw31WrRMiJUtTIPaQr4ZDGrVpfFXydPvGiscqyyJqqteutcXV6AL2wCrzhO31j2Bu6RKgVp2XD1yPVIMM59PxzXr7CipaHogkycYDDwV2IO7nCfRfad75DUxEE16QVB7XjXctu2EXiBnTAKdekPTFQovIzJvBE5iE4BXhLCfdhex2Z3vUxo5S92B0V2DhPNKTJIzIPjEenKjbPFNaaJOFOYusBmq7gkbgeZmImDCCug3YGOqKfFOH1xNUjmDW6wA0ONaRZj9CbSNMaXCgj2ykyFsoaXfTPvpCOsLFhfjLzJXRJK81Qyu6mPtxSIv19pvBOGgeIvps50RkgFvu1t3mK4L13VI4T5oMUVVYm8SVeZt5NlQV9Oc1L8FCZxAS9R63683g1z8o5razbogqRfJ37YyKJzPgCgmKNtYyH8L9FJH98YOLQ3QptBX6mEkv9579m4JppOHM12Uxw2pAeojYL5PnDHek5wHvX4rkTO8AZumRqRg70UkskceKXpn9CP6XHCIDqBmFGBSzB5OMuFZvcJS9eq0scGijmtheIqTdTLJ0LtELRtbONzw4X5Sl59f4IzkJSf03sK1XwOnKUFjr3GWc3YF8rYcL3EXgAfoWc5AqCoOupu6o9J5wQVTehmzgF0o2GjVNuSpk1gijAvs8t6cdoq1PayAPFGgb8fhtg0jQGBQeSSTAlPDpn3dC3COLiJlhpVbsfIDPf1dIyffKDa2igpyPwvwSFfG7cndH2890f7McCxOXCQRQw1EVuM5dPGmmUIHCIMILpwgGoFFV9SOyuuugQC9watQ0aY5cI0te61GyH2EyfKt6D8t3zNAgSfDBR2Y1AjpBRM0JVOB7E3pd2vtznP1OzsMZybGSSbZuIYnlaInGxtPCogoivdpI3LsdHtYRoW3BNW9hAe5S1eia4ivwca6dV7SUV3q5vNZtHf5khPdYpVsRt6LxfGDeIO1mRrdmSMwUBYZrP8vpKDuOdeu4uahT0SzJiVMGxJPRBDMRzb5HnRGgEzxG) nothing nothing nothing

Thanks, Enal.

EnalLS avatar Oct 28 '21 14:10 EnalLS

p.client.conf["message.max.bytes"] = 36423360
p.client.conf["max.message.bytes"] = 36423360

Ouch, this is not supposed to work :fearful: conf is passed to the KafkaClient constructor and used to instantiate C-level client object. conf is then saved as a client's field, but changing it has no effect on the behavior.

Try the following instead:

conf = Dict("message.max.bytes" => 36423360, "max.message.bytes" => 36423360)
p = KafkaProducer(KAFKA_BROKER_ADDRESS, conf)

dfdx avatar Oct 28 '21 18:10 dfdx

Given example does not work, receives a Type related error. So, we had to add types to the dictionary constructor to avoid typing errors: conf = Dict{String, Any}("message.max.bytes" => 364233060, "max.message.bytes" => 364233060) when creating configuration as above, we don’t get an error, and we find the expected values in the configuration object. However, we still have few issues. If I ran the code , producer output:

julia> include("producerJuliaServer.jl") 1 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 2 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 3 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 4 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 5 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 6 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 7 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 8 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 9 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String 10 sending 10B String sending 100B String sending 1KB String sending 1MB String sending 15MB String sending 20MB String Producer Done! julia> Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks. Exception: EXCEPTION_ACCESS_VIOLATION at 0x63471770 -- crc32c at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) in expression starting at none:0

consumer side, there’s no message received.

Tried to send message with the same size several time (with 20MB, 15MB, 1MB even 1KB) after sending 2 – sometimes 3 – messages, the terminal crashed. Producer code:

for i in 1:1 println(i) # prepare_data_and_send_it(rand_string_10B,"10B String") # prepare_data_and_send_it(rand_string_100B,"100B String") prepare_data_and_send_it(rand_string_1KB,"1KB String") # prepare_data_and_send_it(rand_string_1MB,"1MB String") # prepare_data_and_send_it(rand_string_5MB,"15MB String") # prepare_data_and_send_it(rand_string_20MB,"20MB String") end

ran it, 2 times in a row, after the 2nd one, producer output:

julia> include("producerJuliaServer.jl") 1 sending 1KB String Producer Done! julia> include("producerJuliaServer.jl") 1 sending 1KB String Producer Done! julia> Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks. Exception: EXCEPTION_ACCESS_VIOLATION at 0x63574eea -- mtx_lock at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) in expression starting at none:0 mtx_lock at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) rd_kafka_q_serve at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) rd_kafka_poll at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) kafka_poll at C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\wrapper.jl:114 [inlined] #3 at C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\client.jl:52 [inlined] macro expansion at .\asyncevent.jl:252 [inlined] #583 at .\task.jl:411 unknown function (ip: 0000000061126423) jl_apply at /cygdrive/c/buildbot/worker/package_win64/build/src\julia.h:1703 [inlined] start_task at /cygdrive/c/buildbot/worker/package_win64/build/src\task.c:839 Allocations: 885083 (Pool: 884745; Big: 338); GC: 3

Consumer output:

listening to messages... got message finished writing message got message finished writing message

no matter what the message size is, after sending it 2-3 times we get an error.

Some of the behavior looks like issue #11, so we modified our code to create the producer only once per running session, but still we get exceptions. It seems to come from thread/memory related issues from within RDKafka.dll, we got the following exceptions: Exception: EXCEPTION_ACCESS_VIOLATION at 0x63471770 -- crc32c at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) in expression starting at REPL[3]:1
Exception: EXCEPTION_ACCESS_VIOLATION at 0x63574eea -- mtx_lock at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) in expression starting at none:0

Thanks, Enal.

EnalLS avatar Oct 31 '21 12:10 EnalLS

librdkafka should be thread-safe by itself, but we may corrupt memory e.g. while destroying the allocated resources. Can you post a minimal reproducible example (the producer side should be enough)? It's hard to see what may go wrong without the actual code.

dfdx avatar Nov 01 '21 11:11 dfdx

producer code :

using RDKafka
using Random
import RDKafka.produce
import RDKafka.KafkaProducer

KAFKA_BROKER_ADDRESS = "*********************************"
KAFKA_TOPIC = "test"

partition = 0

conf = Dict{String, Any}("message.max.bytes" => 364233060, "max.message.bytes" => 364233060)
p = KafkaProducer(KAFKA_BROKER_ADDRESS, conf)

rand_string_10B = randstring(10)
rand_string_100B = randstring(100);
rand_string_1KB = randstring(1024);
rand_string_1MB = randstring(1048576);
rand_string_5MB = randstring(5242880);
rand_string_20MB = randstring(20971520);
rand_arr_20MB = rand(UInt8,1048576);

function prepare_data_and_send_it(message,nameOfdata)
    println("sending $nameOfdata")
    message = string("### $nameOfdata ###", message)
    produce(p, KAFKA_TOPIC, nameOfdata, message)
end

function produce_much_stuff(iterationsNum)
    for i in 1:iterationsNum
        println(i)
        prepare_data_and_send_it(rand_string_10B,"10B String")
        prepare_data_and_send_it(rand_string_100B,"100B String")
        prepare_data_and_send_it(rand_string_1KB,"1KB String")
        prepare_data_and_send_it(rand_string_1MB,"1MB String")
        prepare_data_and_send_it(rand_string_5MB,"15MB String")
        prepare_data_and_send_it(rand_string_20MB,"20MB String")
    end
end


print("Producer Done!")
```

EnalLS avatar Nov 01 '21 12:11 EnalLS

I cannot reproduce the crash with a freshly installed RDKafka.jl on Linux. Messages of size > 1Mb indeed are not delivered, but it might be related to other settings (e.g. see this question), so I haven't looked at it yet.

Could you please verify what exactly fails for you - sending several messages (even of a small size) in a row, or sending large messages? Also, is it correct that you get the access violation error after the producer has finished its job (the message "Producer Done!" is printed by this time)?

dfdx avatar Nov 01 '21 21:11 dfdx

What fails to us is sending several messages in row, usually when the messages are equal to or above 1Mb size. We do get the "Producer Done!" print most of the time, but then we get the error, which seems to be printed by different thread. In addition, it looks like sometimes the kafka connection remains open for 2-3 mins after the producer has finished, this because we often get broker not available messages after running the producer code few times.

EnalLS avatar Nov 02 '21 09:11 EnalLS

If you continue sending messages endlessly (so producer is never "done"), do you see the same error?

dfdx avatar Nov 02 '21 12:11 dfdx

Yes, tried this:

function produce_much_stuff()
    while true
    # for i in 1:iterationsNum
        # println(i)
        # prepare_data_and_send_it(rand_string_10B,"10B String")
        # prepare_data_and_send_it(rand_string_100B,"100B String")
        # prepare_data_and_send_it(rand_string_1KB,"1KB String")
        # prepare_data_and_send_it(rand_string_1MB,"1MB String")
        # prepare_data_and_send_it(rand_string_5MB,"15MB String")
        prepare_data_and_send_it(rand_string_20MB,"20MB String")
    end
end

the output was:

produce_much_stuff(10) sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String -1 ERROR: Produce request failed with error code: -1 : Unknown error Stacktrace: [1] error(s::String) @ Base .\error.jl:33 [2] produce(rkt::Ptr{Nothing}, partition::Int64, key::Vector{UInt8}, payload::Vector{UInt8}) @ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\wrapper.jl:142 [3] produce(kt::RDKafka.KafkaTopic, partition::Int64, key::String, payload::String) @ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\producer.jl:28 [4] produce(p::KafkaProducer, topic::String, partition::Int64, key::String, payload::String) @ RDKafka C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\producer.jl:36 [5] produce @ C:\Users\EnalMulla.julia\packages\RDKafka\GlW4o\src\producer.jl:42 [inlined] [6] prepare_data_and_send_it(message::String, nameOfdata::String) @ Main C:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.jl:34 [7] produce_much_stuff(iterationsNum::Int64) @ Main C:\S\repos\dummyRepo\JuliaKafka\producer\producerJuliaServer.jl:46 [8] top-level scope @ REPL[3]:1

EnalLS avatar Nov 08 '21 11:11 EnalLS

another output:

julia> produce_much_stuff(10)
sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String sending 20MB String Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks. Exception: EXCEPTION_ACCESS_VIOLATION at 0xff1770 -- crc32c at C:\Users\EnalMulla.julia\artifacts\be727c9eb35aad630606a7ebef96d85f1ccf6825\bin\librdkafka.dll (unknown line) in expression starting at REPL[3]:1

in there's no message received in the consumer side.

EnalLS avatar Nov 08 '21 11:11 EnalLS

Does the script work fine with messages less than 1Mb? Large messages may be blocked by many settings on both - client and server side, so one direction to go is to inspect the settings thoroughly. On the other hand, if you see the same error with smaller messages, or smaller messages are not delivered to the consumer, the problem might be in the Windows vs Linux version of librdkafka, because on Linux I cannot reproduce the described behavior.

dfdx avatar Nov 08 '21 12:11 dfdx