r2dbc-postgresql
r2dbc-postgresql copied to clipboard
Persisting an spring data entity with OffsetDateTime field does not work correctly
Bug Report
Versions
- Driver: 0.9.1
- Database: 11.9
- Java: 17
- OS: Amazon Linux 2
Current Behavior
When trying to save an entity with an OffsteDateTime field in the entity the following stacktrace is visible when running the Spring boot application within a docker container running Amazon Linux 2.
Stack trace
Caused by: java.lang.ClassCastException: class java.time.OffsetDateTime cannot be cast to class java.time.LocalTime (java.time.OffsetDateTime and java.time.LocalTime are in module java.base of loader 'bootstrap')
at io.r2dbc.postgresql.codec.BuiltinCodecSupport.encodeToText(BuiltinCodecSupport.java:83)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSupplier] :
reactor.core.publisher.Mono.fromSupplier
io.r2dbc.postgresql.codec.AbstractCodec.create(AbstractCodec.java:150)
Error has been observed at the following site(s):
*__Mono.fromSupplier ? at io.r2dbc.postgresql.codec.AbstractCodec.create(AbstractCodec.java:150)
|_ Flux.from ? at io.r2dbc.postgresql.PostgresqlStatement.lambda$collectBindingParameters$10(PostgresqlStatement.java:275)
|_ Flux.reduce ? at io.r2dbc.postgresql.PostgresqlStatement.lambda$collectBindingParameters$10(PostgresqlStatement.java:276)
*_____Flux.concatMap ? at io.r2dbc.postgresql.PostgresqlStatement.collectBindingParameters(PostgresqlStatement.java:271)
|_ Flux.collectList ? at io.r2dbc.postgresql.PostgresqlStatement.collectBindingParameters(PostgresqlStatement.java:279)
|_ Mono.flatMapMany ? at io.r2dbc.postgresql.PostgresqlStatement.lambda$execute$7(PostgresqlStatement.java:216)
|_ Flux.handle ? at io.r2dbc.postgresql.PostgresqlResult.map(PostgresqlResult.java:107)
*_______Flux.flatMap ? at org.springframework.r2dbc.core.DefaultFetchSpec.lambda$all$2(DefaultFetchSpec.java:89)
*_____Flux.usingWhen ? at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:134)
|_ Flux.onErrorMap ? at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:146)
|_ ? at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:87)
|_ Flux.last ? at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.doInsert(R2dbcEntityTemplate.java:648)
|_ Mono.flatMap ? at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.doInsert(R2dbcEntityTemplate.java:648)
|_ ? at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.lambda$null$6(R2dbcEntityTemplate.java:590)
*_______Mono.flatMap ? at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.lambda$doInsert$7(R2dbcEntityTemplate.java:590)
*_______Mono.flatMap ? at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.doInsert(R2dbcEntityTemplate.java:581)
|_ ? at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.insert(R2dbcEntityTemplate.java:574)
|_ ? at org.springframework.data.r2dbc.repository.support.SimpleR2dbcRepository.save(SimpleR2dbcRepository.java:146)
|_ ? at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.lambda$decorate$1(RepositoryMethodInvoker.java:229)
*_____Mono.usingWhen ? at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.decorate(RepositoryMethodInvoker.java:226)
|_ ? at com.klarna.connected.card.transaction.service.repository.$Proxy116.save(null:-1)
|_ Mono.mapNotNull ? at com.klarna.connected.card.transaction.service.repository.TransactionRepositoryCustom.save(TransactionRepositoryCustom.java:49)
|_ Mono.flatMap ? at com.klarna.connected.card.transaction.service.repository.TransactionRepositoryCustom.save(TransactionRepositoryCustom.java:50)
Original Stack Trace:
at io.r2dbc.postgresql.codec.BuiltinCodecSupport.encodeToText(BuiltinCodecSupport.java:83)
at io.r2dbc.postgresql.codec.BuiltinCodecSupport.lambda$doEncode$0(BuiltinCodecSupport.java:76)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:195)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.complete(MonoDelayUntil.java:418)
at reactor.core.publisher.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:531)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:121)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:538)
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:486)
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:430)
at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:310)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:460)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:753)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:247)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:213)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:204)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:619)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:885)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:761)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:667)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Table schema
Input Code
create table transaction_service.TRANSACTION
(
event_id uuid primary key default uuid_generate_v4(),
persona_id uuid,
k_merchant_id varchar(100),
connected_card_id bigint not null,
merchant_id varchar(255) not null,
merchant_name text not null,
merchant_location_id int not null,
created_at timestamp not null,
updated_at timestamp not null,
transaction_amount bigint not null,
transaction_currency varchar(3) not null,
transaction_at timestamptz not null
);
Steps to reproduce
In the entity backing this table I create a field with the type OffsetDateTime and when saving it with a spring data repository I get the stack trace above. When using an Instant it works as expected.
Input Code
@Table("transaction_service.TRANSACTION")
@Data
@Builder
@With
@NoArgsConstructor
@AllArgsConstructor
@Valid
public class Transaction implements Persistable<UUID> {
@Id
@Getter(AccessLevel.NONE)
private UUID eventId;
private UUID personaId;
private String kMerchantId;
@NotNull
private Long connectedCardId;
@NotNull
private String merchantId;
private String merchantName;
private Long merchantLocationId;
@NotNull
@CreatedDate
private Instant createdAt;
@NotNull
@LastModifiedDate
private Instant updatedAt;
@NotNull
private Long transactionAmount;
@NotNull
private String transactionCurrency;
@NotNull
private OffsetDateTime transactionAt;
@Override
public UUID getId() {
return eventId;
}
@Override
public boolean isNew() {
return eventId == null;
}
//Repositoy
interface TransactionRepository extends ReactiveCrudRepository<Transaction, UUID> {
}
Expected behavior/code
I would expect the driver to correctly handle the save of an OffsetDateTime as it should be supported according to the documentation.
Possible Solution
Additional context
Thanks for the details. Can you reliably reproduce the issue? I wasn't able to reproduce it with a minimal set of code. There seems to be something off with the codec lookup or codec caching. If you can reproduce the issue, I'd like to ask you to debug into the driver code and verify the state of the CachedCodecLookup.encodeCodecsCache, specifically through the findEncodeCodec(T value) method to verify what codec is being used and why this is.
I will try to see if I can reproduce it with minimum of code here. Will report back as soon as I have tried this.
Hmm this was not so easy for me to reproduce either. I will try to continue to see if I can actually debug this one.
I think I might have a clue regarding the reproducer. With 9b4dacb8d13f6be9ca49b702a1371c3e62d616d5 we changed the caching mechanism because we discovered that cache based on hashes may lead to conflicting duplicates so we update the caching keys. I would suggest putting the 0.9.2 snapshot into your production environment as we wanted to ship another service release anyway (based on the current state) and having a confirmation that this issue no longer happens would be a good confirmation before we ship the release.
What do you think?
It is interesting I can reproduce the problem but when I attach the debugger it does work. But not when running the container without the debugger. Will continue trying.
I do not know how to reproduce this as when I run the docker container with the java debugger it runs correctly but without it I get the stack trace. Not sure why this is happening.
Closing due to lack of actionability. If we have a way to reproduce the issue or have more insights, happy to reopen this ticket again.