kuksa-databroker icon indicating copy to clipboard operation
kuksa-databroker copied to clipboard

Prototype Persistence provider for AGL persistence API

Open SebastianSchildt opened this issue 5 months ago • 2 comments

Hello there,

as we do not yet know where the persistence API will end up, and want to write this in Rust/C++ anyway I just open this issue for discussion.

here is a complete - albeit beyond ugly - provider that does everything:

  • Starts up and reads a complete VSS json file, and filters out all entires where "persist" is set to true
  • Checlk AGL persistence store and in case data exists publish it t KUKSA
  • Subscribing all paths and in case of changes pushing it back to persoistence layer

The code


import json
import subprocess
from kuksa_client.grpc import VSSClient
from kuksa_client.grpc import Datapoint

KUKSA_PORT=55556

def extract(path, vssdata, persistPaths):
    #print(f"Extracting {path}")
    if vssdata['type'] == "branch":
        if "children" in vssdata:
            for k, v in vssdata['children'].items():
                extract(path + "." + k, v, persistPaths)
    elif vssdata['type'] == "sensor" or vssdata['type'] == "actuator":
        #print(f"Checking {path}")
        if "persist" in vssdata and vssdata['persist'] is True:
            persistPaths.append(path)
            #print(f"Persisting {path}")
    

def extract_path_to_persist(vssdata, persistPaths):
    extract("Vehicle", vssdata['Vehicle'], persistPaths)

def read_and_publish_to_kuksa(persistPaths):
    for path in persistPaths:
        print(f"Reading {path}")
        output = subprocess.check_output(["grpcurl", "-plaintext", "-d", f'{{"key": "{path}", "namespace": "vss"}}', "localhost:50054", "storage_api.Database/Read"])
        outputDict = json.loads(output)
        with VSSClient('127.0.0.1', KUKSA_PORT) as client:
            if "success" in outputDict and outputDict["success"] is True:
                print(f"Pushing value {outputDict['result']} to {path} is ")
                client.set_current_values({ path: Datapoint(outputDict['result']),
    })
            else:
                print(f"No data for {path}")
   
def main():
    print("Hello Persistent world!")
    # we are reading it from a complete VSS JSON, parsing may
    # be more complicated tha reading a simple config file
    # but the VSS way would be to define a persist field in
    # an overlay, and furthermore, the datatype information
    # might be needed when doing this in Rust or C++ as the 
    # GRPC API of KUKSA is typed for performance reasons
    # (Python bindings just hid this fact. At a cost)
    vssdata = json.load(open("vss_release_4.0.json"))
    persistPaths = []
    extract_path_to_persist(vssdata, persistPaths)

    print(f"Persisting {persistPaths}")

    # try to get path form AGL persistence API and push
    # to KUKSA. 
    read_and_publish_to_kuksa(persistPaths)

    # No we subscribe all Paths and update them in
    # AGL persistence API
    print("Subscribing to all paths")
    with VSSClient('127.0.0.1', KUKSA_PORT) as client:
        for updates in client.subscribe_current_values(persistPaths):
            for path, data in updates.items():
                if data is not None:
                    print(f"Updating {path} with {data.value}")
                    # update path in AGL persistence API
                    subprocess.check_output(["grpcurl", "-plaintext", "-d", f'{{"key": "{path}", "namespace": "vss", "value": "{data.value}"}}', "localhost:50054", "storage_api.Database/Write"])




if __name__ == "__main__":
    main()

This is based on https://github.com/LSchwiedrzik/agl-persistent-storage-api/tree/master

@LSchwiedrzik "just" need to rewirte in Rust.... As seperate component, or maybe an (optional) feature directly in persistent-storage

Databroker needs to run, everything to get started with that should be in https://github.com/eclipse-kuksa/kuksa-databroker/blob/main/doc/quickstart.md

fyi @eriksven @g-scott-murray

SebastianSchildt avatar Sep 10 '24 13:09 SebastianSchildt