rsocket-js icon indicating copy to clipboard operation
rsocket-js copied to clipboard

Rsocket Websocket Client - issue with composite metadata at browsers

Open maxim-bandurko-lsvt opened this issue 6 years ago • 26 comments

Hello.

With new release 0.0.18 were implemented Composite Metadata: https://github.com/rsocket/rsocket-js/issues/49

Was trying to start using it and got to issue with composite metadata when using rsocket websocket client at browsers: TypeError: Unknown encoding: undefined

in LiteBuffer.js at line 255

Buffer.prototype.write = function write(input, offset, length, encoding) {
  switch (encoding) {
    case 'utf8':
      return utf8Write(this, input, offset, length);
    default:
      throw new TypeError('Unknown encoding: ' + encoding);
  }
};

@OlegDokuka Can you please check these screenshots:

Screen Shot 2019-11-22 at 11 09 39 AM

Screen Shot 2019-11-22 at 11 10 44 AM

Screen Shot 2019-11-22 at 11 15 32 AM

As it looks like at CompositeMetadata.js at line 191: const customMimeLength = metadataHeader.write(customMime, 1);

is needed 2 more parameters.

I had used example from: https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/CompositeMetadataExample.js

Thank you.

maxim-bandurko-lsvt avatar Nov 22 '19 19:11 maxim-bandurko-lsvt

Hello @maxim-bandurko-lsvt!

Thanks for reporting the issue! I would like to clarify, did you use the example for the repository and it did not work well for you?

Regards, Oleh

OlegDokuka avatar Nov 22 '19 21:11 OlegDokuka

Oh yeah! I see. I tested that with the node, but the node has a native Buffer that works differently. So, due to the node Buffer spec, the default encoding is UTF-8, that is basically why I ignored the rest of params, but the polyfill that we keep for browser missing that one. So, good catch up!

Thank you for reporting that!

OlegDokuka avatar Nov 22 '19 22:11 OlegDokuka

@maxim-bandurko-lsvt created a PR

OlegDokuka avatar Nov 22 '19 22:11 OlegDokuka

@OlegDokuka Thank you!

maxim-bandurko-lsvt avatar Nov 22 '19 23:11 maxim-bandurko-lsvt

Hi @OlegDokuka

with js client with composite metadata

     this.socket
         .requestResponse({
           data:  Buffer.from(JSON.stringify(scannerReq)),
           metadata: encodeAndAddWellKnownMetadata(
                           encodeAndAddCustomMetadata(
                             Buffer.alloc(0),
                             TEXT_PLAIN.string,
                             Buffer.from('Hello World')
                          ),
                          
                           MESSAGE_RSOCKET_ROUTING,
                          // meta
                           Buffer.from('irl.user.location.user1')
                     )
         })

Rsocket springboot server throws an exception for routing metadata

java.lang.IndexOutOfBoundsException: readerIndex(1) + length(105) exceeds writerIndex(23): UnpooledSlicedByteBuf(ridx: 1, widx: 23, cap: 23/23, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 231, cap: 231))
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1495)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoError] :
	reactor.core.publisher.Mono.error(Mono.java:281)
	io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:199)
Error has been observed at the following site(s):
	|_ Mono.error ⇢ at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:199)
Stack trace:
		at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1495)
		at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1478)
		at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:894)
		at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
		at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
		at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
		at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:115)
		at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
		at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
		at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestResponse(MessagingRSocket.java:122)
		at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193)
		at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299)

nofelkad avatar Jan 21 '20 07:01 nofelkad

@nofelkad Fix is inside the gitgub master branch already, but @OlegDokuka would be nice if it can go to PR also.

maxim-bandurko-lsvt avatar Jan 21 '20 22:01 maxim-bandurko-lsvt

@maxim-bandurko-lsvt i have incorporated this changes. This error is post that. Prior i was getting the same error for which this issue was opened (TypeError: Unknown encoding: undefined).

nofelkad avatar Jan 22 '20 05:01 nofelkad

@nofelkad I am getting the same issue with spring boot:

