[Bug] NullPointerException with dlogNamespace when running Pulsar Functions with etcd metadata store
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
OS version: Ubuntu 22.04
Pulsar version: 2.11.0 (using apachepulsar/pulsar-all:2.11.0 Docker images)
Minimal reproduce step
It's not the cleanest, but here's a Docker Compose I've been working with. Note that it spins up 3 etcd + 3 bookie + 1 broker container.
version: "3.9"
services:
# etcd cluster.
etcd1:
image: quay.io/coreos/etcd:v3.5.7
container_name: etcd1
restart: always
networks:
- pulsar
command: >
/usr/local/bin/etcd
--name node1
--initial-advertise-peer-urls http://etcd1:2380
--listen-peer-urls http://0.0.0.0:2380
--advertise-client-urls http://etcd1:2379
--listen-client-urls http://0.0.0.0:2379
--initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
--initial-cluster-state new
--initial-cluster-token initial-token
etcd2:
image: quay.io/coreos/etcd:v3.5.7
container_name: etcd2
restart: always
networks:
- pulsar
command: >
/usr/local/bin/etcd
--name node2
--initial-advertise-peer-urls http://etcd2:2380
--listen-peer-urls http://0.0.0.0:2380
--advertise-client-urls http://etcd2:2379
--listen-client-urls http://0.0.0.0:2379
--initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
--initial-cluster-state new
--initial-cluster-token initial-token
etcd3:
image: quay.io/coreos/etcd:v3.5.7
container_name: etcd3
restart: always
networks:
- pulsar
command: >
/usr/local/bin/etcd
--name node3
--initial-advertise-peer-urls http://etcd3:2380
--listen-peer-urls http://0.0.0.0:2380
--advertise-client-urls http://etcd3:2379
--listen-client-urls http://0.0.0.0:2379
--initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
--initial-cluster-state new
--initial-cluster-token initial-token
# Container that only runs once to initialise metadata.
bootstrap:
image: apachepulsar/pulsar-all:2.11.0
container_name: bootstrap
depends_on:
- etcd1
- etcd2
- etcd3
networks:
- pulsar
command: >
bin/pulsar initialize-cluster-metadata \
--cluster my-pulsar \
--metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \
--configuration-metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \
--web-service-url http://broker1:8080 \
--broker-service-url pulsar://broker1:6650 \
# BookKeeper cluster.
bookie1:
image: apachepulsar/pulsar-all:2.11.0
container_name: bookie1
restart: always
depends_on:
bootstrap:
condition: service_completed_successfully
networks:
- pulsar
volumes:
- ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
command: bin/pulsar bookie
bookie2:
image: apachepulsar/pulsar-all:2.11.0
container_name: bookie2
restart: always
depends_on:
bootstrap:
condition: service_completed_successfully
networks:
- pulsar
volumes:
- ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
command: bin/pulsar bookie
bookie3:
image: apachepulsar/pulsar-all:2.11.0
container_name: bookie3
restart: always
depends_on:
bootstrap:
condition: service_completed_successfully
networks:
- pulsar
volumes:
- ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
command: bin/pulsar bookie
# Broker.
broker:
image: apachepulsar/pulsar-all:2.11.0
container_name: broker
restart: always
depends_on:
- bookie1
- bookie2
- bookie3
networks:
- pulsar
volumes:
- ${PWD}/broker.conf:/pulsar/conf/broker.conf
- ${PWD}/functions_worker.yml:/pulsar/conf/functions_worker.yml
command: bin/pulsar broker
networks:
pulsar:
driver: bridge
Config changes are as follows:
bookkeeper.conf:
useHostNameAsBookieID=true
metadataServiceUri=metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379
broker.conf:
clusterName=my-pulsar
metadataStoreUrl=etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379
functionsWorkerEnabled=true
functions_worker.yml
pulsarFunctionsCluster: my-pulsar
configurationMetadataStoreUrl: metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379
To trigger the error, from inside the broker container:
- Create a Pulsar function file
test_function.py(or alternatively,touch test_function.pywill suffice for triggering the error) - Run:
$ bin/pulsar-admin functions create \
--py test_function.py \
--classname test_function.ExamplePulsarFunction \
--tenant public \
--namespace default \
--name test-function \
--inputs persistent://public/default/test-input-topic \
--output persistent://public/default/test-output-topic
What did you expect to see?
The Pulsar function successfully being added (or, with the test command using touch above, Pulsar complaining that test_function.ExamplePulsarFunction doesn't exist).
What did you see instead?
The error seen is:
Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null
Reason: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null
Traceback from the broker image:
2023-02-28T05:49:23,816+0000 [pulsar-web-38-11] ERROR org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Failed process Function public/default/test-function package:
java.lang.NullPointerException: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null
at org.apache.pulsar.functions.worker.WorkerUtils.uploadToBookKeeper(WorkerUtils.java:90) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at org.apache.pulsar.functions.worker.WorkerUtils.uploadFileToBookkeeper(WorkerUtils.java:80) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.getFunctionPackageLocation(ComponentImpl.java:416) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:240) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:200) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:67) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[org.eclipse.jetty-jetty-servlets-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Anything else?
I suspect the main issue is that various parts of dlog implementation are still ZooKeeper-specific, for example:
initInBroker()inPulsarWorkerServiceonly setsdlogURIifbrokerConfig.isMetadataStoreBackedByZookeeper()is trueinitializeCluster()inPulsarClusterMetadataSetup(forbin/pulsar-admin initialize-cluster-metadata) only initializes the dlog namespace ifif (localStore instanceof ZKMetadataStore && configStore instanceof ZKMetadataStore) {is true. This one means that settinginitializedDlogMetadata: trueinfunctions_worker.ymldoesn't help, since the metadata's not actually set up.
This all means that by the time uploadToBookKeeper() is called in WorkerUtils, dlogNamespace is null and dlogNamespace.logExists() fails.
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
The issue had no activity for 30 days, mark with Stale label.
@abeliangroupie I encountered the same problem, is there any update from your side?
@abeliangroupie I encountered the same problem, is there any update from your side?
I ended up giving up on etcd and went back to ZooKeeper for now, unfortunately
@packageman What's your pulsar verison when problem here, same with this issue of pulsar 2.11? Have any test more high pulsar version?
What's your pulsar verison when problem here。
@liangyuanpeng My pulsar version is 3.1.0. Yes, same with this issue of pulsar 2.11。
Currently, the latest version is 3.1.2, I haven't tested on this version。