python3-gpiod
python3-gpiod copied to clipboard
Add interrupt-based callback API for GPIO inputs
i'd like an API call where I may provide a callback for GPIO inputs which is called asynchronously on incoming interrupts (e.g. by a button).
I'm currently implementing this functionality elsewhere around this lib, but I could imagine this could be handy for the lib itself - what do you think?
The old fs-type approaches of GPIO access (e.g. RPi.GPIO) offer some sort of interrupt-based callback registration.
@w4tsn I think so. But I'm waiting for functions using callback to be added to libgpiodcxx.
If a user wants to add callback feature quickly, there are also plans to add it in advance, with a warning that it may disappear later. :)
I've added this functionality in the pi-mqtt-gpio project with the usage of the threading package in a pretty naive way.
In a callback register function the pin is initialized and a thread is spawned running a while True
loop.
It's working fine like this from what I can tell, but there are better ways I suppose.
I'm not quite sure however, how a first community-implementation of this feature could look like in the context of this project.
https://github.com/w4tsn/pi-mqtt-gpio/blob/feature/module-gpiod/pi_mqtt_gpio/modules/gpiod.py
import threading
import queue
from datetime import datetime, timedelta
class GpioThread(threading.Thread):
def __init__(self, chip, offset, config, callback, bouncetime):
super().__init__()
self.daemon = True
self._queue = queue.Queue()
self.pin = chip.get_line(offset)
self.pin.request(config)
self.callback = callback
self.bouncetime = timedelta(microseconds=bouncetime)
def run(self):
previous_event_time = datetime.now()
while True:
if self.pin.event_wait():
event = self.pin.event_read()
if event.timestamp - previous_event_time > self.bouncetime:
previous_event_time = event.timestamp
ret = self.callback()
self._queue.put(
{
"type": event.event_type,
"time": event.timestamp,
"result": ret,
}
)
@property
def handle(self):
if self._queue.empty():
return None
return self._queue.get()
def setup_interrupt(self, handle, pin, edge, callback, bouncetime=100):
"""
install interrupt callback function
handle: is returned in the callback function as identification
pin: gpio to watch for interrupts
edge: triggering edge: RISING, FALLING or BOTH
callback: the callback function to be called, when interrupt occurs
bouncetime: minimum time between two interrupts
"""
config = self.io.line_request()
config.consumer = 'pi-mqtt-gpio'
config.request_type = INTERRUPT[edge]
t = GpioThread(chip=self.chip, offset=pin, config=config,
callback=callback, bouncetime=bouncetime)
t.start()
self.watchers[offset] = t
# I'm not sure if the property itself will go or the return value, when input t.handle into dict.
self.GPIO_INTERRUPT_CALLBACK_LOOKUP[offset] = {"handle": t.handle,
"callback": callback}
I didn't test the above code.
Priority is to remove the dependency on libgpiod.so, so I haven't checked the threading side yet.
I think it would be better to minimize the sharing of the resources of the main thread and the resources of the gpio thread, so I redefined the thread and got the return value through the queue.
Thanks a lot. This is a nice idea and is a nice starting point for an enhancement. I'll check and incorporate your suggestion
Test code.
import gpiod
import asyncio
import select
import threading
import time
chip = gpiod.chip(1)
line = chip.get_line(70)
line2 = chip.get_line(69)
config = gpiod.line_request()
config.consumer = "test"
config.request_type = gpiod.line_request.EVENT_BOTH_EDGES
loop = asyncio.get_event_loop()
t = threading.Thread(target=loop.run_forever, daemon=True)
t.start()
line.request(config)
poll = select.epoll()
poll.register(line.event_get_fd(), select.POLLIN | select.POLLPRI)
def callback():
event = line.event_read()
print(line.offset, event.timestamp)
loop.add_reader(poll.fileno(), callback)
time.sleep(5)
line2.request(config)
poll2 = select.epoll()
poll2.register(line2.event_get_fd(), select.POLLIN | select.POLLPRI)
def callback2():
event = line2.event_read()
print(line2.offset, event.timestamp)
loop.add_reader(poll2.fileno(), callback2)
while True:
print("main")
time.sleep(10000)