sse-eventbus icon indicating copy to clipboard operation
sse-eventbus copied to clipboard

Customize output format for different connected clients?

Open JogoShugh opened this issue 1 year ago • 7 comments

Hello,

I was wondering if it's possible to dispatch a single event, but then format it different based on the client. I couldn't see a way to do this that is already built in, as the convert logic is global, not client-specific. So, I forked it and have spiked out an initial approach to doing this.

Let me know if you'd like to support this, I would be happy to clean up what I have done and do it in a way you'd prefer to see it. I imagine rather than subclassing, we could use a bean injection approach that passes a custom instance into SseEventBus.

In any case, here is what I've had to change in your source so far:

https://github.com/JogoShugh/sse-eventbus/pull/1/files

I wouldn't really call what I've done true "mediatype" support, since it's still streaming out text/event-stream in the "event: data: " format, but for now it works for me.

Then, in my own project code:

package org.starbornag.api

import ch.rasc.sse.eventbus.*
import ch.rasc.sse.eventbus.config.SseEventBusConfigurer
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.http.MediaType
import org.starbornag.api.domain.bed.BedCellPlanted
import org.starbornag.api.domain.bed.BedEvent

class BedSseEventBus(configurer: SseEventBusConfigurer?, subscriptionRegistry: SubscriptionRegistry?) :
    SseEventBus(configurer, subscriptionRegistry) {

    private val iconMap = mapOf(
        "BedCellWatered" to "💧",
        "BedFertilized" to "🌿",
        "BedMulched" to "🪵"
    )

    private fun plantTypeToIcon(plantType: String) =
        when (plantType.lowercase()) {
            "tomato" -> "🍅"
            "eggplant" -> "🍆"
            "potato" -> "🥔"
            "carrot" -> "🥕"
            "hot pepper" -> "🌶️"
           // etc.....
            else -> "" // Default case (for unknown plant types)
        }

    override fun convertObjectForClient(event: SseEvent, client: Client): String {
        val mediaType = client.mediaType()
        return when (mediaType) {
            MediaType.APPLICATION_JSON -> super.convertObjectForClient(event, client)
            else -> {
                when (val data = event.data()) {
                    is BedCellPlanted -> plantTypeToIcon(data.plantType)
                    is BedEvent -> iconMap[data.javaClass.simpleName] ?: ""
                    else -> super.convertObjectForClient(event, client)
                }
            }
        }
    }
}

@Configuration
class BedSseEventBusConfiguration {
    @Autowired(required = false)
    protected var objectMapper: ObjectMapper? = null

    @Autowired(required = false)
    protected var dataObjectConverters: List<DataObjectConverter>? = null

    @Bean
    fun eventBus(): SseEventBus {
        val config = object : SseEventBusConfigurer {} // Use object expression

        val sseEventBus = BedSseEventBus(config, DefaultSubscriptionRegistry()) // Your derived class

        val converters = dataObjectConverters?.toMutableList() ?: mutableListOf()

        if (objectMapper != null) {
            converters.add(JacksonDataObjectConverter(objectMapper!!))
        } else {
            converters.add(DefaultDataObjectConverter())
        }

        sseEventBus.dataObjectConverters = converters

        return sseEventBus
    }
}

@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.CLASS)
@MustBeDocumented
@Import(
    BedSseEventBusConfiguration::class
)
annotation class EnableBedSseEventBus

Then, the only other change is adding the media type during creation of the client-specific emitter:

@GetMapping("/api/beds/{bedId}/events", produces =
    [MediaType.TEXT_EVENT_STREAM_VALUE, MediaType.APPLICATION_JSON_VALUE])
fun events(@PathVariable bedId: UUID,
           @RequestParam clientId: UUID,
           @RequestHeader("Accept") acceptHeader: MediaType?
): ResponseEntity<SseEmitter> {
            val bed = BedRepository.getBed(bedId)
    val eventNames = getEventNamesFromBedCells(bed)
    val mediaType = acceptHeader ?: MediaType.TEXT_PLAIN
    return ResponseEntity.ok(sseEventBus.createSseEmitter(
            clientId.toString(),
        120_000L,
            mediaType,
            *eventNames.toTypedArray()
        )
    )
}

