lahja icon indicating copy to clipboard operation
lahja copied to clipboard

[WIP] APIs I expected/wanted on "first contact"

Open carver opened this issue 6 years ago • 8 comments

Since this is roughly my first real contact with lahja, I wanted to write down my first impressions. I'll leave this open and [WIP] for a bit, as I collect my thoughts.

Custom APIs

An acknowledgment response

I have a request/response pair, but the response is super simple. Basically, I just want to know when it succeeds. So I don't want to have to define a custom response event type. I just want the caller to hang until the server responds.

This could look almost identical to the current Endpoint.subscribe() API (except I also want an async version, mentioned below). As soon as the handler exits, the vanilla response event is sent to acknowledge completion.

Response with a primitive value

I have a request, but just need a simple value back. Do I really need to define the response event?

Something like, client-side:

result = await endpoint.request(NeedsDoubling(3))
assert result == 6

# also, a blocking version:
endpoint.request_blocking(NeedsDoubling(3))

Server-side

async def doubler(event: NeedsDoubling):
  return event.operand * 2
endpoint.add_async_handler(NeedsDoubling, doubler)

# also
endpoint.add_handler(NeedsDoubling, lambda event: event.operand * 2)

Maybe the acknowledgement becomes redundant with this API, because you could just return None.

Connection Retries

If I start both the server and client and the client runs slightly before, I get a connection refused. I'd prefer to have it retry, at least for a while. Probably something like:

  • connect_to_endpoints(ConnectionConfig, timeout=seconds_im_willing_to_wait)

There are likely more APIs that would benefit. Unsure what the default timeout should be, if any.

Synchronous versions

Sync versions of most APIs. I expect the call to block until success (probably by launching a new thread under the hood):

  • Endpoint.connect_to_endpoints()
  • Endpoint.broadcast()

Async versions

like Endpoint.subscribe(event, some_awaitable_handler)

carver avatar May 24 '19 23:05 carver

Distilled to the things that aren't already addressed in master or via one of the open pull requests.

  • add Endpoints.connect_to_endpoint_nowait to core API
  • ensure that connect_to_endpoint tolerates slow-to-appear IPC sockets as a tested feature.
  • sending back primatives: https://github.com/ethereum/lahja/issues/90
  • allow requesting an ack when an event is received: https://github.com/ethereum/lahja/issues/91

This also touches on something that is now possible, at least once #70 is merged, which is setting up the server-side request/response handlers using the EndpointAPI.subscribe API now that it accepts a coroutine as the handler.

pipermerriam avatar May 25 '19 05:05 pipermerriam

I suppose another thing to do is update the examples in https://github.com/ethereum/lahja/tree/master/examples -- which was where I ended up on first contact.

carver avatar May 27 '19 21:05 carver

So here's a toy example of something I'd like to do from inside a BaseIsolatedPlugin. I thought it would help give color to the API desires...

def stacked_calls_demo(bus):
    bus.start_serving('task-runner')
    # This ^ is where task requesters will send events

    # Task execution will sometimes need extra data, connect below...

    # Block below connection until the endpoint becomes available...
    data_gatherer = bus.connect_to_endpoint('DataGatherer', timeout=10)
    # In the real implementation, this is probably lazier, and doesn't connect until
    # the first data is needed.

    # spawn a thread when this event is received. event expects primitive bool response
    bus.add_handler(RequestLongTask, long_task_runner(data_gatherer))

    # block until sigint || sigterm
    bus.serve_forever()

@curry
def long_task_runner(data_gatherer, event) -> bool:
    """
    returns bool: whether the long task was completed successfully
    """
    if not is_data_available():
        # block until data retrieved
        data = data_gatherer.request(WantData())
        # in the real beam sync, this ^ would probably just be an ACK
        # and the data would be loaded direct from the DB
    else:
        data = get_data()                                                                                  
    
    result = do_long_processing(data)
    
    return is_result_valid(result)

Also, maybe this requires making a sync version of the isolated plugin. Maybe it doesn't matter that we're blocking the event loop if we're not using it anywhere in this plugin, but it feels wrong.

carver avatar May 28 '19 00:05 carver

@carver I've updated the examples and added tests to CI so that they don't break again https://github.com/ethereum/lahja/pull/101

cburgdorf avatar May 28 '19 12:05 cburgdorf

Also want to be able to run a broadcast from another thread using something like:

future = asyncio.run_coroutine_threadsafe(event_bus.request(GetSomeData(key)), loop)
data = future.result(timeout=10)

(courtesy of Brian )

This crashes (in v0.12.0) with:

  File ".../venv/lib/python3.6/site-packages/lahja/endpoint.py", line 179, in run
    if self._loop != asyncio.get_event_loop():
  File "/usr/lib/python3.6/asyncio/events.py", line 694, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/usr/lib/python3.6/asyncio/events.py", line 602, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

carver avatar May 29 '19 21:05 carver

@carver I think you already figured this out since you got beam sync working but the RuntimeError is because the decorator runs too soon, ti's not async-aware. Wrapping the call to event_bus.request() in another coro is enough to get this working:

async def run_request(endpoint):
    return await endpoint.request(Request('hi'))

Here's a working (on 3.7.0) example of blocking an event-loop-less thread on a response from the endpoint: https://gist.github.com/lithp/13ca70023dfedb25ceab7703ed6e4207

I'm not sure exactly what we could do to make the decorator wait a little before checking the event loop but I'm sure it's possible to fix this, it might be worth opening a ticket just for this problem.

lithp avatar May 30 '19 05:05 lithp

I guess it's all really a tangent, because what I really want is a ThreadedEndpoint that will block on:

endpoint.request(GetSomeData(key))

carver avatar May 31 '19 18:05 carver

FWIW, this is a local lahja patch on v0.12.0 that I'm running, so I don't have to deal with the RuntimeError:

diff --git a/lahja/endpoint.py b/lahja/endpoint.py
index 20dae5e..366aa5b 100644
--- a/lahja/endpoint.py
+++ b/lahja/endpoint.py
@@ -175,11 +175,16 @@ class Endpoint:
         def run(self, *args, **kwargs):  # type: ignore
             if not self._loop:
                 self._loop = asyncio.get_event_loop()
-
-            if self._loop != asyncio.get_event_loop():
-                raise Exception(
-                    'All endpoint methods must be called from the same event loop'
-                )
+            else:
+                try:
+                    current_loop = asyncio.get_event_loop()
+                except RuntimeError:
+                    pass
+                else:
+                    if self._loop != asyncio.get_event_loop():
+                        raise Exception(
+                            'All endpoint methods must be called from the same event loop'
+                        )
 
             return func(self, *args, **kwargs)
         return cast(TFunc, run)

carver avatar May 31 '19 22:05 carver