pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] NullPointerException with dlogNamespace when running Pulsar Functions with etcd metadata store

Open abeliangroupie opened this issue 2 years ago • 5 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

OS version: Ubuntu 22.04 Pulsar version: 2.11.0 (using apachepulsar/pulsar-all:2.11.0 Docker images)

Minimal reproduce step

It's not the cleanest, but here's a Docker Compose I've been working with. Note that it spins up 3 etcd + 3 bookie + 1 broker container.

version: "3.9"

services:
  # etcd cluster.
  etcd1:
    image: quay.io/coreos/etcd:v3.5.7
    container_name: etcd1
    restart: always
    networks:
      - pulsar
    command: >
      /usr/local/bin/etcd
      --name node1
      --initial-advertise-peer-urls http://etcd1:2380
      --listen-peer-urls http://0.0.0.0:2380
      --advertise-client-urls http://etcd1:2379
      --listen-client-urls http://0.0.0.0:2379
      --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
      --initial-cluster-state new
      --initial-cluster-token initial-token

  etcd2:
    image: quay.io/coreos/etcd:v3.5.7
    container_name: etcd2
    restart: always
    networks:
      - pulsar
    command: >
      /usr/local/bin/etcd
      --name node2
      --initial-advertise-peer-urls http://etcd2:2380
      --listen-peer-urls http://0.0.0.0:2380
      --advertise-client-urls http://etcd2:2379
      --listen-client-urls http://0.0.0.0:2379
      --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
      --initial-cluster-state new
      --initial-cluster-token initial-token

  etcd3:
    image: quay.io/coreos/etcd:v3.5.7
    container_name: etcd3
    restart: always
    networks:
      - pulsar
    command: >
      /usr/local/bin/etcd
      --name node3
      --initial-advertise-peer-urls http://etcd3:2380
      --listen-peer-urls http://0.0.0.0:2380
      --advertise-client-urls http://etcd3:2379
      --listen-client-urls http://0.0.0.0:2379
      --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380"
      --initial-cluster-state new
      --initial-cluster-token initial-token

  # Container that only runs once to initialise metadata.
  bootstrap:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bootstrap
    depends_on:
      - etcd1
      - etcd2
      - etcd3
    networks:
      - pulsar
    command: >
      bin/pulsar initialize-cluster-metadata \
        --cluster my-pulsar \
        --metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \
        --configuration-metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \
        --web-service-url http://broker1:8080 \
        --broker-service-url pulsar://broker1:6650 \

  # BookKeeper cluster.
  bookie1:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bookie1
    restart: always
    depends_on:
      bootstrap:
        condition: service_completed_successfully
    networks:
      - pulsar
    volumes:
      - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
    command: bin/pulsar bookie

  bookie2:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bookie2
    restart: always
    depends_on:
      bootstrap:
        condition: service_completed_successfully
    networks:
      - pulsar
    volumes:
      - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
    command: bin/pulsar bookie

  bookie3:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: bookie3
    restart: always
    depends_on:
      bootstrap:
        condition: service_completed_successfully
    networks:
      - pulsar
    volumes:
      - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf
    command: bin/pulsar bookie

  # Broker.
  broker:
    image: apachepulsar/pulsar-all:2.11.0
    container_name: broker
    restart: always
    depends_on:
      - bookie1
      - bookie2
      - bookie3
    networks:
      - pulsar
    volumes:
      - ${PWD}/broker.conf:/pulsar/conf/broker.conf
      - ${PWD}/functions_worker.yml:/pulsar/conf/functions_worker.yml
    command: bin/pulsar broker

networks:
  pulsar:
    driver: bridge

Config changes are as follows:

bookkeeper.conf:

useHostNameAsBookieID=true
metadataServiceUri=metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379

broker.conf:

clusterName=my-pulsar
metadataStoreUrl=etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379
functionsWorkerEnabled=true

functions_worker.yml

pulsarFunctionsCluster: my-pulsar
configurationMetadataStoreUrl: metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379

To trigger the error, from inside the broker container:

  • Create a Pulsar function file test_function.py (or alternatively, touch test_function.py will suffice for triggering the error)
  • Run:
$ bin/pulsar-admin functions create \
    --py test_function.py \
    --classname test_function.ExamplePulsarFunction \
    --tenant public \
    --namespace default \
    --name test-function \
    --inputs persistent://public/default/test-input-topic \
    --output persistent://public/default/test-output-topic

What did you expect to see?

The Pulsar function successfully being added (or, with the test command using touch above, Pulsar complaining that test_function.ExamplePulsarFunction doesn't exist).

What did you see instead?

The error seen is:

Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

Reason: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null

Traceback from the broker image:

2023-02-28T05:49:23,816+0000 [pulsar-web-38-11] ERROR org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Failed process Function public/default/test-function package: 
java.lang.NullPointerException: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null
	at org.apache.pulsar.functions.worker.WorkerUtils.uploadToBookKeeper(WorkerUtils.java:90) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
	at org.apache.pulsar.functions.worker.WorkerUtils.uploadFileToBookkeeper(WorkerUtils.java:80) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
	at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.getFunctionPackageLocation(ComponentImpl.java:416) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
	at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:240) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
	at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:200) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:67) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[org.eclipse.jetty-jetty-servlets-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

Anything else?

I suspect the main issue is that various parts of dlog implementation are still ZooKeeper-specific, for example:

  • initInBroker() in PulsarWorkerService only sets dlogURI if brokerConfig.isMetadataStoreBackedByZookeeper() is true
  • initializeCluster() in PulsarClusterMetadataSetup (for bin/pulsar-admin initialize-cluster-metadata) only initializes the dlog namespace if if (localStore instanceof ZKMetadataStore && configStore instanceof ZKMetadataStore) { is true. This one means that setting initializedDlogMetadata: true in functions_worker.yml doesn't help, since the metadata's not actually set up.

This all means that by the time uploadToBookKeeper() is called in WorkerUtils, dlogNamespace is null and dlogNamespace.logExists() fails.

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

abeliangroupie avatar Feb 28 '23 06:02 abeliangroupie

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Mar 31 '23 01:03 github-actions[bot]

@abeliangroupie I encountered the same problem, is there any update from your side?

packageman avatar Oct 10 '23 08:10 packageman

@abeliangroupie I encountered the same problem, is there any update from your side?

I ended up giving up on etcd and went back to ZooKeeper for now, unfortunately

abeliangroupie avatar Oct 19 '23 03:10 abeliangroupie

@packageman What's your pulsar verison when problem here, same with this issue of pulsar 2.11? Have any test more high pulsar version?

liangyuanpeng avatar Jan 13 '24 08:01 liangyuanpeng

What's your pulsar verison when problem here。

@liangyuanpeng My pulsar version is 3.1.0. Yes, same with this issue of pulsar 2.11。

Currently, the latest version is 3.1.2, I haven't tested on this version。

packageman avatar Jan 31 '24 01:01 packageman