Kafka.jl icon indicating copy to clipboard operation
Kafka.jl copied to clipboard

Update to Kafka 1.0.0 and Julia 0.6

Open GearsAD opened this issue 8 years ago • 8 comments

Hi,

I got the library working in Julia 0.6 but it doesn't work with Kafka 1.0.0. Is there any chance you could take a look?

Julia 0.6 updates - https://github.com/GearsAD/Kafka.jl

  • Mostly simple changes to remove deprecations
  • Small change with readbytes! - not sure if i did the best thing there but the tests seem happy

Thanks!

GearsAD avatar Nov 26 '17 17:11 GearsAD

I checked out Kafka 1.0 (Scala 2.12), but it doesn't send response to my metadata request. It seems like the protocol has changed (despite stated backward compatibility). I will take a look, but it basically means that I need to go through the docs once again, so it may take time.

dfdx avatar Nov 26 '17 22:11 dfdx

I had the same issue, definitely looked like they shifted something.

Sure, please let me know if I can help. I'm using Python wrapping at the moment to process Kafka stream, but a native Kafka implementation would be infinitely better.

GearsAD avatar Nov 27 '17 05:11 GearsAD

Ok, I was looking in the wrong place: on Julia 0.5 this package works fine on Kafka 0.9, 0.10 and 1.0. But on Julia 0.6 it magically stops working with the server not sending any response. No idea what have happened in the latest Julia version, but it gets interesting :)

dfdx avatar Nov 28 '17 20:11 dfdx

I think I have fixed it. The problem was in request size calculation: to tell Kafka how long the request is I used:

function obj_size(objs...)
    buf = IOBuffer()
    for obj in objs
        writeobj(buf, obj)
    end
    return Int32(length(buf.data))
end

Note the last line - in Julia 0.5 the .data field contains exactly what have been written to buffer, so its length is effectively what I intended to send to Kafka. But in Julia 0.6 data is initially a 32 byte array, even if you haven't written anything to it. Thus I was telling Kafka to wait for 32 bytes instead of the actual (and smaller) request size. Surely, Kafka never responded.

Now I changed size calculation to this:

function obj_size(objs...)
    buf = IOBuffer()
    for obj in objs
        writeobj(buf, obj)
    end
    return Int32(buf.size)
end

which works fine on Julia 0.6 (at least KafkaClient is being created). I still need to test it on previous Julia version (or drop support for it in REQUIRE), but you can already test it in julia-0.6-fixes branch.

dfdx avatar Nov 28 '17 23:11 dfdx

Done, master now supports Julia 0.6 and Kafka from 0.8 to 1.0.

Would you mind rebasing and making a PR to fix deprecation according to work in your fork?

dfdx avatar Nov 29 '17 22:11 dfdx

That’s great!

Sure, will do tomorrow night - much appreciated for fixing it.

GearsAD avatar Nov 30 '17 07:11 GearsAD

Hi dfdx,

I pulled your latest master, here seems to be an issue when creating the metadata. Not sure where the AbstractString is coming from:

Pkg.test("KafkaOrig")

INFO: Testing KafkaOrig ERROR: LoadError: LoadError: MethodError: no constructors have been defined for AbstractString Stacktrace:

[1] readobj(::TCPSocket, ::Type{AbstractString}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:80 [2] readobj(::TCPSocket, ::Type{Kafka.Broker}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:78 [3] readobj(::TCPSocket, ::Type{Array{Kafka.Broker,1}}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:34 [4] readobj(::TCPSocket, ::Type{Kafka.TopicMetadataResponse}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:78 [5] init_metadata(::TCPSocket) at /home/gears/.julia/v0.6/Kafka/src/requests.jl:9 [6] #KafkaClient#1(::Bool, ::Type{T} where T, ::String, ::Int64) at /home/gears/.julia/v0.6/Kafka/src/client.jl:24 [7] Kafka.KafkaClient(::String, ::Int64) at /home/gears/.julia/v0.6/Kafka/src/client.jl:23 [8] include_from_node1(::String) at ./loading.jl:576 [9] include(::String) at ./sysimg.jl:14 [10] include_from_node1(::String) at ./loading.jl:576 [11] include(::String) at ./sysimg.jl:14 [12] process_options(::Base.JLOptions) at ./client.jl:305 [13] _start() at ./client.jl:371 while loading /home/gears/.julia/v0.6/KafkaOrig/test/integration.jl, in expression starting on line 5 while loading /home/gears/.julia/v0.6/KafkaOrig/test/runtests.jl, in expression starting on line 19

GearsAD avatar Dec 01 '17 16:12 GearsAD

Hi dfdx,

Sorry, disregard above, I think my environment is just messy.

GearsAD avatar Dec 01 '17 16:12 GearsAD