Exposed icon indicating copy to clipboard operation
Exposed copied to clipboard

Use Kotlinx Coroutines SharedFlow to subscribe to updates

Open aSemy opened this issue 3 years ago • 3 comments

I would like to use a Kotlinx Coroutines SharedFlow to subscribe to entity updates.

Example usage

Here's a very quick demo showing how I'd like to use it:

object StarWarsFilms : IntIdTable() {
  val sequelId: Column<Int> = integer("sequel_id").uniqueIndex()
  val name: Column<String> = varchar("name", 50)
  val director: Column<String> = varchar("director", 50)
}

val starWarsFilmNames: SharedFlow<String> =
  StarWarsFilms
    .select { 
      StarWarsFilms.sequelId eq 8
    }
    .sharedFlow { 
      // map the entity
      it[StarWarsFilms.name]
    }

suspend fun main() {
  starWarsFilmNames.onEach { name ->
    println("Star Wars film name $name") // will print the name every time a new entity is added
  }.launchIn(this)
}

Updating

I would also like to be able to push updates into a table using a MutableSharedFlow. Although I think this can be achieved already, having a built-in library function would help with ergonomics.

Restrictions

I understand that there are issues with the database drivers being inherently synchronous ('Working with Coroutines', https://github.com/JetBrains/Exposed/issues/1551#issuecomment-1198542569). However, I would like it if this functionality was implemented as best-effort, even if the underlying driver was not optimally.

Current options?

I couldn't see any easy way to do this presently.

I couldn't find any 'subscribe' or 'listening' options described in the DSL or DAO docs, and I couldn't see any existing usage of a Flow in the project.

SqlDelight has similar functionality: https://cashapp.github.io/sqldelight/jvm_sqlite/coroutines/

Related

  • https://github.com/JetBrains/Exposed/issues/326#issuecomment-1199431112
  • https://github.com/JetBrains/Exposed/issues/662
  • https://github.com/JetBrains/Exposed/issues/1551#issuecomment-1198542569

aSemy avatar Aug 13 '22 12:08 aSemy

👍 Kotlin Flow support would be super nice. Being able to process data as streams rather than batches is a huge plus. Also, it decreases the load on the DB because of the backpressure mechanism of flows. Makes processing huge amounts of data very easy.

I think the backpressure mechanism can be implemented by using pagination (limit - offset) with a synchronous drivers.

Dogacel avatar Jan 05 '23 09:01 Dogacel

any updates or plan?

wwalkingg avatar May 24 '23 07:05 wwalkingg