Kafka.jl
Kafka.jl copied to clipboard
Update to Kafka 1.0.0 and Julia 0.6
Hi,
I got the library working in Julia 0.6 but it doesn't work with Kafka 1.0.0. Is there any chance you could take a look?
Julia 0.6 updates - https://github.com/GearsAD/Kafka.jl
- Mostly simple changes to remove deprecations
- Small change with readbytes! - not sure if i did the best thing there but the tests seem happy
Thanks!
I checked out Kafka 1.0 (Scala 2.12), but it doesn't send response to my metadata request. It seems like the protocol has changed (despite stated backward compatibility). I will take a look, but it basically means that I need to go through the docs once again, so it may take time.
I had the same issue, definitely looked like they shifted something.
Sure, please let me know if I can help. I'm using Python wrapping at the moment to process Kafka stream, but a native Kafka implementation would be infinitely better.
Ok, I was looking in the wrong place: on Julia 0.5 this package works fine on Kafka 0.9, 0.10 and 1.0. But on Julia 0.6 it magically stops working with the server not sending any response. No idea what have happened in the latest Julia version, but it gets interesting :)
I think I have fixed it. The problem was in request size calculation: to tell Kafka how long the request is I used:
function obj_size(objs...)
buf = IOBuffer()
for obj in objs
writeobj(buf, obj)
end
return Int32(length(buf.data))
end
Note the last line - in Julia 0.5 the .data field contains exactly what have been written to buffer, so its length is effectively what I intended to send to Kafka. But in Julia 0.6 data is initially a 32 byte array, even if you haven't written anything to it. Thus I was telling Kafka to wait for 32 bytes instead of the actual (and smaller) request size. Surely, Kafka never responded.
Now I changed size calculation to this:
function obj_size(objs...)
buf = IOBuffer()
for obj in objs
writeobj(buf, obj)
end
return Int32(buf.size)
end
which works fine on Julia 0.6 (at least KafkaClient is being created). I still need to test it on previous Julia version (or drop support for it in REQUIRE), but you can already test it in julia-0.6-fixes branch.
Done, master now supports Julia 0.6 and Kafka from 0.8 to 1.0.
Would you mind rebasing and making a PR to fix deprecation according to work in your fork?
That’s great!
Sure, will do tomorrow night - much appreciated for fixing it.
Hi dfdx,
I pulled your latest master, here seems to be an issue when creating the metadata. Not sure where the AbstractString is coming from:
Pkg.test("KafkaOrig")
INFO: Testing KafkaOrig ERROR: LoadError: LoadError: MethodError: no constructors have been defined for AbstractString Stacktrace:
[1] readobj(::TCPSocket, ::Type{AbstractString}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:80 [2] readobj(::TCPSocket, ::Type{Kafka.Broker}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:78 [3] readobj(::TCPSocket, ::Type{Array{Kafka.Broker,1}}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:34 [4] readobj(::TCPSocket, ::Type{Kafka.TopicMetadataResponse}) at /home/gears/.julia/v0.6/Kafka/src/io.jl:78 [5] init_metadata(::TCPSocket) at /home/gears/.julia/v0.6/Kafka/src/requests.jl:9 [6] #KafkaClient#1(::Bool, ::Type{T} where T, ::String, ::Int64) at /home/gears/.julia/v0.6/Kafka/src/client.jl:24 [7] Kafka.KafkaClient(::String, ::Int64) at /home/gears/.julia/v0.6/Kafka/src/client.jl:23 [8] include_from_node1(::String) at ./loading.jl:576 [9] include(::String) at ./sysimg.jl:14 [10] include_from_node1(::String) at ./loading.jl:576 [11] include(::String) at ./sysimg.jl:14 [12] process_options(::Base.JLOptions) at ./client.jl:305 [13] _start() at ./client.jl:371 while loading /home/gears/.julia/v0.6/KafkaOrig/test/integration.jl, in expression starting on line 5 while loading /home/gears/.julia/v0.6/KafkaOrig/test/runtests.jl, in expression starting on line 19
Hi dfdx,
Sorry, disregard above, I think my environment is just messy.