grpc-kotlin icon indicating copy to clipboard operation
grpc-kotlin copied to clipboard

Flow and implementation ... ServiceCoroutineImplBase

Open ashNext opened this issue 3 years ago • 9 comments
trafficstars

I want to make a simple chat where one client sends a message and other clients receive it. I have done this on Bidirectional streaming RPC and StreamObserver via the ... ServiceImplBase implementation.

ChatServer.kt

private val clients = mutableMapOf<String, MutableSet<StreamObserver<Chat.MessageTextRequest>>>()

...

private class ChatService : ChatSendServiceGrpc.ChatSendServiceImplBase() {
    override fun send(responseObserver: StreamObserver<Chat.MessageTextRequest>): StreamObserver<Chat.MessageTextRequest> {

        return object : StreamObserver<Chat.MessageTextRequest> {
            override fun onNext(value: Chat.MessageTextRequest) {
                val observers = clients.computeIfAbsent(value.streamId) {
                    logger.info("New streamId ${value.streamId}")
                    mutableSetOf()
                }

                if (observers.add(responseObserver)) {
                    logger.info("Add responseObserver $responseObserver")
                }

                for (observer in observers) {
                    observer.onNext(messageTextRequest {
                        msgText = value.msgText
                    })
                }
            }

            override fun onError(t: Throwable?) {...}

            override fun onCompleted() {...}
        }
    }
}

But to do the same through Flow and implementation ... ServiceCoroutineImplBase does not work. I don't understand how to "remember" streams.

private class ChatService : ChatSendServiceGrpcKt.ChatSendServiceCoroutineImplBase() {
    override fun send(requests: Flow<Chat.MessageTextRequest>): Flow<Chat.MessageTextRequest> {

        return flow {
            requests.collect { value ->
			
                emit(messageTextRequest {
                    msgText = value.msgText
                })
            }
        }
    }
}

ashNext avatar Dec 02 '21 21:12 ashNext

I have a gRPC Kotlin sample that uses BiDi for a chat client & server: https://github.com/jamesward/cloud-native-grpc-kotlin

Maybe that can help you figure out what is going on?

jamesward avatar Dec 03 '21 15:12 jamesward

@jamesward hi, thx for reply, but your example is very simple, how can I get chatId for my shared flows? i need many chats

z0mb1ek avatar Dec 07 '21 02:12 z0mb1ek

Architecturally there are a lot of variations. In a real-world I'd use Kafka and use that as the hub. For a single-node configuration you can do something like you and I have but you should definitely use MutableSharedFlow as the hub, like: https://github.com/jamesward/cloud-native-grpc-kotlin/blob/main/chat-server/src/main/kotlin/cngk/ChatServer.kt#L17

Note how the sharedFlow.emitAll(filteredFlow) has to be run in a separate coroutine.

jamesward avatar Dec 07 '21 16:12 jamesward

@jamesward how can I get chatId from requests flow in first request? I see only a way like this:

            sharedFlow.combine(requests) { a, b ->
                b
            }.collect {
                println(it.t)
            }
        }

 return sharedFlow

but this check id every message on collect. Is there any way to get chatId in the first message and than save sharedFlow per chatId only once?

z0mb1ek avatar Dec 07 '21 16:12 z0mb1ek

I'm not totally understanding your architecture and what you mean by chatId. Can you provide more details on that?

jamesward avatar Dec 07 '21 16:12 jamesward

There is method with bidi streaming for message exchange for clients. Client send message to this method with chatId, and I need send this message only for other clients with this chatId. In your example you send message to ALL

z0mb1ek avatar Dec 07 '21 16:12 z0mb1ek

I see. Yeah, you will have to put that state somewhere. Map<String, MutableSharedFlow> might work but you'll have to be careful with concurrency. Ultimately this approach is only good for a toy. For prod you'll need to externalize that state or use Kafka (etc).

jamesward avatar Dec 07 '21 16:12 jamesward

No, I cannot. Because return of the flow called before collect. Or can you give some example with code please?

z0mb1ek avatar Dec 07 '21 16:12 z0mb1ek