java.lang.IndexOutOfBoundsException: readerIndex(1) + length(99) exceeds writerIndex(22): UnpooledSlicedByteBuf(ridx: 1, widx: 22, cap: 22/22, unwrapped: UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 44, cap: 44/44)) at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1495) at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1478) at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:894) at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47) at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37) at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136) at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:115) at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195) at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167) at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127) at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207) at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) at reactor.core.publisher.Flux.subscribe(Flux.java:8174) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:158) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1478) at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1227) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1274) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:503) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:442) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:830)

Using simple MessageMapping in controller:

@MessageMapping("test") public Flux<DataBuffer> collection(DataBuffer request) { .... }

@OlegDokuka I assume that implementation of Composite Metadata is not implemented yet?

maxim-bandurko-lsvt avatar Feb 11 '20 19:02 maxim-bandurko-lsvt

WS messages dump:

����������ê�¿ 'message/x.rsocket.composite-metadata.v0application/octet-stream ���������, text/plain��Someþ��test{"action": 1}`

maxim-bandurko-lsvt avatar Feb 11 '20 19:02 maxim-bandurko-lsvt

@OlegDokuka @nofelkad Just figured out that example at: https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/CompositeMetadataExample.js Is missing the route length inside buffer.

Line 65: Buffer.from('test.service'),

Buffer should include the length for route before actual route. Buffer.from(String.fromCharCode(route.length) + route)

maxim-bandurko-lsvt avatar Feb 11 '20 20:02 maxim-bandurko-lsvt

@maxim-bandurko-lsvt can you please check if this is still an issue in the latest rsocket release? All of these should be fixed

OlegDokuka avatar Jan 25 '21 20:01 OlegDokuka

@OlegDokuka Sure!

maxim-bandurko-lsvt avatar Jan 26 '21 23:01 maxim-bandurko-lsvt

@OlegDokuka Can't make it working any more. Latest JS client release 0.0.23 with latest Spring Boot release 2.5.0 (M1).

Used your example from https://github.com/rsocket/rsocket/issues/281: https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/CompositeMetadataExample.js#L47

if have metadata as a buffer like in your sample, it just disconnects client from server. If passing just null instead of buffer - it keeps connection with no responses after that.

Old example with encodeAndAddWellKnownMetadata is not working any more also: https://github.com/rsocket/rsocket-js/commit/46fc7dde0bf122fcabf038fe529bd41c3d09f0f7#diff-2deb4e362a27c00f3ae7336914902a0c9c5ea581b0de3586b3e21a70e4962373

Only JS client release 0.0.19 is most stable one that works with Java server.

maxim-bandurko-lsvt avatar Feb 03 '21 01:02 maxim-bandurko-lsvt

@maxim-bandurko-lsvt do you have something that I can reproduce locally to see what is going wrong.

OlegDokuka avatar Feb 03 '21 12:02 OlegDokuka

