luv icon indicating copy to clipboard operation
luv copied to clipboard

Allow passing userdata to thread

Open kelvinhammond opened this issue 6 years ago • 15 comments

Currently work and threads do not support passing userdata.

Note: I'm using sol2 defined userdata but this shouldn't matter. Example:

local channel = UserDataChannel.new()
local thread_id = uv.new_thread(function(chan)
  print("channel: ", tostring(channel))
end, channel)

uv.thread_join(thread_id)

Gives the error: bad argument #2 to 'new_thread' (Expected uv_handle userdata)

kelvinhammond avatar Aug 03 '19 03:08 kelvinhammond

There's a pretty good reason not to support this. It would drastically increase the complexity of the needed serializers and even then there might be assumptions built into the library that the userdata is from which cause it to cause bugs when sent between threads. This isn't a feature that can reasonably be supported. It would require modifications to the library that produced the userdata and a significant body of new code. Sending arbitrary userdata between states is impossible. Creating a standard that allows conforming userdata representing a useful subset of things to be sent between threads is possible but difficult. Doing that requires making a source-fork of every native module that needs to be used between threads.

What are you actually trying to do and why do you think you need that? There might be an easier way to accomplish it.

aiverson avatar Aug 03 '19 04:08 aiverson

I have multiple lua states running in their own threads. One thread may send a message to another thread. I have an object channel using a shared_ptr to boost fiber's buffered channel as userdata.

I can push and pop on the channel but that blocks the thread, I tried an idle handle with try_pop but that causes 100% cpu usage, most likely because of the locks, this even happens with thread_yield() in C++.

So I need a way to send, check, and recv messages from another thread without 100% cpu usage and without blocking the event loop.

kelvinhammond avatar Aug 03 '19 05:08 kelvinhammond

Idle handles cause 100% CPU usage because they prevent the wait until the next event in the loop and causes it to do a non-blocking check for new events and immediately run the idle function again if there aren't any.

A check handle will leave the waits intact so if no event is available it will wait until one is rather than using the CPU completely but will still run the check on every iteration of the event loop.

What you might want to do is use an async to notify the thread that it has something new on one of your custom channels and then installing an idle that pops the next item out of the queue and processes it if available but cancels the idle if the queue is empty. That would accomplish the usage you described with the channels you already have.

The simple way to do what you want is to just create a socket that you can connect to from each thread and send serialized data between them, accomplishing the buffering, notification, and sequencing by just delegating it to libuv and the OS.

More complicatedly, you can use the FFI to create a buffered channel and pass a handle to one of the ends of the buffered channel between threads by marshalling it into the bytes of a string and then unmarshalling it on the other end, then have your buffered channel support sending typed FFI data between threads, potentially including copyless memory buffers and other advanced features.

Is any of this helpful?

aiverson avatar Aug 03 '19 05:08 aiverson

I'd like to avoid FFI. I haven't figured out how to pass the channel passed into the lua script into a thread created with new_thread where I could then call async:send().

Do you have a suggestion for this?

Thank you for your help.

kelvinhammond avatar Aug 03 '19 05:08 kelvinhammond

The userdata would have to be created on the new thread, not passed across it. If you don't mind making some custom C++ code for this, you can make a pair of functions that marshall and unmarshall the channel headers. For example, you can use reinterpret_cast<> to write a pointer to the channel into the bytes of a string of the appropriate length and just return the string, then convert that string to a lua string that you can just pass between the threads, then on the other thread, you can call a different function that uses reinterpret_cast<> to get the pointer to the channel back from the bytes of the string, which when wrapped by sol2 gets you the channel userdata back again. You would need to be careful with garbage collection anchoring and avoid a double-free bug, but that is one way to make that happen.

Why couldn't you just use a libuv socket or pipe?

What is the task you are actually trying to do for which you reached for boost fiber channels?

aiverson avatar Aug 03 '19 05:08 aiverson

I haven't used the socket or pipe because I'm unsure how to use them correctly. I will be sending messages of varying sizes over the pipe.

I used a channel because I have multiple states, each state will have its own event loop, variables, etc in a sandbox. One state may tell the other state to exit or that a message has been received from one input and be copied to multiple states for processing.

I also have custom user types defined which don't get passed / registered in new_thread.

Flow:

state main:
- event loop
  - timers for periodics tasks
  - process messages (ex: exit, tasks from sub1)
  - process user input and send to sub states

state sub1:
- event loop
  - timers for periodic tasks
  - process messages (ex: exit, user input)

kelvinhammond avatar Aug 03 '19 05:08 kelvinhammond

Are you wanting a many-many architecture or a one-many architecture? Do you want any thread to be able to communicate to any thread that it has acquired a channel for and for any thread to be able to spawn new threads and pass channels around, or do you want a single main thread that can communicate to a pool of workers and each worker can communicate to the main thread, but workers don't need to communicate among themselves?

aiverson avatar Aug 03 '19 05:08 aiverson

Sidenote, building a better way to do this into luv is an ongoing project of mine, but unfortunately it isn't done yet so I can't just point you to a branch on my fork and say to try using it.

