pybricksdev icon indicating copy to clipboard operation
pybricksdev copied to clipboard

Use pybricksdev programmatically

Open KonerDev opened this issue 1 year ago • 39 comments

I want to run code on my hub using a Python script not the command, how is that possible?

KonerDev avatar Aug 04 '23 17:08 KonerDev

FYI, the best place to ask questions about anything Pybricks is https://github.com/orgs/pybricks/discussions

Here is a basic script to get you started:

#!/usr/bin/env python3

# run this script on your computer

import asyncio
import contextlib

from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub

# this script must be in the current working directory and will be sent to the hub
MY_PROGRAM = "hub_program.py"

async def main():
    async with contextlib.AsyncExitStack() as stack:
        dev = await find_device()
        hub = PybricksHub()
        await hub.connect(dev)
        stack.push_async_callback(hub.disconnect)
        await hub.run(MY_PROGRAM, print_output=True, wait=True)

asyncio.run(main())

dlech avatar Aug 04 '23 18:08 dlech

Oh thanks, I didn't know that. And with the script I get an error:

Traceback (most recent call last):
  File "C:\Koner\projects\python\robi-ai\connect-tests\main.py", line 18, in <module>
    asyncio.run(main())
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Koner\projects\python\robi-ai\connect-tests\main.py", line 11, in main
    dev = await find_device()
          ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\pybricksdev\ble\__init__.py", line 63, in find_device
    device = await BleakScanner.find_device_by_filter(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\bleak\__init__.py", line 369, in find_device_by_filter
    async with cls(detection_callback=apply_filter, **kwargs):
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\bleak\__init__.py", line 152, in __aenter__
    await self._backend.start()
  File "C:\Users\Koner\AppData\Local\Programs\Python\Python311\Lib\site-packages\bleak\backends\winrt\scanner.py", line 241, in start
    self.watcher.start()
OSError: [WinError -2147020577] Das Gerät kann nicht verwendet werden

This means The device cannot be used in English

KonerDev avatar Aug 05 '23 06:08 KonerDev

It looks like this is because bluetooth was turned off. How can I turn it on automatically?

KonerDev avatar Aug 05 '23 09:08 KonerDev

And I still have one question, is it also possible to send individual commands instead of entire files?

KonerDev avatar Aug 05 '23 10:08 KonerDev

It is, but the only available commands are to download a file or run a file that has already been downloaded to the hub or stop a running program.

But you can write a program that runs on the hub that receives commands from the program running on your computer.

dlech avatar Aug 05 '23 15:08 dlech

Do you have an example of this? And on the code.pybricks.com website you can also execute individual commands, how does it work there?

KonerDev avatar Aug 05 '23 15:08 KonerDev

There are some examples at https://pybricks.com/projects/tutorials/wireless/hub-to-device/

dlech avatar Aug 07 '23 16:08 dlech

Thank you, but can I somehow write the code shown in the resource you gave me into one single file? So first I would need to connect with the hub using this code, right?

device = await find_device()
hub = PybricksHub()
await hub.connect(device)

Then somehow upload the hub program (of course, the command doesn't work as shown):

hub.upload_program('hub_program.py')

Then start it:

hub.run_program()

And afterward, I would need to send the commands. Can't I do this somehow using:

hub.write(b"fwd")

instead of using BleakClient?

KonerDev avatar Aug 26 '23 11:08 KonerDev

Yes, you can do that using pybricksdev.

dlech avatar Aug 26 '23 13:08 dlech

Thanks, but how can I upload the program to the hub, because

hub.upload_program('hub_program.py')

of course doesn't work?

KonerDev avatar Sep 03 '23 14:09 KonerDev

I thinks this is what you are looking for

https://github.com/pybricks/pybricksdev/blob/11667cb05427b2638fb475c1561fdfa380f59998/pybricksdev/connections/pybricks.py#L431-L447

You will need to compile the program first. You can have a look at the source of run() in the same file to see how it is done.

dlech avatar Sep 04 '23 17:09 dlech

Thank you so much for your help! When I was just reading through the source code, I came across that you can set wait to False as a parameter for hub.run(). That was exactly what I was looking for, but I seemed to have overlooked it.

KonerDev avatar Sep 04 '23 18:09 KonerDev

But I still have a few questions:

  1. How can I use hub.run() without the tqdm progress bar
  2. How can I receive the data that the hub sends to the computer. With hub.read_line()?
  3. How can I send commands to the hub that have different lengths, for example up to 20 characters

KonerDev avatar Sep 04 '23 18:09 KonerDev

  1. How can I use hub.run() without the tqdm progress bar

You can set the environment variable TQDM_DISABLE=1.

2. How can I receive the data that the hub sends to the computer. With hub.read_line()?

Yes, this is the simplest way.

3. How can I send commands to the hub that have different lengths, for example up to 20 characters

There is also a write_line() method.

dlech avatar Sep 04 '23 18:09 dlech

What am I doing wrong?

main.py

import asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub

os.environ["TQDM_DISABLE"] = "1"


async def connect_to_hub():
    try:
        device = await find_device()
        hub = PybricksHub()
        await hub.connect(device)
        return hub
    except asyncio.TimeoutError:
        raise Exception("No device found")
    except OSError:
        raise Exception("Please turn bluetooth on")


async def main():
    hub = await connect_to_hub()
    await hub.run("hub.py", wait=False)
    await hub.write_line("rev")
    await asyncio.sleep(3)
    await hub.write_line("fwd")
    await asyncio.sleep(3)
    await hub.write_line("bye")
    print(await hub.read_line())


asyncio.run(main())

hub.py

from pybricks.pupdevices import Motor
from pybricks.parameters import Port

# Standard MicroPython modules
from usys import stdin, stdout

motor = Motor(Port.A)

while True:
    # Read three bytes.
    cmd = stdin.buffer.read()

    # Decide what to do based on the command.
    if cmd == b"fwd":
        motor.dc(50)
    elif cmd == b"rev":
        motor.dc(-50)
    elif cmd == b"bye":
        break

    # Send a response.
    stdout.buffer.write(b"OK")

My hub doesn't do anything anymore, I don't get an OK back and the TQDM progress bar is still displayed

KonerDev avatar Sep 05 '23 07:09 KonerDev

The hub program doesn't take into account newlines. Also, if everything is text-based, we should be able to use stdin directly instead of stdin.buffer.

Also, the program on the computer should wait for something like an "OK" from the hub first before sending commands so that it knows that the hub program is loaded and running.

dlech avatar Sep 05 '23 13:09 dlech

Sadly it still does not work

hub.py

from pybricks.pupdevices import Motor
from pybricks.parameters import Port

# Standard MicroPython modules
from usys import stdin, stdout

motor = Motor(Port.A)

while True:
    # Read three bytes.
    cmd = stdin.read()

    # Decide what to do based on the command.
    if cmd == b"fwd\n":
        motor.dc(50)
    elif cmd == b"rev\n":
        motor.dc(-50)
    elif cmd == b"bye\n":
        break

    # Send a response.
    stdout.buffer.write(b"OK\n")

KonerDev avatar Sep 06 '23 08:09 KonerDev

It looks like this program still sends the OK at the end instead of at the beginning. Also, it looks like it is doing a read() instead of readline().

dlech avatar Sep 06 '23 23:09 dlech

Unfortunately it still doesn't work:

main.py

import asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub

os.environ["TQDM_DISABLE"] = "1"


async def connect_to_hub():
    try:
        device = await find_device()
        hub = PybricksHub()
        await hub.connect(device)
        return hub
    except asyncio.TimeoutError:
        raise Exception("No device found")
    except OSError:
        raise Exception("Please turn bluetooth on")


async def main():
    hub = await connect_to_hub()
    await hub.run("hub.py", wait=False)

    if await hub.read_line():
        await hub.write_line("rev")
        await asyncio.sleep(3)
        await hub.write_line("fwd")
        await asyncio.sleep(3)
        await hub.write_line("bye")
        print(await hub.read_line())


asyncio.run(main())

hub.py

from pybricks.pupdevices import Motor
from pybricks.parameters import Port

# Standard MicroPython modules
from usys import stdin, stdout

motor = Motor(Port.A)

# Program is ready
stdout.buffer.write(b"OK\n")

while True:
    # Read three bytes.
    cmd = stdin.readline()

    # Decide what to do based on the command.
    if cmd == b"fwd\n":
        motor.dc(50)
    elif cmd == b"rev\n":
        motor.dc(-50)
    elif cmd == b"bye\n":
        break

    # Send a response.
    stdout.buffer.write(b"OK\n")

KonerDev avatar Sep 07 '23 11:09 KonerDev

The OK is still at the end instead of the beginning on the hub program and the PC program does not wait for OK before sending each command.

dlech avatar Sep 07 '23 20:09 dlech

Like this?

main.py

import asyncio
import os
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub

os.environ["TQDM_DISABLE"] = "1"


async def connect_to_hub():
    try:
        device = await find_device()
        hub = PybricksHub()
        await hub.connect(device)
        return hub
    except asyncio.TimeoutError:
        raise Exception("No device found")
    except OSError:
        raise Exception("Please turn bluetooth on")


async def main():
    hub = await connect_to_hub()
    await hub.run("hub.py", wait=False)

    if await hub.read_line():
        await hub.write_line("rev")

        if await hub.read_line():
            await asyncio.sleep(3)
            await hub.write_line("fwd")

            if await hub.read_line():
                await asyncio.sleep(3)
                await hub.write_line("bye")

                print(await hub.read_line())


asyncio.run(main())

hub.py

from pybricks.pupdevices import Motor
from pybricks.parameters import Port

# Standard MicroPython modules
from usys import stdin, stdout

motor = Motor(Port.A)

# Program is ready
stdout.buffer.write(b"OK\n")

while True:
    # Read three bytes.
    cmd = stdin.readline()

    # Decide what to do based on the command.
    if cmd == b"fwd\n":
        motor.dc(50)
    elif cmd == b"rev\n":
        motor.dc(-50)
    elif cmd == b"bye\n":
        break

    # Send a response.
    stdout.buffer.write(b"OK\n")

KonerDev avatar Sep 08 '23 10:09 KonerDev

This is how I would do it:

main.py

import asyncio
import contextlib
import os

# must before tqdm import!
os.environ["TQDM_DISABLE"] = "1"

from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub


async def connect_to_hub():
    try:
        device = await find_device()
        hub = PybricksHub()
        await hub.connect(device)
        return hub
    except asyncio.TimeoutError:
        raise RuntimeError("No device found")
    except OSError:
        raise RuntimeError("Please turn bluetooth on")


async def send_command(hub: PybricksHub, cmd: str):
    line = await asyncio.wait_for(hub.read_line(), timeout=5)

    if line != "OK":
        raise RuntimeError(f"Unexpected response: '{line}'")

    await hub.write_line(cmd)


async def stop_if_running(hub: PybricksHub):
    try:
        await hub.stop_user_program()
    except Exception:
        # ignore error, e.g. if hub is already disconnected
        pass


async def main():
    async with contextlib.AsyncExitStack() as stack:
        hub = await connect_to_hub()
        stack.push_async_callback(hub.disconnect)

        await hub.run("hub.py", print_output=False, wait=False)
        stack.push_async_callback(stop_if_running, hub)

        await send_command(hub, "rev")
        await asyncio.sleep(3)
        await send_command(hub, "fwd")
        await asyncio.sleep(3)
        await send_command(hub, "bye")


asyncio.run(main())

hub.py

from pybricks.pupdevices import Motor
from pybricks.parameters import Port

# Standard MicroPython modules
from usys import stdin

motor = Motor(Port.A)

while True:
    # let PC know we are ready for a command
    print("OK")

    # wait for command from PC
    cmd = stdin.readline().strip()

    # Decide what to do based on the command.
    if cmd == "fwd":
        motor.dc(50)
    elif cmd == "rev":
        motor.dc(-50)
    elif cmd == "bye":
        break

dlech avatar Sep 08 '23 23:09 dlech

Hi, unfortunately I still have a problem with my code. Because when I run this, an error occurs in the console.

main.py

import asyncio
import os

os.environ["TQDM_DISABLE"] = "1"

from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub

class MyHub:
    def __init__(self):
        print("MyHub class initialized")

    # Get response from hub
    async def read_line(self):
        return await self.hub.read_line()

    # Send message to hub
    async def write_line(self, value: str):
        await self.hub.write_line(value)

    # Connect with hub
    async def connect(self):
        try:
            device = await find_device()
            self.hub = PybricksHub()
            await self.hub.connect(device)
            print("Connected to hub successfully")
            await self.hub.run("hub.py", print_output=False, wait=False)

            # Wait for hub loading the program
            response = await self.read_line()
            if response != "OK":
                print(f"Unexpected hub response: {response}")
                await self.disconnect()
                exit()
            else:
                print("Running hub script...")
        except asyncio.TimeoutError:
            print("Hub not found")
            exit()
        except OSError:
            print("Bluetooth is turned off")
            exit()

    # Disconnect from hub
    async def disconnect(self):
        print("Disconnecting from hub...")
        await self.hub.stop_user_program()
        await self.hub.disconnect()


myhub = MyHub()
asyncio.run(myhub.connect())
asyncio.run(myhub.write_line("rev"))
asyncio.run(myhub.read_line())
asyncio.run(myhub.disconnect())

Error

Traceback (most recent call last):
  File "C:\...\projects\python\robi-ai\new\notworking.py", line 58, in <module>
    asyncio.run(robi.read_line())
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\...\projects\python\robi-ai\new\notworking.py", line 17, in read_line
    return await self.hub.read_line()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\site-packages\pybricksdev\connections\pybricks.py", line 429, in read_line
    return await self.race_disconnect(self._stdout_line_queue.get())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\site-packages\pybricksdev\connections\pybricks.py", line 365, in race_disconnect
    return awaitable_task.result()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\queues.py", line 155, in get
    getter = self._get_loop().create_future()
             ^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\Programs\Python\Python311\Lib\asyncio\mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: <Queue at 0x14eb1bce1d0 maxsize=0 tasks=1> is bound to a different event loop

KonerDev avatar Sep 22 '23 10:09 KonerDev

You can only use one asyncio.run() in your script.

dlech avatar Sep 22 '23 11:09 dlech

Unfortunately I get an error from the hub

hub.py

from pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait
from usys import stdin
import threading
import random

# [...]

# Setup hub
hub = InventorHub()
hub.light.off()

# Face animation
animatedFaceFrames = [
    ((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]

def animatedFaceController():
    while True:
        for frame in animatedFaceFrames:
            hub.display.icon(frame)
            wait(50)
        wait(random.uniform(1, 5))

animatedFaceThread = threading.Thread(target=animatedFaceController)
animatedFaceThread.start()

# [...]

Error

[15:27:44 INFO]: Traceback (most recent call last):
[15:27:44 INFO]:   File "hub.py", line 6, in <module>
[15:27:44 INFO]: ImportError: no module named 'threading'

What's wrong with multithreading?

KonerDev avatar Sep 30 '23 13:09 KonerDev

Pybricks does not support threading. The current beta version supports cooperative multitasking with coroutines (async/await). I don't think we have much documentation on it yet though.

dlech avatar Sep 30 '23 14:09 dlech

Can you show me the code with this cooperative multitasking? And where can I get the beta firmware? And should I just "install" the beta over the old version or should I first uninstall the old one and then install the beta?

KonerDev avatar Sep 30 '23 15:09 KonerDev

And where can I get the beta firmware?

https://beta.pybricks.com

And should I just "install" the beta over the old version or should I first uninstall the old one and then install the beta?

These are independent applications. One does not replace the other.

Can you show me the code with this cooperative multitasking?

from pybricks.pupdevices import Motor
from pybricks.parameters import Port, Color
from pybricks.hubs import InventorHub
from pybricks.tools import wait, multitask, run_task
from usys import stdin
import random

# [...]

# Setup hub
hub = InventorHub()
hub.light.off()

# Face animation
animatedFaceFrames = [
    ((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((0, 0, 0, 0, 0), (70, 70, 0, 70, 70), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((0, 0, 0, 0, 0), (80, 80, 0, 80, 80), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
    ((100, 100, 0, 100, 100), (100, 100, 0, 100, 100), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0), (0, 0, 0, 0, 0)),
]

async def animatedFaceController():
    while True:
        for frame in animatedFaceFrames:
            hub.display.icon(frame)
            await wait(50)
        await wait(random.uniform(1, 5))

async def other_task():
    ...

async def main():
    await multitask(
        animatedFaceController(),
        other_task(),
    )

run_task(main())

dlech avatar Sep 30 '23 16:09 dlech

So can I just install the beta firmware while the normal firmware is still installed? And how can I then tell the hub to use the beta firmware? Or have I misunderstood something?

KonerDev avatar Sep 30 '23 16:09 KonerDev

You have to flash the firmware on the hub to change it. You can't have two firmwares installed at the same time. You can read the current version of the firmware on the hub in the status bar of the Pybricks Code/Pybricks Beta web apps or you can write a program.

from pybricks import version

print(version)

dlech avatar Sep 30 '23 16:09 dlech