[fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic
Fixes #17354
Motivation
Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as IncompatibleSchemaException("Topic does not have schema to check").
Modifications
In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe to the topic. Continuing to create a schema consumer to subscribe to the topic will fail.
- When
numActiveConsumers != 0, and check the schema of the currently existing consumers is AUTO_CONSUME schema.
Verifying this change
- [x] Make sure that the change passes the CI checks.
Documentation
Check the box below or label this PR directly.
Need to update docs?
-
[ ]
doc-required(Your PR needs to update docs and you will update later) -
[x]
doc-not-needed(Please explain why) -
[ ]
doc(Your PR contains doc changes) -
[ ]
doc-complete(Docs have been already added)
Hi @Denovo1998 , we'd better add test for this.
Already done @Technoboy-
/pulsarbot rerun-failure-checks
@congbobo184 @mattisonchao PTAL
@codelipenghui @gaoran10 @liangyepianzhou PTAL
What if the producer writes the data but hasn't uploaded the schema yet? https://github.com/apache/pulsar/pull/9853#issuecomment-794646589 #2669
When a consumer with Schema.AUTO_CONSUME subscribes to the topic.
The schema of CommandSubscribe is null, so the schema.getType() == SchemaType.AUTO_CONSUME always be false.
The problem is not caused by the check of the AUTO_CONSUME.
It is that there are some activeConsumer with AUTO_CONSUME.
And then the new consumer connecting with the new other schemas will fail at addSchemaIfIdleOrCheckCompatible with the exception IncompatibleSchemaException("Topic does not have schema to check").
IMO, The check is confusing. If there is no hasSchema, then we should addSchema without other conditions.
https://github.com/apache/pulsar/blob/fe1963988fc6883f52826069a781b91aba0405bf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3037
if (hasSchema || (!producers.isEmpty()) || (numActiveConsumers != 0) || (ledger.getTotalSize() != 0)) { return checkSchemaCompatibleForConsumer(schema); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); }
Why did not we change it to
if (hasSchema) { return checkSchemaCompatibleForConsumer(schema); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); }
And store the schema of the byte[]
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) { // don't set schema for Schema.BYTES si = null; }
change it to
if (si != null && SchemaType.NONE == si.getType()) { // don't set schema for Schema.BYTES si = null; }
Codecov Report
Merging #17449 (ba186d7) into master (69fb3c2) will increase coverage by
36.98%. The diff coverage is86.66%.
@@ Coverage Diff @@
## master #17449 +/- ##
=============================================
+ Coverage 25.24% 62.22% +36.98%
- Complexity 205 3497 +3292
=============================================
Files 1668 1844 +176
Lines 126201 135563 +9362
Branches 13776 14923 +1147
=============================================
+ Hits 31858 84355 +52497
+ Misses 89386 43468 -45918
- Partials 4957 7740 +2783
| Flag | Coverage Δ | |
|---|---|---|
| inttests | 24.53% <48.88%> (?) |
|
| systests | 25.31% <66.66%> (+0.06%) |
:arrow_up: |
| unittests | 59.48% <86.66%> (?) |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...va/org/apache/pulsar/client/impl/ConsumerImpl.java | 70.22% <66.66%> (+41.94%) |
:arrow_up: |
| ...va/org/apache/pulsar/common/protocol/Commands.java | 80.00% <71.42%> (+33.13%) |
:arrow_up: |
| ...ava/org/apache/pulsar/broker/service/Consumer.java | 76.01% <83.33%> (+31.70%) |
:arrow_up: |
| ...sar/broker/service/persistent/PersistentTopic.java | 64.06% <85.71%> (+21.29%) |
:arrow_up: |
| ...va/org/apache/pulsar/broker/service/ServerCnx.java | 55.01% <100.00%> (+24.78%) |
:arrow_up: |
| ...ache/pulsar/broker/service/SubscriptionOption.java | 86.36% <100.00%> (+0.64%) |
:arrow_up: |
| ...oker/service/nonpersistent/NonPersistentTopic.java | 62.96% <100.00%> (+51.13%) |
:arrow_up: |
| ...e/pulsar/client/impl/schema/AutoConsumeSchema.java | 72.95% <100.00%> (+11.26%) |
:arrow_up: |
| ...lsar/functions/runtime/process/ProcessRuntime.java | 47.44% <0.00%> (-2.05%) |
:arrow_down: |
| ...in/java/org/apache/pulsar/common/api/AuthData.java | 71.42% <0.00%> (ø) |
|
| ... and 1302 more |
https://github.com/apache/pulsar/blob/adae4ae2b06304eae9ec001357e0bd6ee6ffb053/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3030-L3046 I think this method only need to check whether have schema, if doesn't hava schema then add the schema, if have shema then check the compatible. so the code like this
if (hasSchema) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
active producer or active consumer can't affect this method. if not have any active producer or active consumer, the schema also can add success. users will still use it incorrectly. It only blocks some, but not all, which probably doesn't make sense. and current code cannot be guaranteed to be correct in the case of concurrency. or keep current logic or think of a better solution and make it easy @codelipenghui @liangyepianzhou
The problem is not caused by the check of the AUTO_CONSUME. It is that there are some activeConsumer with AUTO_CONSUME.
Oh, got it.
IMO, The check is confusing. If there is no hasSchema, then we should addSchema without other conditions.
No, if we have active producers or consumers. The new schema might break the existing producer and consumers because they are using the byte[] schema in this case. The main reason is the client side will not upload the byte[] schema.
I think the current fix will break the existing behavior (If there are active consumers with byte[] schema, the new schema should not be added to the schema registry)
The client side will not upload the AUTO_CONSUME. That should be a challenge for us to know if all the active consumers are using the AUTO_CONSUME schema or byte[] schema.
No, if we have active producers or consumers. The new schema might break the existing producer and consumers because they are using the byte[] schema in this case. The main reason is the client side will not upload the byte[] schema.
@codelipenghui Yes, I have taken this into consideration as well and have given a suggestion:
If there is a byte[] schema for a topic, then we do not hope it has another schema.
That is to say, byte[] schema is a valid schema that is exclusive. So we need to store it and mark this topic as hasSchema.
And store the schema of the byte[]
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) { // don't set schema for Schema.BYTES si = null; }
change it to
if (si != null && SchemaType.NONE == si.getType()) { // don't set schema for Schema.BYTES si = null; }
@liangyepianzhou
I think the code snippet is from the client-side right? Here is a related discussion https://lists.apache.org/thread/3js51tq2p3c3oldfrhprn4kcohx7h1wv
I think we should not check the schema compatibility for AUTO_CONSUME and AUTO_PUBLISH schema
https://github.com/apache/pulsar/blob/fe1963988fc6883f52826069a781b91aba0405bf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1135
I think we should change to
if (schema != null && schema.getType() != SchemaType.AUTO_PUBLISH && schema.getType() != SchemaType.AUTO_CONSUME) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe(option)); } else { return topic.subscribe(option); }What do you think @congbobo184 @liangyepianzhou
This doesn't fix the bug that pr is trying to fix. For example, if the schema is AVRO, but two AUTO_CONSUME consumers have subscribed to the topic, the schema compatibility will still be checked. Because numActiveConsumers ! = 0 is true, so the active consumers shouldn't only include the AUTO_CONSUME consumers. Of course we should add your code, too. So I made some changes. @codelipenghui @liangyepianzhou @congbobo184 PTAL
The pr had no activity for 30 days, mark with Stale label.
I think the better way to resolve this bug is to record schemaType in Consumer class:
https://github.com/apache/pulsar/blob/a95bb630d76c3234ef4660d722518e642341ad3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L71-L81
Then when we calculate the active consumer num, we can ignore the consumers without a specific schema:
https://github.com/apache/pulsar/blob/a95bb630d76c3234ef4660d722518e642341ad3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1162-L1173
I can provide another PR to implement it.
@labuladong When did I say I needed your help?
I have resolve this bug by record SchemaData in Consumer last night.
@congbobo184 PTAL.
@congbobo184 The Commit of my PR is a bit dirty. Should I open a PR and close this?
@congbobo184 The Commit of my PR is a bit dirty. Should I open a PR and close this?
don't need push a new pr, please merge the apache/master to resolve the conflict
@congbobo184 The conflict has been resolved. What else should i do?
@codelipenghui @congbobo184
- https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817 change here to: SchemaInfo si = schema.getSchemaInfo(); if (si != null && SchemaType.NONE == si.getType()) { // don't set schema for Schema.BYTES si = null; } But here it will change BYTE to NONE: https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L641-L651
- Then: https://github.com/Denovo1998/pulsar/blob/33c838af4d4a70e40e6782d37579c67e3f338237/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1133-L1138 in handleSubscribe(): if (schema != null && schema.getType() != SchemaType.NONE) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe(option, schema)); } else { return topic.subscribe(option, schema); }
Is this OK?
- And:
https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1023
https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1209-L1222
Does the other two
handleProducer()andhandleGetOrCreateSchema()need to be changed?
We only need to upload AUTO_CONSUME and don't change the behavior of BYTES schema(don't unload NONE AND BYTES schema. in broker schema = null represents the BYTES schema). how about this? @Denovo1998 @codelipenghui
Hi @Denovo1998! I'm glad that you have this idea too, so I'll close my pr.
For your discussion above, my solution is to only count the consumer which specifies a schema:
int numActiveConsumersWithSchema = subscriptions.values().stream()
.mapToInt(subscription -> (int) subscription.getConsumers().stream()
// filter out the consumers have specific schema
.filter((c) -> c.getSchemaType() != null && c.getSchemaType().getValue() > 0).count())
.sum();
If so, the original schema won’t be changed, and this code can ignore the consumers with null, AUTO_CONSUME, NONE or BYTES schema, because these schemas are all non-positive enums:
https://github.com/apache/pulsar/blob/90f67587e31cc0cfb27773f85f18d607c9c5c324/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java#L140-L175
But when AUTO_CONSUME consumer subscribes to an empty topic first. The schemaInfo is null. There seems to be no way to fix this bug. https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L270-L280 https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817 @congbobo184 @codelipenghui
we need to send the AUTO_CONSUME type to the broker https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L773-L788
https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817
change AUTO_CONSUME type can be send to broker
@congbobo184 There is nothing wrong with my test consumer, but will this affect the behavior of Producer?
https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-common/src/main/proto/PulsarApi.proto#L47
we should add the AUTO_CONSUME schema type in this proto, otherwise, it can't upload AUTO_CONSUME to broker
@congbobo184 Is that all right? Is there anything else need to do?