private fun getEventNamesFromBedCells(bed: BedAggregate?) =
    bed!!.rows.flatMap { row ->
        row.cells.flatMap { cell ->
            listOf("events-$cell", "plants-$cell")
        }
    }
}

Test case

Here I have one browser client and two CLIs. The browser client actually sends text/event-stream (maybe that should be the default instead of text/plain) and one of the CLIs sends application/json:

The UI is using HTMX, so all I really need to do is stream fragments of HTML (so far just emojis lol) to populate the cells with current events:

image

But, I also want to be able to listen out for JSON events of the same origin and do other things with that.

So, this is working for me, but if you think there is some value here let me know, I'll be happy to clean it up and submit a PR.

JogoShugh avatar Oct 03 '24 18:10 JogoShugh

Well, I made another commit that uses a bean injection approach. It's much better already:

https://github.com/JogoShugh/sse-eventbus/commit/9617522d85f52da6fece0bca50839cec44f693f5

So, in my code, I no longer subclass, I just now have:

class BedEventConverter : MediaTypeAwareDataObjectConverter {
    private val iconMap = mapOf(
        "BedCellWatered" to "💧",
        "BedFertilized" to "🌿",
        "BedMulched" to "🪵"
    )

    private fun plantTypeToIcon(plantType: String) =
        when (plantType.lowercase()) {
            "tomato" -> "🍅"
            "eggplant" -> "🍆"
            "potato" -> "🥔"
            "carrot" -> "🥕"
            "corn" -> "🌽" // or "ear of corn"
            "hot pepper" -> "🌶️"
            "bell pepper" -> "🫑"
            "cucumber" -> "🥒"
            "broccoli" -> "🥦"
            "garlic" -> "🧄"
            "onion" -> "🧅"
            "lettuce" -> "🥬"
            "sweet potato" -> "🍠"
            "chili pepper" -> "🌶"
            "mushroom" -> "🍄"
            "peanuts" -> "🥜"
            "beans" -> "🫘"
            "chestnut" -> "🌰"
            "ginger root" -> "🫚"
            "shallot" -> "🫛"
            "herb" -> "🌿" // If you want to include herbs
            else -> "" // Default case (for unknown plant types)
        }

    override fun supports(event: SseEvent, mediaType: MediaType): Boolean {
        return when (event.data()) {
            is BedEvent -> true
            else -> false
        }
    }

    override fun convert(event: SseEvent, mediaType: MediaType): String? {
        return when (mediaType) {
            MediaType.APPLICATION_JSON -> null
            else -> {
                when (val data = event.data()) {
                    is BedCellPlanted -> plantTypeToIcon(data.plantType)
                    is BedEvent -> iconMap[data.javaClass.simpleName] ?: ""
                    else -> null
                }
            }
        }
    }
}

@Configuration
class BedEventConverterConfiguration {
    @Bean
    fun bedEventFormatter() : MediaTypeAwareDataObjectConverter = BedEventConverter()
}

JogoShugh avatar Oct 03 '24 19:10 JogoShugh

Hi

That looks great. I'll be happy to integrate it when you create a pull request.

Ralph

ralscha avatar Oct 04 '24 12:10 ralscha

Great! I'll work on it this weekend, will add tests and such

JogoShugh avatar Oct 04 '24 13:10 JogoShugh

Sitll hacking on this...

I wanted to dig deeper into SseEmitter and, in particular, understand DataWithMediatype..

As far as when using WebFlux, I am able to do it like this:

package org.starbornag.api

import org.springframework.http.HttpInputMessage
import org.springframework.http.HttpOutputMessage
import org.springframework.http.MediaType
import org.springframework.http.converter.HttpMessageConverter
import org.springframework.stereotype.Component
import org.starbornag.api.domain.bed.BedEvent

@Component
class BedEventToHtmlConverter : HttpMessageConverter<BedEvent> {

    override fun canRead(clazz: Class<*>, mediaType: MediaType?): Boolean = false

    override fun canWrite(clazz: Class<*>, mediaType: MediaType?): Boolean {
        return BedEvent::class.java.isAssignableFrom(clazz) &&
                mediaType?.isCompatibleWith(MediaType.TEXT_HTML) == true
    }

    override fun getSupportedMediaTypes(): List<MediaType> = listOf(MediaType.TEXT_HTML)

