agent-lightning icon indicating copy to clipboard operation
agent-lightning copied to clipboard

Dev/database store

Open mydmdm opened this issue 2 months ago • 0 comments

This pull request implements a SqlLightningStore which uses a SQL database for persistent storage and online querying of rollouts, attempts, and spans.

Key Changes

The main addition is in the following folder and file:

agentlightning/store/database
  └── sqlite.py       # New SqlLightningStore implementation
  └── retry_helper.py # A configurable async retry helper
  └── orm
      └── base.py     # Base ORM models
      └── rollout.py  # ORM model `RolloutInDB` for rollouts
      └── attempt.py  # ORM model `AttemptInDB` for attempts
      └── span.py     # ORM model `SpanInDB` for spans
      └── resource.py # ORM model `ResourcesUpdateInDB` for resources

Usage

sqlstore = SqlLightningStore(
  database_url="sqlite+aiosqlite:///path/to/db.sqlite3",
)
await sqlstore.start() # Initialize the database connection
... # Use the store as usual
await sqlstore.stop()  # Close the database connection

The key parameter is database_url, which specifies the database connection string. The implementation is based on SQLAlchemy, and more details about supported database URLs can be found in the SQLAlchemy documentation.

More configuration options are available in the constructor, such as the retrying behavior for wait_for_rollouts() and periodic timeout checks for attempts.

Required interface methods from LightningStore are implemented, such as start_rollout(), update_attempt(), add_span(), and querying methods.

Data Model

The data model is defined using SQLAlchemy ORM classes, which map to database tables. Key models include:

  • RolloutInDB: Represents a rollout in the database.
  • AttemptInDB: Represents an attempt in the database.
  • SpanInDB: Represents a span in the database.
  • ResourcesUpdateInDB: Represents a resources update in the database.
  • SpanSeqIdInDB: Used for generating unique span sequence IDs.

To ensure data consistency during concurrent updates, the optimistic concurrency control (OCC) mechanism with version counting and CAS (compare-and-swap) is applied to RolloutInDB (for rollout dequeuing) and SpanSeqIdInDB (for span sequence ID generation) to prevent race conditions.

Status Transition Diagram

The rollouts and attempts status transitions are illustrated in the following diagram (where cancelled is not shown), in a nested manner:

  • The outer graph shows the rollout status transitions (as preparing is merged into running), and the inner graph shows the attempt status transitions (during the running of a rollout).
  • The green edges indicate transitions caused by start_rollout() and dequeue_rollout(), which make the rollout enter the running status.
  • The finishing of latest attempt, no matter succeeded or failed, makes the rollout leave the running status, either to succeeded, failed, or requeuing (if more retries are allowed).
  • If an attempt is started for a rollout, the status of latest attempt is reflected to the status of the rollout, until the attempt is finished (either succeeds or fails, while timeout and unresponsive are mapped to failed).
---
config:
  layout: dagre
  theme: default
  look: classic
---
flowchart TB
    subgraph running["running of rollout"]
        direction LR
        st["st"]
        prep("preparing")
        r("running")
        suc("succeeded")
        f("failed")
        u("unresponsive")
        t("timeout")
        st2["st2"]
        st3["st3"]
        %% a1@{ shape: text, label: "attempt is<br>succeeded" }
        %% a2@{ shape: text, label: "attempt is<br>failed" }
        %% a3@{ shape: text, label: "attempt created<br>for rollout" }
    end

    start["start"] -->|"enqueue_rollout()"| queuing("queuing")
    queuing -->|"dequeue_rollout()"| running
    start --->|"start_rollout()"| running
    requeuing("requeuing") -->|"dequeue_rollout()<br>or any attempt<br>unresponsive -> running"| running
    running -->|"latest attempt<br>is succeeded"| succeeded("succeeded")
    running -->|"all attempts<br>are failed and <br>no retry allowed"| failed("failed")
    running -->|"all attempts are failed<br>and <br>more retry allowed"| requeuing
    failed -->|"any attempt<br>unresponsive -> running"| running
    failed --> stop["stop"]
    succeeded --> stop

    linkStyle 0,1,2,3 stroke:green,stroke-width:4px,color:green;

    %% a3 -.-
    st -->|"attempt created<br>for rollout"| prep
    prep -->|"add_span()"| r
    r -->|"update_attempt()"| suc
    r -->|"update_attempt()"| f
    r -->|"unresponsive_seconds<br>exceeds"| u
    r -->|"timeout_seconds<br>exceeds"| t
    u -->|"add_span()"| r
    u -->|"attempt is<br>failed"| st3
    prep -->|"timeout_seconds<br>exceeds"| t
    suc -->|"attempt is<br>succeeded"| st2
    f -->|"attempt is<br>failed"| st3
    t -->|"attempt is<br>failed"| st3
    st@{ shape: sm-circ}
    st2@{ shape: framed-circle}
    st3@{ shape: framed-circle}
    %% st2 -.- a1
    %% st3 -.- a2
    start@{ shape: sm-circ}
    stop@{ shape: framed-circle}

    linkStyle 11,16 stroke:blue,stroke-width:4px,color:blue;
    linkStyle 12,13 stroke:violet,stroke-width:4px,color:violet;
    linkStyle 18,14,15 stroke:grey,stroke-width:4px,color:grey;

Testing

To keep tests consistent across different store implementations, the existing test fixture inmemory_store have been updated to support database-backed stores using SQLite files.

Known Issues

  • [ ] Currently only tested on SQLite. Further testing is needed for other databases like PostgreSQL, and perhaps MySQL.
  • [ ] Performance benchmarking is pending to evaluate the overhead introduced by database operations. More connection arguments such as pooling may need to be exposed in constructor.
  • [ ] Periodic cleanup of old rollouts/attempts/spans is not yet implemented.
  • [x] (resolved) Some known status transition edge cases differ from existing store implementations, as known above.

mydmdm avatar Nov 05 '25 07:11 mydmdm