@OlegDokuka I did example for you using more recent versions of Webpack with core-js and added "buffer" module (as it's polyfill was dropped from Webpack v5 https://github.com/rsocket/rsocket-js/issues/110 ), and all started working properly! So issue is with my JS frameworks and I need to switch version of Webpack from v4 to v5.

Anyway, attaching example of JS client with Java server, in case if somebody needs simple example for composite metadata with binary mime type.

example.zip

The only thing is that can't remind myself how exactly to extract those metadata from message inside the @MessageMapping not modifying RSocketStrategies bean (had example above year ago and was waiting using it in production till composite metadata will be finally implemented).

    @MessageMapping("test.requestStream")
    public Flux<DataBuffer> testRequestStream(DataBuffer data, @Headers Map<String, DataBuffer> headers) {
        log.info("-->> testRequestStream");
        log.info("payload: " + new String(data.asByteBuffer().array()));
        log.info("headers: " + headers);

        return Flux
            .interval(Duration.ofSeconds(3))
            .map(
                i -> new DefaultDataBufferFactory().wrap(("ID: " + i).getBytes())
            )
            ;
    }

maxim-bandurko-lsvt avatar Feb 03 '21 22:02 maxim-bandurko-lsvt

@OlegDokuka Just did all updates at my JS framework builds and everything works as a charm!

I know, may be this thread is not correct one for a question about extracting composite metadata at MessageMapping in Spring Java, but will be very thankful to you with guide on this.

maxim-bandurko-lsvt avatar Feb 03 '21 23:02 maxim-bandurko-lsvt

@maxim-bandurko-lsvt, just FYI, we have our own buffer polyfill, thus adding an external one is redundant

OlegDokuka avatar Feb 04 '21 07:02 OlegDokuka

@OlegDokuka Yes, and with previous versions it was used as a substitution on default. Coming from https://github.com/rsocket/rsocket-js/issues/110 it is not included on default as I see. This actually can be confusing, as it works for Node, but doesn't work for browser app and depends on 3 options how to make it working.

I think it makes sense to come with new Buffer abstraction similar to DataBuffer in Spring. So, no matter what buffer polyfill is used, or native Node Buffer, the constructor/factory of this DataBuffer will figure out and will use polyfill object isolated in a scope automatically if global one is missing. In this case developer can just use rsocket modules with no conflicts with certain polyfills that their apps have.

Honestly, wish JS had all those framework that Spring has!

maxim-bandurko-lsvt avatar Feb 04 '21 17:02 maxim-bandurko-lsvt

@maxim-bandurko-lsvt have no idea, looks like it is your webpack setup. Here is the example where there are no other Buffer polyfill included -> https://github.com/OlegDokuka/rsocket-crosslanguage-example/blob/master/js-client/webpack.config.js and only lite-buffer impl is used as a Buffer polyfill.

In any case, I'm working toward excluding litebuffer from the core at all. Please see #112

OlegDokuka avatar Feb 04 '21 17:02 OlegDokuka

@OlegDokuka those example is not working.

Any way #112 is good idea.

Also, just tested passing encoded data by encodeCompositeMetadata method into data of payload, and server tolerated that very good ;)

            .requestStream({
                data: encodeCompositeMetadata(
                    [
                        [TEXT_PLAIN, Buffer.from('Hello World')],
                        ['custom/test/metadata', Buffer.from([1, 2, 3])],
                    ]
                ),
                metadata: encodeCompositeMetadata(
                    [
                        [TEXT_PLAIN, Buffer.from('Hello World')],
                        [MESSAGE_RSOCKET_ROUTING, encodeRoute("test.requestStream")],
                        [
                            MESSAGE_RSOCKET_AUTHENTICATION,
                            encodeSimpleAuthMetadata('user', 'pass'),
                        ],
                        ['custom/test/metadata', Buffer.from([1, 2, 3])],
                    ]
                ),
            })

So, the actual encoding way can be used in data and anybody can go with "JSON less" payload with composed multiple data chunks in RSocket. Server got:

UnpooledHeapByteBuf(ridx: 0, widx: 42, cap: 42/42)
 �
                                                                                                               Hello Worldcustom/test/metadata

How can we extract those data from UnpooledHeapByteBuf? As if that is tolerated very good in RSocket logic, there will be no need to use JSON serialization/deserialization for composited "sub" payloads. Server can just extract any of it that is needed, and use buffer for needed things (or deserialize if need an object of it).

maxim-bandurko-lsvt avatar Feb 04 '21 18:02 maxim-bandurko-lsvt

those example is not working

Can you please point (error, etc) what does not work for you in my example since it "works on my machine" ™️

OlegDokuka avatar Feb 04 '21 18:02 OlegDokuka

Also, can you clarify please, what do you mean by 'JSON less' ? Any pointers in the code?

Also, composite metadata is note supposed to be passed as data in a payload. Though, in rsocket-Java, there is a CompositeMetadata class which may decode given ByteBuf

OlegDokuka avatar Feb 04 '21 18:02 OlegDokuka

those example is not working

Can you please point (error, etc) what does not work for you in my example since it "works on my machine" ™️

