lahja
lahja copied to clipboard
[WIP] APIs I expected/wanted on "first contact"
Since this is roughly my first real contact with lahja, I wanted to write down my first impressions. I'll leave this open and [WIP] for a bit, as I collect my thoughts.
Custom APIs
An acknowledgment response
I have a request/response pair, but the response is super simple. Basically, I just want to know when it succeeds. So I don't want to have to define a custom response event type. I just want the caller to hang until the server responds.
This could look almost identical to the current Endpoint.subscribe() API (except I also want an async version, mentioned below). As soon as the handler exits, the vanilla response event is sent to acknowledge completion.
Response with a primitive value
I have a request, but just need a simple value back. Do I really need to define the response event?
Something like, client-side:
result = await endpoint.request(NeedsDoubling(3))
assert result == 6
# also, a blocking version:
endpoint.request_blocking(NeedsDoubling(3))
Server-side
async def doubler(event: NeedsDoubling):
return event.operand * 2
endpoint.add_async_handler(NeedsDoubling, doubler)
# also
endpoint.add_handler(NeedsDoubling, lambda event: event.operand * 2)
Maybe the acknowledgement becomes redundant with this API, because you could just return None.
Connection Retries
If I start both the server and client and the client runs slightly before, I get a connection refused. I'd prefer to have it retry, at least for a while. Probably something like:
connect_to_endpoints(ConnectionConfig, timeout=seconds_im_willing_to_wait)
There are likely more APIs that would benefit. Unsure what the default timeout should be, if any.
Synchronous versions
Sync versions of most APIs. I expect the call to block until success (probably by launching a new thread under the hood):
Endpoint.connect_to_endpoints()Endpoint.broadcast()
Async versions
like Endpoint.subscribe(event, some_awaitable_handler)
Distilled to the things that aren't already addressed in master or via one of the open pull requests.
- add
Endpoints.connect_to_endpoint_nowaitto core API - ensure that
connect_to_endpointtolerates slow-to-appear IPC sockets as a tested feature. - sending back primatives: https://github.com/ethereum/lahja/issues/90
- allow requesting an ack when an event is received: https://github.com/ethereum/lahja/issues/91
This also touches on something that is now possible, at least once #70 is merged, which is setting up the server-side request/response handlers using the EndpointAPI.subscribe API now that it accepts a coroutine as the handler.
I suppose another thing to do is update the examples in https://github.com/ethereum/lahja/tree/master/examples -- which was where I ended up on first contact.
So here's a toy example of something I'd like to do from inside a BaseIsolatedPlugin. I thought it would help give color to the API desires...
def stacked_calls_demo(bus):
bus.start_serving('task-runner')
# This ^ is where task requesters will send events
# Task execution will sometimes need extra data, connect below...
# Block below connection until the endpoint becomes available...
data_gatherer = bus.connect_to_endpoint('DataGatherer', timeout=10)
# In the real implementation, this is probably lazier, and doesn't connect until
# the first data is needed.
# spawn a thread when this event is received. event expects primitive bool response
bus.add_handler(RequestLongTask, long_task_runner(data_gatherer))
# block until sigint || sigterm
bus.serve_forever()
@curry
def long_task_runner(data_gatherer, event) -> bool:
"""
returns bool: whether the long task was completed successfully
"""
if not is_data_available():
# block until data retrieved
data = data_gatherer.request(WantData())
# in the real beam sync, this ^ would probably just be an ACK
# and the data would be loaded direct from the DB
else:
data = get_data()
result = do_long_processing(data)
return is_result_valid(result)
Also, maybe this requires making a sync version of the isolated plugin. Maybe it doesn't matter that we're blocking the event loop if we're not using it anywhere in this plugin, but it feels wrong.
@carver I've updated the examples and added tests to CI so that they don't break again https://github.com/ethereum/lahja/pull/101
Also want to be able to run a broadcast from another thread using something like:
future = asyncio.run_coroutine_threadsafe(event_bus.request(GetSomeData(key)), loop)
data = future.result(timeout=10)
(courtesy of Brian )
This crashes (in v0.12.0) with:
File ".../venv/lib/python3.6/site-packages/lahja/endpoint.py", line 179, in run
if self._loop != asyncio.get_event_loop():
File "/usr/lib/python3.6/asyncio/events.py", line 694, in get_event_loop
return get_event_loop_policy().get_event_loop()
File "/usr/lib/python3.6/asyncio/events.py", line 602, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
@carver I think you already figured this out since you got beam sync working but the RuntimeError is because the decorator runs too soon, ti's not async-aware. Wrapping the call to event_bus.request() in another coro is enough to get this working:
async def run_request(endpoint):
return await endpoint.request(Request('hi'))
Here's a working (on 3.7.0) example of blocking an event-loop-less thread on a response from the endpoint: https://gist.github.com/lithp/13ca70023dfedb25ceab7703ed6e4207
I'm not sure exactly what we could do to make the decorator wait a little before checking the event loop but I'm sure it's possible to fix this, it might be worth opening a ticket just for this problem.
I guess it's all really a tangent, because what I really want is a ThreadedEndpoint that will block on:
endpoint.request(GetSomeData(key))
FWIW, this is a local lahja patch on v0.12.0 that I'm running, so I don't have to deal with the RuntimeError:
diff --git a/lahja/endpoint.py b/lahja/endpoint.py
index 20dae5e..366aa5b 100644
--- a/lahja/endpoint.py
+++ b/lahja/endpoint.py
@@ -175,11 +175,16 @@ class Endpoint:
def run(self, *args, **kwargs): # type: ignore
if not self._loop:
self._loop = asyncio.get_event_loop()
-
- if self._loop != asyncio.get_event_loop():
- raise Exception(
- 'All endpoint methods must be called from the same event loop'
- )
+ else:
+ try:
+ current_loop = asyncio.get_event_loop()
+ except RuntimeError:
+ pass
+ else:
+ if self._loop != asyncio.get_event_loop():
+ raise Exception(
+ 'All endpoint methods must be called from the same event loop'
+ )
return func(self, *args, **kwargs)
return cast(TFunc, run)