pulsar-flink
pulsar-flink copied to clipboard
[BUG] NoClassDefFoundError and NPE when using PulsarDeserializer to deserialize messages in avro to flink row
Describe the bug
Basicially, I want to use PulsarDeserializer
to get avro data converted to flink row. But somehow, it keeps telling me NoClassDefFoundError and NPE.
The stdout in flink web ui shows the following
Exception in thread "AsyncHttpClient-7-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:298)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:465)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:431)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:423)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:277)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:268)
at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:239)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:491)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:458)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:256)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:1042)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more
which is quite confusing to me, because I checked this class org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
is in the fat job jar.
As for why avro->flink row, I want to process pulsar messages with field names and field positions. Since I have tons of topics and I don't want to have tons of useless classes the whole purpose of which is to do deserialization, that's why AvroDeser
is no go.
To Reproduce
It's very simple code.
JSONOptions jsonOptions = new JSONOptions(new HashMap<>(), "", "");
boolean useExtendField = true;
PulsarDeserializationSchema<Row> deserializer = new PulsarDeserializer(schemaInfo, jsonOptions, useExtendField);
FlinkPulsarSource<Row> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, deserializer, props);
source.setStartFromEarliest();
DataStream<Row> stream = env.addSource(source);
stream.print();
Expected behavior
Print flink row.
Log messages
The cli stdout
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 744d6c7ce1cb2f9e7ffc8b4a377b0ee5)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 744d6c7ce1cb2f9e7ffc8b4a377b0ee5)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at org.myorg.quickstart.StreamingJob.main(StreamingJob.java:131)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 744d6c7ce1cb2f9e7ffc8b4a377b0ee5)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.lang.NullPointerException
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:978)
at org.apache.pulsar.client.impl.ReaderBuilderImpl.create(ReaderBuilderImpl.java:77)
at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.createActualReader(ReaderThread.java:134)
at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:96)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.pulsar.client.impl.ReaderBuilderImpl.create(ReaderBuilderImpl.java:75)
... 2 more
Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.lang.NullPointerException
at org.apache.pulsar.client.impl.ConnectionHandler.handleConnectionError(ConnectionHandler.java:88)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$null$1(BinaryProtoLookupService.java:156)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:519)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:673)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
... 30 more
Caused by: java.lang.NullPointerException
at org.apache.pulsar.client.impl.ConsumerImpl.connectionOpened(ConsumerImpl.java:870)
at org.apache.pulsar.client.impl.ConnectionHandler.lambda$grabCnx$0(ConnectionHandler.java:73)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
... 31 more
The flink taskmanager log
2021-05-13 19:11:07,170 DEBUG org.apache.pulsar.common.protocol.PulsarDecoder [] - [localhost/127.0.0.1:6650] Received cmd LOOKUP_RESPONSE
2021-05-13 19:11:07,171 DEBUG org.apache.pulsar.client.impl.ClientCnx [] - Received Broker lookup response: Connect
2021-05-13 19:11:07,172 INFO org.apache.pulsar.client.impl.ConsumerImpl [] - [persistent://aaa/bbb/ccc][reader-caf46c8cf0] Subscribing to topic on cnx [id: 0xb4519008, L:/127.0.0.1:57752 - R:localhost/127.0.0.1:6650], consumerId 0
2021-05-13 19:11:07,176 WARN org.apache.pulsar.client.impl.ConnectionHandler [] - [persistent://aaa/bbb/ccc] [reader-caf46c8cf0] Error connecting to broker: java.lang.NullPointerException
2021-05-13 19:11:07,179 WARN org.apache.pulsar.client.impl.ConnectionHandler [] - [persistent://aaa/bbb/ccc] [reader-caf46c8cf0] Could not get connection to broker: java.lang.NullPointerException -- Will try again in 0.1 s
Possible cause
In PulsarDeserializer
, getSchema
returns null
https://github.com/streamnative/pulsar-flink/blob/9094562681be84698bf8f159e0ea7c10b87048f7/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java#L515
which is causing ConsumerImpl.schema
to be null
and then this line errors out.
https://github.com/apache/pulsar/blob/1926dee082c70f4d5dcad54a94fdee57e197901a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L727
I got it working with PulsarDeserializationSchema.valueOnley(new AvroRowDeserializationSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8)))
.
Now I'm not sure what's the purpose of PulsarDeserializer
if what I was describing is not a bug.
@longtengz Please tell me the version of the connector you are using~
PulsarDeserializer
is a class that is about to be deprecated, and it is still used sparingly in the basic types of Pulsar. Please use PulsarDeserializationSchema.valueOnley(new AvroRowDeserializationSchema(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8)))
to decode the avro data.
Regarding the problem of org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
, it should be that the same class exists in flink_lib. You can keep any copy. This class is in the pulsar-client-all
package. Also exists in the fat connector jar.
@jianyun8023 I think this is a bug, we should drop the PulsarDeserializer if it's about to be deprecated.
@syhily @longtengz Well, PulsarDeserializer
is the class that should be dropped