    override fun read(clazz: Class<out BedEvent>, inputMessage: HttpInputMessage): BedEvent {
        throw UnsupportedOperationException("Reading BedEvent from HTML is not supported")
    }

    override fun write(bedCellPlanted: BedEvent, contentType: MediaType?, outputMessage: HttpOutputMessage) {
        val html = BedEventHtmlFormatter.convert(bedCellPlanted)
        outputMessage.headers.contentType = MediaType.TEXT_HTML
        outputMessage.body.write(html.toByteArray())
    }
}

This then allows us to use the sseBuilder.data(Object, MediaType) overload which produces an instance of this:

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.DataWithMediaType.html

And, the SseEmitter itself knows how to deal with that and call this appropriate implementation of HttpMessageConverter<BedEvent> as show above.

So, this approach would remove the need entirely for the "MediaTypeAware" converter in my first pass.

After I realized the built in jackson converter was taking precedence, so I added a configurer option to bypass the DataObjectConverters, defaulting to false.

Then, in the my project I just did:

@Configuration
class SseEventBusConfiguration : SseEventBusConfigurer {
    override fun bypassDataObjectConverters(): Boolean = true
}

And thus, I still get exactly what I'm hoping for:

Two listeners, one wanting application/json and the other wanting text/html:

image

And, firing off the behavior that triggers the eventsL:

image

JogoShugh avatar Oct 17 '24 01:10 JogoShugh

That looks very good

ralscha avatar Oct 17 '24 07:10 ralscha

OK cool, I'll go with this approach then for final PR. Since the bypass option will be additive / opt-in it shouldn't break any existing behaviors.

JogoShugh avatar Oct 17 '24 13:10 JogoShugh

Note: As a tangent to this, I'm working on using this library Actson to do streaming JSON object "emission" (Kotlin Flow terminilogy) since openAI streams JSON to you in chunks like:

{ "comm

ands": [

{ "plan

tType": "tom

ato"

etc etc..

So, when combined with your library, I'm now able to turn the streamed JSON commands translated from openAI into commands as soon just enough of the fragment comes in, process it, and fire off the SSE notifications as quickly as possible.

I fiddled around with the Function Calling / "toolCalls" support that they provide, but it doesn't seem any more effective for what I'm doing than carefully constructing the prompt and giving it examples.

Example of that (still working bugs out) -> https://github.com/michel-kraemer/actson/issues/91

Most critically:

  • Controller starts processing at 1728965311009
  • 2.6 seconds after that, Spring AI has finished getting the first translated command back utilizing the Flow operator in the linked issue
  • 31 ms after that, I'm launching coroutine to process the command
  • 3 ms after that, I'm publishing events through your event bus! -- and all that is done very quickly indeed
plantSeedling controller start: 1728965311009
Found complete json object: {"bedId":"2fbda883-d49d-4067-8e16-2b04cc523111","plantType":"tomato","plantCultivar":"roma","location":{"rows":[1]}}: 2:606
Processing command: PlantSeedling(bedId=2fbda883-d49d-4067-8e16-2b04cc523111, started=Tue Oct 15 00:08:33 EDT 2024, plantType=tomato, plantCultivar=roma, location=org.starbornag.api.domain.bed.command.CellsSelection@3f8a11d9): 2:637
publishEvent: 2:640
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
publishEvent: 2:641
Found complete json object: {"bedId":"2fbda883-d49d-4067-8e16-2b04cc523111","plantType":"cucumber","plantCultivar":"pickling","location":{"rows":[2]}}: 4:162
Processing command: PlantSeedling(bedId=2fbda883-d49d-4067-8e16-2b04cc523111, started=Tue Oct 15 00:08:35 EDT 2024, plantType=cucumber, plantCultivar=pickling, location=org.starbornag.api.domain.bed.command.CellsSelection@3f58d719): 4:162
publishEvent: 4:162
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
publishEvent: 4:163
etc etc

Even just calling openAi directly with cUrl and the same prompting takes 4 to 6 seconds when the embedded phrase results in N distinct commands. So, will be looking into fine-tuning a base model, but that's all very sub-tangent of course.

JogoShugh avatar Oct 17 '24 13:10 JogoShugh