aiverson avatar Aug 03 '19 05:08 aiverson

many to many so that sub states may communicate with other sub states and the main state may communicate with and be communicated with the sub states.

kelvinhammond avatar Aug 03 '19 05:08 kelvinhammond

Is it an acceptable solution to have the main state proxy the communications to the substate in question? Getting communication between one main state and a bunch of child states is doable easily with a pipe, and proxying the communications betweeen different states is doable in a fairly straightforward manner. If that is an acceptable solution, I can tell you how to do it; otherwise, waiting until I have made my planned threading extensions to luv, getting into FFI stuff, or using the custom C++ code to do the marshalling are the only reasonable options I can think of off the top of my head.

aiverson avatar Aug 03 '19 06:08 aiverson

Having a main state proxy communications to substates seems like it would be fine, how would something like this work?

What is the best way to correctly use a pipe? My main issue is partial reads and figuring out when a message is completed when using sockets / socket like.

kelvinhammond avatar Aug 03 '19 06:08 kelvinhammond

You can do this by using local pipe = uv.new_pipe(false) to create a new pipe that can't do IPC and can only go between threads of the same process, then pipe:bind(pipename) to give it an accessible name. This creates the pipe on the filesystem using the name as a path on unix, or as a named pipe on windows. From there, you start listening to the pipe using pipe:listen(n, fn) I use 128 for n. The function is what should be done with each new connection. The body of this function will look something like

local client = uv.new_pipe(false)
pipe:accept(client)
--vars for dispatcher state
local function dispatch(err, data)
-- body of dispatching logic that decodes and routes the data from that connection.
end
client:read_start(codec.decode(dispatch))

That sets up the main state's side of the pipe.

To connect from the child state it is just local pipe = uv.new_pipe(false) followed by pipe:connect(pipename). Then the child state can read from and write to the pipe just like any other stream.

To avoid partial reads and handle message completion, you would make a codec that can chunk the raw bytestream into the appropriate sections. The easy way to do this is to just prepend each chunk with a message length. Then the codec can just ensure it has all the bytes of the count then ensure it has all the bytes of the message based on the count, waiting if necessary to ensure that it has all the data it needs before passing the messages in the correct chunks on.

aiverson avatar Aug 03 '19 06:08 aiverson

Thank you, I'll try this and get back to you tomorrow.

kelvinhammond avatar Aug 03 '19 06:08 kelvinhammond

If you're interested in using coroutines instead of callbacks, coro-net has support for pipes, where codec functions can be applied according to coro-wrapper's description.

SinisterRectus avatar Aug 03 '19 12:08 SinisterRectus

Hi @aiverson , I tried a very similar code as you suggested in the comment in the context of neovim. As Luv is integrated into neovim, so, I was trying to write an async multithreaded plugin using Luv capabilities.

In my code, there are 2 threads (one main thread and one child thread), 2 pipes (One for writing from main thread to child thread and one for writing from child thread to main thread).

I am writing some data from the main thread to the child thread on the first pipe, and when child thread gets the data it writes the same data to another pipe destined for main thread.

Here is the code I am referring to:

local uv = vim.loop

local M = {}


local function entryFunction()
	local uv = require'luv'
	local pipe = uv.new_pipe(false)
	local another_pipe = uv.new_pipe(false)
	pipe:connect("/tmp/socket.async.tmp")
	another_pipe:connect("/tmp/socket.aync.another")
	pipe:read_start(function(err, chunk)
		if err then
			another_pipe:write("error")
		else
			another_pipe:write(chunk)
		end
	end)
end

function M.asyncLoad()
	local pipe = uv.new_pipe(false)
	local another_pipe = uv.new_pipe(false)
	pipe:bind("/tmp/socket.async.tmp")
	pipe:listen(128, function()
		local client = uv.new_pipe(false)
		pipe:accept(client)
		for i = 1, 3 do
			local data = string.format("Here %d", i)
			client:write(data)
			-- os.execute("sleep 1")
		end
	end)
	another_pipe:bind("/tmp/socket.aync.another")
	another_pipe:listen(128, function()
		local client = uv.new_pipe(false)
		another_pipe:accept(client)
		client:read_start(function(err, chunk)
			if err then
				print(err)
			else
				print(chunk)
			end
		end)
	end)
	M.thread = uv.new_thread(entryFunction)
end

return M

asyncLoad is the function that is being called as plugin from neovim context.

This program is expected to print: Here 1, Here 2, and Here 3 line by line. However, it is printing null. (from reading chunk from main thread's another_pipe).

It also seems that the child thread is not able to read any data sent by main thread because putting a print statement in read_start function of child thread doesn't print anything.

Is there something I am missing? My guess is child thread is exiting before it receives any data. However, putting an infinite loop in the child thread seems a waste of resources as it would be busy waiting. So, if my guess is right about exiting of child thread, what should I do so that the child thread persists but without busy waiting ?

Thanks

RishabhRD avatar Nov 26 '20 22:11 RishabhRD