Dev/database store
This pull request implements a SqlLightningStore which uses a SQL database for persistent storage and online querying of rollouts, attempts, and spans.
Key Changes
The main addition is in the following folder and file:
agentlightning/store/database
└── sqlite.py # New SqlLightningStore implementation
└── retry_helper.py # A configurable async retry helper
└── orm
└── base.py # Base ORM models
└── rollout.py # ORM model `RolloutInDB` for rollouts
└── attempt.py # ORM model `AttemptInDB` for attempts
└── span.py # ORM model `SpanInDB` for spans
└── resource.py # ORM model `ResourcesUpdateInDB` for resources
Usage
sqlstore = SqlLightningStore(
database_url="sqlite+aiosqlite:///path/to/db.sqlite3",
)
await sqlstore.start() # Initialize the database connection
... # Use the store as usual
await sqlstore.stop() # Close the database connection
The key parameter is database_url, which specifies the database connection string. The implementation is based on SQLAlchemy, and more details about supported database URLs can be found in the SQLAlchemy documentation.
More configuration options are available in the constructor, such as the retrying behavior for wait_for_rollouts() and periodic timeout checks for attempts.
Required interface methods from LightningStore are implemented, such as start_rollout(), update_attempt(), add_span(), and querying methods.
Data Model
The data model is defined using SQLAlchemy ORM classes, which map to database tables. Key models include:
RolloutInDB: Represents a rollout in the database.AttemptInDB: Represents an attempt in the database.SpanInDB: Represents a span in the database.ResourcesUpdateInDB: Represents a resources update in the database.SpanSeqIdInDB: Used for generating unique span sequence IDs.
To ensure data consistency during concurrent updates, the optimistic concurrency control (OCC) mechanism with version counting and CAS (compare-and-swap) is applied to RolloutInDB (for rollout dequeuing) and SpanSeqIdInDB (for span sequence ID generation) to prevent race conditions.
Status Transition Diagram
The rollouts and attempts status transitions are illustrated in the following diagram (where cancelled is not shown), in a nested manner:
- The outer graph shows the rollout status transitions (as
preparingis merged intorunning), and the inner graph shows the attempt status transitions (during therunningof a rollout). - The green edges indicate transitions caused by
start_rollout()anddequeue_rollout(), which make the rollout enter therunningstatus. - The finishing of latest attempt, no matter succeeded or failed, makes the rollout leave the
runningstatus, either tosucceeded,failed, orrequeuing(if more retries are allowed). - If an attempt is started for a rollout, the status of latest attempt is reflected to the status of the rollout, until the attempt is finished (either succeeds or fails, while
timeoutandunresponsiveare mapped tofailed).
---
config:
layout: dagre
theme: default
look: classic
---
flowchart TB
subgraph running["running of rollout"]
direction LR
st["st"]
prep("preparing")
r("running")
suc("succeeded")
f("failed")
u("unresponsive")
t("timeout")
st2["st2"]
st3["st3"]
%% a1@{ shape: text, label: "attempt is<br>succeeded" }
%% a2@{ shape: text, label: "attempt is<br>failed" }
%% a3@{ shape: text, label: "attempt created<br>for rollout" }
end
start["start"] -->|"enqueue_rollout()"| queuing("queuing")
queuing -->|"dequeue_rollout()"| running
start --->|"start_rollout()"| running
requeuing("requeuing") -->|"dequeue_rollout()<br>or any attempt<br>unresponsive -> running"| running
running -->|"latest attempt<br>is succeeded"| succeeded("succeeded")
running -->|"all attempts<br>are failed and <br>no retry allowed"| failed("failed")
running -->|"all attempts are failed<br>and <br>more retry allowed"| requeuing
failed -->|"any attempt<br>unresponsive -> running"| running
failed --> stop["stop"]
succeeded --> stop
linkStyle 0,1,2,3 stroke:green,stroke-width:4px,color:green;
%% a3 -.-
st -->|"attempt created<br>for rollout"| prep
prep -->|"add_span()"| r
r -->|"update_attempt()"| suc
r -->|"update_attempt()"| f
r -->|"unresponsive_seconds<br>exceeds"| u
r -->|"timeout_seconds<br>exceeds"| t
u -->|"add_span()"| r
u -->|"attempt is<br>failed"| st3
prep -->|"timeout_seconds<br>exceeds"| t
suc -->|"attempt is<br>succeeded"| st2
f -->|"attempt is<br>failed"| st3
t -->|"attempt is<br>failed"| st3
st@{ shape: sm-circ}
st2@{ shape: framed-circle}
st3@{ shape: framed-circle}
%% st2 -.- a1
%% st3 -.- a2
start@{ shape: sm-circ}
stop@{ shape: framed-circle}
linkStyle 11,16 stroke:blue,stroke-width:4px,color:blue;
linkStyle 12,13 stroke:violet,stroke-width:4px,color:violet;
linkStyle 18,14,15 stroke:grey,stroke-width:4px,color:grey;
Testing
To keep tests consistent across different store implementations, the existing test fixture inmemory_store have been updated to support database-backed stores using SQLite files.
Known Issues
- [ ] Currently only tested on SQLite. Further testing is needed for other databases like PostgreSQL, and perhaps MySQL.
- [ ] Performance benchmarking is pending to evaluate the overhead introduced by database operations. More connection arguments such as pooling may need to be exposed in constructor.
- [ ] Periodic cleanup of old rollouts/attempts/spans is not yet implemented.
- [x] (resolved) Some known status transition edge cases differ from existing store implementations, as known above.