Use Kotlinx Coroutines SharedFlow to subscribe to updates
I would like to use a Kotlinx Coroutines SharedFlow to subscribe to entity updates.
Example usage
Here's a very quick demo showing how I'd like to use it:
object StarWarsFilms : IntIdTable() {
val sequelId: Column<Int> = integer("sequel_id").uniqueIndex()
val name: Column<String> = varchar("name", 50)
val director: Column<String> = varchar("director", 50)
}
val starWarsFilmNames: SharedFlow<String> =
StarWarsFilms
.select {
StarWarsFilms.sequelId eq 8
}
.sharedFlow {
// map the entity
it[StarWarsFilms.name]
}
suspend fun main() {
starWarsFilmNames.onEach { name ->
println("Star Wars film name $name") // will print the name every time a new entity is added
}.launchIn(this)
}
Updating
I would also like to be able to push updates into a table using a MutableSharedFlow. Although I think this can be achieved already, having a built-in library function would help with ergonomics.
Restrictions
I understand that there are issues with the database drivers being inherently synchronous ('Working with Coroutines', https://github.com/JetBrains/Exposed/issues/1551#issuecomment-1198542569). However, I would like it if this functionality was implemented as best-effort, even if the underlying driver was not optimally.
Current options?
I couldn't see any easy way to do this presently.
I couldn't find any 'subscribe' or 'listening' options described in the DSL or DAO docs, and I couldn't see any existing usage of a Flow in the project.
SqlDelight has similar functionality: https://cashapp.github.io/sqldelight/jvm_sqlite/coroutines/
Related
- https://github.com/JetBrains/Exposed/issues/326#issuecomment-1199431112
- https://github.com/JetBrains/Exposed/issues/662
- https://github.com/JetBrains/Exposed/issues/1551#issuecomment-1198542569
👍 Kotlin Flow support would be super nice. Being able to process data as streams rather than batches is a huge plus. Also, it decreases the load on the DB because of the backpressure mechanism of flows. Makes processing huge amounts of data very easy.
I think the backpressure mechanism can be implemented by using pagination (limit - offset) with a synchronous drivers.
any updates or plan?