golem icon indicating copy to clipboard operation
golem copied to clipboard

db transaction query stream durability fix

Open justcoon opened this issue 6 months ago • 1 comments

query stream in transaction did not used proper begin oplog index

snippet of oplog data - without fix

[
    {
        "type": "BeginRemoteWrite",
        "timestamp": "2025-05-25T18:02:34.118Z"
    },
    {
        "type": "ImportedFunctionInvoked",
        "timestamp": "2025-05-25T18:02:34.119Z",
        "functionName": "rdbms::postgres::db-transaction::query-stream",
        "request": {
            "typ":  { ... },
            "value": {
                "params": [],
                "pool-key": {
                    "address": "postgres://postgres:postgres@localhost:55006/postgres"
                },
                "statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
            }
        },
        "response": {
            "typ":  { ... },
            "value": {
                "ok": {
                    "params": [],
                    "pool-key": {
                        "address": "postgres://postgres:postgres@localhost:55006/postgres"
                    },
                    "statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
                }
            }
        },
        "wrappedFunctionType": {
            "type": "WriteRemoteBatched",
            "index": 40  // bug in query stream impl. created new begin oplog index
        }
    },
    {
        "type": "ImportedFunctionInvoked",
        "timestamp": "2025-05-25T18:02:34.123Z",
        "functionName": "rdbms::postgres::db-result-stream::get-columns",
        "request": {
            "typ": {
                "type": "Option",
                "inner": {
                    "type": "Str"
                }
            },
            "value": null
        },
        "response": {
            "typ":  { ... },
            "value": {
                "ok": [
                    {
                        "db-type": {
                            "uuid": null
                        },
                        "db-type-name": "UUID",
                        "name": "user_id",
                        "ordinal": 0
                    },
                    {
                        "db-type": {
                            "text": null
                        },
                        "db-type-name": "TEXT",
                        "name": "name",
                        "ordinal": 1
                    },
                    {
                        "db-type": {
                            "array": {
                                "text": null
                            }
                        },
                        "db-type-name": "TEXT[]",
                        "name": "tags",
                        "ordinal": 2
                    }
                ]
            }
        },
        "wrappedFunctionType": {
            "type": "WriteRemoteBatched",
            "index": 40
        }
    },
    
    {
        "type": "EndRemoteWrite",
        "timestamp": "2025-05-25T18:02:34.135Z",
        "beginIndex": 39  // as bug in query stream impl. created new begin oplog index, there is different index here then in wrappedFunctionType-s of specific function invocations
    }
]

snippet of oplog data - with fix

 [
    {
        "type": "BeginRemoteWrite",
        "timestamp": "2025-05-25T17:53:49.005Z"
    },
    {
        "type": "ImportedFunctionInvoked",
        "timestamp": "2025-05-25T17:53:49.006Z",
        "functionName": "rdbms::postgres::db-transaction::query-stream",
        "request": {
            "typ": { ... },
            "value": {
                "params": [],
                "pool-key": {
                    "address": "postgres://postgres:postgres@localhost:55002/postgres"
                },
                "statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
            }
        },
        "response": {
            "typ": { ...},
            "value": {
                "ok": {
                    "params": [],
                    "pool-key": {
                        "address": "postgres://postgres:postgres@localhost:55002/postgres"
                    },
                    "statement": "SELECT user_id, name, tags FROM test_users_idem ORDER BY created_on ASC"
                }
            }
        },
        "wrappedFunctionType": {
            "type": "WriteRemoteBatched",
            "index": 39
        }
    },
    {
        "type": "ImportedFunctionInvoked",
        "timestamp": "2025-05-25T17:53:49.009Z",
        "functionName": "rdbms::postgres::db-result-stream::get-columns",
        "request": {
            "typ": {
                "type": "Option",
                "inner": {
                    "type": "Str"
                }
            },
            "value": null
        },
        "response": {
            "typ": { ... },
            "value": {
                "ok": [
                    {
                        "db-type": {
                            "uuid": null
                        },
                        "db-type-name": "UUID",
                        "name": "user_id",
                        "ordinal": 0
                    },
                    {
                        "db-type": {
                            "text": null
                        },
                        "db-type-name": "TEXT",
                        "name": "name",
                        "ordinal": 1
                    },
                    {
                        "db-type": {
                            "array": {
                                "text": null
                            }
                        },
                        "db-type-name": "TEXT[]",
                        "name": "tags",
                        "ordinal": 2
                    }
                ]
            }
        },
        "wrappedFunctionType": {
            "type": "WriteRemoteBatched",
            "index": 39
        }
    }, 
    {
        "type": "EndRemoteWrite",
        "timestamp": "2025-05-25T17:53:49.017Z",
        "beginIndex": 39
    }
]

all oplog indexes in WriteRemoteBatched have same value like begin index in EndRemoteWrite

justcoon avatar May 13 '25 00:05 justcoon

hi, @vigoo, thanks for review, build is green and PR was rebased, I think this can be merged now

justcoon avatar May 29 '25 19:05 justcoon