pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Geo-replication doesn't create a topic and replicate messages before the topic exists in the remote cluster

Open aweiri1 opened this issue 2 months ago • 7 comments

Search before reporting

  • [x] I searched in the issues and found nothing similar.

Read release policy

  • [x] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

pulsar version: 4.04 helm chart version: 4.0.1 running two kubernetes pulsar clusters: openshift cluster and talos cluster

Linux pulsar-talos-toolset-0 6.12.25-talos #1 SMP Mon Apr 28 10:05:42 UTC 2025 x86_64 GNU/Linux Linux pulsar-okd1-toolset-0 6.3.12-200.fc38.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Jul 6 04:05:18 UTC 2023 x86_64 GNU/Linux

using python client for test producer/consumer Python 3.10.12

Issue Description

I am trying to configure geo-replication for a two pulsar cluster set up.

I have an okd1 pulsar cluster running via an openshift kubernetes cluster, I have a talos pulsar cluster running via talos kubernetes cluster, and I have a global config/metadata store running as a zookeeper only pulsar cluster via talos kubernetes cluster.

each cluster uses a proxy service that has an external load balancer IP that I am using for configuration.

I have a two cluster geo-replication set up deployed via kubernetes. when running a producer on cluster A for the first time (using auto topic creation - did not manually create it) my producer successfully runs and sends the message on the cluster A service url. before starting the consumer on cluster B on that same topic, I wanted to check the topic stats on cluster B, but it says the topic doesn't exist. Once I start the consumer, the consumer on the cluster B service url, it receives the messages successfully. But I thought topics were supposed to be replicated across clusters? I have waited over 30 minutes to see if the topic shows up on cluster B, to eliminate any timing issues, but it still never showed up until that consumer is started on cluster B.

when I connect the cluster B consumer, it receives all of the messages from cluster A producer. until I start that cluster B consumer, the topic does not exist on cluster B and none of the messages exist on cluster B.

this is only when the topic is first auto created from cluster A producer. Once that consumer runs for the first time on cluster B (which creates the topic for cluster B) I don't hit this issue on this topic again, since its already been created. From that point, the consumer on cluster B does not need to be running for me to see messages sitting in the backlog.

Error messages

