pybricksdev
pybricksdev copied to clipboard
Use pybricksdev programmatically
I want to run code on my hub using a Python script not the command, how is that possible?
FYI, the best place to ask questions about anything Pybricks is https://github.com/orgs/pybricks/discussions
Here is a basic script to get you started:
#!/usr/bin/env python3
# run this script on your computer
import asyncio
import contextlib
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
# this script must be in the current working directory and will be sent to the hub
MY_PROGRAM = "hub_program.py"
async def main():
async with contextlib.AsyncExitStack() as stack:
dev = await find_device()
hub = PybricksHub()
await hub.connect(dev)
stack.push_async_callback(hub.disconnect)
await hub.run(MY_PROGRAM, print_output=True, wait=True)
asyncio.run(main())
Oh thanks, I didn't know that. And with the script I get an error:
Traceback (most recent call last):
File "C:\Koner\projects\python\robi-ai\connect-tests\main.py", line 18, in <module>
asyncio.run(main())
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Koner\projects\python\robi-ai\connect-tests\main.py", line 11, in main
dev = await find_device()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\pybricksdev\ble\__init__.py", line 63, in find_device
device = await BleakScanner.find_device_by_filter(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\bleak\__init__.py", line 369, in find_device_by_filter
async with cls(detection_callback=apply_filter, **kwargs):
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\bleak\__init__.py", line 152, in __aenter__
await self._backend.start()
File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\bleak\backends\winrt\scanner.py", line 241, in start
self.watcher.start()
OSError: [WinError -2147020577] Das Gerät kann nicht verwendet werden
This means The device cannot be used
in English
It looks like this is because bluetooth was turned off. How can I turn it on automatically?
And I still have one question, is it also possible to send individual commands instead of entire files?
It is, but the only available commands are to download a file or run a file that has already been downloaded to the hub or stop a running program.
But you can write a program that runs on the hub that receives commands from the program running on your computer.
Do you have an example of this? And on the code.pybricks.com website you can also execute individual commands, how does it work there?
There are some examples at https://pybricks.com/projects/tutorials/wireless/hub-to-device/
Thank you, but can I somehow write the code shown in the resource you gave me into one single file? So first I would need to connect with the hub using this code, right?
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
Then somehow upload the hub program (of course, the command doesn't work as shown):
hub.upload_program('hub_program.py')
Then start it:
hub.run_program()
And afterward, I would need to send the commands. Can't I do this somehow using:
hub.write(b"fwd")
instead of using BleakClient?
Yes, you can do that using pybricksdev
.
Thanks, but how can I upload the program to the hub, because
hub.upload_program('hub_program.py')
of course doesn't work?
I thinks this is what you are looking for
https://github.com/pybricks/pybricksdev/blob/11667cb05427b2638fb475c1561fdfa380f59998/pybricksdev/connections/pybricks.py#L431-L447
You will need to compile the program first. You can have a look at the source of run()
in the same file to see how it is done.
Thank you so much for your help! When I was just reading through the source code, I came across that you can set wait to False as a parameter for hub.run(). That was exactly what I was looking for, but I seemed to have overlooked it.
But I still have a few questions:
- How can I use
hub.run()
without the tqdm progress bar - How can I receive the data that the hub sends to the computer. With
hub.read_line()
? - How can I send commands to the hub that have different lengths, for example up to 20 characters
- How can I use
hub.run()
without the tqdm progress bar
You can set the environment variable TQDM_DISABLE=1
.
2. How can I receive the data that the hub sends to the computer. With
hub.read_line()
?
Yes, this is the simplest way.
3. How can I send commands to the hub that have different lengths, for example up to 20 characters
There is also a write_line()
method.
What am I doing wrong?
main.py
import asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
os.environ["TQDM_DISABLE"] = "1"
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise Exception("No device found")
except OSError:
raise Exception("Please turn bluetooth on")
async def main():
hub = await connect_to_hub()
await hub.run("hub.py", wait=False)
await hub.write_line("rev")
await asyncio.sleep(3)
await hub.write_line("fwd")
await asyncio.sleep(3)
await hub.write_line("bye")
print(await hub.read_line())
asyncio.run(main())
hub.py
from pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
while True:
# Read three bytes.
cmd = stdin.buffer.read()
# Decide what to do based on the command.
if cmd == b"fwd":
motor.dc(50)
elif cmd == b"rev":
motor.dc(-50)
elif cmd == b"bye":
break
# Send a response.
stdout.buffer.write(b"OK")
My hub doesn't do anything anymore, I don't get an OK back and the TQDM progress bar is still displayed
The hub program doesn't take into account newlines. Also, if everything is text-based, we should be able to use stdin
directly instead of stdin.buffer
.
Also, the program on the computer should wait for something like an "OK" from the hub first before sending commands so that it knows that the hub program is loaded and running.
Sadly it still does not work
hub.py
from pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
while True:
# Read three bytes.
cmd = stdin.read()
# Decide what to do based on the command.
if cmd == b"fwd\n":
motor.dc(50)
elif cmd == b"rev\n":
motor.dc(-50)
elif cmd == b"bye\n":
break
# Send a response.
stdout.buffer.write(b"OK\n")
It looks like this program still sends the OK at the end instead of at the beginning. Also, it looks like it is doing a read()
instead of readline()
.
Unfortunately it still doesn't work:
main.py
import asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
os.environ["TQDM_DISABLE"] = "1"
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise Exception("No device found")
except OSError:
raise Exception("Please turn bluetooth on")
async def main():
hub = await connect_to_hub()
await hub.run("hub.py", wait=False)
if await hub.read_line():
await hub.write_line("rev")
await asyncio.sleep(3)
await hub.write_line("fwd")
await asyncio.sleep(3)
await hub.write_line("bye")
print(await hub.read_line())
asyncio.run(main())
hub.py
from pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
# Program is ready
stdout.buffer.write(b"OK\n")
while True:
# Read three bytes.
cmd = stdin.readline()
# Decide what to do based on the command.
if cmd == b"fwd\n":
motor.dc(50)
elif cmd == b"rev\n":
motor.dc(-50)
elif cmd == b"bye\n":
break
# Send a response.
stdout.buffer.write(b"OK\n")
The OK is still at the end instead of the beginning on the hub program and the PC program does not wait for OK before sending each command.
Like this?
main.py
import asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
os.environ["TQDM_DISABLE"] = "1"
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise Exception("No device found")
except OSError:
raise Exception("Please turn bluetooth on")
async def main():
hub = await connect_to_hub()
await hub.run("hub.py", wait=False)
if await hub.read_line():
await hub.write_line("rev")
if await hub.read_line():
await asyncio.sleep(3)
await hub.write_line("fwd")
if await hub.read_line():
await asyncio.sleep(3)
await hub.write_line("bye")
print(await hub.read_line())
asyncio.run(main())
hub.py
from pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin, stdout
motor = Motor(Port.A)
# Program is ready
stdout.buffer.write(b"OK\n")
while True:
# Read three bytes.
cmd = stdin.readline()
# Decide what to do based on the command.
if cmd == b"fwd\n":
motor.dc(50)
elif cmd == b"rev\n":
motor.dc(-50)
elif cmd == b"bye\n":
break
# Send a response.
stdout.buffer.write(b"OK\n")
This is how I would do it:
main.py
import asyncio
import contextlib
import os
# must before tqdm import!
os.environ["TQDM_DISABLE"] = "1"
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
async def connect_to_hub():
try:
device = await find_device()
hub = PybricksHub()
await hub.connect(device)
return hub
except asyncio.TimeoutError:
raise RuntimeError("No device found")
except OSError:
raise RuntimeError("Please turn bluetooth on")
async def send_command(hub: PybricksHub, cmd: str):
line = await asyncio.wait_for(hub.read_line(), timeout=5)
if line != "OK":
raise RuntimeError(f"Unexpected response: '{line}'")
await hub.write_line(cmd)
async def stop_if_running(hub: PybricksHub):
try:
await hub.stop_user_program()
except Exception:
# ignore error, e.g. if hub is already disconnected
pass
async def main():
async with contextlib.AsyncExitStack() as stack:
hub = await connect_to_hub()
stack.push_async_callback(hub.disconnect)
await hub.run("hub.py", print_output=False, wait=False)
stack.push_async_callback(stop_if_running, hub)
await send_command(hub, "rev")
await asyncio.sleep(3)
await send_command(hub, "fwd")
await asyncio.sleep(3)
await send_command(hub, "bye")
asyncio.run(main())
hub.py
from pybricks.pupdevices import Motor
from pybricks.parameters import Port
# Standard MicroPython modules
from usys import stdin
motor = Motor(Port.A)
while True:
# let PC know we are ready for a command
print("OK")
# wait for command from PC
cmd = stdin.readline().strip()
# Decide what to do based on the command.
if cmd == "fwd":
motor.dc(50)
elif cmd == "rev":
motor.dc(-50)
elif cmd == "bye":
break
Hi, unfortunately I still have a problem with my code. Because when I run this, an error occurs in the console.
main.py
import asyncio
import os
os.environ["TQDM_DISABLE"] = "1"
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
class MyHub:
def __init__(self):
print("MyHub class initialized")
# Get response from hub
async def read_line(self):
return await self.hub.read_line()
# Send message to hub
async def write_line(self, value: str):
await self.hub.write_line(value)
# Connect with hub
async def connect(self):
try:
device = await find_device()
self.hub = PybricksHub()
await self.hub.connect(device)
print("Connected to hub successfully")
await self.hub.run("hub.py", print_output=False, wait=False)
# Wait for hub loading the program
response = await self.read_line()
if response != "OK":
print(f"Unexpected hub response: {response}")
await self.disconnect()
exit()
else:
print("Running hub script...")
except asyncio.TimeoutError:
print("Hub not found")
exit()
except OSError:
print("Bluetooth is turned off")
exit()
# Disconnect from hub
async def disconnect(self):
print("Disconnecting from hub...")
await self.hub.stop_user_program()
await self.hub.disconnect()
myhub = MyHub()
asyncio.run(myhub.connect())
asyncio.run(myhub.write_line("rev"))
asyncio.run(myhub.read_line())
asyncio.run(myhub.disconnect())
Error
Traceback (most recent call last):
File "C:\...\projects\python\robi-ai\new\notworking.py", line 58, in <module>
asyncio.run(robi.read_line())
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\...\projects\python\robi-ai\new\notworking.py", line 17, in read_line
return await self.hub.read_line()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\site-packages\pybricksdev\connections\pybricks.py", line 429, in read_line
return await self.race_disconnect(self._stdout_line_queue.get())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\site-packages\pybricksdev\connections\pybricks.py", line 365, in race_disconnect
return awaitable_task.result()
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\queues.py", line 155, in get
getter = self._get_loop().create_future()
^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\mixins.py", line 20, in _get_loop
raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: <Queue at 0x14eb1bce1d0 maxsize=0 tasks=1> is bound to a different event loop
You can only use one asyncio.run()
in your script.
Unfortunately I get an error from the hub
hub.py
from pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait
from usys import stdin
import threading
import random
# [...]
# Setup hub
hub = InventorHub()
hub.light.off()
# Face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
wait(50)
wait(random.uniform(1, 5))
animatedFaceThread = threading.Thread(target=animatedFaceController)
animatedFaceThread.start()
# [...]
Error
[15:27:44 INFO]: Traceback (most recent call last):
[15:27:44 INFO]: File "hub.py", line 6, in <module>
[15:27:44 INFO]: ImportError: no module named 'threading'
What's wrong with multithreading?
Pybricks does not support threading. The current beta version supports cooperative multitasking with coroutines (async/await). I don't think we have much documentation on it yet though.
Can you show me the code with this cooperative multitasking? And where can I get the beta firmware? And should I just "install" the beta over the old version or should I first uninstall the old one and then install the beta?
And where can I get the beta firmware?
https://beta.pybricks.com
And should I just "install" the beta over the old version or should I first uninstall the old one and then install the beta?
These are independent applications. One does not replace the other.
Can you show me the code with this cooperative multitasking?
from pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait, multitask, run_task
from usys import stdin
import random
# [...]
# Setup hub
hub = InventorHub()
hub.light.off()
# Face animation
animatedFaceFrames = [
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]
async def animatedFaceController():
while True:
for frame in animatedFaceFrames:
hub.display.icon(frame)
await wait(50)
await wait(random.uniform(1, 5))
async def other_task():
...
async def main():
await multitask(
animatedFaceController(),
other_task(),
)
run_task(main())
So can I just install the beta firmware while the normal firmware is still installed? And how can I then tell the hub to use the beta firmware? Or have I misunderstood something?
You have to flash the firmware on the hub to change it. You can't have two firmwares installed at the same time. You can read the current version of the firmware on the hub in the status bar of the Pybricks Code/Pybricks Beta web apps or you can write a program.
from pybricks import version
print(version)