pulsar-client-python
pulsar-client-python copied to clipboard
Python client does not handle default schema values
Describe the bug When trying to update a schema on a topic ( having as a compatibility check strategy set to BACKWARD_TRANSITIVE or BACKWARD in my case ) pulsar returns the error "IncompatibleSchema" even if all the guidelines have been followed.
To Reproduce Steps to reproduce the behavior:
- Create the producer:
from pulsar.schema import *
class Example(Record):
a = String()
b = Integer()
c = Integer()
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
topic='tenant/namespace/my-topic',
schema=JsonSchema(Example) )
producer.send(Example(a='Hello', b=1))
- Create the consumer:
from pulsar.schema import *
class Example(Record):
a = String()
b = Integer()
c = Integer()
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
topic='tenant/namespace/my-topic',
subscription_name='my-subscription',
schema=JsonSchema(Example))
while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
-
Run consumer first and then producer, the schema should be created on the topic correctly and the consumer reads the message correctly
-
Stop the consumer
-
Make a change to the consumer, add a float field to the Example class but with a default value:
a = String()
b = Integer()
c = Integer()
d = Float(default=1.0)
- Run the consumer again, it throws the error IncompatibleSchema
Expected behavior Since the schema compatibility check is set to BACKWARD_TRANSITIVE or BACKWARD, the policy is to upgrade consumer first and an optional property has been added to the schema, I expect the schema to be registered correctly.
Screenshots If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
- OS: MacOS Mojave
Additional context Using pulsar docker container v2.6.0, pulsar-client v2.6.0 for Python
Looking at the logs inside the container I noticed that when the schema gets sent to pulsar from the python client it does not include the default value of the fields that have it, here's an example:
Error during schema compatibility check: Unable to read schema:
{
"type" : "record",
"name" : "Example",
"fields" : [ {
"name" : "a",
"type" : [ "null", "string" ]
}, {
"name" : "b",
"type" : [ "null", "int" ]
}, {
"name" : "c",
"type" : [ "null", "boolean" ]
}]
}
using schema:
{
"type" : "record",
"name" : "Example",
"fields" : [ {
"name" : "a",
"type" : [ "null", "string" ]
}, {
"name" : "b",
"type" : [ "null", "int" ]
}, {
"name" : "c",
"type" : [ "null", "boolean" ]
}, {
"name" : "d",
"type" : [ "null", "float" ]
}]
}
As you can see the second schema is missing the "default": "1.0" key-value pair for the property "d". This makes the new schema incompatible with the old one.
I dug I little deeper in the python client's code and I noticed that in the class Record ( which is extended from my Example class ), and specifically in the schema() method, the definition of the default property on the schema is completely missing.
@classmethod
def schema(cls):
schema = {
'name': str(cls.__name__),
'type': 'record',
'fields': []
}
for name in sorted(cls._fields.keys()):
field = cls._fields[name]
field_type = field.schema() if field._required else ['null', field.schema()]
schema['fields'].append({
'name': name,
'type': field_type
})
return schema
I think that here, this piece of code
schema['fields'].append({
'name': name,
'type': field_type
})
Should also handle the default value of the field itself e.g.:
schema['fields'].append({
'name': name,
'type': field_type,
'default': field.default()
})
@giacomo-porro Looks you have found the root cause, are you interested in push a PR to fix it?
@codelipenghui Sure, as soon as I will be back from vacation :)
@codelipenghui @giacomo-porro I ran into the same issue today. Could anyone do a PR? Or is this issue fixed somewhere else and i made a different mistake?
@cuzyoucant Hi, sorry I didn't really have the chance to work on it since I came back from vacation...for now I am living with the workaround I implemented this summer, basically I created a class that inherits from the Record class and I have overwritten the schema method with the correct logic inside
@cuzyoucant Hi, sorry I didn't really have the chance to work on it since I came back from vacation...for now I am living with the workaround I implemented this summer, basically I created a class that inherits from the Record class and I have overwritten the schema method with the correct logic inside
I've been trying to override this classmethod myself, as you did, but not sure what it's supposed to look like.
I managed to override the schema classmethod correctly (I think):
import pulsar
from pulsar.schema import *
class Example(Record):
@classmethod
def schema(cls):
schema = super().schema()
for i, field_default in enumerate([cls._fields[field['name']].default() for field in schema['fields']]):
schema['fields'][i]['default'] = field_default
return schema
a = String()
b = Integer()
c = Boolean()
d = String()
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
topic='my-topic3',
schema=JsonSchema(Example) )
for i in range(10):
producer.send(Example(a='Hello!', b=1, c=True, d='new field'))
client.close()
However it doesn't seem to work, after inserting a new field and running again I receive the error:
Traceback (most recent call last):
File "producer.py", line 28, in <module>
schema=JsonSchema(Example) )
File "/home/joel/miniconda3/envs/pulsar/lib/python3.7/site-packages/pulsar/__init__.py", line 531, in create_producer
p._producer = self._client.create_producer(topic, conf)
Exception: Pulsar error: UnknownError
Further on my previous comment, it DOES work... eventually. Checking the docker standalone log messages upon sending updated schema the first time:
12:49:55.948 [pulsar-io-51-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /172.26.0.1:38094 12:49:55.954 [pulsar-io-51-3] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.26.0.1:38094][persistent://public/default/my-topic4] Creating producer. producerId=0 12:49:55.958 [BookieHighPriorityThread-3181-OrderedExecutor-6-0] WARN org.apache.bookkeeper.proto.ReadEntryProcessor - Ledger: 3470 fenced by: /127.0.0.1:55506 12:49:55.962 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.bookkeeper.client.ReadOnlyLedgerHandle - Closing recovered ledger 3470 at entry 0 12:49:55.972 [pulsar-io-51-3] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.26.0.1:38094
Later, other messages that seem to show the new schema has been applied:
12:51:15.776 [pulsar-inactivity-monitor-53-1] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic4] Global topic inactive for 60 seconds, closed repl producers 12:51:15.778 [ForkJoinPool.commonPool-worker-1] INFO org.apache.pulsar.broker.service.AbstractTopic - Delete schema storage of id: public/default/my-topic4 12:51:15.780 [main-EventThread] WARN org.apache.bookkeeper.meta.AbstractZkLedgerManager - Ledger node does not exist in ZooKeeper: ledgerId=2327. Returning success. 12:51:15.781 [main-EventThread] WARN org.apache.bookkeeper.meta.AbstractZkLedgerManager - Ledger node does not exist in ZooKeeper: ledgerId=3436. Returning success. 12:51:15.781 [main-EventThread] WARN org.apache.bookkeeper.meta.AbstractZkLedgerManager - Ledger node does not exist in ZooKeeper: ledgerId=3438. Returning success. 12:51:15.781 [main-EventThread] WARN org.apache.bookkeeper.meta.AbstractZkLedgerManager - Ledger node does not exist in ZooKeeper: ledgerId=3440. Returning success. 12:51:15.781 [main-EventThread] WARN org.apache.bookkeeper.meta.AbstractZkLedgerManager - Ledger node does not exist in ZooKeeper: ledgerId=3442. Returning success. 12:51:15.781 [main-EventThread] WARN org.apache.bookkeeper.meta.AbstractZkLedgerManager - Ledger node does not exist in ZooKeeper: ledgerId=3468. Returning success. 12:51:15.783 [pulsar-ordered-OrderedExecutor-6-0-EventThread] INFO org.apache.pulsar.zookeeper.ZooKeeperCache - [State:CONNECTED Timeout:30000 sessionid:0x1004461a54b0009 local:/127.0.0.1:58388 remoteserver:localhost/127.0.0.1:2181 lastZxid:41837 xid:52259 sent:52259 recv:52296 queuedpkts:0 pendingresp:0 queuedevents:1] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDeleted path:/schemas/public/default/my-topic4 12:51:15.784 [BookKeeperClientWorker-OrderedExecutor-1-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/my-topic4] Remove ManagedLedger 12:51:15.785 [pulsar-ordered-OrderedExecutor-6-0-EventThread] INFO org.apache.pulsar.zookeeper.ZooKeeperCache - [State:CONNECTED Timeout:30000 sessionid:0x1004461a54b0009 local:/127.0.0.1:58388 remoteserver:localhost/127.0.0.1:2181 lastZxid:41839 xid:52261 sent:52261 recv:52299 queuedpkts:0 pendingresp:0 queuedevents:1] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDeleted path:/managed-ledgers/public/default/persistent/my-topic4 12:51:15.785 [bookkeeper-ml-workers-OrderedExecutor-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/my-topic4] Successfully deleted managed ledger 12:51:15.785 [bookkeeper-ml-workers-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic4] Topic deleted 12:51:15.785 [bookkeeper-ml-workers-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic4] Topic deleted successfully due to inactivity 12:51:23.115 [pulsar-web-69-6] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [26/Nov/2020:12:51:23 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false HTTP/1.1" 200 1278 "-" "Pulsar-Java-v2.6.2" 7
The next producer message sent after these log messages is accepted without error.
Edit: it's not working at all now, seems to be random
I've fiddled around some more, testing a Python and Java producer side-by-side, each one iterating the schema incrementally. It seems that to make a Python client produce schema that is compatible with that produced by the Java client, the following modification would be necessary:
class Example(Record):
@classmethod
def schema(cls):
schema = super(Example, cls).schema()
for i, x in enumerate([cls._fields[x['name']].default() for x in schema['fields']]):
if schema['fields'][i]['type'][1] == "string":
schema['fields'][i]['default'] = None
if schema['fields'][i]['type'][1] == "int":
schema['fields'][i]['type'] = 'int'
else:
pass
return schema
For example in the generated JSON schema, Python sets an empty string as the default for String whereas Java sets it to Null; Python sets an Integer type as [ "null", "int" ] whereas Java sets it as "int". I'm sure there are other data types to consider. I'm now out of my depth here so will have to defer to others.
Is there any work on this still? I've recently discovered as well that the Python client doesn't correctly include the default values, at least when creating the JSON representation of it. I'm not sure of what technical issues there might also be with it.