an error we get in the cluster A broker logs on immediate producer send is:

 2025-09-09T22:20:32,547+0000 [broker-client-shared-scheduled-executor-7-1] WARN  org.apache.pulsar.client.impl.PulsarClientImpl - [topic: persistent://geo-replication-2/testing/__change_events] Could not get connection while getPartitionedTopicMetadata -- Will try again in 754 ms                                                                                                                                                                                                            │
│ pulsar-talos-broker 2025-09-09T22:20:32,551+0000 [pulsar-io-3-15] ERROR org.apache.pulsar.client.impl.ClientCnx - [id: 0x714b44aa, L:/ - R:] Close connection because received internal-server error {"errorMsg":"","reqId":1946041700241505712, "remote":"pulsar-okd1-broker.pulsar.svc.cluster.local/, "local":"/"}                                                                                                                │
│ pulsar-talos-broker 2025-09-09T22:20:32,552+0000 [pulsar-io-3-15] WARN  org.apache.pulsar.client.impl.BinaryProtoLookupService - [persistent://geo-replication-2/testing/__change_events] failed to get Partitioned metadata : {"errorMsg":"{"errorMsg":"","reqId":1946041700241505712, "remote":"pulsar-okd1-broker.pulsar.svc.cluster.local/", "local":"/"}","reqId":1229027975051488438, "remote":"", "local":"/"}                 │
│ pulsar-talos-broker org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"{"errorMsg":"","reqId":1946041700241505712, "remote":"pulsar-okd1-broker.pulsar.svc.cluster.local/1"local":"/"}","reqId":1229027975051488438, "remote":"", "local":"/"} 
                                                                                                                                                      
I assumed this could also be a timing issue because it immediately tries to find the topic on cluster B (okd1) and it does not exist.

topic stats on cluster A show the following in the replication field: 

"replication" : {
    "pulsar-okd1" : {
      "msgRateIn" : 0.0,
      "msgInCount" : 0,
      "msgThroughputIn" : 0.0,
      "bytesInCount" : 0,
      "msgRateOut" : 0.0,
      "msgOutCount" : 0,
      "msgThroughputOut" : 0.0,
      "bytesOutCount" : 0,
      "msgRateExpired" : 0.0,
      "replicationBacklog" : 100,
      "connected" : false,
      "replicationDelayInSeconds" : 0,
      "msgExpiredCount" : 0
    }

I did enable debug on both clusters. there are no create topic logs, but the debug logs on okd1 show the metadata lookup for the topic created on talos cluster:


2025-10-02T23:16:33,265+0000 [pulsar-io-3-5] DEBUG org.apache.pulsar.broker.service.BrokerService - No autoTopicCreateOverride policy found for persistent://geo-replication/testing/test
2025-10-02T23:16:33,471+0000 [pulsar-io-3-8] DEBUG org.apache.pulsar.broker.service.ServerCnx - [persistent://geo-replication/testing/test] Received PartitionMetadataLookup from /10.128.2.45:43198 for 770175375804561621
2025-10-02T23:16:33,471+0000 [pulsar-io-3-8] DEBUG

Reproducing the issue

using 3 kubernetes clusters. 1 of them is the zookeeper only cluster which is the global metadata store for the okd1 and talos clusters. The other two are full pulsar clusters which use a proxy. this is via kubernetes proxy service that uses a load balancer which has an external IP. that external IP is what I use as my pulsar service url On the talos cluster I run the following to enable geo-replication:

bin/pulsar-admin tenants create geo-replication --allowed-clusters pulsar-okd1,pulsar-talos

bin/pulsar-admin namespaces create geo-replication/testing

bin/pulsar-admin namespaces set-clusters geo-replication/testing --clusters pulsar-talos,pulsar-okd1

since we're using the global config store, the cluster and tenant already exists on okd1 cluster. all I did was set the clusters to the tenant/ns

I did pass in the right service url for okd1 by doing a clusters update on the okd1 cluster and updated the urls to use the load balancer IP address. Then I restarted the brokers. (did this for both clusters). I am not using any authentication credentials for either of the clusters. I do have permissions to create a topic on that namespace. verified by just doing a topic create command on the ns.

Additional information

After more discussion with David K in slack channel, he concluded:

It sounds like the metadata on cluster B doesn’t get updated until a consumer attaches to the replicated topic even though the underlying topic data is there. This behavior is wrong, and the topic should exist in the target cluster’s metadata.

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

aweiri1 avatar Oct 03 '25 16:10 aweiri1

@aweiri1 Do you use the default configuration of createTopicToRemoteClusterForReplication=true in broker.conf? Does the Pulsar Admin connectivity work for the target remote cluster from the source cluster? Is the remote cluster properly created in the source cluster with the correct service url (--url / --url-secure)? You could check with pulsar-admin clusters get [cluster].

lhotari avatar Oct 06 '25 19:10 lhotari

@lhotari

  1. Yes, I am using the default configuration and it is set to true.
  2. the admin connectivity works both ways by running the below command on both clusters and getting the same output:
  • url -s http://<load-balancer-ip>:8080/admin/v2/clusters ["pulsar-okd1","pulsar-talos"]
  1. yes, when spinning up the clusters via helm charts, I do a clusters update command to update the clusters to use the load balancer external IP address (rather than the kubernetes dns address).:
  • bin/pulsar-admin clusters update pulsar-talos \ --broker-url pulsar://load-balancer-ip:6650 \ --url http://<load-balancer-ip:8080 \

I do this for both pulsar-talos and pulsar-okd1 clusters. After I spin up both clusters via the helm charts, and do a clusters list (on either cluster) both clusters already exists there (pulsar-talos and pulsar-okd1). This is because of the shared global config store, so the clusters already know about each other and exist on each other when doing the clusters list command. So I never have to do a clusters create just a clusters update. I restart the brokers after the update change.

aweiri1 avatar Oct 07 '25 01:10 aweiri1

pulsar version: 4.04

@aweiri1 Does the problem reproduce with Pulsar 4.0.7? IIRC, several fixes have been made between 4.0.4 and 4.0.7 to geo-replication and topic creation.

Diff between 4.0.4 and 4.0.7:

  • https://github.com/apache/pulsar/compare/v4.0.4...v4.0.7

Release notes:

  • https://pulsar.apache.org/release-notes/versioned/pulsar-4.0.7/
  • https://pulsar.apache.org/release-notes/versioned/pulsar-4.0.6/
  • https://pulsar.apache.org/release-notes/versioned/pulsar-4.0.5/

lhotari avatar Oct 07 '25 09:10 lhotari

@lhotari we have not yet moved to upgrading our kubernetes pulsar clusters, but plan to soon. In the meantime, I am doing more testing on the issue.

I decided to try a bare-metal deployment on my VMs to see if I came across the same topic replication issue. This was a very simple one-node pulsar v4.0.4 cluster deployment (x2) with a global config store (3 VMs total). There is no proxy or load balancer being used, and I did not enable functions.

When configuring and testing geo-replication on the bare-metal set up, I started a producer on cluster 1 which auto created the topic on cluster 1. It began sending consistent messages, and after only sending ~5 messages (without starting any consumers) the topic existed on cluster 2 and I was able to view the topic stats.

This tells me it could be a kubernetes deployment issue rather than a version issue, but I'm not sure what in the K8s cluster would cause this error.. Wondering if you have any feedback?

aweiri1 avatar Oct 21 '25 21:10 aweiri1

@lhotari I am still working on resolving this issue, and I have documented the following findings:

I believe I may have made progress on narrowing down the issue, which is cross communication across clusters for replicators.

As we know, geo-replication is semi-working:

issue re-cap:

  • 2 pulsar kubernetes cluster set-up (okd1 and talos)
  • updated cluster urls to load balancer IPs (manually) and restarted brokers with success
  • can use the cluster urls successfully with producers and consumers
  • enabled geo-replication on replication tenant/ns
  • produced 100 messages on talos cluster with no consumers running on either cluster
  • immediately triggers connection/internal-server error in talos broker logs
  • topic never exists on okd1 cluster
  • started consumer on okd1 cluster and it receives the 100 messages

result: geo-replication semi-working with weird topic replication issue. The topic never replicates, even though on consumer start, the messages replicate.

theory: by default and behind the scenes for geo-replication, the talos replicator communicates with the okd1 broker NOT the LB/Proxy. Hence why the error log shows the connection error happening with the internal okd1 broker dns name. But we can’t do cross-cluster-communication with brokers because they do not expose an external endpoint, only internal. It is trying to connect over the internal pod network of okd, which won’t work. What we have been using is a proxy service in kubernetes that sits behind a loadbalancer which has an external IP, and this is the only external IP we are using in our kubernetes cluster set up. I am trying to look into pulsar configurations that would resolve this, but I am not sure if it is because of our kubernetes set up.

configurations in broker.conf that may help resolve the issue:

  • advertisedAddress - I changed advertisedAddress to our LB IP in broker.configData in our helm chart, and it breaks the deployment, the pods aren't able to come up.
  • advertisedListeners
  • internalListenerName
  • bindAddress
  • createTopicToRemoteClusterForReplication - this is set to true for our brokers, so this shouldn't be the issue.

Any feedback you may have is appreciated!

aweiri1 avatar Nov 20 '25 21:11 aweiri1

But we can’t do cross-cluster-communication with brokers because they do not expose an external endpoint, only internal.

You didn't mention which method you use for the Kubernetes deployment. In Apache Pulsar Helm chart deployments, there's some advice and important security notice: https://github.com/apache/pulsar-helm-chart?tab=readme-ov-file#external-access-recommendations . In Apache Pulsar Helm chart, the external access is via Pulsar Proxy and it requires changing the default configuration. There's also comments and security notice in the values.yaml file: https://github.com/apache/pulsar-helm-chart/blob/c341e7d65d8d351adc1f866acc2bf67c51a96b5c/charts/pulsar/values.yaml#L1448-L1468 .

The advertisedListeners, bindAddresses configuration doesn't have good examples at the moment. There's some docs, but it's not complete. The main use case in Kubernetes would be to enable external access using NodePorts. There would have to be a NodePort service for each broker pod and the pod would have to dynamically configure advertisedListeners when the pod starts up on a specific node so that routing would happen directly to the node where the pod is running. Regarding the advertisedListeners/bindAddresses config, I used the minimal config in an integration test where Kubernetes is running in a k3s docker container: https://github.com/apache/pulsar/blob/807dcaf5d928f8202c1bf8b8402cfcf72a41e63d/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java#L355-L357 For a real cluster, it would be different since the advertisedListeners value would have to be set dynamically.

lhotari avatar Nov 21 '25 08:11 lhotari

Before enabling geo-replication, you should validate the connection between the clusters, so that you can produce messages into a topic from the source cluster to the target cluster. You can use the command line tool PULSAR_CLIENT_CONF=$PWD/myconfig.conf /pulsar/bin/pulsar-client produce to perform this test, where you'd put the client's configuration in the myconfig.conf file. You can copy the /pulsar/conf/client.conf to myconfig.conf and then modify the values. In the Apache Pulsar Helm chart, there isn't a great solution from security perspective for authorization and authentication across the clusters. If you use the JWT token auth, the private key would have to be shared. This is not great since rotating the keys is very hard without significant service impact. The more recommended approach is to use OIDC authentication and there's support in the Helm chart, but there isn't much docs about how to configure it with multiple clusters.

lhotari avatar Nov 21 '25 08:11 lhotari