golem
golem copied to clipboard
db transaction query stream durability fix
query stream in transaction did not used proper begin oplog index
snippet of oplog data - without fix
[
{
"type": "BeginRemoteWrite",
"timestamp": "2025-05-25T18:02:34.118Z"
},
{
"type": "ImportedFunctionInvoked",
"timestamp": "2025-05-25T18:02:34.119Z",
"functionName": "rdbms::postgres::db-transaction::query-stream",
"request": {
"typ": { ... },
"value": {
"params": [],
"pool-key": {
"address": "postgres://postgres:postgres@localhost:55006/postgres"
},
"statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
}
},
"response": {
"typ": { ... },
"value": {
"ok": {
"params": [],
"pool-key": {
"address": "postgres://postgres:postgres@localhost:55006/postgres"
},
"statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
}
}
},
"wrappedFunctionType": {
"type": "WriteRemoteBatched",
"index": 40 // bug in query stream impl. created new begin oplog index
}
},
{
"type": "ImportedFunctionInvoked",
"timestamp": "2025-05-25T18:02:34.123Z",
"functionName": "rdbms::postgres::db-result-stream::get-columns",
"request": {
"typ": {
"type": "Option",
"inner": {
"type": "Str"
}
},
"value": null
},
"response": {
"typ": { ... },
"value": {
"ok": [
{
"db-type": {
"uuid": null
},
"db-type-name": "UUID",
"name": "user_id",
"ordinal": 0
},
{
"db-type": {
"text": null
},
"db-type-name": "TEXT",
"name": "name",
"ordinal": 1
},
{
"db-type": {
"array": {
"text": null
}
},
"db-type-name": "TEXT[]",
"name": "tags",
"ordinal": 2
}
]
}
},
"wrappedFunctionType": {
"type": "WriteRemoteBatched",
"index": 40
}
},
{
"type": "EndRemoteWrite",
"timestamp": "2025-05-25T18:02:34.135Z",
"beginIndex": 39 // as bug in query stream impl. created new begin oplog index, there is different index here then in wrappedFunctionType-s of specific function invocations
}
]
snippet of oplog data - with fix
[
{
"type": "BeginRemoteWrite",
"timestamp": "2025-05-25T17:53:49.005Z"
},
{
"type": "ImportedFunctionInvoked",
"timestamp": "2025-05-25T17:53:49.006Z",
"functionName": "rdbms::postgres::db-transaction::query-stream",
"request": {
"typ": { ... },
"value": {
"params": [],
"pool-key": {
"address": "postgres://postgres:postgres@localhost:55002/postgres"
},
"statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
}
},
"response": {
"typ": { ...},
"value": {
"ok": {
"params": [],
"pool-key": {
"address": "postgres://postgres:postgres@localhost:55002/postgres"
},
"statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
}
}
},
"wrappedFunctionType": {
"type": "WriteRemoteBatched",
"index": 39
}
},
{
"type": "ImportedFunctionInvoked",
"timestamp": "2025-05-25T17:53:49.009Z",
"functionName": "rdbms::postgres::db-result-stream::get-columns",
"request": {
"typ": {
"type": "Option",
"inner": {
"type": "Str"
}
},
"value": null
},
"response": {
"typ": { ... },
"value": {
"ok": [
{
"db-type": {
"uuid": null
},
"db-type-name": "UUID",
"name": "user_id",
"ordinal": 0
},
{
"db-type": {
"text": null
},
"db-type-name": "TEXT",
"name": "name",
"ordinal": 1
},
{
"db-type": {
"array": {
"text": null
}
},
"db-type-name": "TEXT[]",
"name": "tags",
"ordinal": 2
}
]
}
},
"wrappedFunctionType": {
"type": "WriteRemoteBatched",
"index": 39
}
},
{
"type": "EndRemoteWrite",
"timestamp": "2025-05-25T17:53:49.017Z",
"beginIndex": 39
}
]
all oplog indexes in WriteRemoteBatched have same value like begin index in EndRemoteWrite
hi, @vigoo, thanks for review, build is green and PR was rebased, I think this can be merged now