ydb-java-sdk icon indicating copy to clipboard operation
ydb-java-sdk copied to clipboard

SyncWriter and flush problem throwing Exception

Open ekuvardin opened this issue 7 months ago • 0 comments

When writing to topic code may produce a RuntimeException wraped in CompletionException.

For example, writing with codec = 10000 or 7.


 WriterSettings settings = WriterSettings.newBuilder()
                .setTopicPath(topicName)
                .setCodec(7)
                .build();
        SyncWriter writer = client.createSyncWriter(settings);
         writer.initAndWait();


        for (byte[] testMessage : testMessages) {
            writer.send(Message.newBuilder().setData(testMessage).build());
        }

        writer.flush();
        writer.shutdown(1, TimeUnit.MINUTES);

writer.flush(); - is the cause of our problem Correct behavior—work without exception because we wait for all jobs to be completed and return results;

But if you machine is too quick - you will get CompletionException

Spoiler warning java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more 11:15:44.949 [pool-4-thread-2] ERROR tech.ydb.topic.write.impl.WriterImpl - [yNNtF0] Exception while encoding message: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more 11:15:44.950 [pool-4-thread-5] ERROR tech.ydb.topic.write.impl.WriterImpl - [yNNtF0] Exception while encoding message: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more 11:15:44.950 [pool-4-thread-3] ERROR tech.ydb.topic.write.impl.WriterImpl - [yNNtF0] Exception while encoding message: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more

After some research I can make test work more stable without throwing Exception(May be I was lucking). It's just a guess and should be checked:

  1. WriterImpl - make lastAcceptedMessageFuture volatile. (Seems it's changed in different threads)
  2. WriterImpl - acceptMessageIntoSendingQueue should return CompletableFuture<Void>
  3. WriterImpl - tryToEnqueue should return acceptMessageIntoSendingQueue(message);

ekuvardin avatar May 05 '25 08:05 ekuvardin