ape
ape copied to clipboard
Add a "listen for event" function
Overview
This is something I would have liked to see some additional functionality in brownie from.
Specification
Maybe the API looks something like:
from ape import networks
event = networks.listen_for_event(contract_address, event_name, timeout=200)
I made a custom edition of this in my chainlink-mix would be great for ape to have this built-in.
Maybe contract_address
is optional. This would be a blocking call, it might make sense to integrate with asyncio in the distant future. For now, having it be blocking is perfect I think.
Perhaps not well documented, but we do have Contract Event polling for this purpose
It doesn't look like it actually listens for events tho? I don't see a way to get events from that.
I wrote a method in python for almost having a general enough func (takes a contracts and no event stuff).
Seems like this could be pretty easy to implement.
def wait_for_randomness(vrf_consumer, timeout=200, poll_interval=2):
print("Waiting for random response...")
start_time = time.time()
current_time = time.time()
while current_time - start_time < timeout:
# Would swap this out for some event stuff
response = vrf_consumer.random_words(0)
if response > 0:
return response
time.sleep(poll_interval)
current_time = time.time()
print("Done waiting!")
return None
It doesn't look like it actually listens for events tho? I don't see a way to get events from that.
I wrote a method in python for almost having a general enough func (takes a contracts and no event stuff).
Seems like this could be pretty easy to implement.
def wait_for_randomness(vrf_consumer, timeout=200, poll_interval=2): print("Waiting for random response...") start_time = time.time() current_time = time.time() while current_time - start_time < timeout: # Would swap this out for some event stuff response = vrf_consumer.random_words(0) if response > 0: return response time.sleep(poll_interval) current_time = time.time() print("Done waiting!") return None
It calls out to BlockManager.poll_blocks
which does a cyclical polling action just like that example does
EDIT: although looking further it's likely to be extremely inefficient and should use the query manager instead
@PatrickAlphaC It does listen for events, that is exactly what it does!
For example the script:
import click
from ape import config, networks, project
# TODO: Parametrize and make a better example
def main():
instance = project.TestContractVy.at("0x274b028b03A250cA03644E6c578D81f019eE1323")
event_type = instance.NumberChange
click.echo(f"Polling for new '{event_type.name}' logs...")
for new_log in event_type.poll_logs():
click.echo(new_log.newNum)
It will wait for new events as they come in and then log them. The process never ends.
@fubuloubu How would the query manager help here? These are all new logs coming in and that are not yet entered into an any DB
The one missing piece I see is a way to exit the poll upon some condition... Perhaps you are only waiting for a single event or something.
OK yeah reading more closely, you are trying to basically have an async type deal that waits for a log and then carries on with something else once it arrives, right?
@fubuloubu How would the query manager help here? These are all new logs coming in and that are not yet entered into an any DB
To get the historicial logs from query manager, and then wait on any new ones stays as it is today
If the docs would give an example of how to implement EventWatcher
this would be very helpful. After importing with from brownie.network.event import EventWatcher
I see I can print out print the directory print(f' {dir(EventWatcher)}')
but it is unclear how I can make use of these commands.
I have posted a question on StackExchange
If the docs would give an example of how to implement
EventWatcher
this would be very helpful. After importing withfrom brownie.network.event import EventWatcher
I see I can print out print the directoryprint(f' {dir(EventWatcher)}')
but it is unclear how I can make use of these commands.I have posted a question on StackExchange
In ape, we added special methods to create exactly this type of event handling loop: https://docs.apeworx.io/ape/stable/methoddocs/contracts.html#ape.contracts.base.ContractEvent.poll_logs
you would use it like this:
for log in my_contract.MyEvent.poll_logs(...):
# Do something with log e.g. `assert log.arg1 == 1`
...
You are correct that we don't have this very well documented at present, so I'll make this issue into a documentation update to add both ContractEvent.poll_logs
and it's companion chain.blocks.poll_blocks
to the userguides.
To give us some insight, where in our docs would you expect this to live?
Overview
This is something I would have liked to see some additional functionality in brownie from.
Specification
Maybe the API looks something like:
from ape import networks event = networks.listen_for_event(contract_address, event_name, timeout=200)
I made a custom edition of this in my chainlink-mix would be great for ape to have this built-in.
To follow up on this issue, we've actually made a whole platform from this idea which allows provisioning event handlers using a message queue processing architecture, made super simple using our SDK: https://github.com/SilverBackLtd/silverback
A snapshot from the example:
from ape_tokens import tokens
from silverback import SilverBackApp
app = SilverBackApp()
YFI = tokens["YFI"]
@app.on_(YFI.Approval)
async def exec_event2(log):
return log.value
ohh maybe I am out of place here. I just now saw this is the apeWorx page. Beginners mistake but I did not even check what page I was posting too. But I think ANY page which a basic google search might turn up on how to use EventWatcher
in the context of eth-brownie would be helpful. Since commenting I've managed to figure it out though, so maybe it can be said that if someone is to the point they are using EventWatcher
they probably can figure it out as well. Still, some more detailed examples would have been helpful. To this end I answered my own question on stack exchange.