It hasn't any errors. It just get connected to Java server and nothing more after that. WS messages dump has only connection message:

����������ˆ��ê`'message/x.rsocket.composite-metadata.v0
text/plainYourRandomIDHere

no more messages.

I've cleaned up 'node_modules' and did install again. It is still the same.

maxim-bandurko-lsvt avatar Feb 04 '21 20:02 maxim-bandurko-lsvt

Also, composite metadata is note supposed to be passed as data in a payload. Though, in rsocket-Java, there is a CompositeMetadata class which may decode given ByteBuf

Yep, composite metadata inside data of payload worked very good and seems to be very promising to have wide usage! Here is simple example how to get it at Java in case somebody will need to use that logic:

@Controller
public class SocketController {

    private static MimeType COMPOSITE_METADATA = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());


    @Autowired
    private RSocketStrategies strategies;


    @MessageMapping("test.requestStream")
    public Flux<DataBuffer> testRequestStream(DataBuffer data, @Headers Map<String, Object> headers) {
        log.info("-->> testRequestStream");
        log.info("data: " + data);
        log.info("payload: " + new String(data.asByteBuffer().array()));
        log.info("headers: " + headers);

        
        DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(
            StringDecoder.allMimeTypes()
        );

        extractor.metadataToExtract(
            MimeType.valueOf(WellKnownMimeType.TEXT_PLAIN.getString()),
            String.class, 
            "text/plain"
        );

        extractor.metadataToExtract(
            MimeType.valueOf("custom/test"),
            String.class, 
            "custom/test"
        );


        Payload payload = PayloadUtils.createPayload(this.strategies.dataBufferFactory().allocateBuffer(), data);

        Map<String, Object> parsed = extractor.extract(payload, COMPOSITE_METADATA);
        log.info("parsed: " + parsed);
        

        return Flux
            .interval(Duration.ofSeconds(3))
            .map(
                i -> new DefaultDataBufferFactory().wrap(("ID: " + i).getBytes())
            )
            ;
    }
    
}

maxim-bandurko-lsvt avatar Feb 05 '21 04:02 maxim-bandurko-lsvt

Also, can you clarify please, what do you mean by 'JSON less' ? Any pointers in the code?

Sometimes payloads can be very huge and consist not only with simple short strings, numbers, booleans. They may have dozen thousands lines of plain text, base64 data, etc. Common tactic is just serialize that into JSON at client and deserialize that to object at server (or reverse direction if that is response). And that is normal in terms of small payloads, but when payload is getting huge part of just string data that no need to be serialized (as it is just huge string) and size of this payload gets in megabytes and such payloads are very often in application, that takes a lot of resources to both server and client because of serialization and deserialization. To help make such requests processing logic cheaper a lot of developers starting using array of buffers that is can say "composited" into one payload buffer using certain byte as a delimiter. That works, but not very convenient, as it is needed to create small factories to each payload as to get correct types of properties values in payload and do object mapping in some way.

So, it is very straight forward to have some kind "CompositeBuffer" that keeps buffer indexes with key representation and value type in the head of this buffer. Encoder makes this header and adds to top of buffer and decoder extracts the header, parses structure by index for each key and allows to extract all property keys converted to correct type, and finally map that to object if that is needed for convenient usage.

I understand that RSocket uses such principe of encoding composite metadata for another things, but it worked very well out of box just for data on both javascript and Java sides ;)

maxim-bandurko-lsvt avatar Feb 05 '21 04:02 maxim-bandurko-lsvt

@maxim-bandurko-lsvt

Yep, composite metadata inside data of payload worked very good and seems to be very promising to have wide usage! Here is simple example how to get it at Java in case somebody will need to use that logic:

think about composite metadata as of HTTP headers. Do you ever encode your logical message inside heders?

I understand that RSocket uses such principe of encoding composite metadata for another things, but it worked very well out of box just for data on both javascript and Java sides ;)

Yeah. As you wish :D

OlegDokuka avatar Feb 05 '21 08:02 OlegDokuka